Update nagios monitor to check for duplicated email addresses
authorMagnus Hagander <magnus@hagander.net>
Wed, 12 Aug 2020 15:14:19 +0000 (17:14 +0200)
committerMagnus Hagander <magnus@hagander.net>
Wed, 12 Aug 2020 15:14:19 +0000 (17:14 +0200)
This should not be possible if things are done through the appropriate
interfaces, and those interfaces are bug-free (right..). But really bad
things can probably happen if they don't, so put a monitor in place to
check for it.

This also adds a view in the db that shows all registered email
addresses and their accounts, regardless of if it's primary or secondary
address. This is used by the nagios check but can of course be useful to
manual checks as well.

pgweb/account/migrations/0007_all_emails_view.py [new file with mode: 0644]
tools/auth_changetrack/nagios_check.py

diff --git a/pgweb/account/migrations/0007_all_emails_view.py b/pgweb/account/migrations/0007_all_emails_view.py
new file mode 100644 (file)
index 0000000..937b667
--- /dev/null
@@ -0,0 +1,29 @@
+# Generated by Django 2.2.11 on 2020-08-12 14:50
+
+from django.db import migrations
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ('account', '0006_communityauth_sync'),
+    ]
+
+    operations = [
+        migrations.RunSQL(
+            """
+CREATE OR REPLACE VIEW all_user_email_addresses AS
+ SELECT auth_user.username,
+    auth_user.email,
+    'primary'::text AS type
+   FROM auth_user
+UNION ALL
+ SELECT auth_user.username,
+    se.email,
+    'secondary'::text AS type
+   FROM auth_user
+     JOIN account_secondaryemail se ON se.user_id = auth_user.id
+            """,
+            "DROP VIEW all_user_email_addresses",
+        ),
+    ]
index 3b415bf48448f32249c24f9a746a7512a591f8aa..a9977f8be87de9248d7b945e71903caa0da01462 100755 (executable)
@@ -9,31 +9,47 @@ WARNING_THRESHOLD = timedelta(minutes=5)
 # More than 15 minutes something is definitely wrong
 CRITICAL_THRESHOLD = timedelta(minutes=15)
 
-if __name__ == "__main__":
-    if len(sys.argv) != 2:
-        print("Usage: nagios_check.py <dsn>")
-        sys.exit(1)
-
-    conn = psycopg2.connect(sys.argv[1])
-    curs = conn.cursor()
 
+def check_queue(curs):
     # Get the oldest entry that has not been completed, if any
-    curs.execute("SELECT COALESCE(max(now()-changedat), '0') FROM account_communityauthchangelog")
+    curs.execute("SELECT COALESCE(now()-changedat) FROM account_communityauthchangelog")
     rows = curs.fetchall()
-    conn.close()
 
     if len(rows) == 0:
-        print("OK, queue is empty")
+        return "queue is empty"
         sys.exit(0)
 
     age = rows[0][0]
 
     if age < WARNING_THRESHOLD:
-        print("OK, queue age is %s" % age)
-        sys.exit(0)
+        return "queue age is %s" % age
     elif age < CRITICAL_THRESHOLD:
         print("WARNING, queue age is %s" % age)
         sys.exit(1)
     else:
         print("CRITICAL, queue age is %s" % age)
         sys.exit(2)
+
+
+def check_mail(curs):
+    curs.execute("SELECT count(*) FROM (SELECT 1 FROM all_user_email_addresses GROUP BY email HAVING count(*) > 1) x")
+    num, = curs.fetchone()
+    if num > 0:
+        print("CRITICAL, {} email addresses have duplicate entries!".format(num))
+        sys.exit(2)
+    return ""
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        print("Usage: nagios_check.py <dsn>")
+        sys.exit(1)
+
+    conn = psycopg2.connect(sys.argv[1])
+    curs = conn.cursor()
+
+    status = []
+    status.append(check_queue(curs))
+    status.append(check_mail(curs))
+
+    print("OK: {}".format('; '.join([s for s in status if s])))