Cleanup of conflict code
authorGreg Sabino Mullane <greg@endpoint.com>
Sun, 18 Mar 2012 12:45:21 +0000 (08:45 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Sun, 18 Mar 2012 12:45:21 +0000 (08:45 -0400)
Bucardo.pm

index 0ed3b05113298adc6be4e0e58d6bd5cc99eede2f..a8dfaf61e67601a28f298184c152bcd0c6c52967 100644 (file)
@@ -2752,6 +2752,7 @@ sub start_kid {
         ## Reset some things at the per-database level
         for my $dbname (keys %{ $sync->{db} }) {
 
+            ## This must be set, as it is used by the standard_conflict below
             $deltacount{$dbname} = 0;
             $dmlcount{allinserts}{$dbname} = 0;
             $dmlcount{alldeletes}{$dbname} = 0;
@@ -3025,6 +3026,7 @@ sub start_kid {
 
                     ## Gets all relevant rows from bucardo_deltas: runs asynchronously
                     $sth{getdelta}{$dbname}{$g}->execute();
+
                 }
 
                 ## Grab all results as they finish.
@@ -3162,8 +3164,8 @@ sub start_kid {
             ## Only need to turn off triggers and rules once via pg_class
             my $disabled_via_pg_class = 0;
 
-            ## Cached epoch value per database for the standard conflict:
-            delete $self->{dbhightime};
+            ## The overall winning database for conflicts
+            delete $self->{conflictwinner};
 
             ## Do each goat in turn
 
@@ -3297,6 +3299,7 @@ sub start_kid {
                     $dmlcount{conflicts} += $count;
 
                     $self->glog("Conflicts for $S.$T: $count", LOG_NORMAL);
+$self->glog(Dumper \%conflict);
 
                     ## If we have a custom conflict handler for this goat, invoke it
                     if ($g->{code_conflict}) {
@@ -3328,15 +3331,25 @@ sub start_kid {
                     }
 
                     ## If we are grabbing the 'latest', figure out which it is
+                    ## For this handler, we want to treat all the tables in the sync
+                    ## as deeply linked to each other, and this we have one winning
+                    ## database for *all* tables in the sync.
+                    ## Thus, the only things arriving from other databases will be inserts
                     elsif ('bucardo_latest' eq $g->{standard_conflict}) {
 
-                        ## If this is the first time with a conflict, grab all values
-                        ## We want the latest txntime across all tables for each database
-                        if (! exists $self->{dbhightime}) {
-                            for my $dbname (keys %{ $self->{db_hasconflict} }) {
+                        ## We only need to figure out the winning database once
+                        ## The winner is the latest one to touch any of our tables
+                        ## In theory, this is a little crappy.
+                        ## In practice, it works out quite well. :)
+                        if (! exists $self->{conflictwinner}) {
+
+                            for my $dbname (@dbs_delta) {
+
                                 $x = $sync->{db}{$dbname};
 
-                                $self->{dbhightime}{$dbname} = 0;
+                                ## Start by assuming this DB has no changes
+                                $x->{lastmod} = 0;
+
                                 for my $g (@$goatlist) {
 
                                     ## This only makes sense for tables
@@ -3349,88 +3362,142 @@ sub start_kid {
                                     }
                                     $sth = $x->{dbh}->prepare($g->{sql_max_delta});
                                     $count = $sth->execute();
-                                    my $epoch = 0;
-                                    if ($count < 1) { ## No rows at all!
+                                    if ($count < 1) { ## No changes means we keep the default of "0"
                                         $sth->finish();
                                     }
                                     else {
-                                        $epoch = $sth->fetchall_arrayref()->[0][0];
-                                    }
-                                    if ($epoch > $self->{dbhightime}{$dbname}) {
-                                        $self->{dbhightime}{$dbname} = $epoch;
+                                        ## Keep in mind we don't really care which table this is
+                                        my $epoch = $sth->fetchall_arrayref()->[0][0];
+                                        if ($epoch > $x->{lastmod}) {
+                                            $x->{lastmod} = $epoch;
+                                        }
                                     }
-                                }
-                            }
-                        }
 
-                        ## Walk through each pkey conflict and declare a winner
-                        for my $key (keys %conflict) {
-                            ## Highest one wins
-                            my $highest = 0;
-                            ## The name of the winning database
-                            my $winner;
-                            for my $dbname (sort keys %{ $conflict{$key} }) {
-                                ## We sort the database names so even in the (very!) unlikely
-                                ## chance of a tie, the same database always wins
-
-                                if ($self->{dbhightime}{$dbname} > $highest) {
-                                    $highest = $self->{dbhightime}{$dbname};
-                                    $winner = $dbname;
+                                } ## end checking each table in the sync
+
+                            } ## end checking each source database
+
+                            ## Now we declare the overall winner
+                            ## We sort the database names so even in the (very!) unlikely
+                            ## chance of a tie, the same database always wins
+                            my $highest = -1;
+                            for my $dbname (sort @dbs_delta) {
+
+                                $x = $sync->{db}{$dbname};
+
+                                if ($x->{lastmod} > $highest) {
+                                    $highest = $x->{lastmod};
+                                    $self->{conflictwinner} = $dbname;
                                 }
                             }
-                            if (! defined $winner) {
-                                die "Could not determine a winner using 'latest' standard_conflict for $S.$T";
-                            }
-                            $conflict{$key} = $winner;
-                        }
+
+                            ## We now have a winning database inside self -> conflictwinner
+                            ## This means we do not need to update %conflict at all
+
+                        } ## end conflictwinner not set yet
+
                     }
                     else {
 
                         ## Use the standard conflict: a list of database names
-
-                        ## Optimize for a single database name
-                        my $sc = $g->{standard_conflict};
-                        if (index($sc, ' ') < 1) {
-                            ## Sanity check
-                            if (! exists $deltabin{$sc}) {
-                                die "Invalid standard_conflict '$sc' used for $S.$T";
-                            }
-                            for my $key (keys %conflict) {
-                                $conflict{$key} = $sc;
-                            }
-                        }
-                        ## Not a single name? See which one wins for each key
-                        else {
-                            my @dbs = split / +/ => $sc;
-                            ## Make sure they all exist
-                            for my $db (@dbs) {
-                                if (! exists $deltabin{$db}) {
-                                    die qq{Invalid database "$db" found in standard conflict for $S.$T};
+                        ## Basically, we use the first valid one we find
+                        ## The only reason *not* to use an entry is if it had
+                        ## no updates at all for this run. Note: this does not
+                        ## mean no conflicts, it means no insert/update/delete
+
+                        if (! exists $self->{conflictwinner}) {
+
+                            ## Optimize for a single database name
+                            my $sc = $g->{standard_conflict};
+                            if (index($sc, ' ') < 1) {
+                                ## Sanity check
+                                if (! exists $deltacount{$sc}) {
+                                    die "Invalid standard_conflict '$sc' used for $S.$T";
                                 }
+                                $self->{conflictwinner} = $sc;
                             }
-                            ## Loop through each collision and set the winning database
-                            for my $key (keys %conflict) {
-                                my $winner;
-                                ## Check each database in order to see if it has this key
-                                for my $db (@dbs) {
-                                    if (exists $conflict{$key}{$db}) {
-                                        $winner = $db;
-                                        last;
+                            else {
+                                ## Have more than one, so figure out the best one to use
+                                my @dbs = split / +/ => $sc;
+                                ## Make sure they all exist
+                                for my $dbname (@dbs) {
+                                    if (! exists $deltacount{$dbname}) {
+                                        die qq{Invalid database "$dbname" found in standard conflict for $S.$T};
                                     }
                                 }
-                                $conflict{$key} = $winner;
+
+                                ## Check each candidate in turn
+                                ## It wins, unless it has no changes at all
+                                for my $dbname (@dbs) {
+
+                                    my $found_delta = 0;
+
+                                    ## Walk through but stop at the first found delta
+                                    for my $g (@$goatlist) {
+
+                                        ## This only makes sense for tables
+                                        next if $g->{reltype} ne 'table';
+
+                                        ## Prep our SQL: find the epoch of the latest transaction for this table
+                                        if (!exists $g->{sql_got_delta}) {
+                                            ## We need to know if any have run since the last time we ran this sync
+                                            ## In other words, any deltas newer than the highest track entry
+                                            $SQL = qq{SELECT COUNT(*) FROM bucardo.$g->{deltatable} d }
+                                                 . qq{WHERE d.txntime > }
+                                                 . qq{(SELECT MAX(txntime) FROM bucardo.$g->{tracktable} }
+                                                 . qq{WHERE target = '$x->{TARGETNAME}')};
+                                            $g->{sql_got_delta} = $SQL;
+                                            $self->glog("AAAA Running $SQL");
+                                        }
+                                        $sth = $x->{dbh}->prepare($g->{sql_got_delta});
+                                        $count = $sth->execute();
+                                        $sth->finish();
+                                        if ($count >= 1) {
+                                            $found_delta = 1;
+                                            last;
+                                        }
+                                    }
+
+                                    if (! $found_delta) {
+                                        $self->glog("No rows changed, so discarding conflict winner '$dbname'", LOG_VERBOSE);
+                                        next;
+                                    }
+
+                                    $self->{conflictwinner} = $dbname;
+                                    last;
+
+                                }
+
+                                ## No match at all? Must be a non-inclusive list
+                                if (! exists $self->{conflictwinner}) {
+                                    die qq{Invalid standard conflict '$sc': no matching database found!};
+                                }
                             }
-                        }
+                        } ## end conflictwinner not set yet
+
                     } ## end standard conflict
 
-                    ## At this point, %conflict should hold the keys and the winning database
+                    ## At this point, conflictwinner should be set, OR
+                    ## %conflict should hold the winning database per key
                     ## Walk through and apply to the %deltabin hash
+
+                    ## We want to walk through each primary key for this table
+                    ## We figure out who the winning database is
+                    ## Then we remove all rows for all databases with this key
+                    ## Finally, we add the winning databases/key combo to deltabin
+                    ## We do it this way as we cannot be sure that the combo existed.
+                    ## It could be the case that the winning database made
+                    ## no changes to this table!
                     for my $key (keys %conflict) {
-                        my $winner = $conflict{$key};
+                        my $winner = $self->{conflictwinner} || $conflict{$key};
+
+                        ## Delete everyone for this primary key
                         for my $dbname (keys %deltabin) {
-                            next if $winner eq $dbname;
                             delete $deltabin{$dbname}{$key};
                         }
+
+                        ## Add (or re-add) the winning one
+                        $deltabin{$winner}{$key} = 1;
                     }
 
                     $self->glog('Conflicts have been resolved', LOG_NORMAL);
@@ -6025,7 +6092,7 @@ sub activate_sync {
     $self->{sync}{$syncname}{mcp_active} = 1;
 
     ## Let any listeners know we are done
-    $self->db_notify($maindbh, "activated_sync_$syncname");
+    $self->db_notify($maindbh, "activated_sync_$syncname", 1);
     ## We don't need to listen for activation requests anymore
     $self->db_unlisten($maindbh, "activate_sync_$syncname", '', 1);
     ## But we do need to listen for deactivate and kick requests