our %config;
our %config_about;
+## Sequence columns we care about and how to change them via ALTER:
+my @sequence_columns = (
+ ['last_value' => ''],
+ ['start_value' => 'START WITH'],
+ ['increment_by' => 'INCREMENT BY'],
+ ['max_value' => 'MAXVALUE'],
+ ['min_value' => 'MINVALUE'],
+ ['is_cycled' => 'BOOL CYCLE'],
+ ['is_called' => ''],
+);
+
+my $sequence_columns = join ',' => map { $_->[0] } @sequence_columns;
+
## Everything else is subroutines
sub new {
} ## end if reltype is table
- ## If a sequence, grab all info as a hash
- ## Saves us from worrying about future changes or version specific columns
- if ($g->{reltype} eq 'sequence') {
- $SQL = "SELECT * FROM $g->{safeschema}.$g->{safetable}";
- $sth = $srcdbh->prepare($SQL);
- $sth->execute();
- $g->{sequenceinfo} = $sth->fetchall_arrayref({})->[0];
- }
+ ## Grab sequence information for comparison to target further down
+ my $sourceseq = $g->{reltype} eq 'sequence'
+ ? $self->get_sequence_info($srcdbh, $g->{safeschema}, $g->{safetable})
+ : {};
## Customselect may be null, so force to a false value
$g->{customselect} ||= '';
## If a sequence, verify the information and move on
if ($g->{reltype} eq 'sequence') {
- $SQL = "SELECT * FROM $g->{safeschema}.$g->{safetable}";
- $sth = $dbh->prepare($SQL);
- $sth->execute();
- $info = $sth->fetchall_arrayref({})->[0];
- for my $key (sort keys %$info) {
- next if $key eq 'log_cnt';
- if (! exists $g->{sequenceinfo}{$key}) {
+ my $targetseq = $self->get_sequence_info($dbh, $g->{safeschema}, $g->{safetable});
+ for my $key (sort keys %$targetseq) {
+ if (! exists $sourceseq->{$key}) {
$self->glog(qq{Warning! Sequence on target has item $key, but source does not!}, LOG_WARN);
next;
}
- my $sseq = $g->{sequenceinfo}{$key};
- if ($info->{$key} ne $sseq) {
- $self->glog("Warning! Sequence mismatch. Source $key=$sseq, target is $info->{$key}", LOG_WARN);
+ if ($targetseq->{$key} ne $sourceseq->{$key}) {
+ $self->glog("Warning! Sequence mismatch. Source $key=$sourceseq->{$key}, target is $targetseq->{$key}", LOG_WARN);
next;
}
}
- ## Grab oid of the sequence on the remote database
- $sth = $dbh->prepare($SQL{checktable});
- $count = $sth->execute($g->{schemaname},$g->{tablename});
- if ($count != 1) {
- my $msg = qq{Could not find remote sequence $g->{schemaname}.$g->{tablename} on $db\n};
- $self->glog($msg, LOG_TERSE);
- warn $msg;
- return 0;
- }
- $g->{targetoid}{$db} = $sth->fetchall_arrayref()->[0][0];
-
next;
- }
+
+ } ## end if sequence
## Grab oid and quoted information about the table on the remote database
$sth = $dbh->prepare($SQL{checktable});
if ($synctype eq 'pushdelta' or $synctype eq 'swap') {
- $SQL = 'DELETE FROM bucardo.bucardo_sequences WHERE tablename=? AND syncname=? AND targetdb=?';
- $sth{source}{deletesequence} = $sourcedbh->prepare($SQL);
- $sth{target}{deletesequence} = $targetdbh->prepare($SQL) if $synctype eq 'swap';
-
- $SQL = 'INSERT INTO bucardo.bucardo_sequences '
- . '(tablename,syncname,targetdb,last_value,start_value,'
- . '(increment_by,max_value,min_value,is_cycled,is_called) '
- . 'VALUES (?,?,?,?,?,?,?,?,?,?)';
- $sth{source}{insertsequence} = $sourcedbh->prepare($SQL);
- $sth{target}{insertsequence} = $targetdbh->prepare($SQL) if $synctype eq 'swap';
-
-
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
($S,$T) = ($g->{safeschema},$g->{safetable});
+ ## If pushdelta, handle any sequence changes right away
+ if ($synctype eq 'pushdelta' and $g->{reltype} eq 'sequence') {
+ $deltacount{sequences} +=
+ $self->adjust_sequence($sourcedbh, $targetdbh, $S, $T, $sync->{name}, $targetdb);
+ }
+
+ ## No need to continue unless we are a table
+ next if $g->{reltype} ne 'table';
+
## If this table was truncated on the source, we do nothing here
if ($g->{source}{needstruncation}) {
$self->glog(qq{Bypassing normal pushdelta counting for $S.$T as this is a truncate}, LOG_DEBUG);
next;
}
- ## We'll handle sequence changes here and now (pushdelta only)
- if ($synctype eq 'pushdelta' and $g->{reltype} eq 'sequence') {
-
- ## Compare the current sequence information with out stored values
- ## to see if we need to replicate it.
- $SQL = "SELECT * FROM $S.$T";
- $sth = $sourcedbh->prepare($SQL);
- $sth->execute();
- my $currseq = $sth->fetchall_arrayref({})->[0];
-
- $SQL = 'SELECT * FROM bucardo.bucardo_sequences WHERE tablename=? AND syncname=? AND targetdb=?';
- $sth = $sourcedbh->prepare($SQL);
- $count = $sth->execute($g->{oid}, $sync->{name}, $targetdb);
- my $newval = $count < 1 ? 1 : 0; ## Do we insert or update bucardo_sequences?
- my $oldseq = $sth->fetchall_arrayref({})->[0];
-
- ## Do we need to call setval?
- if ($newval
- or $currseq->{last_value} != $oldseq->{last_value}
- or $currseq->{is_called} != $oldseq->{is_called}
- ) {
- $self->glog("Setting sequence $S.$T to value of $currseq->{last_value}, is_called is $currseq->{is_called}", LOG_NORMAL);
- $SQL = "SELECT setval('$S.$T', $currseq->{last_value}, '$currseq->{is_called}')";
- $targetdbh->do($SQL);
- $newval = 1;
- }
-
- ## Do we need to call ALTER SEQUENCE?
- for my $val (sort keys %$currseq) {
- next if $currseq->{$val} eq $oldseq->{$val};
- $self->glog("Sequence $S.$T has a different $val value: was $oldseq->{$val}, now $currseq->{$val}", LOG_DEBUG);
- $SQL = "ALTER SEQUENCE $S.$T";
- $newval =1;
- }
-
- if ($newval) {
- $sth = $sth{source}{deletesequence};
- $sth->execute($g->{oid},$sync->{name},$targetdb);
-
- $sth = $sth{source}{insertsequence};
- $sth->execute($g->{oid},$sync->{name},$targetdb,@$currseq{qw/
- last_value start_value increment_by max_value min_value is_cycled is_called /});
- $deltacount{sequences}++;
- }
-
- } ## end sequence checking
-
- ## No need to continue unless we are a table
- next if $g->{reltype} ne 'table';
-
$deltacount{allsource} += $deltacount{source}{$S}{$T} = $sth{source}{$g}{getdelta}->execute();
$sth{source}{$g}{getdelta}->finish() if $deltacount{source}{$S}{$T} =~ s/0E0/0/o;
$self->glog(qq{Source delta count for $S.$T: $deltacount{source}{$S}{$T}},
next;
}
- ## Handle sequences first, by simply forcing a setval
+ ## Handle sequences first
if ($g->{reltype} eq 'sequence') {
- $SQL = "SELECT last_value, is_called FROM $S.$T";
- my ($lastval, $iscalled) = @{$sourcedbh->selectall_arrayref($SQL)->[0]};
+
+ my $currseq = $self->get_sequence_info($sourcedbh, $S, $T);
+
+ my ($lastval, $iscalled) = @$currseq{qw/last_value is_called/};
$self->glog("Setting sequence $S.$T to value of $lastval, is_called is $iscalled", LOG_NORMAL);
$SQL = "SELECT setval('$S.$T', $lastval, '$iscalled')";
$targetdbh->do($SQL);
+ ## Just in case, ALTER the sequence as well
+ $SQL = "ALTER SEQUENCE $S.$T";
+ my @alter;
+ for my $col (@sequence_columns) {
+ my ($name,$syntax) = @$col;
+ next if ! $syntax;
+ if ($syntax =~ s/BOOL //) {
+ $SQL .= sprintf ' %s%s',
+ $currseq->{$name} ? '' : 'NO ',
+ $syntax;
+ }
+ else {
+ $SQL .= " $syntax $currseq->{$name}";
+ }
+ }
+ $targetdbh->do($SQL);
+
## No need to continue any further
next;
}
if ($g->{reltype} eq 'sequence') {
my $action = 0; ## 0 = skip, 1 = source->target, 2 = target->source
- $g->{tempschema} = {};
- my $SEQUENCESQL = "SELECT * FROM $S.$T";
- my $currseq;
if (exists $g->{code_conflict}) {
$self->glog('No support for custom conflict handlers for sequences yet!', LOG_WARN);
}
$action = 2;
}
elsif ('lowest' eq $sc or 'highest' eq $sc) {
- $SQL = "SELECT * FROM $S.$T";
- $sth = $sourcedbh->prepare($SQL);
- $sth->execute();
- $currseq = $sth->fetchall_arrayref({})->[0];
-
- ($g->{tempschema}{s}{lastval},$g->{tempschema}{s}{iscalled}) =
- @{$sourcedbh->selectall_arrayref($SEQUENCESQL)->[0]};
- ($g->{tempschema}{t}{lastval},$g->{tempschema}{t}{iscalled}) =
- @{$targetdbh->selectall_arrayref($SEQUENCESQL)->[0]};
- if ($g->{tempschema}{s}{lastval} > $g->{tempschema}{t}{lastval}) {
+ my $sourceseq = $self->get_sequence_info($sourcedbh, $S, $T);
+ my $targetseq = $self->get_sequence_info($targetdbh, $S, $T);
+ if ($sourceseq->{last_value} > $targetseq->{last_value}) {
$action = 'lowest' eq $sc ? 2 : 1;
}
- elsif ($g->{tempschema}{s}{lastval} < $g->{tempschema}{t}{lastval}) {
+ elsif ($sourceseq->{last_value} < $targetseq->{last_value}) {
$action = 'lowest' eq $sc ? 1 : 2;
}
else {
$deltacount{sequences}++;
## Get the last seen value
- my $LASTSEQUENCESQL = 'SELECT * FROM bucardo.bucardo_sequences WHERE tablename = ? AND syncname = ? AND targetdb = ?';
+ my $LASTSEQUENCESQL = 'SELECT * FROM bucardo.bucardo_sequences WHERE tablename = ? AND syncname = ? AND targetname = ?';
## Source wins - copy its value to the target
if (1 == $action) {
- $self->glog("Copying value of $S.$T from source to target", LOG_VERBOSE);
-
- if (! exists $g->{tempschema}{s}) {
- ($g->{tempschema}{s}{lastval},$g->{tempschema}{s}{iscalled}) =
- @{$sourcedbh->selectall_arrayref($SEQUENCESQL)->[0]};
- }
-
- my $lastval = $g->{tempschema}{s}{lastval};
- my $iscalled = $g->{tempschema}{s}{iscalled};
-
- ## Has it changed since last visit?
- $sth = $sourcedbh->prepare($LASTSEQUENCESQL);
- $count = $sth->execute($g->{oid}, $sync->{name}, $targetdb);
- my $newval = 0;
- if ($count < 1) {
- $newval = 1; ## Never before seen, so add to the table
- $sth->finish();
- }
- else {
- my ($oldval,$oldcalled) = @{$sth->fetchall_arrayref()->[0]};
- if ($oldval != $lastval) {
- $newval = 2; ## Value has changed
- }
- elsif ($oldcalled ne $iscalled) {
- $newval = 3; ## is_called has changed
- }
- }
- ## Has not changed, so we simply move on to the next goat
- next if ! $newval;
-
- ## Apply to the target
- $self->glog("Setting sequence $S.$T on target to value of $lastval, is_called is $iscalled", LOG_NORMAL);
- $SQL = "SELECT setval('$S.$T', $lastval, '$iscalled')";
- $targetdbh->do($SQL);
-
- ## Save to the target's internal table
- ## Rather than worry about upserts, we'll just delete/insert every time
- $sth = $sth{target}{deletesequence};
- $sth->execute($g->{targetoid}{$targetdb},$sync->{name},$sourcedb);
-
- $sth = $sth{target}{insertsequence};
- $sth->execute($g->{targetoid}{$targetdb},$sync->{name},$sourcedb,@$currseq{qw/
- last_value start_value increment_by max_value min_value is_cycled is_called /});
- $deltacount{sequences}++;
-
- ## Internal note so we know things have changed
- $deltacount{sequences}++;
-
- ## Done: jump to the next goat
- next;
- }
-
- ## Target wins - copy its value to the source
- $self->glog("Copying value of $S.$T from target to source", LOG_VERBOSE);
-
- if (! exists $g->{tempschema}{t}) {
- ($g->{tempschema}{t}{lastval},$g->{tempschema}{t}{iscalled}) =
- @{$targetdbh->selectall_arrayref($SEQUENCESQL)->[0]};
- }
-
- my $lastval = $g->{tempschema}{t}{lastval};
- my $iscalled = $g->{tempschema}{t}{iscalled};
-
- ## Has it changed since last visit?
- $sth = $sourcedbh->prepare($LASTSEQUENCESQL);
- $count = $sth->execute($g->{oid}, $sync->{name}, $targetdb);
- my $newval = 0;
- if ($count < 1) {
- $newval = 1; ## Never before seen, so add to the table
- $sth->finish();
+ $deltacount{sequences} +=
+ $self->adjust_sequence($sourcedbh, $targetdbh, $S, $T, $syncname, $targetdb);
}
else {
- my ($oldval,$oldcalled) = @{$sth->fetchall_arrayref()->[0]};
- if ($oldval != $lastval) {
- $newval = 2; ## Value has changed
- }
- elsif ($oldcalled ne $iscalled) {
- $newval = 3; ## is_called has changed
- }
+ $deltacount{sequences} +=
+ $self->adjust_sequence($targetdbh, $sourcedbh, $S, $T, $syncname, $sourcedb);
}
- ## Has not changed, so we simply move on to the next goat
- next if ! $newval;
-
- ## Apply to the source
- $self->glog("Setting sequence $S.$T on source to value of $lastval, is_called is $iscalled", LOG_NORMAL);
- $SQL = "SELECT setval('$S.$T', $lastval, '$iscalled')";
- $sourcedbh->do($SQL);
-
- ## Save to the source's internal table
- ## Rather than worry about upserts, we'll just delete/insert every time
- $sth = $sth{source}{deletesequence};
- $sth->execute($g->{oid},$sync->{name},$targetdb);
-
- $sth = $sth{source}{insertsequence};
- $sth->execute($g->{oid},$sync->{name},$targetdb,@$currseq{qw/
- last_value start_value increment_by max_value min_value is_cycled is_called /});
- $deltacount{sequences}++;
## Proceed to the next goat
next;
} ## end of cleanup_kid
+sub get_sequence_info {
+
+ ## Get sequence information
+ ## Not technically MVCC but good enough for our purposes
+ ## Arguments:
+ ## 1. Database handle
+ ## 2. Schema name
+ ## 3. Sequence name
+ ## 4. (optional) Name of the sync
+ ## 5. (optional) Target database name
+ ## Returns: hashref of information
+
+ ## If five arguments are given, look up the "old" information in bucardo_sequences
+ ## With only three arguments, pull directly from the sequence
+
+ my ($self,$ldbh,$schemaname,$seqname,$syncname,$targetname) = @_;
+
+ if (defined $syncname) {
+ ## Pull "old" sequence information. May be empty.
+ $SQL = "SELECT $sequence_columns FROM bucardo.bucardo_sequences "
+ . ' WHERE schemaname=? AND seqname = ? AND syncname=? AND targetname=?';
+ $sth = $ldbh->prepare($SQL);
+ $sth->execute($schemaname,$seqname, $syncname, $targetname);
+ }
+ else {
+ ## Pull directly from a named sequence
+ $SQL = "SELECT $sequence_columns FROM $schemaname.$seqname";
+ $sth = $ldbh->prepare($SQL);
+ $sth->execute();
+ }
+
+ return $sth->fetchall_arrayref({})->[0];
+
+} ## end of get_sequence_info
+
+
+sub adjust_sequence {
+
+ ## Change a sequence if needed
+ ## If changed, update the bucardo_sequences table
+ ## Arguments:
+ ## 1. source database handle (where bucardo_sequences lives)
+ ## 2. target database handle (the sequence to be updated/altered)
+ ## 3. Schema name
+ ## 4. Sequence name
+ ## 5. Name of the current sync
+ ## 6. Name of the current target
+ ## Returns: number of changes made for this sequence
+
+ my ($self,$sdbh,$tdbh,$schemaname,$seqname,$syncname,$targetname) = @_;
+
+ my $changes = 0;
+
+ my $currseq = $self->get_sequence_info($sdbh, $schemaname, $seqname);
+
+ my $oldseq = $self->get_sequence_info($sdbh, $schemaname, $seqname, $syncname, $targetname);
+
+$self->glog("currseq for $seqname: " . (Dumper $currseq));
+$self->glog("oldseq for $seqname: " . (Dumper $oldseq));
+
+
+ ## Call SETVAL as needed
+ if (! exists $oldseq->{last_value}
+ or $currseq->{last_value} != $oldseq->{last_value}
+ or $currseq->{is_called} != $oldseq->{is_called}
+ ) {
+ $self->glog("Setting sequence $schemaname.$seqname to value of $currseq->{last_value}, is_called is $currseq->{is_called}", LOG_NORMAL);
+ $SQL = "SELECT setval('$schemaname.$seqname', $currseq->{last_value}, '$currseq->{is_called}')";
+ $tdbh->do($SQL);
+ $changes++;
+ }
+
+ ## Call ALTER SEQUENCE as needed
+ my @alter;
+ for my $col (@sequence_columns) {
+ my ($name,$syntax) = @$col;
+
+ next if ! $syntax;
+
+ next if exists $oldseq->{$name} and $currseq->{$name} eq $oldseq->{$name};
+
+ ## No output if we've not seen it before: just apply everything
+ if (exists $oldseq->{$name}) {
+ $self->glog("Sequence $schemaname.$seqname has a different $name value: was $oldseq->{$name}, now $currseq->{$name}", LOG_NORMAL);
+ }
+
+ if ($syntax =~ s/BOOL //) {
+ push @alter => sprintf '%s%s',
+ $currseq->{$name} ? '' : 'NO ',
+ $syntax;
+ }
+ else {
+ push @alter => "$syntax $currseq->{$name}";
+ }
+ $changes++;
+ }
+ if (@alter) {
+ $SQL = "ALTER SEQUENCE $schemaname.$seqname ";
+ $SQL .= join ' ' => @alter;
+ $self->glog("Running on target $targetname: $SQL", LOG_DEBUG);
+ $tdbh->do($SQL);
+ }
+
+ return 0 if ! $changes;
+
+ ## Need to update the local bucardo_sequences table
+ $SQL = 'DELETE FROM bucardo.bucardo_sequences WHERE schemaname=? AND seqname=? AND syncname=? AND targetname=?';
+ $sth = $sdbh->prepare_cached($SQL);
+ $sth->execute($schemaname,$seqname,$syncname,$targetname);
+
+ $SQL = 'INSERT INTO bucardo.bucardo_sequences (schemaname,seqname,syncname,targetname,'
+ . 'last_value,start_value, increment_by,max_value,min_value, is_cycled,is_called) '
+ . 'VALUES (?,?,?,?, ?,?, ?,?,?, ?,?)';
+ $sth = $sdbh->prepare_cached($SQL);
+ $sth->execute($schemaname,$seqname,$syncname,$targetname,
+ $currseq->{'last_value'},
+ $currseq->{'start_value'},
+ $currseq->{'increment_by'},
+ $currseq->{'max_value'},
+ $currseq->{'min_value'},
+ $currseq->{'is_cycled'},
+ $currseq->{'is_called'});
+
+ return $changes;
+
+} ## end of adjust_sequence
+
+
sub send_mail {
## Send out an email message