From 3a6ddd93cd75740b8f4941eb73aaa66b8411f7a4 Mon Sep 17 00:00:00 2001 From: "David E. Wheeler" Date: Mon, 10 Dec 2012 17:27:43 -0800 Subject: [PATCH] First stab at getting makedelta working again. Add code to push_rows() that inserts the proper delta and track records if the target is PostgreSQL. Tests fail just as before because nothing tickles the autokick, which is disabled by the replication KID. There is a NOTIFY for a makedelta job, though, so perhaps that can be used? --- Bucardo.pm | 49 +++++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/Bucardo.pm b/Bucardo.pm index e794c0d44..140635c0b 100644 --- a/Bucardo.pm +++ b/Bucardo.pm @@ -3886,7 +3886,6 @@ sub start_kid { ## Update bucardo_track table so that the bucardo_delta rows we just processed ## are marked as "done" and ignored by subsequent runs - ## We also rely on this section to do makedelta related bucardo_track inserts ## Reset our pretty-printer count $maxcount = 0; @@ -3922,7 +3921,6 @@ sub start_kid { for my $dbname (@dbs_source) { if ($deltacount{dbtable}{$dbname}{$S}{$T}) { - ## XXX account for makedelta! ## This is async: if ($x->{trackstage}) { $sth{stage}{$dbname}{$g}->execute(); @@ -5692,16 +5690,6 @@ sub validate_sync { } ## end checking each custom code - ## Consolidate some things that are set at both sync and goat levels - - ## The makedelta settings indicates which sides (source/target) get manual delta rows - ## This is required if other syncs going to other targets need to see the changed data - ## Note that fullcopy is always 0, and pushdelta can only change target_makedelta - ## The db is on or off, and the sync then inherits, forces it on, or forces it off - ## Each goat then does the same: inherits, forces on, forces off - - ## XXX Revisit the whole makedelta idea - ## Go through each goat in this sync, adjusting items and possibly bubbling up info to sync for my $g (@{$s->{goatlist}}) { ## None of this applies to non-tables @@ -8497,8 +8485,8 @@ sub push_rows { my $pcount = @pkvals; ## Loop through each chunk of primary keys to copy over - for my $pkvs (@pkvals) { - $pkvs = join ',' => @{ $pkvs }; + for my $pk_values (@pkvals) { + my $pkvs = join ',' => @{ $pk_values }; ## Message to prepend to the statement if chunking my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */"; @@ -8639,19 +8627,44 @@ sub push_rows { ## Perform final cleanups for each target for my $t (@{ $srccmd{$clause} }) { - my $type = $t->{dbtype}; - my $tname = $newname->{$t->{name}}; if ('postgres' eq $type) { - $t->{dbh}->pg_putcopyend(); + my $dbh = $t->{dbh}; + $dbh->pg_putcopyend(); ## Same bug as above if ($self->{dbdpgversion} < 21801) { - $t->{dbh}->do('SELECT 1'); + $dbh->do('SELECT 1'); } $self->glog(qq{Rows copied to $t->{name}.$tname: $total}, LOG_VERBOSE); $count += $total; + ## If this goat is set to makedelta, add rows to bucardo_delta to simulate the + ## normal action of a tigger, and add rows to bucardo_track so they changed + ## rows cannot flow back to us + if ($t->{makedelta} && !$fullcopy) { + my ($cols, $vals); + if ($numpks == 1) { + $cols = "($pkcols)"; + $vals = join ',', map { "($_)" } map { @{ $_ } } @pkvals; + } else { + $cols = $pkcols; + $vals = join ',', map { @{ $_ } } @pkvals; + } + $dbh->do(qq{ + INSERT INTO bucardo.$goat->{deltatable} $cols + VALUES $vals + }); + $dbh->do(qq{ + INSERT INTO bucardo.$goat->{tracktable} + VALUES (NOW(), ?) + }, undef, $t->{TARGETNAME}); + + # Notify that the sync was done; add payload with host/table name? + $self->db_notify($dbh, "deltadone_${syncname}"); + + # XXX Kick it! + } } elsif ('flatpg' eq $type) { print {$t->{filehandle}} "\\\.\n\n"; -- 2.39.5