From ebf68b8563d3000dc9f81c4980f01975dabec16a Mon Sep 17 00:00:00 2001 From: Greg Sabino Mullane Date: Sun, 2 Nov 2014 13:42:14 -0500 Subject: [PATCH] Remove the terrible, awful $x global var, and fix some bugs in the process --- Bucardo.pm | 1029 +++++++++++++++++++++++++++------------------------- 1 file changed, 537 insertions(+), 492 deletions(-) diff --git a/Bucardo.pm b/Bucardo.pm index d7993e1a3..a30eaffe6 100644 --- a/Bucardo.pm +++ b/Bucardo.pm @@ -59,10 +59,10 @@ use constant { ## Map system signal numbers to standard names ## This allows us to say kill $signumber{HUP} => $pid -my $x = 0; +my $i = 0; my %signumber; for (split(' ', $Config{sig_name})) { - $signumber{$_} = $x++; + $signumber{$_} = $i++; } ## Prevent buffering of output: @@ -920,23 +920,24 @@ sub mcp_main { ## Check each (pingable) remote database in undefined order for my $dbname (keys %{ $self->{sdb} }) { - $x = $self->{sdb}{$dbname}; - next if $x->{dbtype} =~ /flat|mongo|redis/o; + my $d = $self->{sdb}{$dbname}; - next if $x->{status} eq 'stalled'; + next if $d->{dbtype} =~ /flat|mongo|redis/o; - if (! $x->{dbh}->ping) { + next if $d->{status} eq 'stalled'; + + if (! $d->{dbh}->ping) { ## Database is not reachable, so we'll try and reconnect $self->glog("Ping failed for database $dbname, trying to reconnect", LOG_NORMAL); ## Sleep a hair so we don't reloop constantly sleep 0.5; - ($x->{backend}, $x->{dbh}) = $self->connect_database($dbname); - if (defined $x->{backend}) { - $self->glog(qq{Database "$dbname" backend PID: $x->{backend}}, LOG_VERBOSE); - $self->show_db_version_and_time($x->{dbh}, qq{Database "$dbname" }); + ($d->{backend}, $d->{dbh}) = $self->connect_database($dbname); + if (defined $d->{backend}) { + $self->glog(qq{Database "$dbname" backend PID: $d->{backend}}, LOG_VERBOSE); + $self->show_db_version_and_time($d->{dbh}, qq{Database "$dbname" }); } else { $self->glog("Unable to reconnect to database $dbname!", LOG_WARN); @@ -957,14 +958,15 @@ sub mcp_main { ## Add in any messages from each remote database for my $dbname (keys %{ $self->{sdb} }) { - $x = $self->{sdb}{$dbname}; - next if $x->{dbtype} ne 'postgres'; + my $d = $self->{sdb}{$dbname}; + + next if $d->{dbtype} ne 'postgres'; - next if $x->{status} eq 'stalled'; + next if $d->{status} eq 'stalled'; - my $nlist = $self->db_get_notices($x->{dbh}); - $x->{dbh}->rollback(); + my $nlist = $self->db_get_notices($d->{dbh}); + $d->{dbh}->rollback(); for my $name (keys %{ $nlist } ) { if (! exists $notice->{$name}) { $notice->{$name} = $nlist->{$name}; @@ -1868,11 +1870,13 @@ sub start_controller { ## Count the number of gangs in use by this sync my %gang; for my $dbname (sort keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; + + my $d = $sync->{db}{$dbname}; + ## Makes no sense to specify gangs for source databases! - next if $x->{role} eq 'source'; + next if $d->{role} eq 'source'; - $gang{$x->{gang}}++; + $gang{$d->{gang}}++; } $sync->{numgangs} = keys %gang; @@ -1916,20 +1920,21 @@ sub start_controller { ## Reconnect to all databases we care about: overwrites existing dbhs for my $dbname (sort keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; - if ($x->{dbtype} =~ /flat/o) { + my $d = $sync->{db}{$dbname}; + + if ($d->{dbtype} =~ /flat/o) { $self->glog(qq{Not connecting to flatfile database "$dbname"}, LOG_NORMAL); next; } ## Do not need non-Postgres handles for the controller - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; ## Establish a new database handle - ($x->{backend}, $x->{dbh}) = $self->connect_database($dbname); - $self->glog(qq{Database "$dbname" backend PID: $x->{backend}}, LOG_NORMAL); - $self->{pidmap}{$x->{backend}} = "DB $dbname"; + ($d->{backend}, $d->{dbh}) = $self->connect_database($dbname); + $self->glog(qq{Database "$dbname" backend PID: $d->{backend}}, LOG_NORMAL); + $self->{pidmap}{$d->{backend}} = "DB $dbname"; } ## Adjust the target table names as needed and store in the goat hash @@ -2012,14 +2017,16 @@ sub start_controller { $g->{newname}{$syncname} = {}; $g->{newcols}{$syncname} = {}; for my $dbname (sort keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; - my $type= $x->{dbtype}; + + my $d = $sync->{db}{$dbname}; + + my $type= $d->{dbtype}; my $cname; my $ccols = ''; ## We only ever change table names (or cols) for true targets - if ($x->{role} ne 'source') { + if ($d->{role} ne 'source') { ## Save local copies for this database only $cname = $customname; @@ -2063,12 +2070,12 @@ sub start_controller { $g->{newname}{$syncname}{$dbname} = $cname; } ## Only a few use schemas: - elsif ($x->{dbtype} eq 'postgres' - or $x->{dbtype} eq 'flatpg') { + elsif ($d->{dbtype} eq 'postgres' + or $d->{dbtype} eq 'flatpg') { $g->{newname}{$syncname}{$dbname} = "$S.$T"; } ## Some always get the raw table name - elsif ($x->{dbtype} eq 'redis') { + elsif ($d->{dbtype} eq 'redis') { $g->{newname}{$syncname}{$dbname} = $g->{tablename}; } else { @@ -2421,22 +2428,22 @@ sub start_kid { for my $dbname (sort keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## First, do some exclusions ## If this is a onetimecopy sync, the fullcopy targets are dead to us - next if $sync->{onetimecopy} and $x->{role} eq 'fullcopy'; + next if $sync->{onetimecopy} and $d->{role} eq 'fullcopy'; ## If this is a onetimecopy sync, we only need to connect to a single source - if ($sync->{onetimecopy} and $x->{role} eq 'source') { + if ($sync->{onetimecopy} and $d->{role} eq 'source') { next if $found_first_source; $found_first_source = 1; } ## If this is inactive, we've already checked that if it is a source in validate_sync ## Thus, if we made it this far, it is a target and should be skipped - if ($x->{status} eq 'inactive') { + if ($d->{status} eq 'inactive') { $self->glog(qq{Skipping inactive database "$dbname" entirely}, LOG_NORMAL); ## Don't just skip it: nuke it from orbit! It's the only way to be sure. delete $sync->{db}{$dbname}; @@ -2446,99 +2453,99 @@ sub start_kid { ## Now set the default attributes ## Is this a SQL database? - $x->{does_sql} = 0; + $d->{does_sql} = 0; ## Can it do truncate? - $x->{does_truncate} = 0; + $d->{does_truncate} = 0; ## Does it support asynchronous queries well? - $x->{does_async} = 0; + $d->{does_async} = 0; ## Does it have good support for ANY()? - $x->{does_ANY_clause} = 0; + $d->{does_ANY_clause} = 0; ## Can it do savepoints (and roll them back)? - $x->{does_savepoints} = 0; + $d->{does_savepoints} = 0; ## Does it support truncate cascade? - $x->{does_cascade} = 0; + $d->{does_cascade} = 0; ## Does it support a LIMIT clause? - $x->{does_limit} = 0; + $d->{does_limit} = 0; ## Can it be queried? - $x->{does_append_only} = 0; + $d->{does_append_only} = 0; ## List of tables in this database that need makedelta inserts - $x->{does_makedelta} = {}; + $d->{does_makedelta} = {}; ## Does it have that annoying timestamp +dd bug? - $x->{has_mysql_timestamp_issue} = 0; + $d->{has_mysql_timestamp_issue} = 0; ## Start clumping into groups and adjust the attributes ## Postgres - if ('postgres' eq $x->{dbtype}) { + if ('postgres' eq $d->{dbtype}) { push @dbs_postgres => $dbname; - $x->{does_sql} = 1; - $x->{does_truncate} = 1; - $x->{does_savepoints} = 1; - $x->{does_cascade} = 1; - $x->{does_limit} = 1; - $x->{does_async} = 1; - $x->{does_ANY_clause} = 1; + $d->{does_sql} = 1; + $d->{does_truncate} = 1; + $d->{does_savepoints} = 1; + $d->{does_cascade} = 1; + $d->{does_limit} = 1; + $d->{does_async} = 1; + $d->{does_ANY_clause} = 1; } ## Drizzle - if ('drizzle' eq $x->{dbtype}) { + if ('drizzle' eq $d->{dbtype}) { push @dbs_drizzle => $dbname; - $x->{does_sql} = 1; - $x->{does_truncate} = 1; - $x->{does_savepoints} = 1; - $x->{does_limit} = 1; - $x->{has_mysql_timestamp_issue} = 1; + $d->{does_sql} = 1; + $d->{does_truncate} = 1; + $d->{does_savepoints} = 1; + $d->{does_limit} = 1; + $d->{has_mysql_timestamp_issue} = 1; } ## MongoDB - if ('mongo' eq $x->{dbtype}) { + if ('mongo' eq $d->{dbtype}) { push @dbs_mongo => $dbname; } ## MySQL (and MariaDB) - if ('mysql' eq $x->{dbtype} or 'mariadb' eq $x->{dbtype}) { + if ('mysql' eq $d->{dbtype} or 'mariadb' eq $d->{dbtype}) { push @dbs_mysql => $dbname; - $x->{does_sql} = 1; - $x->{does_truncate} = 1; - $x->{does_savepoints} = 1; - $x->{does_limit} = 1; - $x->{has_mysql_timestamp_issue} = 1; + $d->{does_sql} = 1; + $d->{does_truncate} = 1; + $d->{does_savepoints} = 1; + $d->{does_limit} = 1; + $d->{has_mysql_timestamp_issue} = 1; } ## Oracle - if ('oracle' eq $x->{dbtype}) { + if ('oracle' eq $d->{dbtype}) { push @dbs_oracle => $dbname; - $x->{does_sql} = 1; - $x->{does_truncate} = 1; - $x->{does_savepoints} = 1; + $d->{does_sql} = 1; + $d->{does_truncate} = 1; + $d->{does_savepoints} = 1; } ## Redis - if ('redis' eq $x->{dbtype}) { + if ('redis' eq $d->{dbtype}) { push @dbs_redis => $dbname; } ## SQLite - if ('sqlite' eq $x->{dbtype}) { + if ('sqlite' eq $d->{dbtype}) { push @dbs_sqlite => $dbname; - $x->{does_sql} = 1; - $x->{does_truncate} = 1; - $x->{does_savepoints} = 1; - $x->{does_limit} = 1; + $d->{does_sql} = 1; + $d->{does_truncate} = 1; + $d->{does_savepoints} = 1; + $d->{does_limit} = 1; } ## Flat files - if ($x->{dbtype} =~ /flat/) { - $x->{does_append_only} = 1; + if ($d->{dbtype} =~ /flat/) { + $d->{does_append_only} = 1; } ## Everyone goes into this bucket @@ -2546,24 +2553,24 @@ sub start_kid { ## Databases we read data from push @dbs_source => $dbname - if $x->{role} eq 'source'; + if $d->{role} eq 'source'; ## Target databases push @dbs_target => $dbname - if $x->{role} ne 'source'; + if $d->{role} ne 'source'; ## Databases that (potentially) get written to ## This is all of them, unless we are a source ## and a fullcopy sync or in onetimecopy mode push @dbs_write => $dbname if (!$sync->{fullcopy} and !$sync->{onetimecopy}) - or $x->{role} ne 'source'; + or $d->{role} ne 'source'; ## Databases that get deltas ## If in onetimecopy mode, this is always forced to be empty ## Likewise, no point in populating if this is a fullcopy sync push @dbs_delta => $dbname - if $x->{role} eq 'source' + if $d->{role} eq 'source' and ! $sync->{onetimecopy} and ! $sync->{fullcopy}; @@ -2571,24 +2578,24 @@ sub start_kid { ## In normal mode, this means a role of 'fullcopy' ## In onetimecopy mode, this means a role of 'target' push @dbs_fullcopy => $dbname - if ($sync->{onetimecopy} and $x->{role} eq 'target') - or ($sync->{fullcopy} and $x->{role} eq 'fullcopy'); + if ($sync->{onetimecopy} and $d->{role} eq 'target') + or ($sync->{fullcopy} and $d->{role} eq 'fullcopy'); ## Non-fullcopy databases. Basically dbs_source + dbs_target push @dbs_non_fullcopy => $dbname - if $x->{role} ne 'fullcopy'; + if $d->{role} ne 'fullcopy'; ## Databases with Perl DBI support push @dbs_dbi => $dbname - if $x->{dbtype} eq 'postgres' - or $x->{dbtype} eq 'drizzle' - or $x->{dbtype} eq 'mariadb' - or $x->{dbtype} eq 'mysql' - or $x->{dbtype} eq 'oracle' - or $x->{dbtype} eq 'sqlite'; + if $d->{dbtype} eq 'postgres' + or $d->{dbtype} eq 'drizzle' + or $d->{dbtype} eq 'mariadb' + or $d->{dbtype} eq 'mysql' + or $d->{dbtype} eq 'oracle' + or $d->{dbtype} eq 'sqlite'; push @dbs_connectable => $dbname - if $x->{dbtype} !~ /flat/; + if $d->{dbtype} !~ /flat/; } ## Connect to the main database @@ -2641,14 +2648,15 @@ sub start_kid { $msg .= "\nMain DB state: " . ($maindbh->state || '?'); $msg .= ' Error: ' . ($maindbh->err || 'none'); for my $dbname (@dbs_dbi) { - $x = $sync->{db}{$dbname}; - my $dbh = $x->{dbh}; + my $d = $sync->{db}{$dbname}; + + my $dbh = $d->{dbh}; my $state = $dbh->state || '?'; - $msg .= "\nDB $dbname state: $state"; + $msg .= "\nDB $dbname state: $state"; $msg .= ' Error: ' . ($dbh->err || 'none'); ## If this was a deadlock problem, try and gather more information - if ($state eq '40P01' and $x->{dbtype} eq 'postgres') { + if ($state eq '40P01' and $d->{dbtype} eq 'postgres') { $msg .= $self->get_deadlock_details($dbh, $msg); $moresub = ' (deadlock)'; last; @@ -2674,8 +2682,10 @@ sub start_kid { ## Drop all open database connections, clear out the dbrun table for my $dbname (@dbs_dbi) { - $x = $sync->{db}{$dbname}; - my $dbh = $x->{dbh} or do { + + my $d = $sync->{db}{$dbname}; + + my $dbh = $d->{dbh} or do { $self->glog("Missing $dbname database handle", LOG_WARN); next; }; @@ -2694,7 +2704,7 @@ sub start_kid { } ## Make sure we don't think we are still in the middle of an async query - $x->{async_active} = 0; + $d->{async_active} = 0; ## Make sure we never access this connection again undef $dbh; @@ -2711,9 +2721,11 @@ sub start_kid { if ($config{semaphore_table}) { my $tname = $config{semaphore_table}; for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; - if ($x->{dbtype} eq 'mongo') { - my $collection = $x->{dbh}->get_collection($tname); + + my $d = $sync->{db}{$dbname}; + + if ($d->{dbtype} eq 'mongo') { + my $collection = $d->{dbh}->get_collection($tname); my $object = { sync => $syncname, status => 'failed', @@ -2850,13 +2862,13 @@ sub start_kid { ## This main list has already been pruned by the controller as needed for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - ($x->{backend}, $x->{dbh}) = $self->connect_database($dbname); - $self->glog(qq{Database "$dbname" backend PID: $x->{backend}}, LOG_VERBOSE); + ($d->{backend}, $d->{dbh}) = $self->connect_database($dbname); + $self->glog(qq{Database "$dbname" backend PID: $d->{backend}}, LOG_VERBOSE); ## Register ourself with the MCP (if we are Postgres) - if ($x->{dbtype} eq 'postgres') { + if ($d->{dbtype} eq 'postgres') { $self->db_notify($maindbh, 'kid_pid_start', 1, $dbname); } } @@ -2932,10 +2944,10 @@ sub start_kid { ## For each source database, prepare the queries above for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Set the DBGROUP for each database: the bucardo.track_* target entry - $x->{DBGROUPNAME} = "dbgroup $dbs"; + $d->{DBGROUPNAME} = "dbgroup $dbs"; for my $g (@$goatlist) { @@ -2944,22 +2956,22 @@ sub start_kid { ($S,$T) = ($g->{safeschema},$g->{safetable}); ## Replace with the target name for source delta querying - ($SQL = $SQL{delta}{$g}) =~ s/DBGROUP/'$x->{DBGROUPNAME}'/o; + ($SQL = $SQL{delta}{$g}) =~ s/DBGROUP/'$d->{DBGROUPNAME}'/o; ## As these can be expensive, make them asynchronous - $sth{getdelta}{$dbname}{$g} = $x->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); + $sth{getdelta}{$dbname}{$g} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); ## We need to update either the track table or the stage table ## There is no way to know beforehand which we will need, so we prepare both ## Replace with the target name for source track updating - ($SQL = $SQL{track}{$g}) =~ s/DBGROUP/'$x->{DBGROUPNAME}'/go; + ($SQL = $SQL{track}{$g}) =~ s/DBGROUP/'$d->{DBGROUPNAME}'/go; ## Again, async as they may be slow - $sth{track}{$dbname}{$g} = $x->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); + $sth{track}{$dbname}{$g} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); ## Same thing for stage - ($SQL = $SQL{stage}{$g}) =~ s/DBGROUP/'$x->{DBGROUPNAME}'/go; - $sth{stage}{$dbname}{$g} = $x->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); + ($SQL = $SQL{stage}{$g}) =~ s/DBGROUP/'$d->{DBGROUPNAME}'/go; + $sth{stage}{$dbname}{$g} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); } ## end each table @@ -2970,7 +2982,7 @@ sub start_kid { ## Set all makedelta tables (can be target databases too, as another sync may have them as a source) for my $dbname (@dbs) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; for my $g (@$goatlist) { @@ -2981,7 +2993,7 @@ sub start_kid { ## Set the per database/per table makedelta setting now if (defined $g->{makedelta}) { if ($g->{makedelta} eq 'on' or $g->{makedelta} =~ /\b$dbname\b/) { - $x->{does_makedelta}{$S}{$T} = 1; + $d->{does_makedelta}{$S}{$T} = 1; $self->glog("Set table $dbname.$S.$T to makedelta", LOG_NORMAL); } } @@ -2990,7 +3002,6 @@ sub start_kid { } ## end all databases - } ## end if delta databases ## We disable and enable triggers and rules in one of two ways @@ -3008,21 +3019,21 @@ sub start_kid { my $anyone_does_pgclass = 0; for my $dbname (@dbs_write) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; - my $ver = $x->{dbh}{pg_server_version}; + my $ver = $d->{dbh}{pg_server_version}; if ($ver >= 80300) { - $x->{disable_trigrules} = 'replica'; + $d->{disable_trigrules} = 'replica'; } else { - $x->{disable_trigrules} = 'pg_class'; + $d->{disable_trigrules} = 'pg_class'; $anyone_does_pgclass = 1; } ## If 8.2 or higher, we can use COPY (SELECT *) - $x->{modern_copy} = $ver >= 80200 ? 1 : 0; + $d->{modern_copy} = $ver >= 80200 ? 1 : 0; } ## We don't bother building these statements unless we need to @@ -3084,36 +3095,36 @@ sub start_kid { ## First, things that are common to databases, irrespective of read/write: for my $dbname (@dbs) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - my $xdbh = $x->{dbh}; + my $dbh = $d->{dbh}; - if ($x->{dbtype} eq 'postgres') { + if ($d->{dbtype} eq 'postgres') { ## We never want to timeout - $xdbh->do('SET statement_timeout = 0'); + $dbh->do('SET statement_timeout = 0'); ## Using the same time zone everywhere keeps us sane - $xdbh->do(q{SET TIME ZONE 'GMT'}); + $dbh->do(q{SET TIME ZONE 'GMT'}); ## Rare, but allow for tcp fiddling if ($config{tcp_keepalives_idle}) { ## e.g. not 0, should always exist - $xdbh->do("SET tcp_keepalives_idle = $config{tcp_keepalives_idle}"); - $xdbh->do("SET tcp_keepalives_interval = $config{tcp_keepalives_interval}"); - $xdbh->do("SET tcp_keepalives_count = $config{tcp_keepalives_count}"); + $dbh->do("SET tcp_keepalives_idle = $config{tcp_keepalives_idle}"); + $dbh->do("SET tcp_keepalives_interval = $config{tcp_keepalives_interval}"); + $dbh->do("SET tcp_keepalives_count = $config{tcp_keepalives_count}"); } - $xdbh->commit(); + $dbh->commit(); } ## end postgres - elsif ($x->{dbtype} eq 'mysql' or $x->{dbtype} eq 'mariadb') { + elsif ($d->{dbtype} eq 'mysql' or $d->{dbtype} eq 'mariadb') { ## ANSI mode: mostly because we want ANSI_QUOTES - $xdbh->do(q{SET sql_mode = 'ANSI'}); + $dbh->do(q{SET sql_mode = 'ANSI'}); ## Use the same time zone everywhere - $xdbh->do(q{SET time_zone = '+0:00'}); - $xdbh->commit(); + $dbh->do(q{SET time_zone = '+0:00'}); + $dbh->commit(); } ## end mysql/mariadb @@ -3122,26 +3133,26 @@ sub start_kid { ## Now things that apply only to databases we are writing to: for my $dbname (@dbs_write) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - my $xdbh = $x->{dbh}; + my $dbh = $d->{dbh}; - if ($x->{dbtype} eq 'postgres') { + if ($d->{dbtype} eq 'postgres') { ## Note: no need to turn these back to what they were: we always want to stay in replica mode ## If doing old school pg_class hackery, we defer until much later - if ($x->{disable_trigrules} eq 'replica') { - $xdbh->do(q{SET session_replication_role = 'replica'}); - $xdbh->commit(); + if ($d->{disable_trigrules} eq 'replica') { + $dbh->do(q{SET session_replication_role = 'replica'}); + $dbh->commit(); } } ## end postgres - elsif ($x->{dbtype} eq 'mysql' or $x->{dbtype} eq 'mariadb') { + elsif ($d->{dbtype} eq 'mysql' or $d->{dbtype} eq 'mariadb') { ## No foreign key checks, please - $xdbh->do('SET foreign_key_checks = 0'); - $xdbh->commit(); + $dbh->do('SET foreign_key_checks = 0'); + $dbh->commit(); } ## end mysql/mariadb @@ -3152,25 +3163,25 @@ sub start_kid { for my $dbname (@dbs_postgres) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; my $darg; if ($sync->{need_safe_dbh_strict}) { - for my $arg (sort keys %{ $dbix{ $x->{role} }{strict} }) { - next if ! length $dbix{ $x->{role} }{strict}{$arg}; - $darg->{$arg} = $dbix{ $x->{role} }{strict}{$arg}; + for my $arg (sort keys %{ $dbix{ $d->{role} }{strict} }) { + next if ! length $dbix{ $d->{role} }{strict}{$arg}; + $darg->{$arg} = $dbix{ $d->{role} }{strict}{$arg}; } - $darg->{dbh} = $x->{dbh}; + $darg->{dbh} = $d->{dbh}; $self->{safe_dbh_strict}{$dbname} = DBIx::Safe->new($darg); } if ($sync->{need_safe_dbh}) { undef $darg; - for my $arg (sort keys %{ $dbix{ $x->{role} }{notstrict} }) { - next if ! length $dbix{ $x->{role} }{notstrict}{$arg}; - $darg->{$arg} = $dbix{ $x->{role} }{notstrict}{$arg}; + for my $arg (sort keys %{ $dbix{ $d->{role} }{notstrict} }) { + next if ! length $dbix{ $d->{role} }{notstrict}{$arg}; + $darg->{$arg} = $dbix{ $d->{role} }{notstrict}{$arg}; } - $darg->{dbh} = $x->{dbh}; + $darg->{dbh} = $d->{dbh}; $self->{safe_dbh}{$dbname} = DBIx::Safe->new($darg); } } @@ -3255,10 +3266,10 @@ sub start_kid { $maindbh->ping or die qq{Ping failed for main database\n}; for my $dbname (@dbs_dbi) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - $x->{dbh}->ping or die qq{Ping failed for database "$dbname"\n}; - $x->{dbh}->rollback(); + $d->{dbh}->ping or die qq{Ping failed for database "$dbname"\n}; + $d->{dbh}->rollback(); } $lastpingcheck = time(); } @@ -3304,13 +3315,14 @@ sub start_kid { ## Reset some things at the per-database level for my $dbname (keys %{ $sync->{db} }) { + my $d = $sync->{db}{$dbname}; + ## This must be set, as it is used by the conflict_strategy below $deltacount{$dbname} = 0; $dmlcount{allinserts}{$dbname} = 0; $dmlcount{alldeletes}{$dbname} = 0; - $x = $sync->{db}{$dbname}; - delete $x->{truncatewinner}; + delete $d->{truncatewinner}; } @@ -3333,8 +3345,10 @@ sub start_kid { ## Populate the dbrun table so others know we are using these databases $self->glog('Populating the dbrun table', LOG_DEBUG); for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; - $sth{dbrun_insert}->execute($syncname, $dbname, $x->{backend}); + + my $d = $sync->{db}{$dbname}; + + $sth{dbrun_insert}->execute($syncname, $dbname, $d->{backend}); } ## Add a note to the syncrun table @@ -3357,38 +3371,38 @@ sub start_kid { ## (last action was commit or rollback) for my $dbname (@dbs_dbi) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Just in case: - $x->{dbh}->rollback(); + $d->{dbh}->rollback(); - if ($x->{dbtype} eq 'postgres') { - $x->{dbh}->do(qq{SET TRANSACTION ISOLATION LEVEL $isolation_level READ WRITE}); + if ($d->{dbtype} eq 'postgres') { + $d->{dbh}->do(qq{SET TRANSACTION ISOLATION LEVEL $isolation_level READ WRITE}); $self->glog(qq{Set database "$dbname" to $isolation_level read write}, LOG_DEBUG); } - if ($x->{dbtype} eq 'mysql' or $x->{dbtype} eq 'mariadb') { - $x->{dbh}->do('SET TRANSACTION READ WRITE ISOLATION LEVEL SERIALIZABLE'); + if ($d->{dbtype} eq 'mysql' or $d->{dbtype} eq 'mariadb') { + $d->{dbh}->do('SET TRANSACTION READ WRITE ISOLATION LEVEL SERIALIZABLE'); $self->glog(qq{Set database "$dbname" to serializable}, LOG_DEBUG); } - if ($x->{dbtype} eq 'drizzle') { + if ($d->{dbtype} eq 'drizzle') { ## Drizzle does not appear to have anything to control this yet } - if ($x->{dbtype} eq 'oracle') { - $x->{dbh}->do('SET TRANSACTION READ WRITE'); - $x->{dbh}->do(q{SET TRANSACTION ISOLATION LEVEL SERIALIZABLE NAME 'bucardo'}); + if ($d->{dbtype} eq 'oracle') { + $d->{dbh}->do('SET TRANSACTION READ WRITE'); + $d->{dbh}->do(q{SET TRANSACTION ISOLATION LEVEL SERIALIZABLE NAME 'bucardo'}); $self->glog(qq{Set database "$dbname" to serializable and read write}, LOG_DEBUG); } - if ($x->{dbtype} eq 'sqlite') { + if ($d->{dbtype} eq 'sqlite') { ## Nothing needed here, the default seems okay } - if ($x->{dbtype} eq 'redis') { + if ($d->{dbtype} eq 'redis') { ## Implement MULTI, when the driver supports it - ##$x->{dbh}->multi(); + ##$d->{dbh}->multi(); } } @@ -3419,28 +3433,28 @@ sub start_kid { for my $dbname (@dbs_write) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Figure out which table name to use my $tname = $g->{newname}{$syncname}{$dbname}; - if ('postgres' eq $x->{dbtype}) { + if ('postgres' eq $d->{dbtype}) { my $com = "$tname IN $lock_table_mode MODE"; $self->glog("Database $dbname: Locking table $com", LOG_TERSE); - $x->{dbh}->do("LOCK TABLE $com"); + $d->{dbh}->do("LOCK TABLE $com"); } - elsif ('mysql' eq $x->{dbtype } or 'drizzle' eq $x->{dbtype} or 'mariadb' eq $x->{dbtype}) { + elsif ('mysql' eq $d->{dbtype } or 'drizzle' eq $d->{dbtype} or 'mariadb' eq $d->{dbtype}) { my $com = "$tname WRITE"; $self->glog("Database $dbname: Locking table $com", LOG_TERSE); - $x->{dbh}->do("LOCK TABLE $com"); + $d->{dbh}->do("LOCK TABLE $com"); } - elsif ('oracle' eq $x->{dbtype}) { + elsif ('oracle' eq $d->{dbtype}) { my $com = "$tname IN EXCLUSIVE MODE"; $self->glog("Database $dbname: Locking table $com", LOG_TERSE); - $x->{dbh}->do("LOCK TABLE $com"); + $d->{dbh}->do("LOCK TABLE $com"); } - elsif ('sqlite' eq $x->{dbtype}) { - $x->{dbh}->do('BEGIN EXCLUSIVE TRANSACTION'); + elsif ('sqlite' eq $d->{dbtype}) { + $d->{dbh}->do('BEGIN EXCLUSIVE TRANSACTION'); } } } @@ -3472,11 +3486,11 @@ sub start_kid { for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Grab the latest truncation time for each table, for this source database $self->glog(qq{Checking truncate_trigger table on database "$dbname"}, LOG_VERBOSE); - $sth = $x->{dbh}->prepare($SQL); + $sth = $d->{dbh}->prepare($SQL); $self->{has_truncation} += $sth->execute($syncname); for my $row (@{ $sth->fetchall_arrayref() }) { my ($s,$t,$time) = @{ $row }; @@ -3495,8 +3509,8 @@ sub start_kid { for my $s (keys %{ $self->{truncateinfo} }) { for my $t (keys %{ $self->{truncateinfo}{$s} }) { my $dbname = $self->{truncateinfo}{$s}{$t}{maxdb}; - $x = $sync->{db}{$dbname}; - $x->{truncatewinner}{$s}{$t} = 1; + my $d = $sync->{db}{$dbname}; + $d->{truncatewinner}{$s}{$t} = 1; $self->glog("Truncate winner for $s.$t is database $dbname", LOG_DEBUG); } } @@ -3530,17 +3544,17 @@ sub start_kid { my $maxvalue = -1; for my $dbname (@dbs_non_fullcopy) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; - $sth = $x->{dbh}->prepare($SQL); + $sth = $d->{dbh}->prepare($SQL); $sth->execute(); my $info = $sth->fetchall_arrayref({})->[0]; $g->{sequenceinfo}{$dbname} = $info; ## Only the source databases matter for the max value comparison - next if $x->{role} ne 'source'; + next if $d->{role} ne 'source'; if ($info->{last_value} > $maxvalue) { $maxvalue = $info->{last_value}; @@ -3553,11 +3567,11 @@ sub start_kid { ## Now that we have a winner, apply the changes to every other (non-fullcopy) PG database for my $dbname (@dbs_non_fullcopy) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; - $x->{adjustsequence} = 1; + $d->{adjustsequence} = 1; } $deltacount{sequences} += $self->adjust_sequence($g, $sync, $S, $T, $syncname); @@ -3574,26 +3588,27 @@ sub start_kid { if ($config{quick_delta_check}) { for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; $SQL = 'SELECT * FROM bucardo.bucardo_delta_check(?,?)'; - $sth = $x->{dbh}->prepare($SQL); - $sth->execute($syncname, $x->{DBGROUPNAME}); - $x->{deltazero} = $x->{deltatotal} = 0; + $sth = $d->{dbh}->prepare($SQL); + $sth->execute($syncname, $d->{DBGROUPNAME}); + $d->{deltazero} = $d->{deltatotal} = 0; for my $row (@{$sth->fetchall_arrayref()}) { my ($number,$tablename) = split /,/ => $row->[0], 2; - $x->{deltaquick}{$tablename} = $number; + $d->{deltaquick}{$tablename} = $number; if ($number) { - $x->{deltatotal}++; + $d->{deltatotal}++; $deltacount{table}{$tablename}++; } else { - $x->{deltazero}++; + $d->{deltazero}++; } } - $self->glog("Tables with deltas on $dbname: $x->{deltatotal} Without: $x->{deltazero}", LOG_VERBOSE); + $self->glog("Tables with deltas on $dbname: $d->{deltatotal} Without: $d->{deltazero}", LOG_VERBOSE); } ## end quick delta check for each database + } ## end quick delta check ## Grab the delta information for each table from each source database @@ -3616,15 +3631,15 @@ sub start_kid { ## We still need these, as we want to respect changes made after the truncation! next if exists $g->{truncatewinner} and $g->{truncatewinner} ne $dbname; - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## No need to grab information if we know there are no deltas for this table if ($config{quick_delta_check}) { - next if ! $x->{deltaquick}{"$S.$T"}; + next if ! $d->{deltaquick}{"$S.$T"}; } ## Gets all relevant rows from bucardo_deltas: runs asynchronously - $x->{async_active} = time; + $d->{async_active} = time; $sth{getdelta}{$dbname}{$g}->execute(); } @@ -3635,19 +3650,19 @@ sub start_kid { ## Skip if truncating and this one is not the winner next if exists $g->{truncatewinner} and $g->{truncatewinner} ne $dbname; - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## If we skipped this, set the deltacount to zero and move on if ($config{quick_delta_check}) { - if (! $x->{deltaquick}{"$S.$T"}) { + if (! $d->{deltaquick}{"$S.$T"}) { $deltacount{dbtable}{$dbname}{$S}{$T} = 0; next; } } ## pg_result tells us to wait for the query to finish - $count = $x->{dbh}->pg_result(); - $x->{async_active} = 0; + $count = $d->{dbh}->pg_result(); + $d->{async_active} = 0; ## Call finish() and change the ugly 0E0 to a true zero $sth{getdelta}{$dbname}{$g}->finish() if $count =~ s/0E0/0/o; @@ -3684,8 +3699,6 @@ sub start_kid { ## Skip if truncating and this one is not the winner next if exists $g->{truncatewinner} and $g->{truncatewinner} ne $dbname; - $x = $sync->{db}{$dbname}; - $self->glog((sprintf q{Delta count for %-*s : %*d}, $self->{maxdbstname}, "$dbname.$S.$T", @@ -3796,18 +3809,18 @@ sub start_kid { ## Just to be safe, rollback everything for my $dbname (@dbs_dbi) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - $x->{dbh}->rollback(); + $d->{dbh}->rollback(); } ## Clear out the entries from the dbrun table for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## We never do native fullcopy targets here - next if $x->{role} eq 'fullcopy'; + next if $d->{role} eq 'fullcopy'; $sth = $sth{dbrun_delete}; $sth->execute($syncname, $dbname); @@ -3873,10 +3886,10 @@ sub start_kid { ## The list of primary key columns if (! $g->{pkeycols}) { ## only do this once $g->{pkeycols} = ''; - $x=0; + my $i=0; for my $qpk (@{$g->{qpkey}}) { - $g->{pkeycols} .= sprintf '%s,', $g->{binarypkey}{$x} ? qq{ENCODE($qpk,'base64')} : $qpk; - $x++; + $g->{pkeycols} .= sprintf '%s,', $g->{binarypkey}{$i} ? qq{ENCODE($qpk,'base64')} : $qpk; + $i++; } chop $g->{pkeycols}; $g->{numpkcols} > 1 and $g->{pkeycols} = "($g->{pkeycols})"; @@ -3899,8 +3912,6 @@ sub start_kid { for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; - ## Skip if we are truncating and this is not the winner next if exists $g->{truncatewinner} and $g->{truncatewinner} ne $dbname; @@ -4012,14 +4023,16 @@ sub start_kid { ## Provide detailed information on all databases, but elide the dbh for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; + + my $d = $sync->{db}{$dbname}; + ## Make a shallow copy, excluding the actual dbh handle - for my $name (keys %$x) { + for my $name (keys %$d) { ## We provide DBIx::Safe versions elsewhere next if $name eq 'dbh'; - $code->{info}{dbinfo}{$dbname}{$name} = $x->{$name}; + $code->{info}{dbinfo}{$dbname}{$name} = $d->{$name}; } } @@ -4122,16 +4135,20 @@ sub start_kid { ## Check every database that generates deltas for my $dbname (@dbs_delta) { - $x = $sync->{db}{$dbname}; - $x->{sth} = $x->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); - $x->{async_active} = time; - $x->{sth}->execute(); + + my $d = $sync->{db}{$dbname}; + + $d->{sth} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); + $d->{async_active} = time; + $d->{sth}->execute(); } for my $dbname (@dbs_delta) { - $x = $sync->{db}{$dbname}; - $x->{dbh}->pg_result(); - $x->{async_active} = 0; - $x->{lastmod} = $x->{sth}->fetchall_arrayref()->[0][0] || 0; + + my $d = $sync->{db}{$dbname}; + + $d->{dbh}->pg_result(); + $d->{async_active} = 0; + $d->{lastmod} = $d->{sth}->fetchall_arrayref()->[0][0] || 0; } ## Now we can put them in rank order @@ -4247,20 +4264,20 @@ sub start_kid { ## If there is more than one source, then everyone gets written to! for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Again: everyone is written to unless there is a single source ## A truncation source may have an empty deltabin, but it will exist - $x->{writtento} = (1==$numsources and exists $deltabin{$dbname}) ? 0 : 1; - next if ! $x->{writtento}; + $d->{writtento} = (1==$numsources and exists $deltabin{$dbname}) ? 0 : 1; + next if ! $d->{writtento}; - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; ## Should we use the stage table for this database? - $x->{trackstage} = ($numsources > 1 and exists $deltabin{$dbname}) ? 1 : 0; + $d->{trackstage} = ($numsources > 1 and exists $deltabin{$dbname}) ? 1 : 0; ## Disable triggers and rules the 'old way' - if ($x->{disable_trigrules} eq 'pg_class' and ! $disabled_via_pg_class) { + if ($d->{disable_trigrules} eq 'pg_class' and ! $disabled_via_pg_class) { ## Run all 'before_trigger_disable' code if (exists $sync->{code_before_trigger_disable}) { @@ -4272,7 +4289,7 @@ sub start_kid { } $self->glog(qq{Disabling triggers and rules on db "$dbname" via pg_class}, LOG_VERBOSE); - $x->{dbh}->do($SQL{disable_trigrules}); + $d->{dbh}->do($SQL{disable_trigrules}); ## Run all 'after_trigger_disable' code if (exists $sync->{code_after_trigger_disable}) { @@ -4297,11 +4314,11 @@ sub start_kid { ## No index means no manipulation ## We don't cache the value, but simply set index_disabled below $SQL = "SELECT relhasindex FROM pg_class WHERE oid = '$S.$T'::regclass"; - if ($x->{dbh}->selectall_arrayref($SQL)->[0][0]) { + if ($d->{dbh}->selectall_arrayref($SQL)->[0][0]) { $self->glog("Turning off indexes for $dbname.$S.$T", LOG_NORMAL); $SQL = "UPDATE pg_class SET relhasindex = 'f' WHERE oid = '$S.$T'::regclass"; - $x->{dbh}->do($SQL); - $x->{index_disabled} = 1; + $d->{dbh}->do($SQL); + $d->{index_disabled} = 1; } } @@ -4310,31 +4327,31 @@ sub start_kid { ## Create filehandles for any flatfile databases for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if $x->{dbtype} !~ /flat/o; + next if $d->{dbtype} !~ /flat/o; ## Figure out and set the filename my $date = strftime('%Y%m%d_%H%M%S', localtime()); - $x->{filename} = "$config{flatfile_dir}/bucardo.flatfile.$self->{syncname}.$date.sql"; + $d->{filename} = "$config{flatfile_dir}/bucardo.flatfile.$self->{syncname}.$date.sql"; ## Does this already exist? It's possible we got so quick the old one exists ## Since we want the names to be unique, come up with a new name - if (-e $x->{filename}) { + if (-e $d->{filename}) { my $tmpfile; my $extension = 1; { - $tmpfile = "$x->{filename}.$extension"; + $tmpfile = "$d->{filename}.$extension"; last if -e $tmpfile; $extension++; redo; } - $x->{filename} = $tmpfile; + $d->{filename} = $tmpfile; } - $x->{filename} .= '.tmp'; + $d->{filename} .= '.tmp'; - open $x->{filehandle}, '>>', $x->{filename} - or die qq{Could not open flatfile "$x->{filename}": $!\n}; + open $d->{filehandle}, '>>', $d->{filename} + or die qq{Could not open flatfile "$d->{filename}": $!\n}; } ## Populate the semaphore table if the setting is non-empty @@ -4342,10 +4359,10 @@ sub start_kid { my $tname = $config{semaphore_table}; for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - if ($x->{dbtype} eq 'mongo') { - my $collection = $x->{dbh}->get_collection($tname); + if ($d->{dbtype} eq 'mongo') { + my $collection = $d->{dbh}->get_collection($tname); my $object = { sync => $syncname, status => 'started', @@ -4370,13 +4387,14 @@ sub start_kid { ## Thus, if we have exception handling code, we create savepoints to rollback to if ($g->{has_exception_code}) { for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; + + my $d = $sync->{db}{$dbname}; ## No need to rollback if we didn't make any changes - next if ! $x->{writtento}; + next if ! $d->{writtento}; $self->glog(qq{Creating savepoint on database "$dbname" for exception handler(s)}, LOG_DEBUG); - $x->{dbh}->do("SAVEPOINT bucardo_$$") + $d->{dbh}->do("SAVEPOINT bucardo_$$") or die qq{Savepoint creation failed for bucardo_$$}; } } @@ -4411,10 +4429,10 @@ sub start_kid { ## Grab the real target name my $tname = $g->{newname}{$syncname}{$dbnamet}; - $x = $sync->{db}{$dbnamet}; + my $d = $sync->{db}{$dbnamet}; my $do_cascade = 0; - $self->truncate_table($x, $tname, $do_cascade); + $self->truncate_table($d, $tname, $do_cascade); } ## We keep going, in case the source has post-truncation items } @@ -4456,31 +4474,33 @@ sub start_kid { ## - run a REINDEX ## - release the savepoint for my $dbname (sort keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; - next if ! $x->{writtento}; + my $d = $sync->{db}{$dbname}; - if ($x->{index_disabled}) { + next if ! $d->{writtento}; + + if ($d->{index_disabled}) { $self->glog("Re-enabling indexes for $dbname.$S.$T", LOG_NORMAL); $SQL = "UPDATE pg_class SET relhasindex = 't' WHERE oid = '$S.$T'::regclass"; - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); $self->glog("Reindexing table $dbname.$S.$T", LOG_NORMAL); ## We do this asynchronously so we don't wait on each db - $x->{async_active} = time; - $x->{dbh}->do( "REINDEX TABLE $S.$T", {pg_async => PG_ASYNC} ); + $d->{async_active} = time; + $d->{dbh}->do( "REINDEX TABLE $S.$T", {pg_async => PG_ASYNC} ); } } ## Wait for all REINDEXes to finish for my $dbname (sort keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; - next if ! $x->{writtento}; + my $d = $sync->{db}{$dbname}; + + next if ! $d->{writtento}; - if ($x->{index_disabled}) { - $x->{dbh}->pg_result(); - $x->{async_active} = 0; - $x->{index_disabled} = 0; + if ($d->{index_disabled}) { + $d->{dbh}->pg_result(); + $d->{async_active} = 0; + $d->{index_disabled} = 0; } } @@ -4535,18 +4555,19 @@ sub start_kid { ## Time to let the exception handling custom code do its work ## First, we rollback to our savepoint on all databases that are using them for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; - next if ! $x->{writtento}; + my $d = $sync->{db}{$dbname}; + + next if ! $d->{writtento}; ## Just in case, clear out any existing async queries - if ($x->{async_active}) { - $x->{dbh}->pg_cancel(); - $x->{async_active} = 0; + if ($d->{async_active}) { + $d->{dbh}->pg_cancel(); + $d->{async_active} = 0; } $self->glog("Rolling back to savepoint on database $dbname", LOG_DEBUG); - $x->{dbh}->do("ROLLBACK TO SAVEPOINT bucardo_$$"); + $d->{dbh}->do("ROLLBACK TO SAVEPOINT bucardo_$$"); } ## Prepare information to pass to the handler about this run @@ -4596,7 +4617,6 @@ sub start_kid { ## Make sure the Postgres database connections are still clean for my $dbname (@dbs_postgres) { - $x = $sync->{db}{$dbname}; my $ping = $sync->{db}{$dbname}{dbh}->ping(); if ($ping !~ /^[123]$/o) { @@ -4654,36 +4674,36 @@ sub start_kid { ## For each database that had delta changes, insert rows to bucardo_track for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - $x->{needs_track} = 0; + $d->{needs_track} = 0; if ($deltacount{dbtable}{$dbname}{$S}{$T}) { - $x->{needs_track} = 1; + $d->{needs_track} = 1; ## Kick off the track or stage update asynchronously - if ($x->{trackstage}) { + if ($d->{trackstage}) { ## The stage table can only have rows if a previous version failed ## This can happen if this kid committed, but another failed ## Thus, we always want to make sure the stage table is empty: $SQL = "TRUNCATE TABLE bucardo.$g->{stagetable}"; - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); $sth{stage}{$dbname}{$g}->execute(); } else { $sth{track}{$dbname}{$g}->execute(); } - $x->{async_active} = time; + $d->{async_active} = time; } } ## Loop through again and let everyone finish for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - if ($x->{needs_track}) { - ($count = $x->{dbh}->pg_result()) =~ s/0E0/0/o; - $x->{async_active} = 0; + if ($d->{needs_track}) { + ($count = $d->{dbh}->pg_result()) =~ s/0E0/0/o; + $d->{async_active} = 0; $self->{insertcount}{dbname}{$S}{$T} = $count; $maxcount = $count if $count > $maxcount; } @@ -4699,7 +4719,6 @@ sub start_kid { next if $g->{reltype} ne 'table'; ($S,$T) = ($g->{safeschema},$g->{safetable}); for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; next if ! $deltacount{dbtable}{$dbname}{$S}{$T}; $maxsize = length " $dbname.$S.$T" if length " $dbname.$S.$T" > $maxsize; $maxcount2 = length $count if length $count > $maxcount2; @@ -4712,11 +4731,13 @@ sub start_kid { ($S,$T) = ($g->{safeschema},$g->{safetable}); for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; + + my $d = $sync->{db}{$dbname}; + if ($deltacount{dbtable}{$dbname}{$S}{$T}) { $count = $self->{insertcount}{dbname}{$S}{$T}; $self->glog((sprintf 'Rows inserted to bucardo_%s for %-*s: %*d', - $x->{trackstage} ? 'stage' : 'track', + $d->{trackstage} ? 'stage' : 'track', $maxsize, "$dbname.$S.$T", length $maxcount2, @@ -4736,11 +4757,11 @@ sub start_kid { my ($sourcename, $sourcedbh, $sourcex); for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; $sourcename = $dbname; - $sourcedbh = $x->{dbh}; - $sourcex = $x; + $sourcedbh = $d->{dbh}; + $sourcex = $d; $self->glog(qq{For fullcopy, we are using source database "$sourcename"}, LOG_VERBOSE); last; @@ -4794,12 +4815,12 @@ sub start_kid { my $have_targets = 0; for my $dbname (@dbs_fullcopy) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; my $tname = $g->{newname}{$syncname}{$dbname}; ## If this target table has rows, skip it - if ($self->table_has_rows($x, $tname)) { + if ($self->table_has_rows($d, $tname)) { $sync->{otc}{skip}{$dbname} = 1; $self->glog(qq{Target table "$dbname.$tname" has rows and we are in onetimecopy if empty mode, so we will not COPY}, LOG_NORMAL); } @@ -4820,8 +4841,6 @@ sub start_kid { for my $dbname (@dbs_fullcopy) { - $x = $sync->{db}{$dbname}; - ## Skip if onetimecopy was two and this target had rows next if exists $sync->{otc}{skip}{$dbname}; @@ -4834,36 +4853,36 @@ sub start_kid { for my $dbname (@dbs_copytarget) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Grab the actual target table name my $tname = $g->{newname}{$syncname}{$dbname}; - if ($x->{dbtype} eq 'postgres') { + if ($d->{dbtype} eq 'postgres') { ## TODO: Cache this information earlier $SQL = "SELECT relhasindex FROM pg_class WHERE oid = '$tname'::regclass"; - if ($x->{dbh}->selectall_arrayref($SQL)->[0][0]) { + if ($d->{dbh}->selectall_arrayref($SQL)->[0][0]) { $self->glog("Turning off indexes for $tname on $dbname", LOG_NORMAL); ## Do this without pg_class manipulation when Postgres supports that $SQL = "UPDATE pg_class SET relhasindex = 'f' WHERE oid = '$S.$T'::regclass"; - $x->{dbh}->do($SQL); - $x->{index_disabled} = 1; + $d->{dbh}->do($SQL); + $d->{index_disabled} = 1; } } - if ($x->{dbtype} eq 'mysql' or $x->{dbtype} eq 'mariadb') { + if ($d->{dbtype} eq 'mysql' or $d->{dbtype} eq 'mariadb') { $SQL = "ALTER TABLE $tname DISABLE KEYS"; $self->glog("Disabling keys for $tname on $dbname", LOG_NORMAL); - $x->{dbh}->do($SQL); - $x->{index_disabled} = 1; + $d->{dbh}->do($SQL); + $d->{index_disabled} = 1; } - if ($x->{dbtype} eq 'sqlite') { + if ($d->{dbtype} eq 'sqlite') { ## May be too late to do this here $SQL = q{PRAGMA foreign_keys = OFF}; $self->glog("Disabling foreign keys on $dbname", LOG_NORMAL); - $x->{dbh}->do($SQL); - $x->{index_disabled} = 1; + $d->{dbh}->do($SQL); + $d->{index_disabled} = 1; } } } @@ -4874,17 +4893,17 @@ sub start_kid { ## Truncate the table on all target databases, and fallback to delete if that fails for my $dbname (@dbs_copytarget) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; ## Nothing to do here for flatfiles - next if $x->{dbtype} =~ /flat/; + next if $d->{dbtype} =~ /flat/; ## Grab the real target name my $tname = $g->{newname}{$syncname}{$dbname}; - if ('postgres' eq $x->{dbtype}) { + if ('postgres' eq $d->{dbtype}) { ## Disable triggers and rules the 'old way' - if ($x->{disable_trigrules} eq 'pg_class' and ! $disabled_via_pg_class) { + if ($d->{disable_trigrules} eq 'pg_class' and ! $disabled_via_pg_class) { ## Run all 'before_trigger_disable' code if (exists $sync->{code_before_trigger_disable}) { @@ -4896,7 +4915,7 @@ sub start_kid { } $self->glog(qq{Disabling triggers and rules on db "$dbname" via pg_class}, LOG_VERBOSE); - $x->{dbh}->do($SQL{disable_trigrules}); + $d->{dbh}->do($SQL{disable_trigrules}); ## Run all 'after_trigger_disable' code if (exists $sync->{code_after_trigger_disable}) { @@ -4920,7 +4939,7 @@ sub start_kid { if ($sync->{deletemethod} =~ /truncate/io) { my $do_cascade = $sync->{deletemethod} =~ /cascade/io ? 1 : 0; - if ($self->truncate_table($x, $tname, $do_cascade)) { + if ($self->truncate_table($d, $tname, $do_cascade)) { $self->glog("Truncated table $tname", LOG_VERBOSE); $use_delete = 0; } @@ -4936,7 +4955,7 @@ sub start_kid { $maindbh->commit(); ## Note: even though $tname is the actual name, we still track stats with $S.$T - $dmlcount{D}{target}{$S}{$T} = $self->delete_table($x, $tname); + $dmlcount{D}{target}{$S}{$T} = $self->delete_table($d, $tname); $dmlcount{alldeletes}{target} += $dmlcount{D}{target}{$S}{$T}; $self->glog("Rows deleted from $tname: $dmlcount{D}{target}{$S}{$T}", LOG_VERBOSE); } @@ -4956,9 +4975,9 @@ sub start_kid { ## Restore the indexes and run REINDEX where needed for my $dbname (@dbs_copytarget) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if ! $x->{index_disabled}; + next if ! $d->{index_disabled}; my $tname = $g->{newname}{$syncname}{$dbname}; @@ -4966,31 +4985,31 @@ sub start_kid { $sth{kid_syncrun_update_status}->execute("REINDEX $tname (KID $$)", $syncname); $maindbh->commit(); - if ($x->{dbtype} eq 'postgres') { + if ($d->{dbtype} eq 'postgres') { $SQL = "UPDATE pg_class SET relhasindex = 't' WHERE oid = '$tname'::regclass"; - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); ## Do the reindex, and time how long it takes my $t0 = [gettimeofday]; $self->glog("Reindexing table $dbname.$tname", LOG_NORMAL); - $x->{dbh}->do("REINDEX TABLE $tname"); + $d->{dbh}->do("REINDEX TABLE $tname"); $self->glog((sprintf(q{(OTC: %s) REINDEX TABLE %s}, $self->pretty_time(tv_interval($t0), 'day'), $tname)), LOG_NORMAL); } - if ($x->{dbtype} eq 'mysql' or $x->{dbtype} eq 'mariadb') { + if ($d->{dbtype} eq 'mysql' or $d->{dbtype} eq 'mariadb') { $SQL = "ALTER TABLE $tname ENABLE KEYS"; $self->glog("Enabling keys for $tname on $dbname", LOG_NORMAL); - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); } - if ($x->{dbtype} eq 'sqlite') { + if ($d->{dbtype} eq 'sqlite') { $SQL = q{PRAGMA foreign_keys = ON}; $self->glog("Enabling keys on $dbname", LOG_NORMAL); - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); } - $x->{index_disabled} = 0; + $d->{index_disabled} = 0; } ## end each target to be reindexed @@ -5007,19 +5026,20 @@ sub start_kid { ## Close filehandles for any flatfile databases for my $dbname (keys %{ $sync->{db} }) { - $x = $sync->{db}{$dbname}; - next if $x->{dbtype} !~ /flat/o; + my $d = $sync->{db}{$dbname}; - close $x->{filehandle} - or warn qq{Could not close flatfile "$x->{filename}": $!\n}; + next if $d->{dbtype} !~ /flat/o; + + close $d->{filehandle} + or warn qq{Could not close flatfile "$d->{filename}": $!\n}; ## Atomically rename it so other processes can pick it up - (my $newname = $x->{filename}) =~ s/\.tmp$//; - rename $x->{filename}, $newname; + (my $newname = $d->{filename}) =~ s/\.tmp$//; + rename $d->{filename}, $newname; ## Remove the old ones, just in case - delete $x->{filename}; - delete $x->{filehandle}; + delete $d->{filename}; + delete $d->{filehandle}; } ## If using semaphore tables, mark the status as 'complete' @@ -5029,10 +5049,10 @@ sub start_kid { for my $dbname (@dbs_connectable) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - if ($x->{dbtype} eq 'mongo') { - my $collection = $x->{dbh}->get_collection($tname); + if ($d->{dbtype} eq 'mongo') { + my $collection = $d->{dbh}->get_collection($tname); my $object = { sync => $syncname, status => 'complete', @@ -5053,16 +5073,20 @@ sub start_kid { ## For each source database that had a truncate entry, mark them all as done $SQL = 'UPDATE bucardo.bucardo_truncate_trigger SET replicated = now() WHERE sync = ? AND replicated IS NULL'; for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; - $x->{sth} = $x->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); - $x->{sth}->execute($syncname); - $x->{async_active} = time; + + my $d = $sync->{db}{$dbname}; + + $d->{sth} = $d->{dbh}->prepare($SQL, {pg_async => PG_ASYNC}); + $d->{sth}->execute($syncname); + $d->{async_active} = time; } for my $dbname (@dbs_source) { - $x = $sync->{db}{$dbname}; - $x->{dbh}->pg_result(); - $x->{async_active} = 0; + + my $d = $sync->{db}{$dbname}; + + $d->{dbh}->pg_result(); + $d->{async_active} = 0; } } @@ -5079,22 +5103,22 @@ sub start_kid { ## Bring the db back to normal for my $dbname (@dbs_write) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if ! $x->{writtento}; + next if ! $d->{writtento}; ## Turn triggers and rules back on if using old-school pg_class hackery - if ($x->{dbtype} eq 'postgres') { + if ($d->{dbtype} eq 'postgres') { - next if $x->{disable_trigrules} ne 'pg_class'; + next if $d->{disable_trigrules} ne 'pg_class'; $self->glog(qq{Enabling triggers and rules on $dbname via pg_class}, LOG_VERBOSE); - $x->{dbh}->do($SQL{enable_trigrules}); + $d->{dbh}->do($SQL{enable_trigrules}); } - elsif ($x->{dbtype} eq 'mysql' or $x->{dbtype} eq 'mariadb') { + elsif ($d->{dbtype} eq 'mysql' or $d->{dbtype} eq 'mariadb') { $self->glog(qq{Turning foreign key checks back on for $dbname}, LOG_VERBOSE); - $x->{dbh}->do('SET foreign_key_checks = 1'); + $d->{dbh}->do('SET foreign_key_checks = 1'); } } @@ -5124,13 +5148,21 @@ sub start_kid { ## If we only have a single source, this ensures we don't mark rows as done ## in the track tables before everyone has reported back for my $dbname (@dbs_dbi) { - next if ! $x->{writtento}; - $sync->{db}{$dbname}{dbh}->commit(); + + my $d = $sync->{db}{$dbname}; + + next if ! $d->{writtento}; + + $d->{dbh}->commit(); } ## Now we can commit anyone else for my $dbname (@dbs_dbi) { - next if $x->{writtento}; - $sync->{db}{$dbname}{dbh}->commit(); + + my $d = $sync->{db}{$dbname}; + + next if $d->{writtento}; + + $d->{dbh}->commit(); } for my $dbname (@dbs_redis) { ## Implement EXEC when the client supports it @@ -5143,11 +5175,11 @@ sub start_kid { ## This is the safest way to ensure we never miss any changes for my $dbname (@dbs_dbi) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if ! $x->{trackstage}; + next if ! $d->{trackstage}; - my $dbh = $sync->{db}{$dbname}{dbh}; + my $dbh = $d->{dbh}; for my $g (@$goatlist) { @@ -5233,12 +5265,12 @@ sub start_kid { $maindbh->commit(); for my $dbname (@dbs_fullcopy) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; for my $g (@$goatlist) { next if ! $g->{vacuum_after_copy} or $g->{reltype} ne 'table'; my $tablename = $g->{newname}{$syncname}{$dbname}; - $self->vacuum_table($kid_start_time, $x->{dbtype}, $x->{dbh}, $x->{name}, $tablename); + $self->vacuum_table($kid_start_time, $d->{dbtype}, $d->{dbh}, $d->{name}, $tablename); } } } @@ -5247,7 +5279,7 @@ sub start_kid { $maindbh->commit(); for my $dbname (@dbs_fullcopy) { - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; for my $g (@$goatlist) { next if ! $g->{analyze_after_copy} or $g->{reltype} ne 'table'; @@ -5256,7 +5288,7 @@ sub start_kid { next; } my $tablename = $g->{newname}{$syncname}{$dbname}; - $self->analyze_table($kid_start_time, $x->{dbtype}, $x->{dbh}, $x->{name}, $tablename); + $self->analyze_table($kid_start_time, $d->{dbtype}, $d->{dbh}, $d->{name}, $tablename); } } } @@ -5402,12 +5434,14 @@ sub start_kid { ## Roll everyone back for my $dbname (@dbs_dbi) { - my $x = $sync->{db}{$dbname}; - my $dbh = $x->{dbh}; + + my $d = $sync->{db}{$dbname}; + + my $dbh = $d->{dbh}; ## If we are async, clear it out - if ($x->{async_active}) { + if ($d->{async_active}) { $dbh->pg_cancel(); - $x->{async_active} = 0; + $d->{async_active} = 0; } ## Seperate eval{} for the rollback as we are probably still connected to the transaction. eval { $dbh->rollback; }; @@ -6421,9 +6455,11 @@ sub validate_sync { ## If we've not already populated sdb, do so now if (! exists $self->{sdb}{$dbname}) { - $x = $self->{sdb}{$dbname} = $d; - my $role = $x->{role}; - if ($x->{dbtype} =~ /flat/o) { + + $self->{sdb}{$dbname} = $d; + + my $role = $d->{role}; + if ($d->{dbtype} =~ /flat/o) { $self->glog(qq{Skipping flatfile database "$dbname"}, LOG_NORMAL); next; } @@ -6431,9 +6467,9 @@ sub validate_sync { eval { ## We do not want the MCP handler here local $SIG{__DIE__} = undef; - ($x->{backend}, $x->{dbh}) = $self->connect_database($dbname); + ($d->{backend}, $d->{dbh}) = $self->connect_database($dbname); }; - if (!defined $x->{backend}) { + if (!defined $d->{backend}) { ## If this was already stalled, we can simply reject the validation if ($d->{status} eq 'stalled') { $self->glog("Stalled db $dbname failed again: $@", LOG_VERBOSE); @@ -6445,8 +6481,8 @@ sub validate_sync { return 0; } - $self->glog(qq{Database "$dbname" backend PID: $x->{backend}}, LOG_VERBOSE); - $self->show_db_version_and_time($x->{dbh}, qq{DB "$dbname" }); + $self->glog(qq{Database "$dbname" backend PID: $d->{backend}}, LOG_VERBOSE); + $self->show_db_version_and_time($d->{dbh}, qq{DB "$dbname" }); ## If this db was previously stalled, restore it if ($d->{status} eq 'stalled') { @@ -6499,10 +6535,11 @@ sub validate_sync { ## If we have more than one source, then everyone is a target ## Otherwise, only non-source databases are for my $dbname (keys %{ $s->{db} }) { - $x = $s->{db}{$dbname}; - $x->{istarget} = - ($x->{role} ne 'source' or $role{source} > 1) ? 1 : 0; - $x->{issource} = $x->{role} eq 'source' ? 1 : 0; + + my $d = $s->{db}{$dbname}; + + $d->{istarget} = ($d->{role} ne 'source' or $role{source} > 1) ? 1 : 0; + $d->{issource} = $d->{role} eq 'source' ? 1 : 0; } ## Grab the authoritative list of goats in this herd @@ -6701,19 +6738,19 @@ sub validate_sync { $g->{pkeytype} = [split /\|/o => $g->{pkeytype}]; $g->{numpkcols} = @{$g->{pkey}}; $g->{hasbinarypk} = 0; ## Not used anywhere? - $x=0; + my $i = 0; for (@{$g->{pkey}}) { - $g->{binarypkey}{$x++} = 0; + $g->{binarypkey}{$i++} = 0; } ## All pks together for the main delta query ## We change bytea to base64 so we don't have to declare binary args anywhere $g->{pklist} = ''; - for ($x = 0; defined $g->{pkey}[$x]; $x++) { + for ($i = 0; defined $g->{pkey}[$i]; $i++) { $g->{pklist} .= sprintf '%s,', - $g->{pkeytype}[$x] eq 'bytea' - ? qq{ENCODE("$g->{pkey}[$x]", 'base64')} - : qq{"$g->{pkey}[$x]"}; + $g->{pkeytype}[$i] eq 'bytea' + ? qq{ENCODE("$g->{pkey}[$i]", 'base64')} + : qq{"$g->{pkey}[$i]"}; } ## Remove the final comma: chop $g->{pklist}; @@ -6742,14 +6779,14 @@ sub validate_sync { $sth->execute(qq{"$g->{schemaname}"."$g->{tablename}"}); $colinfo = $sth->fetchall_hashref('attname'); ## Allow for 'dead' columns in the attnum ordering - $x=1; + $i = 1; for (sort { $colinfo->{$a}{attnum} <=> $colinfo->{$b}{attnum} } keys %$colinfo) { - $colinfo->{$_}{realattnum} = $x++; + $colinfo->{$_}{realattnum} = $i++; } $g->{columnhash} = $colinfo; ## Build lists of columns - $x = 1; + $i = 1; $g->{cols} = []; $g->{safecols} = []; COL: for my $colname (sort { $colinfo->{$a}{attnum} <=> $colinfo->{$b}{attnum} } keys %$colinfo) { @@ -6759,7 +6796,7 @@ sub validate_sync { } push @{$g->{cols}}, $colname; push @{$g->{safecols}}, $colinfo->{$colname}{qattname}; - $colinfo->{$colname}{order} = $x++; + $colinfo->{$colname}{order} = $i++; } ## Stringified versions of the above lists, for ease later on @@ -6770,14 +6807,14 @@ sub validate_sync { BCOL: for my $colname (keys %$colinfo) { my $c = $colinfo->{$colname}; next if $c->{atttypid} != 17; ## Yes, it's hardcoded, no sweat - $x = 0; + $i = 0; for my $pk (@{$g->{pkey}}) { if ($colname eq $pk) { - $g->{binarypkey}{$x} = 1; + $g->{binarypkey}{$i} = 1; $g->{hasbinarypk} = 1; next BCOL; } - $x++; + $i++; } ## This is used to bind_param these as binary during inserts and updates push @{$g->{binarycols}}, $colinfo->{$colname}{order}; @@ -6800,21 +6837,21 @@ sub validate_sync { ## Only ones for this sync, please next if ! exists $s->{db}{$dbname}; - $x = $self->{sdb}{$dbname}; + my $d = $self->{sdb}{$dbname}; - next if $x->{role} eq 'source'; + next if $d->{role} eq 'source'; ## Flat files are obviously skipped as we create them de novo - next if $x->{dbtype} =~ /flat/o; + next if $d->{dbtype} =~ /flat/o; ## Mongo is skipped because it can create schemas on the fly - next if $x->{dbtype} =~ /mongo/o; + next if $d->{dbtype} =~ /mongo/o; ## Redis is skipped because we can create keys on the fly - next if $x->{dbtype} =~ /redis/o; + next if $d->{dbtype} =~ /redis/o; ## MySQL/MariaDB/Drizzle/Oracle/SQLite is skipped for now, but should be added later - next if $x->{dbtype} =~ /mysql|mariadb|drizzle|oracle|sqlite/o; + next if $d->{dbtype} =~ /mysql|mariadb|drizzle|oracle|sqlite/o; if ($self->{quickstart}) { $self->glog(" quickstart: Skipping table check for $dbname.$S.$T", LOG_VERBOSE); @@ -6839,7 +6876,7 @@ sub validate_sync { } ## Get a handle for the remote database - my $dbh = $x->{dbh}; + my $dbh = $d->{dbh}; ## If a sequence, verify the information and move on if ($g->{reltype} eq 'sequenceSKIP') { @@ -6890,9 +6927,9 @@ sub validate_sync { $sth->execute("$RS.$RT"); my $targetcolinfo = $sth->fetchall_hashref('attname'); ## Allow for 'dead' columns in the attnum ordering - $x=1; + $i = 1; for (sort { $targetcolinfo->{$a}{attnum} <=> $targetcolinfo->{$b}{attnum} } keys %$targetcolinfo) { - $targetcolinfo->{$_}{realattnum} = $x++; + $targetcolinfo->{$_}{realattnum} = $i++; } $dbh->do('RESET search_path'); @@ -7082,10 +7119,12 @@ sub validate_sync { if ($s->{autokick}) { my $l = "kick_sync_$syncname"; for my $dbname (sort keys %{ $s->{db} }) { - $x = $s->{db}{$dbname}; - next if $x->{status} ne 'active'; - $self->glog("Listen for $l on $dbname ($x->{role})", LOG_DEBUG); - next if $x->{role} ne 'source'; + + my $d = $s->{db}{$dbname}; + + next if $d->{status} ne 'active'; + $self->glog("Listen for $l on $dbname ($d->{role})", LOG_DEBUG); + next if $d->{role} ne 'source'; my $dbh = $self->{sdb}{$dbname}{dbh}; $self->db_listen($dbh, $l, $dbname, 0); $dbh->commit; @@ -7191,17 +7230,19 @@ sub deactivate_sync { ## If we are listening for kicks on the source, stop doing so for my $dbname (sort keys %{ $self->{sdb} }) { - $x = $self->{sdb}{$dbname}; - next if $x->{dbtype} ne 'postgres'; - next if $x->{role} ne 'source'; + my $d = $self->{sdb}{$dbname}; + + next if $d->{dbtype} ne 'postgres'; - $x->{dbh} ||= $self->connect_database($dbname); - $x->{dbh}->commit(); + next if $d->{role} ne 'source'; + + $d->{dbh} ||= $self->connect_database($dbname); + $d->{dbh}->commit(); if ($s->{autokick}) { my $l = "kick_sync_$syncname"; - $self->db_unlisten($x->{dbh}, $l, $dbname, 0); - $x->{dbh}->commit(); + $self->db_unlisten($d->{dbh}, $l, $dbname, 0); + $d->{dbh}->commit(); } } @@ -7428,19 +7469,19 @@ sub fork_vac { ## Reconnect to all databases we care about for my $dbname (keys %{ $self->{sdb} }) { - $x = $self->{sdb}{$dbname}; + my $d = $self->{sdb}{$dbname}; ## We looped through all the syncs earlier to determine which databases ## really need to be vacuumed. The criteria: ## not a fullcopy sync, dbtype is postgres, role is source - next if ! $x->{needsvac}; + next if ! $d->{needsvac}; ## Establish a new database handle - ($x->{backend}, $x->{dbh}) = $self->connect_database($dbname); - $self->glog(qq{Connected to database "$dbname" with backend PID of $x->{backend}}, LOG_NORMAL); - $self->{pidmap}{$x->{backend}} = "DB $dbname"; + ($d->{backend}, $d->{dbh}) = $self->connect_database($dbname); + $self->glog(qq{Connected to database "$dbname" with backend PID of $d->{backend}}, LOG_NORMAL); + $self->{pidmap}{$d->{backend}} = "DB $dbname"; ## We don't want details about the purging - $x->{dbh}->do(q{SET client_min_messages = 'warning'}); + $d->{dbh}->do(q{SET client_min_messages = 'warning'}); } ## Track how long since we last came to life for vacuuming @@ -7499,32 +7540,32 @@ sub fork_vac { ## Kick each one off async for my $dbname (sort keys %{ $self->{sdb}} ) { - $x = $self->{sdb}{$dbname}; + my $d = $self->{sdb}{$dbname}; - next if ! $x->{needsvac}; + next if ! $d->{needsvac}; - my $xdbh = $x->{dbh}; + my $dbh = $d->{dbh}; ## Safety check: if the bucardo schema is not there, we don't want to vacuum - if (! exists $x->{hasschema}) { + if (! exists $d->{hasschema}) { $SQL = q{SELECT count(*) FROM pg_namespace WHERE nspname = 'bucardo'}; - $x->{hasschema} = $xdbh->selectall_arrayref($SQL)->[0][0]; - if (! $x->{hasschema} ) { + $d->{hasschema} = $dbh->selectall_arrayref($SQL)->[0][0]; + if (! $d->{hasschema} ) { $self->glog("Warning! Cannot vacuum db $dbname unless we have a bucardo schema", LOG_WARN); } } ## No schema? We've already complained, so skip it silently - next if ! $x->{hasschema}; + next if ! $d->{hasschema}; $valid_backends++; ## Async please $self->glog(qq{Running bucardo_purge_delta on database "$dbname"}, LOG_VERBOSE); $SQL = q{SELECT bucardo.bucardo_purge_delta('45 seconds')}; - $sth{"vac_$dbname"} = $xdbh->prepare($SQL, { pg_async => PG_ASYNC } ); + $sth{"vac_$dbname"} = $dbh->prepare($SQL, { pg_async => PG_ASYNC } ); $sth{"vac_$dbname"}->execute(); - $x->{async_active} = time; + $d->{async_active} = time; } ## end each source database @@ -7543,21 +7584,21 @@ sub fork_vac { ## Finish each one up for my $dbname (sort keys %{ $self->{sdb}} ) { - $x = $self->{sdb}{$dbname}; + my $d = $self->{sdb}{$dbname}; ## As above, skip if not a source or no schema available - next if ! $x->{needsvac}; + next if ! $d->{needsvac}; - next if ! $x->{hasschema}; + next if ! $d->{hasschema}; - my $xdbh = $x->{dbh}; + my $dbh = $d->{dbh}; $self->glog(qq{Finish and fetch bucardo_purge_delta on database "$dbname"}, LOG_DEBUG); $count = $sth{"vac_$dbname"}->pg_result(); - $x->{async_active} = 0; + $d->{async_active} = 0; my $info = $sth{"vac_$dbname"}->fetchall_arrayref()->[0][0]; - $xdbh->commit(); + $dbh->commit(); $self->glog(qq{Purge on db "$dbname" gave: $info}, LOG_VERBOSE); @@ -7856,15 +7897,16 @@ sub terminate_old_goats { my $SQLC = 'SELECT pg_cancel_backend(?)'; my $total = 0; for my $dbname (sort keys %{ $self->{sdb} }) { - $x = $self->{sdb}{$dbname}; + + my $d = $self->{sdb}{$dbname}; ## All of this is very Postgres specific - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; ## Loop through each backend PID found for this database for my $pid (sort keys %{ $dbpid{$dbname} }) { my $time = $dbpid{$dbname}{$pid}; - $sth = $x->{dbh}->prepare($SQL); + $sth = $d->{dbh}->prepare($SQL); ## See if the process is still around by matching PID and query_start time $count = $sth->execute($pid, $time); @@ -7874,7 +7916,7 @@ sub terminate_old_goats { next if $count < 1; ## If we got a match, try and kill it - $sth = $x->{dbh}->prepare($SQLC); + $sth = $d->{dbh}->prepare($SQLC); $count = $sth->execute($pid); my $res = $count < 1 ? 'failed' : 'ok'; $self->glog("Attempted to kill backend $pid on db $dbname, started $time. Result: $res", LOG_NORMAL); @@ -8505,38 +8547,38 @@ sub table_has_rows { ## 2. Name of the table ## Returns: true or false - my ($self,$x,$tname) = @_; + my ($self,$d,$tname) = @_; ## Some types do not have a count - return 0 if $x->{does_append_only}; + return 0 if $d->{does_append_only}; - if ($x->{does_limit}) { + if ($d->{does_limit}) { $SQL = "SELECT 1 FROM $tname LIMIT 1"; - $sth = $x->{dbh}->prepare($SQL); + $sth = $d->{dbh}->prepare($SQL); $sth->execute(); $count = $sth->rows(); $sth->finish(); return $count >= 1 ? 1 : 0; } - elsif ('mongo' eq $x->{dbtype}) { - my $collection = $x->{dbh}->get_collection($tname); + elsif ('mongo' eq $d->{dbtype}) { + my $collection = $d->{dbh}->get_collection($tname); $count = $collection->count({}); return $count >= 1 ? 1 : 0; } - elsif ('oracle' eq $x->{dbtype}) { + elsif ('oracle' eq $d->{dbtype}) { $SQL = "SELECT 1 FROM $tname WHERE rownum > 1"; - $sth = $x->{dbh}->prepare($SQL); + $sth = $d->{dbh}->prepare($SQL); $sth->execute(); $count = $sth->rows(); $sth->finish(); return $count >= 1 ? 1 : 0; } - elsif ('redis' eq $x->{dbtype}) { + elsif ('redis' eq $d->{dbtype}) { ## No sense in returning anything here return 0; } else { - die "Cannot handle database type $x->{dbtype} yet!"; + die "Cannot handle database type $d->{dbtype} yet!"; } return 0; @@ -8608,14 +8650,14 @@ sub adjust_sequence { next if $dbname eq $winner; ## Natch - $x = $sync->{db}{$dbname}; + my $d = $sync->{db}{$dbname}; - next if $x->{dbtype} ne 'postgres'; + next if $d->{dbtype} ne 'postgres'; - next if ! $x->{adjustsequence}; + next if ! $d->{adjustsequence}; ## Reset the flag in case this sub is called more than once - $x->{adjustsequence} = 0; + $d->{adjustsequence} = 0; my $targetinfo = $g->{sequenceinfo}{$dbname} || {}; @@ -8628,7 +8670,7 @@ sub adjust_sequence { $self->glog("Set sequence $dbname.$S.$T to $sourceinfo->{last_value} (is_called to $sourceinfo->{is_called})", LOG_DEBUG); $SQL = qq{SELECT setval('$S.$T', $sourceinfo->{last_value}, '$sourceinfo->{is_called}')}; - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); $changes++; } @@ -8665,7 +8707,7 @@ sub adjust_sequence { $SQL = "ALTER SEQUENCE $S.$T "; $SQL .= join ' ' => @alter; $self->glog("Running on target $dbname: $SQL", LOG_DEBUG); - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); } } ## end each database @@ -8823,42 +8865,42 @@ sub truncate_table { ## 3. Boolean if we should CASCADE the truncate or not ## Returns: true if the truncate succeeded without error, false otherwise - my ($self, $x, $tname, $cascade) = @_; + my ($self, $d, $tname, $cascade) = @_; ## Override any existing handlers so we can cleanly catch the eval local $SIG{__DIE__} = sub {}; - if ($x->{does_sql}) { - if ($x->{does_savepoints}) { - $x->{dbh}->do('SAVEPOINT truncate_attempt'); + if ($d->{does_sql}) { + if ($d->{does_savepoints}) { + $d->{dbh}->do('SAVEPOINT truncate_attempt'); } $SQL = sprintf 'TRUNCATE TABLE %s%s', $tname, - ($cascade and $x->{does_cascade}) ? ' CASCADE' : ''; + ($cascade and $d->{does_cascade}) ? ' CASCADE' : ''; my $truncate_ok = 0; eval { - $x->{dbh}->do($SQL); + $d->{dbh}->do($SQL); $truncate_ok = 1; }; if (! $truncate_ok) { - $x->{does_savepoints} and $x->{dbh}->do('ROLLBACK TO truncate_attempt'); - $self->glog("Truncate error for db $x->{name}.$x->{dbname}.$tname: $@", LOG_NORMAL); + $d->{does_savepoints} and $d->{dbh}->do('ROLLBACK TO truncate_attempt'); + $self->glog("Truncate error for db $d->{name}.$d->{dbname}.$tname: $@", LOG_NORMAL); return 0; } else { - $x->{does_savepoints} and $x->{dbh}->do('RELEASE truncate_attempt'); + $d->{does_savepoints} and $d->{dbh}->do('RELEASE truncate_attempt'); return 1; } } - if ('mongo' eq $x->{dbtype}) { - my $collection = $x->{dbh}->get_collection($tname); + if ('mongo' eq $d->{dbtype}) { + my $collection = $d->{dbh}->get_collection($tname); $collection->remove({}, { safe => 1} ); return 1; } - elsif ('redis' eq $x->{dbtype}) { + elsif ('redis' eq $d->{dbtype}) { ## No real equivalent here, as we do not map tables 1:1 to redis keys ## In theory, we could walk through all keys and delete ones that match the table ## We will hold off until someone actually needs that, however :) @@ -8878,24 +8920,24 @@ sub delete_table { ## 2. Table name ## Returns: number of rows deleted - my ($self, $x, $tname) = @_; + my ($self, $d, $tname) = @_; my $count = 0; - if ($x->{does_sql}) { - ($count = $x->{dbh}->do("DELETE FROM $tname")) =~ s/0E0/0/o; + if ($d->{does_sql}) { + ($count = $d->{dbh}->do("DELETE FROM $tname")) =~ s/0E0/0/o; } - elsif ('mongo' eq $x->{dbtype}) { + elsif ('mongo' eq $d->{dbtype}) { ## Same as truncate, really, except we return the number of rows - my $collection = $x->{dbh}->get_collection($tname); + my $collection = $d->{dbh}->get_collection($tname); my $res = $collection->remove({}, { safe => 1} ); $count = $res->{n}; } - elsif ('redis' eq $x->{dbtype}) { + elsif ('redis' eq $d->{dbtype}) { ## Nothing relevant here, as the table is only part of the key name } else { - die "Do not know how to delete a dbtype of $x->{dbtype}"; + die "Do not know how to delete a dbtype of $d->{dbtype}"; } return $count; @@ -8939,28 +8981,28 @@ sub delete_rows { if (exists $self->{truncateinfo} and exists $self->{truncateinfo}{$S}{$T}) { ## Try and truncate each target - for my $t (@$deldb) { + for my $d (@$deldb) { - my $type = $t->{dbtype}; + my $type = $d->{dbtype}; - my $tname = $newname->{$t->{name}}; + my $tname = $newname->{$d->{name}}; ## Postgres is a plain and simple TRUNCATE, with an async flag ## TRUNCATE CASCADE is not needed as everything should be in one ## sync (herd), and we have turned all FKs off if ('postgres' eq $type) { - my $tdbh = $t->{dbh}; + my $tdbh = $d->{dbh}; $tdbh->do("TRUNCATE table $tname", { pg_async => PG_ASYNC }); - $t->{async_active} = time; + $d->{async_active} = time; } ## end postgres database ## For all other SQL databases, we simply truncate - elsif ($x->{does_sql}) { - $t->{dbh}->do("TRUNCATE TABLE $tname"); + elsif ($d->{does_sql}) { + $d->{dbh}->do("TRUNCATE TABLE $tname"); } ## For MongoDB, we simply remove everything from the collection ## This keeps the indexes around (which is why we don't "drop") elsif ('mongo' eq $type) { - $self->{collection} = $t->{dbh}->get_collection($tname); + $self->{collection} = $d->{dbh}->get_collection($tname); $self->{collection}->remove({}, { safe => 1} ); } ## For Redis, do nothing @@ -8968,19 +9010,19 @@ sub delete_rows { } ## For flatfiles, write out a basic truncate statement elsif ($type =~ /flat/o) { - printf {$t->{filehandle}} qq{TRUNCATE TABLE %S;\n\n}, + printf {$d->{filehandle}} qq{TRUNCATE TABLE %S;\n\n}, 'flatpg' eq $type ? $tname : $tname; - $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE); + $self->glog(qq{Appended to flatfile "$d->{filename}"}, LOG_VERBOSE); } } ## end each database to be truncated ## Final cleanup for each target - for my $t (@$deldb) { - if ('postgres' eq $t->{dbtype}) { + for my $d (@$deldb) { + if ('postgres' eq $d->{dbtype}) { ## Wrap up all the async truncate call - $t->{dbh}->pg_result(); - $t->{async_active} = 0; + $d->{dbh}->pg_result(); + $d->{async_active} = 0; } } @@ -9661,11 +9703,11 @@ sub push_rows { my $pkeyval = join ':' => @pkey; ## Build a list of non-null key/value pairs to set in the hash my @add; - my $x = $goat->{numpkcols} - 1; + my $i = $goat->{numpkcols} - 1; for my $val (@colvals) { - $x++; + $i++; next if ! defined $val; - push @add, $cols->[$x], $val; + push @add, $cols->[$i], $val; } $t->{dbh}->hmset("$tname:$pkeyval", @add); @@ -9687,7 +9729,9 @@ sub push_rows { ## Perform final cleanups for each target for my $t (@{ $srccmd{$clause} }) { + my $type = $t->{dbtype}; + my $tname = $newname->{$t->{name}}; if ('postgres' eq $type) { @@ -9702,8 +9746,8 @@ sub push_rows { ## If this table is set to makedelta, add rows to bucardo.delta to simulate the ## normal action of a trigger and add a row to bucardo.track to indicate that ## it has already been replicated here. - my $dbinfo = $sync->{db}{ $t->{name} }; - if (!$fullcopy and $dbinfo->{does_makedelta}{$S}{$T}) { + my $d = $sync->{db}{ $t->{name} }; + if (!$fullcopy and $d->{does_makedelta}{$S}{$T}) { $self->glog("Using makedelta to populate delta and track tables for $t->{name}.$tname", LOG_VERBOSE); my ($cols, $vals); if ($numpks == 1) { @@ -9721,7 +9765,8 @@ sub push_rows { $dbh->do(qq{ INSERT INTO bucardo.$goat->{tracktable} VALUES (NOW(), ?) - }, undef, $x->{DBGROUPNAME}); + }, undef, $d->{DBGROUPNAME}); +$self->glog("ZZZZZ TEST $d->{DBGROUPNAME}!!"); $self->glog("Signalling all other syncs that this table has changed", LOG_DEBUG); ## Cache this @@ -9874,12 +9919,12 @@ sub msg { ## no critic die qq{Invalid message "$name" from line $line\n}; } - my $x=1; + my $i = 1; { - my $val = $_[$x-1]; + my $val = $_[$i-1]; $val = '?' if ! defined $val; - last unless $msg =~ s/\$$x/$val/g; - $x++; + last unless $msg =~ s/\$$i/$val/g; + $i++; redo; } return $msg; -- 2.39.5