my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB) = @_;
+ ## Build a list of all PK values to feed to IN clauses
+ ## This is an array in case we go over $chunksize
+ my @pkvals = [];
+
## If fullcopy, $rows will be the scalar 'fullcopy' instead of a hashref
- ## In which case, we create a dummy hashref
my $fullcopy = 0;
if (! ref $rows) {
if ($rows eq 'fullcopy') {
die "Invalid rows passed to push_rows: $rows\n";
}
$rows = {};
+ $pkvals[0] = 'fullcopy';
}
- ## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
- if (ref $TargetDB ne 'ARRAY') {
- $TargetDB = [$TargetDB];
+ ## This will be zero for fullcopy of course
+ my $total_rows = keys %$rows;
+
+ if (!$total_rows and !$fullcopy) {
+ return 0; ## Can happen on a truncation
}
- ## This will be zero for fullcopy of course
- my $total = keys %$rows;
+ my $syncname = $Sync->{name} || '';
+
+ ## We may want to change the target table based on the customname table
+ ## It is up to the caller to populate these, even if the syncname is ''
+ my $customname = $Table->{newname}{$syncname} || {};
- ## Total number of rows written
- $count = 0;
+ ## We may want to change the SELECT based on the customcols table
+ my $customcols = $Table->{newcols}{$syncname} || {};
- my $syncname = $Sync->{name};
- my $newname = $Table->{newname}{$syncname};
my $numpks = $Table->{numpkcols};
## As with delete, we may break this into more than one step
## Should only be a factor for very large numbers of keys
my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
- ## Build a list of all PK values to feed to IN clauses
- my @pkvals;
- my $round = 0;
- my $roundtotal = 0;
- for my $key (keys %$rows) {
- my $inner = length $key
- ? (join ',' => map { s{\'}{''}go; s{\\}{\\\\}go; qq{'$_'}; } split '\0', $key, -1)
- : q{''};
- push @{ $pkvals[$round] ||= [] } => $numpks > 1 ? "($inner)" : $inner;
- if (++$roundtotal >= $chunksize) {
- $roundtotal = 0;
- $round++;
+ ## If there is only one primary key, and a sane number of rows, we can use '= ANY(?)'
+ if (! $fullcopy) {
+ if ($numpks == 1 and $total_rows < $chunksize) {
+ $pkvals[0] = 'allrows';
}
- }
+ ## Otherwise, we push our completed SQL into bins
+ else {
+ my $pkrounds = 1;
+ my $current_row = 1;
- ## Example: 1234, 221
- ## Example MCPK: ('1234','Don''t Stop','2008-01-01'),('221','foobar','2008-11-01')
+ ## Loop through each row and create the needed SQL fragment
+ for my $key (keys %$rows) {
- ## This can happen if we truncated but had no delta activity
- return 0 if (! $pkvals[0] or ! length $pkvals[0]->[0] ) and ! $fullcopy;
+ push @{ $pkvals[$pkrounds-1] ||= [] } => split '\0', $key, -1;
- ## Put dummy data into @pkvals if using fullcopy
- if ($fullcopy) {
- push @pkvals => ['fullcopy'];
+ ## Make sure our SQL statement doesn't grow too large
+ if (++$current_row > $chunksize) {
+ $current_row = 1;
+ $pkrounds++;
+ }
+ }
+ }
}
- ## Get ready to export from the source
- ## This may have multiple versions depending on the customcols table
- my $newcols = $Table->{newcols}{$syncname} || {};
+ ## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
+ if (ref $TargetDB ne 'ARRAY') {
+ $TargetDB = [$TargetDB];
+ }
- ## Walk through and grab which SQL is needed for each target
- ## Cache this earlier on - controller?
+ ## Figure out the different SELECT clauses, and assign targets to them
my %srccmd;
for my $t (@$TargetDB ) {
- ## The SELECT clause we use (may be empty)
- my $clause = $newcols->{$t->{name}};
+ ## The SELECT clause we use (usually an empty string unless customcols is being used)
+ my $select_clause = $customcols->{$t->{name}} || '';
## Associate this target with this clause
- push @{$srccmd{$clause}} => $t;
+ push @{$srccmd{$select_clause}} => $t;
}
- my $tablename = "$Table->{safeschema}.$Table->{safetable}";
- my $fromdbh = $SourceDB->{dbh};
+ my $source_tablename = "$Table->{safeschema}.$Table->{safetable}";
+ my $sourcedbh = $SourceDB->{dbh};
+
+ ## The total number of source rows returned
+ my $total_source_rows = 0;
- ## Loop through each source command and push it out to all targets
- ## that are associated with it
- for my $clause (sort keys %srccmd) {
+ ## Loop through each select command and push it out to all targets that are associated with it
+ for my $select_clause (sort keys %srccmd) {
## Build the clause (cache) and kick it off
- my $SELECT = $clause || 'SELECT *';
+ my $SELECT = $select_clause || 'SELECT *';
- ## Prepare each target in turn
- for my $t (@{ $srccmd{$clause} }) {
+ ## Prepare each target that is using this select clause
+ for my $target (@{ $srccmd{$select_clause} }) {
## Internal name of this target
- my $targetname = $t->{name};
+ my $targetname = $target->{name};
- ## Name of the table we are pushing to on this target
- my $tname = $newname->{$targetname};
+ ## The actual target table name. Depends on dbtype and customname table entries
+ my $target_tablename = $customname->{$targetname};
## The columns we are pushing to, both as an arrayref and a CSV:
my $cols = $Table->{tcolumns}{$SELECT};
- my $columnlist = $t->{does_sql} ?
- ('(' . (join ',', map { $t->{dbh}->quote_identifier($_) } @$cols) . ')')
+ my $columnlist = $target->{does_sql} ?
+ ('(' . (join ',', map { $target->{dbh}->quote_identifier($_) } @$cols) . ')')
: ('(' . (join ',', map { $_ } @$cols) . ')');
- my $type = $t->{dbtype};
+ my $type = $target->{dbtype};
## Use columnlist below so we never have to worry about the order
## of the columns on the target
if ('postgres' eq $type) {
- my $tgtcmd = "$self->{sqlprefix}COPY $tname$columnlist FROM STDIN";
- $t->{dbh}->do($tgtcmd);
+ my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN";
+ $target->{dbh}->do($tgtcmd);
}
elsif ('flatpg' eq $type) {
- print {$t->{filehandle}} "COPY $tname$columnlist FROM STDIN;\n";
- $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+ print {$target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
+ $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
}
elsif ('flatsql' eq $type) {
- print {$t->{filehandle}} "INSERT INTO $tname$columnlist VALUES\n";
- $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+ print {$target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
+ $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
}
elsif ('mongo' eq $type) {
- $self->{collection} = $t->{dbh}->get_collection($tname);
+ $self->{collection} = $target->{dbh}->get_collection($target_tablename);
}
elsif ('redis' eq $type) {
- ## No prep needed, other than to reset our count of changes
- $t->{redis} = 0;
+ ## No setup needed
}
elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
- my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+ my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
$tgtcmd .= '?,' x @$cols;
$tgtcmd =~ s/,$/)/o;
- $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+ $target->{sth} = $target->{dbh}->prepare($tgtcmd);
}
elsif ('oracle' eq $type) {
- my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+ my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
$tgtcmd .= '?,' x @$cols;
$tgtcmd =~ s/,$/)/o;
- $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+ $target->{sth} = $target->{dbh}->prepare($tgtcmd);
}
elsif ('sqlite' eq $type) {
- my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+ my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
$tgtcmd .= '?,' x @$cols;
$tgtcmd =~ s/,$/)/o;
- $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+ $target->{sth} = $target->{dbh}->prepare($tgtcmd);
}
else {
die qq{No support for database type "$type" yet!};
}
- } ## end preparing each target for this clause
+ } ## end preparing each target for this select clause
my $loop = 1;
my $pcount = @pkvals;
## Loop through each chunk of primary keys to copy over
for my $pk_values (@pkvals) {
- my $pkvs = join ',' => @{ $pk_values };
- ## Message to prepend to the statement if chunking
- my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */ ";
- $loop++;
+ ## Start streaming rows from the source
+ $self->glog(qq{Copying from $fromname.$source_tablename}, LOG_VERBOSE);
- ## Kick off the copy on the source
- $self->glog(qq{${pre}Copying from $fromname.$tablename}, LOG_VERBOSE);
- my $srccmd = sprintf '%s%sCOPY (%s FROM %s %s) TO STDOUT%s',
- $pre,
+ ## If we are doing a small batch of single primary keys, use ANY
+ ## If we are doing fullcopy, leave out the WHERE clause completely
+ if (! ref $pk_values ) {
+ my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
$self->{sqlprefix},
$SELECT,
- $tablename,
- $fullcopy ? '' : " WHERE $Table->{pkcols} IN ($pkvs)",
+ $source_tablename,
+ $pk_values eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
$Sync->{copyextra} ? " $Sync->{copyextra}" : '';
- $fromdbh->do($srccmd);
-
- my $buffer = '';
-
- ## Loop through all changed rows on the source, and push to the target(s)
- my $multirow = 0;
-
- ## If in fullcopy mode, we don't know how many rows will get copied,
- ## so we count as we go along
- if ($fullcopy) {
- $total = 0;
+ my $srcsth = $sourcedbh->prepare($srccmd);
+ $pk_values eq 'fullcopy' ? $srcsth->execute()
+ : $srcsth->execute( [ keys %$rows ]);
}
+ else {
+ my $baseq = '?';
+ if ($numpks > 1) {
+ $baseq = '?,' x $numpks;
+ $baseq =~ s/(.+?).$/\($1\)/;
+ }
+ my $number_values = @$pk_values;
+ my $placeholders = "$baseq," x ($number_values / $numpks);
+ chop $placeholders;
+ my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s',
+ "/* $loop of $pcount */ ",
+ $self->{sqlprefix},
+ $SELECT,
+ $source_tablename,
+ $Table->{pkeycols},
+ $placeholders,
+ $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+ my $srcsth = $sourcedbh->prepare($srccmd);
+ $srcsth->execute( @$pk_values );
+ $loop++;
+ }
+
+ ## How many rows we read from the source database this chunk
+ my $source_rows = 0;
## Loop through each row output from the source, storing it in $buffer
- while ($fromdbh->pg_getcopydata($buffer) >= 0) {
-
- $total++ if $fullcopy;
+ ## Future optimzation: slurp in X rows at a time, then process them
+ my $buffer = '';
+ while ($sourcedbh->pg_getcopydata($buffer) >= 0) {
- ## For each target using this particular COPY statement
- for my $t (@{ $srccmd{$clause} }) {
+ $source_rows++;
- my $type = $t->{dbtype};
- my $cols = $Table->{tcolumns}{$SELECT};
- my $tname = $newname->{$t->{name}};
+ ## For each target using this particular SELECT clause
+ for my $target (@{ $srccmd{$select_clause} }) {
- chomp $buffer;
+ my $type = $target->{dbtype};
## For Postgres, we simply do COPY to COPY
if ('postgres' eq $type) {
- $t->{dbh}->pg_putcopydata("$buffer\n");
+ $target->{dbh}->pg_putcopydata($buffer);
}
## For flat files destined for Postgres, just do a tab-delimited dump
elsif ('flatpg' eq $type) {
- print {$t->{filehandle}} "$buffer\n";
+ print {$target->{filehandle}} $buffer;
}
## For other flat files, make a standard VALUES list
elsif ('flatsql' eq $type) {
- if ($multirow++) {
- print {$t->{filehandle}} ",\n";
+ chomp $buffer;
+ if ($source_rows > 1) {
+ print {$target->{filehandle}} ",\n";
}
- print {$t->{filehandle}} '(' .
- (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
+ print {$target->{filehandle}} '(' .
+ (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
}
## For Mongo, do some mongomagic
elsif ('mongo' eq $type) {
## Have to map these values back to their names
+ chomp $buffer;
my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
+
## Our object consists of the primary keys, plus all other fields
my $object = {};
- for my $cname (@{ $cols }) {
+ for my $cname (@{ $targetcols }) {
$object->{$cname} = shift @cols;
}
## Coerce non-strings into different objects
}
}
elsif ($Table->{columnhash}{$key}{ftype} =~ /real|double|numeric/o) {
- $object->{$key} = strtod($object->{$key});
+ $object->{$key} = strtod($object->{$key});
}
}
$self->{collection}->insert($object, { safe => 1 });
or 'drizzle' eq $type
or 'oracle' eq $type
or 'sqlite' eq $type) {
+ chomp $buffer;
my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
for my $cindex (0..@cols) {
next unless defined $cols[$cindex];
- if ($Table->{columnhash}{$cols->[$cindex]}{ftype} eq 'boolean') {
+ if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
# BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
$cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
}
}
- $count += $t->{sth}->execute(@cols);
+ $target->{sth}->execute(@cols);
}
elsif ('redis' eq $type) {
## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue"
+ chomp $buffer;
my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
my @pkey;
for (1 .. $Table->{numpkcols}) {
## Build a list of non-null key/value pairs to set in the hash
my @add;
$i = $Table->{numpkcols} - 1;
+ my $targetcols = $Table->{tcolumns}{$SELECT};
for my $val (@colvals) {
$i++;
next if ! defined $val;
- push @add, $cols->[$i], $val;
+ push @add, $targetcols->[$i], $val;
}
- $t->{dbh}->hmset("$tname:$pkeyval", @add);
- $count++;
- $t->{redis}++;
+ my $target_tablename = $customname->{$target->{name}};
+ $target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
}
} ## end each target
} ## end each row pulled from the source
+ $total_source_rows += $source_rows;
+
} ## end each pklist
## Workaround for DBD::Pg bug
## Once we require a minimum version of 2.18.1 or better, we can remove this!
- if ($self->{dbdpgversion} < 21801) {
- $fromdbh->do('SELECT 1');
+ if ($SourceDB->{dbtype} eq 'postgres' and $self->{dbdpgversion} < 21801) {
+ $sourcedbh->do('SELECT 1');
}
## Perform final cleanups for each target
- for my $t (@{ $srccmd{$clause} }) {
+ for my $target (@{ $srccmd{$select_clause} }) {
- my $type = $t->{dbtype};
+ my $type = $target->{dbtype};
- my $tname = $newname->{$t->{name}};
+ my $tname = $customname->{$target->{name}};
if ('postgres' eq $type) {
- my $dbh = $t->{dbh};
+ my $dbh = $target->{dbh};
$dbh->pg_putcopyend();
## Same bug as above
if ($self->{dbdpgversion} < 21801) {
$dbh->do('SELECT 1');
}
- $self->glog(qq{Rows copied to $t->{name}.$tname: $total}, LOG_VERBOSE);
- $count += $total;
+ $self->glog(qq{Rows copied to $target->{name}.$tname: $total_rows}, LOG_VERBOSE);
## If this table is set to makedelta, add rows to bucardo.delta to simulate the
## normal action of a trigger and add a row to bucardo.track to indicate that
## it has already been replicated here.
- my $d = $Sync->{db}{ $t->{name} };
- if (!$fullcopy and $d->{does_makedelta}{$tablename}) {
- $self->glog("Using makedelta to populate delta and track tables for $t->{name}.$tname", LOG_VERBOSE);
- my $vals;
- if ($numpks == 1) {
- $vals = join ',', map { "($_)" } map { @{ $_ } } @pkvals;
- }
- else {
- $vals = join ',', map { @{ $_ } } @pkvals;
- }
+ my $d = $Sync->{db}{ $target->{name} };
+ if (!$fullcopy and $d->{does_makedelta}{$source_tablename} ) {
+
+ $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE);
+
my $cols = join ',' => @{ $Table->{qpkey} };
- $dbh->do(qq{
- INSERT INTO bucardo.$Table->{deltatable} ($cols)
- VALUES $vals
- });
+ for my $pk_values (@pkvals) {
+
+ my $baseq = '?';
+ if ($numpks > 1) {
+ $baseq = '?,' x $numpks;
+ chop $baseq;
+ }
+ my $number_values = ref $pk_values ? @$pk_values : keys %$rows;
+ my $placeholders = "($baseq)," x ($number_values / $numpks);
+ chop $placeholders;
+
+ my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s',
+ $Table->{deltatable},
+ $cols,
+ $placeholders;
+ $self->glog("GOT $placeholders leading to $SQL", LOG_DEBUG);
+ my $sth = $dbh->prepare($SQL);
+ $sth->execute(ref $pk_values ? @$pk_values : (keys %$rows));
+ }
+
# Make sure we track it - but only if this sync already acts as a source!
- if ($t->{role} eq 'source') {
+ if ($target->{role} eq 'source') {
$dbh->do(qq{
INSERT INTO bucardo.$Table->{tracktable}
VALUES (NOW(), ?)
## We want to send a kick signal to other syncs that are using this table
## However, we do not want to kick unless they are set to autokick and active
-
+ ## This works even if we do not have a real syncs, as $syncname will be ''
$self->glog('Signalling other syncs that this table has changed', LOG_DEBUG);
if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) {
$SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?';
}
}
elsif ('flatpg' eq $type) {
- print {$t->{filehandle}} "\\\.\n\n";
+ print {$target->{filehandle}} "\\\.\n\n";
}
elsif ('flatsql' eq $type) {
- print {$t->{filehandle}} ";\n\n";
+ print {$target->{filehandle}} ";\n\n";
}
elsif ('redis' eq $type) {
- $self->glog(qq{Rows copied to Redis $t->{name}.$tname:<pkeyvalue>: $t->{redis}}, LOG_VERBOSE);
+ $self->glog(qq{Rows copied to Redis $target->{name}.$tname:<pkeyvalue>: $total_source_rows}, LOG_VERBOSE);
+ }
+ else {
+ ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle
}
}
} ## end of each clause in the source command list
- return $count;
+ return $total_source_rows;
} ## end of push_rows