## Reset some things at the per-database level
for my $dbname (keys %{ $sync->{db} }) {
+ ## This must be set, as it is used by the standard_conflict below
$deltacount{$dbname} = 0;
$dmlcount{allinserts}{$dbname} = 0;
$dmlcount{alldeletes}{$dbname} = 0;
## Gets all relevant rows from bucardo_deltas: runs asynchronously
$sth{getdelta}{$dbname}{$g}->execute();
+
}
## Grab all results as they finish.
## Only need to turn off triggers and rules once via pg_class
my $disabled_via_pg_class = 0;
- ## Cached epoch value per database for the standard conflict:
- delete $self->{dbhightime};
+ ## The overall winning database for conflicts
+ delete $self->{conflictwinner};
## Do each goat in turn
$dmlcount{conflicts} += $count;
$self->glog("Conflicts for $S.$T: $count", LOG_NORMAL);
+$self->glog(Dumper \%conflict);
## If we have a custom conflict handler for this goat, invoke it
if ($g->{code_conflict}) {
}
## If we are grabbing the 'latest', figure out which it is
+ ## For this handler, we want to treat all the tables in the sync
+ ## as deeply linked to each other, and this we have one winning
+ ## database for *all* tables in the sync.
+ ## Thus, the only things arriving from other databases will be inserts
elsif ('bucardo_latest' eq $g->{standard_conflict}) {
- ## If this is the first time with a conflict, grab all values
- ## We want the latest txntime across all tables for each database
- if (! exists $self->{dbhightime}) {
- for my $dbname (keys %{ $self->{db_hasconflict} }) {
+ ## We only need to figure out the winning database once
+ ## The winner is the latest one to touch any of our tables
+ ## In theory, this is a little crappy.
+ ## In practice, it works out quite well. :)
+ if (! exists $self->{conflictwinner}) {
+
+ for my $dbname (@dbs_delta) {
+
$x = $sync->{db}{$dbname};
- $self->{dbhightime}{$dbname} = 0;
+ ## Start by assuming this DB has no changes
+ $x->{lastmod} = 0;
+
for my $g (@$goatlist) {
## This only makes sense for tables
}
$sth = $x->{dbh}->prepare($g->{sql_max_delta});
$count = $sth->execute();
- my $epoch = 0;
- if ($count < 1) { ## No rows at all!
+ if ($count < 1) { ## No changes means we keep the default of "0"
$sth->finish();
}
else {
- $epoch = $sth->fetchall_arrayref()->[0][0];
- }
- if ($epoch > $self->{dbhightime}{$dbname}) {
- $self->{dbhightime}{$dbname} = $epoch;
+ ## Keep in mind we don't really care which table this is
+ my $epoch = $sth->fetchall_arrayref()->[0][0];
+ if ($epoch > $x->{lastmod}) {
+ $x->{lastmod} = $epoch;
+ }
}
- }
- }
- }
- ## Walk through each pkey conflict and declare a winner
- for my $key (keys %conflict) {
- ## Highest one wins
- my $highest = 0;
- ## The name of the winning database
- my $winner;
- for my $dbname (sort keys %{ $conflict{$key} }) {
- ## We sort the database names so even in the (very!) unlikely
- ## chance of a tie, the same database always wins
-
- if ($self->{dbhightime}{$dbname} > $highest) {
- $highest = $self->{dbhightime}{$dbname};
- $winner = $dbname;
+ } ## end checking each table in the sync
+
+ } ## end checking each source database
+
+ ## Now we declare the overall winner
+ ## We sort the database names so even in the (very!) unlikely
+ ## chance of a tie, the same database always wins
+ my $highest = -1;
+ for my $dbname (sort @dbs_delta) {
+
+ $x = $sync->{db}{$dbname};
+
+ if ($x->{lastmod} > $highest) {
+ $highest = $x->{lastmod};
+ $self->{conflictwinner} = $dbname;
}
}
- if (! defined $winner) {
- die "Could not determine a winner using 'latest' standard_conflict for $S.$T";
- }
- $conflict{$key} = $winner;
- }
+
+ ## We now have a winning database inside self -> conflictwinner
+ ## This means we do not need to update %conflict at all
+
+ } ## end conflictwinner not set yet
+
}
else {
## Use the standard conflict: a list of database names
-
- ## Optimize for a single database name
- my $sc = $g->{standard_conflict};
- if (index($sc, ' ') < 1) {
- ## Sanity check
- if (! exists $deltabin{$sc}) {
- die "Invalid standard_conflict '$sc' used for $S.$T";
- }
- for my $key (keys %conflict) {
- $conflict{$key} = $sc;
- }
- }
- ## Not a single name? See which one wins for each key
- else {
- my @dbs = split / +/ => $sc;
- ## Make sure they all exist
- for my $db (@dbs) {
- if (! exists $deltabin{$db}) {
- die qq{Invalid database "$db" found in standard conflict for $S.$T};
+ ## Basically, we use the first valid one we find
+ ## The only reason *not* to use an entry is if it had
+ ## no updates at all for this run. Note: this does not
+ ## mean no conflicts, it means no insert/update/delete
+
+ if (! exists $self->{conflictwinner}) {
+
+ ## Optimize for a single database name
+ my $sc = $g->{standard_conflict};
+ if (index($sc, ' ') < 1) {
+ ## Sanity check
+ if (! exists $deltacount{$sc}) {
+ die "Invalid standard_conflict '$sc' used for $S.$T";
}
+ $self->{conflictwinner} = $sc;
}
- ## Loop through each collision and set the winning database
- for my $key (keys %conflict) {
- my $winner;
- ## Check each database in order to see if it has this key
- for my $db (@dbs) {
- if (exists $conflict{$key}{$db}) {
- $winner = $db;
- last;
+ else {
+ ## Have more than one, so figure out the best one to use
+ my @dbs = split / +/ => $sc;
+ ## Make sure they all exist
+ for my $dbname (@dbs) {
+ if (! exists $deltacount{$dbname}) {
+ die qq{Invalid database "$dbname" found in standard conflict for $S.$T};
}
}
- $conflict{$key} = $winner;
+
+ ## Check each candidate in turn
+ ## It wins, unless it has no changes at all
+ for my $dbname (@dbs) {
+
+ my $found_delta = 0;
+
+ ## Walk through but stop at the first found delta
+ for my $g (@$goatlist) {
+
+ ## This only makes sense for tables
+ next if $g->{reltype} ne 'table';
+
+ ## Prep our SQL: find the epoch of the latest transaction for this table
+ if (!exists $g->{sql_got_delta}) {
+ ## We need to know if any have run since the last time we ran this sync
+ ## In other words, any deltas newer than the highest track entry
+ $SQL = qq{SELECT COUNT(*) FROM bucardo.$g->{deltatable} d }
+ . qq{WHERE d.txntime > }
+ . qq{(SELECT MAX(txntime) FROM bucardo.$g->{tracktable} }
+ . qq{WHERE target = '$x->{TARGETNAME}')};
+ $g->{sql_got_delta} = $SQL;
+ $self->glog("AAAA Running $SQL");
+ }
+ $sth = $x->{dbh}->prepare($g->{sql_got_delta});
+ $count = $sth->execute();
+ $sth->finish();
+ if ($count >= 1) {
+ $found_delta = 1;
+ last;
+ }
+ }
+
+ if (! $found_delta) {
+ $self->glog("No rows changed, so discarding conflict winner '$dbname'", LOG_VERBOSE);
+ next;
+ }
+
+ $self->{conflictwinner} = $dbname;
+ last;
+
+ }
+
+ ## No match at all? Must be a non-inclusive list
+ if (! exists $self->{conflictwinner}) {
+ die qq{Invalid standard conflict '$sc': no matching database found!};
+ }
}
- }
+ } ## end conflictwinner not set yet
+
} ## end standard conflict
- ## At this point, %conflict should hold the keys and the winning database
+ ## At this point, conflictwinner should be set, OR
+ ## %conflict should hold the winning database per key
## Walk through and apply to the %deltabin hash
+
+ ## We want to walk through each primary key for this table
+ ## We figure out who the winning database is
+ ## Then we remove all rows for all databases with this key
+ ## Finally, we add the winning databases/key combo to deltabin
+ ## We do it this way as we cannot be sure that the combo existed.
+ ## It could be the case that the winning database made
+ ## no changes to this table!
for my $key (keys %conflict) {
- my $winner = $conflict{$key};
+ my $winner = $self->{conflictwinner} || $conflict{$key};
+
+ ## Delete everyone for this primary key
for my $dbname (keys %deltabin) {
- next if $winner eq $dbname;
delete $deltabin{$dbname}{$key};
}
+
+ ## Add (or re-add) the winning one
+ $deltabin{$winner}{$key} = 1;
}
$self->glog('Conflicts have been resolved', LOG_NORMAL);
$self->{sync}{$syncname}{mcp_active} = 1;
## Let any listeners know we are done
- $self->db_notify($maindbh, "activated_sync_$syncname");
+ $self->db_notify($maindbh, "activated_sync_$syncname", 1);
## We don't need to listen for activation requests anymore
$self->db_unlisten($maindbh, "activate_sync_$syncname", '', 1);
## But we do need to listen for deactivate and kick requests