my $maindbh = $self->{masterdbh};
$self->glog("Bucardo database backend PID: $self->{master_backend}", LOG_VERBOSE);
+ ## Setup mapping so we can report in the log which things came from this backend
+ $self->{pidmap}{$self->{master_backend}} = 'Bucardo DB';
+
## SQL to enter a new database in the dbrun table
$SQL = qq{
INSERT INTO bucardo.dbrun(sync,dbname,pgpid)
my $warn = $msg =~ /CTL.+request/ ? '' : 'Warning! ';
$self->glog(qq{${warn}Child for sync "$syncname" was killed at line $line: $msg}, LOG_WARN);
- ## Never display the database password
+ ## Never display the database passwords
for (values %{$self->{dbs}}) {
$_->{dbpass} = '???';
}
}
my $subject = qq{Bucardo kid for "$syncname" killed on $shorthost$moresub};
$self->send_mail({ body => "$body\n", subject => $subject });
- }
+
+ } ## end sending email
my $extrainfo = sprintf '%s%s%s',
qq{Sync "$syncname"},
}; ## end SIG{__DIE_} handler sub
- ## Setup mapping so we can report in the log which things came from this backend
- $self->{pidmap}{$self->{master_backend}} = 'Bucardo DB';
-
## Listen for the controller asking us to go again if persistent
if ($kidsalive) {
$self->db_listen( $maindbh, "kid_run_$syncname" );
$sth{kid_syncrun_update_status} = $maindbh->prepare($SQL);
## SQL to set the syncrun table as ended once complete
- ## We do not set ended, but let the controller handle that
$SQL = q{
UPDATE bucardo.syncrun
SET deletes=deletes+?, inserts=inserts+?, truncates=truncates+?,
};
$sth{kid_syncrun_end} = $maindbh->prepare($SQL);
- ## Connect to all databases we are responsible for
- ## This list has already been pruned by the controller as needed
+ ## Connect to all (connectable) databases we are responsible for
+ ## This main list has already been pruned by the controller as needed
for my $dbname (@dbs_connectable) {
$x = $sync->{db}{$dbname};
## Only tables get all this fuss: sequences are easy
next if $g->{reltype} ne 'table';
- ## Setup the pretty names (which is used by the exception handler too)
- ($S,$T) = ($g->{safeschema},$g->{safetable});
-
## This is the main query: grab all unique changed primary keys since the last sync
$SQL{delta}{$g} = qq{
SELECT DISTINCT $g->{pklist}
$x = $sync->{db}{$dbname};
## Set the TARGETNAME for each database: the bucardo.track_* target entry
- ## Until we start using gangs again, just use the dbgroup?
+ ## Unless we start using gangs again, just use the dbgroup
$x->{TARGETNAME} = "dbgroup $dbs";
for my $g (@$goatlist) {
$sth{getdelta}{$dbname}{$g} = $x->{dbh}->prepare($SQL, {pg_async => PG_ASYNC});
## We need to update either the track table or the stage table
- ## There is no way to know beforehand if which we will need, so we get both ready
+ ## There is no way to know beforehand which we will need, so we prepare both
## Replace with the target name for source track updating
($SQL = $SQL{track}{$g}) =~ s/TARGETNAME/'$x->{TARGETNAME}'/go;
## If we are 8.3 or higher, we simply use session_replication_role,
## which is completely safe, and faster (thanks Jan!)
##
- ## Note that each database within the same sync may have different methods
+ ## We also see if the version is modern enough to use COPY with subselects
+ ##
+ ## Note that each database within the same sync may have different methods,
+ ## so we need to see if anyone is doing things the old way
my $anyone_does_pgclass = 0;
for my $dbname (@dbs_postgres) {
$x = $sync->{db}{$dbname};
$x->{modern_copy} = $ver >= 80200 ? 1 : 0;
}
- ## The actual SQL to disable using pg_class
- $SQL{disable_trigrules} = $SQL{enable_trigrules} = '';
-
## We don't both building these statements unless we need to
if ($anyone_does_pgclass) {
- ## The SQL to disable all triggers and rules
- ## for the tables in this sync
+ ## The SQL to disable all triggers and rules for the tables in this sync
$SQL = q{
UPDATE pg_class c
SET reltriggers = 0, relhasrules = false
$SQL .= ')';
## We are adding all tables together in a single multi-statement query
- $SQL{disable_trigrules} .= ";\n" if $SQL{disable_trigrules};
- $SQL{disable_trigrules} .= $SQL;
+ $SQL{disable_trigrules} = $SQL;
my $setclause =
## no critic (RequireInterpolationOfMetachars)
grep { $_->{reltype} eq 'table' }
@$goatlist;
- $SQL{enable_trigrules} .= ";\n" if $SQL{enable_trigrules};
$SQL{enable_trigrules} .= $SQL;
} ## end anyone using pg_class to turn off triggers and rules
## Row counts from the delta tables:
my %deltacount;
- ## Count of changes made (insert/delete/summary):
+ ## Count of changes made (inserts,deletes,truncates,conflicts handled):
my %dmlcount;
## Create safe versions of the database handles if we are going to need them
$self->{safe_dbh}{$dbname} = DBIx::Safe->new($darg);
}
}
- }
+
+ } ## end DBIX::Safe creations
## Kids always start "auto-kicked"
$self->{runrightnow} = 1;
## Begin the main KID loop
KID: {
+ ## Leave right away if we find a stopfile
+ if (-e $self->{stopfile}) {
+ $self->glog(qq{Found stopfile "$self->{stopfile}": exiting}, LOG_WARN);
+ last KID;
+ }
+
## Should we actually do something this round?
my $dorun = 0;
$self->{runrightnow} = 0;
}
- ## Leave right away if we find a stopfile
- if (-e $self->{stopfile}) {
- $self->glog(qq{Found stopfile "$self->{stopfile}": exiting}, LOG_WARN);
- last KID;
- }
-
## If persistent, listen for messages and do an occasional ping of all databases
if ($kidsalive) {
my $nlist = $self->db_get_notices($maindbh);
redo KID;
}
- ## How many times have we handled this sync?
- $kidloop++;
+ ## From this point on, we are a live kid that is expected to run the sync
## Used to report on total times for the long-running parts, e.g. COPY
my $kid_start_time = [gettimeofday];
## Create an entry in the syncrun table to let people know we've started
$sth{kid_syncrun_insert}->execute($syncname, "Started (KID $$)");
+ ## Increment our count of how many times we have been here before
+ $kidloop++;
+
## Reset the numbers to track total bucardo_delta matches
undef %deltacount;
$deltacount{all} = 0;
- ## Reset our tracks of total inserts, deletes, and conflicts
+ ## Reset our counts of total inserts, deletes, truncates, and conflicts
undef %dmlcount;
$dmlcount{deletes} = 0;
$dmlcount{inserts} = 0;
## Run all 'before_txn' code
if (exists $sync->{code_before_txn}) {
+ ## Let external people know where we are
$sth{kid_syncrun_update_status}->execute("Code before_txn (KID $$)", $syncname);
$maindbh->commit();
for my $code (@{$sync->{code_before_txn}}) {
+ ## Check if the code has asked us to skip other before_txn codes
last if 'last' eq $self->run_kid_custom_code($sync, $code);
}
}
## This should be done just before we start transactions on all dbs
$maindbh->commit();
- ## Start the main transactions. From here on out, speed is important.
+ ## Start the main transactions by setting isolation levels.
+ ## From here on out, speed is important.
## Note that all database handles are currently not in a txn
## (last action was commit or rollback)
- for my $dbname (@dbs_postgres) {
+ for my $dbname (@dbs_dbi) {
$x = $sync->{db}{$dbname};
## Just in case:
$x->{dbh}->rollback();
- $x->{dbh}->do('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ WRITE');
- $self->glog(qq{Set db "$dbname" to serializable}, LOG_DEBUG);
+ if ($x->{dbtype} eq 'postgres') {
+ $x->{dbh}->do('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ WRITE');
+ $self->glog(qq{Set db "$dbname" to serializable read write}, LOG_DEBUG);
+ }
+
+ if ($x->{dbtype} eq 'mysql') {
+ $x->{dbh}->do('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE');
+ $self->glog(qq{Set db "$dbname" to serializable}, LOG_DEBUG);
+ }
}
- ## We may want to lock all the tables
- ## XXX TODO: alternate ways to trigger this
+ ## We may want to lock all the tables. Use sparingly
my $lock_table_mode = '';
my $force_lock_file = "/tmp/bucardo-force-lock-$syncname";
+ ## If the file exists, pull the mode from inside it
if (-e $force_lock_file) {
$lock_table_mode = 'EXCLUSIVE';
if (-s _ and (open my $fh, '<', "$force_lock_file")) {
close $fh or warn qq{Could not close "$force_lock_file": $!\n};
if (defined $newmode) {
chomp $newmode;
+ ## Quick sanity check: only set if looks like normal words
$lock_table_mode = $newmode if $newmode =~ /^\s*\w[ \w]+\s*$/o;
}
}
$self->glog("Locking all tables in $lock_table_mode MODE", LOG_TERSE);
for my $g (@$goatlist) {
next if $g->{reltype} ne 'table';
- my $com = "$g->{safeschema}.$g->{safetable} IN $lock_table_mode MODE";
- for my $dbname (@dbs_postgres) {
+
+ for my $dbname (@dbs_dbi) {
$x = $sync->{db}{$dbname};
- $self->glog("Database $dbname: Locking table $com", LOG_TERSE);
- $x->{dbh}->do("LOCK TABLE $com");
+ ## XXX customname
+
+ if ($x->{dbtype} eq 'postgres') {
+ my $com = "$g->{safeschema}.$g->{safetable} IN $lock_table_mode MODE";
+ $self->glog("Database $dbname: Locking table $com", LOG_TERSE);
+ $x->{dbh}->do("LOCK TABLE $com");
+ }
+
+ if ($x->{dbtype} eq 'mysql') {
+ my $com = "$g->{safetable} WRITE";
+ $self->glog("Database $dbname: Locking table $com", LOG_TERSE);
+ $x->{dbh}->do("LOCK TABLE $com");
+ }
}
}
}
$sth{kid_syncrun_update_status}->execute("Code before_check_rows (KID $$)", $syncname);
$maindbh->commit();
for my $code (@{$sync->{code_before_check_rows}}) {
+ ## Check if the code has asked us to skip other before_check_rows codes
last if 'last' eq $self->run_kid_custom_code($sync, $code);
}
}