From 68e8bb7775a846ce7fda3a70395e66778f36a0ed Mon Sep 17 00:00:00 2001 From: Magnus Hagander Date: Thu, 29 Nov 2018 17:01:22 +0100 Subject: [PATCH] Use advisory lock around load_message.py Avoid loading two messages at the same time. In particular this can cause issues if it's two copies of the same message on different lists, which can cause a UNIQUE violation in the loader. It could also be a problem if two messages on a new thread arrives in parallel, which could cause two separate threads to be created. This could be made more efficient by properly ordering the operations on storage and using ON CONFLICT, but it's a very rare occassion and it doesn't matter that we have to wait for a second or two for a previous storage to complete. --- loader/load_message.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/loader/load_message.py b/loader/load_message.py index 2c400b1..b466860 100755 --- a/loader/load_message.py +++ b/loader/load_message.py @@ -85,9 +85,19 @@ if __name__ == "__main__": connstr = 'need_connstr' conn = psycopg2.connect(connstr) + curs = conn.cursor() + + # Take an advisory lock to force serialization. + # We could do this "properly" by reordering operations and using ON CONFLICT, + # but concurrency is not that important and this is easier... + try: + curs.execute("SET statement_timeout='30s'") + curs.execute("SELECT pg_advisory_xact_lock(8059944559669076)") + except Exception, e: + print("Failed to wait on advisory lock: %s" % e) + sys.exit(1) # Get the listid we're working on - curs = conn.cursor() curs.execute("SELECT listid FROM lists WHERE listname=%(list)s", { 'list': opt.list }) -- 2.39.5