white noise
authormartinko <gamato@users.sf.net>
Fri, 19 Oct 2012 10:19:28 +0000 (12:19 +0200)
committermartinko <gamato@users.sf.net>
Fri, 19 Oct 2012 10:19:28 +0000 (12:19 +0200)
python/pgq/consumer.py
python/pgq/localconsumer.py
python/skytools/dbservice.py
python/skytools/scripting.py

index b3b45d18445cc602707233543998ce6e1aa563e1..aa60ffa8bf11c66504db4f71447038b394908461 100644 (file)
@@ -15,9 +15,9 @@ __all__ = ['Consumer']
 
 class _WalkerEvent(Event):
     """Redirects status flags to BatchWalker.
-    
-    That way event data can gc-d immidiately and
-    tag_done() events dont need to be remembered.
+
+    That way event data can be gc'd immediately and
+    tag_done() events don't need to be remembered.
     """
     def __init__(self, walker, queue, row):
         Event.__init__(self, queue, row)
@@ -122,8 +122,8 @@ class Consumer(skytools.DBScript):
         # the actual user script on top of pgq.Consumer must also support it
         #pgq_autocommit = 0
 
-        # whether to wait for specified number of events, before
-        # assigning a batch (0 disables)
+        # whether to wait for specified number of events,
+        # before assigning a batch (0 disables)
         #pgq_batch_collect_events = 0
 
         # whether to wait specified amount of time,
@@ -131,7 +131,7 @@ class Consumer(skytools.DBScript):
         #pgq_batch_collect_interval =
 
         # whether to stay behind queue top (postgres interval)
-        #pgq_keep_lag = 
+        #pgq_keep_lag =
 
         # in how many seconds to write keepalive stats for idle consumers
         # this stats is used for detecting that consumer is still running
@@ -167,7 +167,7 @@ class Consumer(skytools.DBScript):
 
     def __init__(self, service_name, db_name, args):
         """Initialize new consumer.
-        
+
         @param service_name: service_name for DBScript
         @param db_name: name of database for get_database()
         @param args: cmdline args for DBScript
@@ -242,15 +242,15 @@ class Consumer(skytools.DBScript):
     def process_event(self, db, event):
         """Process one event.
 
-        Should be overrided by user code.
+        Should be overridden by user code.
         """
         raise Exception("needs to be implemented")
 
     def process_batch(self, db, batch_id, event_list):
         """Process all events in batch.
-        
+
         By default calls process_event for each.
-        Can be overrided by user code.
+        Can be overridden by user code.
         """
         for ev in event_list:
             self.process_event(db, ev)
@@ -275,7 +275,7 @@ class Consumer(skytools.DBScript):
         # load events
         ev_list = self._load_batch_events(curs, batch_id)
         db.commit()
-        
+
         # process events
         self._launch_process_batch(db, batch_id, ev_list)
 
@@ -302,7 +302,7 @@ class Consumer(skytools.DBScript):
         db = self.get_database(self.db_name)
         cx = db.cursor()
         cx.execute("select pgq.unregister_consumer(%s, %s)",
-                    [self.queue_name, self.consumer_name])
+                [self.queue_name, self.consumer_name])
         db.commit()
 
     def _launch_process_batch(self, db, batch_id, list):
@@ -398,4 +398,4 @@ class Consumer(skytools.DBScript):
         self.stat_put('duration', round(t - self.stat_batch_start,4))
         if count > 0: # reset timer if we got some events
             self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
-            self.idle_start = t
\ No newline at end of file
+            self.idle_start = t
index 55ea15b266dff9314bbf30a79bc0f2b9b0db75ae..72fe1578a22eaf5941a921a544c0323f52d91391 100644 (file)
@@ -23,7 +23,7 @@ class LocalConsumer(pgq.Consumer):
 
     Features:
      - Can detect if several batches are already applied to dest db.
-     - If some ticks are lost. allows to seek back on queue.
+     - If some ticks are lost, allows to seek back on queue.
        Whether it succeeds, depends on pgq configuration.
 
     Config options::
@@ -81,7 +81,7 @@ class LocalConsumer(pgq.Consumer):
                 q = "select * from pgq.register_consumer(%s, %s)"
                 curs.execute(q, [self.queue_name, self.consumer_name])
         elif local_tick < 0:
-            self.log.info("Local tick missing, storing queueu tick %d", queue_tick)
+            self.log.info("Local tick missing, storing queue tick %d", queue_tick)
             self.save_local_tick(queue_tick)
         elif local_tick > queue_tick:
             self.log.warning("Tracking out of sync: queue=%d local=%d.  Repositioning on queue.  [Database failure?]",
@@ -172,14 +172,14 @@ class LocalConsumer(pgq.Consumer):
             src_db = self.get_database(self.db_name)
             src_curs = src_db.cursor()
 
-            self.log.info("Rewinding queue to tick local tick %d", dst_tick)
+            self.log.info("Rewinding queue to local tick %d", dst_tick)
             q = "select pgq.register_consumer_at(%s, %s, %s)"
             src_curs.execute(q, [self.queue_name, self.consumer_name, dst_tick])
 
             src_db.commit()
         else:
             self.log.error('Cannot rewind, no tick found in local file')
-        
+
     def dst_reset(self):
         self.log.info("Removing local tracking file")
         try:
@@ -208,4 +208,3 @@ class LocalConsumer(pgq.Consumer):
         """Store tick in local file."""
         data = str(tick_id)
         skytools.write_atomic(self.local_tracking_file, data)
-
index d1ddb666e3e03d338cd44447d65f7b74fa0994da..7ca5956cd2e0d07afa14a25843d86d9d6d1b1c37 100755 (executable)
@@ -22,7 +22,7 @@ def transform_fields(rows, key_fields, name_field, data_field):
     """Convert multiple-rows per key input array
     to one-row, multiple-column output array.  The input arrays
     must be sorted by the key fields.
-    
+
     >>> rows = []
     >>> rows.append({'time': '22:00', 'metric': 'count', 'value': 100})
     >>> rows.append({'time': '22:00', 'metric': 'dur', 'value': 7})
@@ -152,21 +152,21 @@ def log_result(log, list):
 
 
 class DBService:
-    """  Wrap parametrisized query handling and multiset stored procedure writing
+    """  Wrap parameterized query handling and multiset stored procedure writing
     """
     ROW = "_row"            # name of the fake field where internal record id is stored
     FIELD = "_field"        # parameter name for the field in record that is related to current message
     PARAM = "_param"        # name of the parameter to which message relates
     SKIP = "skip"           # used when record is needed for it's data but is not been updated
-    INSERT = "insert" 
+    INSERT = "insert"
     UPDATE = "update"
     DELETE = "delete"
-    INFO = "info"           # just informative message for the user             
+    INFO = "info"           # just informative message for the user
     NOTICE = "notice"       # more than info less than warning
     WARNING = "warning"     # warning message, something is out of ordinary
     ERROR = "error"         # error found but execution continues until check then error is raised
     FATAL = "fatal"         # execution is terminated at once and all found errors returned
-    
+
     def __init__(self, context, global_dict = None):
         """ This object must be initiated in the beginning of each db service
         """
@@ -175,19 +175,19 @@ class DBService:
         self.global_dict = global_dict      # used for cacheing query plans
         self._retval = []                   # used to collect return resultsets
         self._is_test = 'is_test' in rec    # used to convert output into human readable form
-        
-        self.sqls = None                    # if sqls stays None then no recording of sqls is done 
+
+        self.sqls = None                    # if sqls stays None then no recording of sqls is done
         if "show_sql" in rec:               # api must add exected sql to resultset
             self.sqls = []                  # sql's executed by dbservice, used for dubugging
-        
+
         self.can_save = True                # used to keep value most severe error found so far
         self.messages = []                  # used to hold list of messages to be returned to the user
+
     # error and message handling
-    
+
     def tell_user(self, severity, code, message, params = None, **kvargs):
         """ Adds another message to the set of messages to be sent back to user
-            If error message then can_save is set false 
+            If error message then can_save is set false
             If fatal message then error or found errors are raised at once
         """
         params = params or kvargs
@@ -210,14 +210,14 @@ class DBService:
             msgs = "Dbservice error(s): " + make_record_array( self.messages )
             plpy.error( msgs )
 
-    # run sql meant mostly for select but not limited to 
+    # run sql meant mostly for select but not limited to
 
     def create_query(self, sql, params = None, **kvargs):
         """ Returns initialized querybuilder object for building complex dynamic queries
         """
         params = params or kvargs
         return skytools.PLPyQueryBuilder(sql, params, self.global_dict, self.sqls )
-                            
+
     def run_query(self, sql, params = None, **kvargs):
         """ Helper function if everything you need is just paramertisized execute
             Sets rows_found that is coneninet to use when you don't need result just
@@ -265,7 +265,7 @@ class DBService:
         return row.values()[0]
 
      # resultset handling
-     
+
     def return_next(self, rows, res_name, severity = None):
         """ Adds given set of rows to resultset
         """
@@ -273,7 +273,7 @@ class DBService:
         if severity is not None and len(rows) == 0:
             self.tell_user(severity, "dbsXXXX", "No matching records found")
         return rows
-        
+
     def return_next_sql(self, sql, params, res_name, severity = None):
         """ Exectes query and adds recors resultset
         """
@@ -587,4 +587,3 @@ class ServiceContext(DBService):
             If dict was not provied with call it is created
         """
         return fields
-
index 12a72da9100022af5232f6e1ace6b84074309d22..004cacfb73ddbcff50aa27fe2ccc1059399b8811 100644 (file)
@@ -249,7 +249,7 @@ class BaseScript(object):
 
         @param service_name: unique name for script.
             It will be also default job_name, if not specified in config.
-        @param args: cmdline args (sys.argv[1:]), but can be overrided
+        @param args: cmdline args (sys.argv[1:]), but can be overridden
         """
         self.service_name = service_name
         self.go_daemon = 0
@@ -339,7 +339,7 @@ class BaseScript(object):
         """Loads and returns skytools.Config instance.
 
         By default it uses first command-line argument as config
-        file name.  Can be overrided.
+        file name.  Can be overridden.
         """
 
         if len(self.args) < 1:
@@ -354,7 +354,7 @@ class BaseScript(object):
         """Initialize a OptionParser() instance that will be used to
         parse command line arguments.
 
-        Note that it can be overrided both directions - either DBScript
+        Note that it can be overridden both directions - either DBScript
         will initialize a instance and passes to user code or user can
         initialize and then pass to DBScript.init_optparse().
 
@@ -647,7 +647,7 @@ class DBScript(BaseScript):
 
         @param service_name: unique name for script.
             It will be also default job_name, if not specified in config.
-        @param args: cmdline args (sys.argv[1:]), but can be overrided
+        @param args: cmdline args (sys.argv[1:]), but can be overridden
         """
         self.db_cache = {}
         self._db_defaults = {}
@@ -873,7 +873,6 @@ class DBScript(BaseScript):
             # error is already logged
             sys.exit(1)
 
-
     def listen(self, dbname, channel):
         """Make connection listen for specific event channel.