})
if not curs.rowcount:
# Nothing in the queue, so we're done here.
- conn.rollback()
return
siteid, url, cryptkey, include_ssh = curs.fetchone()
if not rows:
# This shouldn't happen
logging.error("Re-querying for updates returned no rows! Aborting.")
- conn.rollback()
return
# Build the update structure
}, timeout=10)
except Exception as e:
logging.error("Exception pushing changes to {}: {}".format(url, e))
- conn.rollback()
site_stoplist.append(siteid)
continue
if r.status_code == 200:
# Success! Whee!
# This is a really silly way to do it, but meh.
+ # Also psycopg2 really doesn't like mixing transaction modes, but here we go..
+ conn.autocommit = False
curs.executemany("DELETE FROM account_communityauthchangelog WHERE site_id=%(siteid)s AND user_id=%(userid)s AND changedat=%(changedat)s", [
{
'siteid': siteid,
)
logging.info("Successfully pushed {} changes to {}".format(len(rows), url))
conn.commit()
+ conn.autocommit = True
continue
logging.error("Failed to push changes to {}: status {}, initial: {}".format(url, r.status_code, r.text[:100]))
- conn.rollback()
site_stoplist.append(siteid)
conn = psycopg2.connect(sys.argv[1])
curs = conn.cursor()
- curs.execute("LISTEN communityauth_changetrack")
- conn.commit()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
+ conn.autocommit = True
+
+ curs.execute("LISTEN communityauth_changetrack")
while True:
process_queue(conn)