Simplify transaction handling in the changetracker daemon
authorMagnus Hagander <magnus@hagander.net>
Wed, 12 Aug 2020 15:17:32 +0000 (17:17 +0200)
committerMagnus Hagander <magnus@hagander.net>
Wed, 12 Aug 2020 15:17:32 +0000 (17:17 +0200)
Previously we used a combination of optimistic concurrency control
(by DELETEing with both the id and the date included in the WHERE
clause) and REPEATABLE READ transactions. This would create
serialization conflicts when completely unnecessary. Since in this case
it doesn't matter if we happen to push the same thing twice, switch
completely to optimistic concurrency control. That gets rid of having to
deal with serialization issues.

tools/auth_changetrack/auth_changetrack.py

index 916127d300de437df9e802b12a0e8db729b9b700..58be15f198dc46916c2cec21e316151a01dbce54 100755 (executable)
@@ -25,7 +25,6 @@ def process_queue(conn):
         })
         if not curs.rowcount:
             # Nothing in the queue, so we're done here.
-            conn.rollback()
             return
 
         siteid, url, cryptkey, include_ssh = curs.fetchone()
@@ -48,7 +47,6 @@ LIMIT 100""",
         if not rows:
             # This shouldn't happen
             logging.error("Re-querying for updates returned no rows! Aborting.")
-            conn.rollback()
             return
 
         # Build the update structure
@@ -80,13 +78,14 @@ LIMIT 100""",
             }, timeout=10)
         except Exception as e:
             logging.error("Exception pushing changes to {}: {}".format(url, e))
-            conn.rollback()
             site_stoplist.append(siteid)
             continue
 
         if r.status_code == 200:
             # Success! Whee!
             # This is a really silly way to do it, but meh.
+            # Also psycopg2 really doesn't like mixing transaction modes, but here we go..
+            conn.autocommit = False
             curs.executemany("DELETE FROM account_communityauthchangelog WHERE site_id=%(siteid)s AND user_id=%(userid)s AND changedat=%(changedat)s", [
                 {
                     'siteid': siteid,
@@ -96,10 +95,10 @@ LIMIT 100""",
             )
             logging.info("Successfully pushed {} changes to {}".format(len(rows), url))
             conn.commit()
+            conn.autocommit = True
             continue
 
         logging.error("Failed to push changes to {}: status {}, initial: {}".format(url, r.status_code, r.text[:100]))
-        conn.rollback()
         site_stoplist.append(siteid)
 
 
@@ -112,10 +111,11 @@ if __name__ == "__main__":
 
     conn = psycopg2.connect(sys.argv[1])
     curs = conn.cursor()
-    curs.execute("LISTEN communityauth_changetrack")
-    conn.commit()
 
     conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
+    conn.autocommit = True
+
+    curs.execute("LISTEN communityauth_changetrack")
 
     while True:
         process_queue(conn)