londiste: added dispatch handler arg retention_period
authormartinko <gamato@users.sf.net>
Wed, 20 Feb 2013 16:10:55 +0000 (17:10 +0100)
committermartinko <gamato@users.sf.net>
Wed, 20 Feb 2013 16:10:55 +0000 (17:10 +0100)
It controls how long to keep partitions around. Examples: '3 months', '1 year'

python/londiste/handlers/dispatch.py
sql/londiste/structure/functions.sql
sql/londiste/structure/grants.ini

index 17e88a249b5f595d856b28bc976751c9f27871ca..ff367bd9e0ca364cea5e4000f704d14f83188453 100644 (file)
@@ -135,6 +135,9 @@ post_part:
     sql statement(s) to execute after creating partition table. Usable
     variables are the same as in part_template
 
+retention_period:
+    how long to keep partitions around. examples: '3 months', '1 year'
+
 encoding:
     name of destination encoding. handler replaces all invalid encoding symbols
     and logs them as warnings
@@ -684,6 +687,7 @@ class Dispatcher(BaseHandler):
             conf.pre_part = self.args.get('pre_part')
             conf.post_part = self.args.get('post_part')
             conf.part_func = self.args.get('part_func', PART_FUNC_NEW)
+            conf.retention_period = self.args.get('retention_period')
         # set row mode and event types to process
         conf.row_mode = self.get_arg('row_mode', ROW_MODES)
         event_types = self.args.get('event_types', '*')
@@ -787,6 +791,8 @@ class Dispatcher(BaseHandler):
         if self.conf.table_mode == 'part':
             dst, part_time = self.split_format(ev, data)
             if dst not in self.row_handler.table_map:
+                if self.conf.retention_period:
+                    self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
                 self.check_part(dst, part_time)
         else:
             dst = self.dest_table
@@ -880,7 +886,7 @@ class Dispatcher(BaseHandler):
                 have_func = skytools.exists_function(curs, PART_FUNC_OLD, len(PART_FUNC_ARGS))
 
             if have_func:
-                self.log.debug('check_part.exec: func:%s, args: %s' % (pfcall, vals))
+                self.log.debug('check_part.exec: func: %s, args: %s' % (pfcall, vals))
                 curs.execute(pfcall, vals)
             else:
                 #
@@ -897,6 +903,14 @@ class Dispatcher(BaseHandler):
         exec_with_vals(self.conf.post_part)
         self.log.info("Created table: %s" % dst)
 
+    def drop_obsolete_partitions (self, parent_table, retention_period, partition_period):
+        """ Drop obsolete partitions of partition-by-date parent table. """
+        curs = self.dst_curs
+        func = "londiste.drop_obsolete_partitions"
+        args = [parent_table, retention_period, partition_period]
+        self.log.debug("func: %s, args: %s" % (func, args))
+        curs.callproc (func, args)
+
     def real_copy(self, tablename, src_curs, dst_curs, column_list):
         """do actual table copy and return tuple with number of bytes and rows
         copied
index 560185ae136ba1bbb3af4d6835ba505e4ba9ddb1..5936536bc928073c72ec7f86249e4668b88be0c8 100644 (file)
@@ -52,4 +52,5 @@ select londiste.upgrade_schema();
 
 -- Group: Utility functions for handlers
 \i functions/londiste.create_partition.sql
+\i functions/londiste.drop_obsolete_partitions.sql
 
index 0c7040141d16c601883568b1e4b5794893ad5333..5f9dc6b76bc9c46fbf5374b421f569ec30ec56a5 100644 (file)
@@ -85,5 +85,6 @@ londiste_local_fns =
        londiste.local_set_table_struct(text, text, text),
        londiste.drop_table_triggers(text, text),
        londiste.table_info_trigger(),
-       londiste.create_partition(text,text,text,text,timestamptz,text)
+       londiste.create_partition(text, text, text, text, timestamptz, text),
+       londiste.drop_obsolete_partitions (text, interval, text)