Use a "semaphore table" to allow a mongo app to somewhat observe atomic behavior.
authorGreg Sabino Mullane <greg@endpoint.com>
Fri, 3 Jun 2011 02:55:42 +0000 (22:55 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Fri, 3 Jun 2011 02:55:42 +0000 (22:55 -0400)
Bucardo.pm
bucardo.schema

index 533a09f786caeec523401165f76a78636e961def..0b34c92de927a64db1797b85540e85ebdadd3805 100644 (file)
@@ -1689,9 +1689,12 @@ sub start_kid {
     };
 
     ## Set up some common groupings of databases inside sync->{db}
-    my (@dbs_postgres, @dbs_mysql, @dbs_source, @dbs_target, @dbs_dbi, @dbs_connectable);
+    my (@dbs, @dbs_postgres, @dbs_mysql, @dbs_source, @dbs_target, @dbs_dbi, @dbs_connectable);
     for my $dbname (sort keys %{ $sync->{db} }) {
         $x = $sync->{db}{$dbname};
+
+        push @dbs => $dbname;
+
         push @dbs_postgres => $dbname
             if $x->{dbtype} eq 'postgres';
 
@@ -1804,6 +1807,30 @@ sub start_kid {
             $finaldbh->commit();
         }
 
+        ## If using semaphore tables, mark the status as 'failed'
+        ## At least in the Mongo case, it's pretty safe to do this,
+        ## as it is unlikely the error came from Mongo Land
+        if ($config{semaphore_table}) {
+            my $tname = $config{semaphore_table};
+            for my $dbname (@dbs_connectable) {
+                $x = $sync->{db}{$dbname};
+                if ($x->{dbtype} eq 'mongo') {
+                    my $collection = $x->{dbh}->get_collection($tname);
+                    my $object = {
+                        sync => $syncname,
+                        status => 'failed',
+                        endtime => scalar gmtime,
+                    };
+                    $collection->update
+                        (
+                            {sync => $syncname},
+                            $object,
+                            { upsert => 1, safe => 1 }
+                        );
+                }
+            }
+        }
+
         (my $flatmsg = $msg) =~ s/\n/ /g;
 
         ## Mark this syncrun as aborted if needed, replace the 'lastbad'
@@ -3245,6 +3272,28 @@ sub start_kid {
                         or die qq{Could not open flatfile "$x->{filename}": $!\n};
                 }
 
+                ## Populate the semaphore table if the setting is non-empty
+                if ($config{semaphore_table}) {
+                    my $tname = $config{semaphore_table};
+                    for my $dbname (@dbs_connectable) {
+                        $x = $sync->{db}{$dbname};
+                        if ($x->{dbtype} eq 'mongo') {
+                            my $collection = $x->{dbh}->get_collection($tname);
+                            my $object = {
+                                sync => $syncname,
+                                status => 'started',
+                                starttime => scalar gmtime,
+                                };
+                            $collection->update
+                                (
+                                    {sync => $syncname},
+                                    $object,
+                                    { upsert => 1, safe => 1 }
+                                );
+                        }
+                    }
+                }
+
                 ## This is where we want to 'rewind' to on a handled exception
               PUSH_SAVEPOINT: {
 
@@ -3568,6 +3617,28 @@ sub start_kid {
             delete $x->{filehandle};
         }
 
+        ## If using semaphore tables, mark the status as 'complete'
+        if ($config{semaphore_table}) {
+            my $tname = $config{semaphore_table};
+            for my $dbname (@dbs_connectable) {
+                $x = $sync->{db}{$dbname};
+                if ($x->{dbtype} eq 'mongo') {
+                    my $collection = $x->{dbh}->get_collection($tname);
+                    my $object = {
+                        sync => $syncname,
+                        status => 'complete',
+                        endtime => scalar gmtime,
+                    };
+                    $collection->update
+                        (
+                            {sync => $syncname},
+                            $object,
+                            { upsert => 1, safe => 1 }
+                        );
+                }
+            }
+        }
+
         ## If doing truncate, do some cleanup
         if (exists $self->{truncateinfo}) {
             ## For each source database that had a truncate entry, mark them all as done
index 4efd6a344aaf4a1991a8ceec7164b8f97b9151fb..937161561abef8bb721695c92a07ce94686c9cb6 100644 (file)
@@ -157,6 +157,7 @@ max_delete_clause|200|Maximum number of items to delete inside of IN() clauses
 max_select_clause|500|Maximum number of items to select inside of IN() clauses
 piddir|/var/run/bucardo|Directory holding Bucardo PID files
 reason_file|bucardo.restart.reason.log|File to hold reasons for stopping and starting
+semaphore_table|bucardo_status|Table to let apps know a sync is ongoing
 stats_script_url|http://www.bucardo.org/|Location of the stats script
 stopfile|fullstopbucardo|Name of the semaphore file used to stop Bucardo processes
 syslog_facility|LOG_LOCAL1|Which syslog facility level to use