## Can it do truncate?
$x->{does_truncate} = 0;
+ ## Does it support asynchronous queries well?
+ $x->{does_async} = 0;
+
+ ## Does it have good support for ANY()?
+ $x->{does_ANY_clause} = 0;
+
## Can it do savepoints (and roll them back)?
$x->{does_savepoints} = 0;
## List of tables in this database that need makedelta inserts
$x->{needs_makedelta} = {};
+ ## Does it have that annoying timestamp +dd bug?
+ $x->{has_mysql_timestamp_issue} = 0;
+
## Start clumping into groups and adjust the attributes
## Postgres
$x->{does_savepoints} = 1;
$x->{does_cascade} = 1;
$x->{does_limit} = 1;
+ $x->{does_async} = 1;
+ $x->{does_ANY_clause} = 1;
}
## Drizzle
$x->{does_truncate} = 1;
$x->{does_savepoints} = 1;
$x->{does_limit} = 1;
+ $x->{has_mysql_timestamp_issue} = 1;
}
## MongoDB
$x->{does_truncate} = 1;
$x->{does_savepoints} = 1;
$x->{does_limit} = 1;
+ $x->{has_mysql_timestamp_issue} = 1;
}
## Oracle
$dsn .= join ';', map {
($_ eq 'dbservice' ? 'service' : $_ ) . "=$d->{$_}";
} grep { defined $d->{$_} and length $d->{$_} } qw/dbname dbservice/;
- $self->glog("DDSSNN=$dsn");
}
elsif ('drizzle' eq $dbtype) {
$dsn = "dbi:drizzle:database=$d->{dbname}";
$self->glog("DSN: $dsn", LOG_NORMAL) if exists $config{log_level};
-warn "DSN=$dsn\n";
$dbh = DBI->connect
(
$dsn,
my $pkcolsraw = $goat->{pkeycolsraw};
my $numpks = $goat->{numpkcols};
- ## Keep track of exact number of rows deleted from each target
- my %count;
+ ## Have we already truncated this table? If yes, skip and reset the flag
+ if (exists $goat->{truncatewinner}) {
+ return 0;
+ }
- ## Allow for non-arrays by forcing to an array
+ ## Ensure the target database argument is always an array
if (ref $deldb ne 'ARRAY') {
$deldb = [$deldb];
}
+ ## We may be going from one table to another - this is the mapping hash
my $newname = $goat->{newname}{$self->{syncname}};
- ## Have we already truncated this table? If yes, skip and reset the flag
- if (exists $goat->{truncatewinner}) {
- return 0;
- }
-
## Are we truncating?
if (exists $self->{truncateinfo}{$S}{$T}) {
if ('postgres' eq $type) {
my $tdbh = $t->{dbh};
$tdbh->do("TRUNCATE table $tname", { pg_async => PG_ASYNC });
+ $t->{async_active} = time;
} ## end postgres database
## For all other SQL databases, we simply truncate
elsif ($x->{does_sql}) {
elsif ('mongo' eq $type) {
$self->{collection} = $t->{dbh}->get_collection($tname);
$self->{collection}->remove({}, { safe => 1} );
- next;
}
## For Redis, do nothing
elsif ('redis' eq $type) {
- next;
}
## For flatfiles, write out a basic truncate statement
elsif ($type =~ /flat/o) {
## Final cleanup for each target
for my $t (@$deldb) {
- my $type = $t->{dbtype};
- if ('postgres' eq $type) {
+ if ('postgres' eq $t->{dbtype}) {
## Wrap up all the async truncate call
$t->{dbh}->pg_result();
+ $t->{async_active} = 0;
}
}
## The number of items before we break it into a separate statement
## This is inexact, as we don't know how large each key is,
## but should be good enough as long as not set too high.
+ ## For now, all targets have the same chunksize
my $chunksize = $config{statement_chunk_size} || 8_000;
## Setup our deletion SQL as needed
my $type = $t->{dbtype};
+ ## Track the number of rows actually deleted from this target
+ $t->{deleted_rows} = 0;
+
+ ## Set to true when all rounds completed
+ $t->{delete_complete} = 0;
+
## No special preparation for mongo or redis
next if $type =~ /mongo|redis/;
- ## Set the type of SQL we are using: IN vs ANY
- my $sqltype = '';
- if ('postgres' eq $type) {
- $sqltype = (1 == $numpks) ? 'ANY' : 'IN';
- }
- elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
- $sqltype = 'MYIN';
- }
- elsif ('oracle' eq $type) {
- $sqltype = 'IN';
- }
- elsif ('sqlite' eq $type) {
- $sqltype = 'IN';
- }
- elsif ($type =~ /flatpg/o) {
- ## XXX Worth the trouble to allow building an ANY someday for flatpg?
- $sqltype = 'IN';
- }
- elsif ($type =~ /flat/o) {
- $sqltype = 'IN';
+ ## Set the type of SQL we are using: IN vs ANY. Default is IN
+ my $sqltype = 'IN';
+
+ ## Use of ANY is greatly preferred, but can only use if the
+ ## underlying database supports it, and if we have a single column pk
+ if ($t->{does_ANY_clause} and 1==$numpks) {
+ $sqltype = 'ANY';
}
+ ## The actual target table name: may differ from the source!
my $tname = $newname->{$t->{name}};
- ## We may want to break this up into separate rounds if large
- my $round = 0;
-
- ## Internal counter of how many items we've processed this round
- my $roundtotal = 0;
+ ## Internal counters to help us break queries into chunks if needed
+ my ($round, $roundtotal) = (0,0);
- ## Postgres-specific optimization for a single primary key:
- if ($sqltype eq 'ANY') {
- $SQL{ANY}{$tname} ||= "$self->{sqlprefix}DELETE FROM $tname WHERE $pkcols = ANY(?)";
- ## The array where we store each chunk
- my @SQL;
+ ## Array to store each chunk of SQL
+ my @chunk;
+ ## Optimization for a single primary key using ANY(?)
+ if ('ANY' eq $sqltype and ! exists $SQL{ANY}{$tname}) {
+ $SQL{ANY}{$tname} = "$self->{sqlprefix}DELETE FROM $tname WHERE $pkcols = ANY(?)";
for my $key (keys %$rows) {
- push @{$SQL[$round]} => length $key ? ([split '\0', $key, -1]) : [''];
+ push @{$chunk[$round]} => length $key ? ([split '\0', $key, -1]) : [''];
if (++$roundtotal >= $chunksize) {
$roundtotal = 0;
$round++;
}
}
- $SQL{ANYargs} = \@SQL;
+ $SQL{ANYargs} = \@chunk;
}
## Normal DELETE call with IN() clause
- elsif ($sqltype eq 'IN') {
- $SQL = sprintf '%sDELETE FROM %s WHERE %s IN (',
+ elsif ('IN' eq $sqltype and ! exists $SQL{IN}{$tname}) {
+ $SQL{IN}{$tname} = sprintf '%sDELETE FROM %s WHERE %s IN (',
$self->{sqlprefix},
$tname,
$pkcols;
- ## The array where we store each chunk
- my @SQL;
- for my $key (keys %$rows) {
- my $inner = length $key
- ? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; qq{'$_'}; } split '\0', $key, -1)
- : q{''};
- $SQL[$round] .= "($inner),";
- if (++$roundtotal >= $chunksize) {
- $roundtotal = 0;
- $round++;
- }
- }
- ## Cleanup
- for (@SQL) {
- chop;
- $_ = "$SQL $_)";
- }
- $SQL{IN} = \@SQL;
- }
- ## MySQL IN clause
- elsif ($sqltype eq 'MYIN') {
- (my $safepk = $pkcols) =~ s/\"/`/go;
- $SQL = sprintf '%sDELETE FROM %s WHERE %s IN (',
- $self->{sqlprefix},
- $tname,
- $safepk;
-
- ## The array where we store each chunk
- my @SQL;
-
- ## Quick workaround for a more standard timestamp
- if ($goat->{pkeytype}[0] =~ /timestamptz/) {
+ my $inner;
+ if ($t->{has_mysql_timestamp_issue}) {
for my $key (keys %$rows) {
- my $inner = length $key
+ $inner = length $key
? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; s/\+\d\d$//; qq{'$_'}; } split '\0', $key, -1)
: q{''};
- $SQL[$round] .= "($inner),";
+ $chunk[$round] .= "($inner),";
if (++$roundtotal >= $chunksize) {
$roundtotal = 0;
$round++;
}
else {
for my $key (keys %$rows) {
- my $inner = length $key
+ $inner = length $key
? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; qq{'$_'}; } split '\0', $key, -1)
: q{''};
- $SQL[$round] .= "($inner),";
+ $chunk[$round] .= "($inner),";
if (++$roundtotal >= $chunksize) {
$roundtotal = 0;
$round++;
}
}
## Cleanup
- for (@SQL) {
+ for (@chunk) {
chop;
- $_ = "$SQL $_)";
+ $_ = "$SQL{IN}{$tname} $_)";
+ }
+ $SQL{IN}{$tname} = \@chunk;
+ }
+
+ $t->{delete_rounds} = @chunk;
+
+ ## If we bypassed because of a cached version, use the cached delete_rounds too
+ if ('ANY' eq $sqltype) {
+ if (exists $SQL{ANYrounds}{$tname}) {
+ $t->{delete_rounds} = $SQL{ANYrounds}{$tname};
+ }
+ else {
+ $SQL{ANYrounds}{$tname} = $t->{delete_rounds};
}
- $SQL{MYIN} = \@SQL;
}
+ elsif ('IN' eq $sqltype) {
+ if (exists $SQL{INrounds}{$tname}) {
+ $t->{delete_rounds} = $SQL{INrounds}{$tname};
+ }
+ else {
+ $SQL{INrounds}{$tname} = $t->{delete_rounds};
+ }
+ }
+
+ ## Empty our internal tracking items that may have been set previously
+ $t->{delete_round} = 0;
+ delete $t->{delete_sth};
+
}
- ## Do each target in turn
- for my $t (@$deldb) {
+ ## Start the main deletion loop
+ ## The idea is to be efficient as possible by always having as many
+ ## async targets running as possible. We run one non-async at a time
+ ## before heading back to check on the asyncs.
- my $type = $t->{dbtype};
+ my $done = 0;
+ my $did_something;
+ while (!$done) {
- my $tname = $newname->{$t->{name}};
+ $did_something = 0;
- if ('postgres' eq $type) {
- my $tdbh = $t->{dbh};
+ ## Wrap up any async targets that have finished
+ for my $t (@$deldb) {
+ next if !$t->{async_active} or $t->{delete_complete};
+ if ('postgres' eq $t->{dbtype}) {
+ if ($t->{dbh}->pg_ready) {
+ ## If this was a do(), we already have the number of rows
+ if (1 == $numpks) {
+ $t->{deleted_rows} += $t->{dbh}->pg_result();
+ }
+ else {
+ $t->{dbh}->pg_result();
+ }
+ $t->{async_active} = 0;
+ }
+ }
+ ## Don't need to check for invalid types: happens on the kick off below
+ }
+
+ ## Kick off all dormant async targets
+ for my $t (@$deldb) {
+
+ ## Skip if this target does not support async, or is in the middle of a query
+ next if !$t->{does_async} or $t->{async_active} or $t->{delete_complete};
- ## Only the last will be async
- ## In most cases, this means always async
- my $count = 1==$numpks ? @{ $SQL{ANYargs} } : @{ $SQL{IN} };
- for my $loop (1..$count) {
- my $async = PG_ASYNC;
- my $pre = $count > 1 ? "/* $loop of $count */ " : '';
+ ## The actual target name
+ my $tname = $newname->{$t->{name}};
+
+ if ('postgres' eq $t->{dbtype}) {
+
+ ## Which chunk we are processing.
+ $t->{delete_round}++;
+ if ($t->{delete_round} > $t->{delete_rounds}) {
+ $t->{delete_complete} = 1;
+ next;
+ }
+ my $dbname = $t->{name};
+ $self->glog("Deleting from target $dbname.$tname. $t->{delete_round} of $t->{delete_rounds}", LOG_DEBUG);
- $self->glog("Deleting target $tname. $loop of $count", LOG_DEBUG);
+ $did_something++;
+ ## Single primary key, so delete using the ANY(?) format
if (1 == $numpks) {
- $t->{deletesth} = $tdbh->prepare("$pre$SQL{ANY}{$tname}", { pg_async => $async });
- my $res = $t->{deletesth}->execute($SQL{ANYargs}->[$loop-1]);
- $count{$t} += $res unless $async;
+ ## Use the or-equal so we only prepare this once
+ $t->{delete_sth} ||= $t->{dbh}->prepare("$SQL{ANY}{$tname}", { pg_async => PG_ASYNC });
+ $t->{delete_sth}->execute($SQL{ANYargs}->[$t->{delete_round}-1]);
}
+ ## Multiple primary keys, so delete old school via IN ((x,y),(a,b))
else {
- $count{$t} += $tdbh->do($pre.$SQL{IN}->[$loop-1], { pg_direct => 1, pg_async => $async });
- $t->{deletesth} = 0;
+ my $pre = $t->{delete_rounds} > 1 ? "/* $t->{delete_round} of $t->{delete_rounds} */ " : '';
+ ## The pg_direct tells DBD::Pg there are no placeholders, and to use PQexec directly
+ $t->{deleted_rows} += $t->{dbh}->
+ do($pre.$SQL{IN}{$tname}->[$t->{delete_round}-1], { pg_async => PG_ASYNC, pg_direct => 1 });
}
- }
- next;
+ $t->{async_active} = time;
+ } ## end postgres
+ else {
+ die qq{Do not know how to do async for type $t->{dbtype}!\n};
+ }
- } ## end postgres database
+ } ## end all async targets
- if ('mongo' eq $type) {
+ ## Kick off a single non-async target
+ for my $t (@$deldb) {
- ## Grab the collection name and store it
- $self->{collection} = $t->{dbh}->get_collection($tname);
+ ## Skip if this target is async, or has no more rounds
+ next if $t->{does_async} or $t->{delete_complete};
- ## Because we may have multi-column primary keys, and each key may need modifying,
- ## we have to put everything into an array of arrays.
- ## The first level is the primary key number, the next is the actual values
- my @delkeys = [];
+ $did_something++;
- ## The pkcolsraw variable is a simple comma-separated list of PK column names
- ## The rows variable is a hash with the PK values as keys (the values can be ignored)
+ my $type = $t->{dbtype};
- ## Binary PKs are easy: all we have to do is decode
- ## We can assume that binary PK means not a multi-column PK
- if ($goat->{hasbinarypkey}) {
- @{ $delkeys[0] } = map { decode_base64($_) } keys %$rows;
- }
- else {
+ ## The actual target name
+ my $tname = $newname->{$t->{name}};
- ## Break apart the primary keys into an array of arrays
- my @fullrow = map { length($_) ? [split '\0', $_, -1] : [''] } keys %$rows;
+ if ('mongo' eq $type) {
- ## Which primary key column we are currently using
- my $pknum = 0;
+ ## Grab the collection name and store it
+ $self->{collection} = $t->{dbh}->get_collection($tname);
- ## Walk through each column making up the primary key
- for my $realpkname (split /,/, $pkcolsraw, -1) {
+ ## Because we may have multi-column primary keys, and each key may need modifying,
+ ## we have to put everything into an array of arrays.
+ ## The first level is the primary key number, the next is the actual values
+ my @delkeys = [];
- ## Grab what type this column is
- ## We need to map non-strings to correct types as best we can
- my $type = $goat->{columnhash}{$realpkname}{ftype};
+ ## The pkcolsraw variable is a simple comma-separated list of PK column names
+ ## The rows variable is a hash with the PK values as keys (the values can be ignored)
- ## For integers, we simply force to a Perlish int
- if ($type =~ /smallint|integer|bigint/o) {
- @{ $delkeys[$pknum] } = map { int $_->[$pknum] } @fullrow;
- }
- ## Non-integer numbers get set via the strtod command from the 'POSIX' module
- elsif ($type =~ /real|double|numeric/o) {
- @{ $delkeys[$pknum] } = map { strtod $_->[$pknum] } @fullrow;
- }
- ## Boolean becomes true Perlish booleans via the 'boolean' module
- elsif ($type eq 'boolean') {
- @{ $delkeys[$pknum] } = map { $_->[$pknum] eq 't' ? true : false } @fullrow;
- }
- ## Everything else gets a direct mapping
- else {
- @{ $delkeys[$pknum] } = map { $_->[$pknum] } @fullrow;
- }
- $pknum++;
+ ## Binary PKs are easy: all we have to do is decode
+ ## We can assume that binary PK means not a multi-column PK
+ if ($goat->{hasbinarypkey}) {
+ @{ $delkeys[0] } = map { decode_base64($_) } keys %$rows;
}
- } ## end of multi-column PKs
+ else {
- ## How many items we end up actually deleting
- $count{$t} = 0;
+ ## Break apart the primary keys into an array of arrays
+ my @fullrow = map { length($_) ? [split '\0', $_, -1] : [''] } keys %$rows;
- ## We may need to batch these to keep the total message size reasonable
- my $max = keys %$rows;
- $max--;
+ ## Which primary key column we are currently using
+ my $pknum = 0;
- ## The bottom of our current array slice
- my $bottom = 0;
+ ## Walk through each column making up the primary key
+ for my $realpkname (split /,/, $pkcolsraw, -1) {
- ## This loop limits the size of our delete requests to mongodb
- MONGODEL: {
- ## Calculate the current top of the array slice
- my $top = $bottom + $chunksize;
+ ## Grab what type this column is
+ ## We need to map non-strings to correct types as best we can
+ my $type = $goat->{columnhash}{$realpkname}{ftype};
- ## Stop at the total number of rows
- $top = $max if $top > $max;
+ ## For integers, we simply force to a Perlish int
+ if ($type =~ /smallint|integer|bigint/o) {
+ @{ $delkeys[$pknum] } = map { int $_->[$pknum] } @fullrow;
+ }
+ ## Non-integer numbers get set via the strtod command from the 'POSIX' module
+ elsif ($type =~ /real|double|numeric/o) {
+ @{ $delkeys[$pknum] } = map { strtod $_->[$pknum] } @fullrow;
+ }
+ ## Boolean becomes true Perlish booleans via the 'boolean' module
+ elsif ($type eq 'boolean') {
+ @{ $delkeys[$pknum] } = map { $_->[$pknum] eq 't' ? true : false } @fullrow;
+ }
+ ## Everything else gets a direct mapping
+ else {
+ @{ $delkeys[$pknum] } = map { $_->[$pknum] } @fullrow;
+ }
+ $pknum++;
+ }
+ } ## end of multi-column PKs
- ## If we have a single key, we can use the '$in' syntax
- if ($numpks <= 1) {
- my @newarray = @{ $delkeys[0] }[$bottom..$top];
- my $result = $self->{collection}->remove(
+ ## We may need to batch these to keep the total message size reasonable
+ my $max = keys %$rows;
+ $max--;
+
+ ## The bottom of our current array slice
+ my $bottom = 0;
+
+ ## This loop limits the size of our delete requests to mongodb
+ MONGODEL: {
+ ## Calculate the current top of the array slice
+ my $top = $bottom + $chunksize;
+
+ ## Stop at the total number of rows
+ $top = $max if $top > $max;
+
+ ## If we have a single key, we can use the '$in' syntax
+ if ($numpks <= 1) {
+ my @newarray = @{ $delkeys[0] }[$bottom..$top];
+ my $result = $self->{collection}->remove(
{$pkcolsraw => { '$in' => \@newarray }}, { safe => 1 });
- $count{$t} += $result->{n};
- }
- else {
- ## For multi-column primary keys, we cannot use '$in', sadly.
- ## Thus, we will just call delete once per row
+ $t->{deleted_rows} += $result->{n};
+ }
+ else {
+ ## For multi-column primary keys, we cannot use '$in', sadly.
+ ## Thus, we will just call delete once per row
- ## Put the names into an easy to access array
- my @realpknames = split /,/, $pkcolsraw, -1;
+ ## Put the names into an easy to access array
+ my @realpknames = split /,/, $pkcolsraw, -1;
- my @find;
+ my @find;
- ## Which row we are currently processing
- my $numrows = scalar keys %$rows;
- for my $rownumber (0..$numrows-1) {
- for my $pknum (0..$numpks-1) {
- push @find => $realpknames[$pknum], $delkeys[$pknum][$rownumber];
+ ## Which row we are currently processing
+ my $numrows = scalar keys %$rows;
+ for my $rownumber (0..$numrows-1) {
+ for my $pknum (0..$numpks-1) {
+ push @find => $realpknames[$pknum], $delkeys[$pknum][$rownumber];
+ }
}
- }
- my $result = $self->{collection}->remove(
+ my $result = $self->{collection}->remove(
{ '$and' => \@find }, { safe => 1 });
- $count{$t} += $result->{n};
-
- ## We do not need to loop, as we just went 1 by 1 through the whole list
- last MONGODEL;
+ $t->{deleted_rows} += $result->{n};
- }
+ ## We do not need to loop, as we just went 1 by 1 through the whole list
+ last MONGODEL;
- ## Bail out of the loop if we've hit the max
- last MONGODEL if $top >= $max;
+ }
- ## Assign the bottom of our array slice to be above the current top
- $bottom = $top + 1;
+ ## Bail out of the loop if we've hit the max
+ last MONGODEL if $top >= $max;
- redo MONGODEL;
- }
+ ## Assign the bottom of our array slice to be above the current top
+ $bottom = $top + 1;
- $self->glog("Mongo objects removed from $tname: $count{$t}", LOG_VERBOSE);
- next;
- }
+ redo MONGODEL;
+ }
- if ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
- my $tdbh = $t->{dbh};
- for (@{ $SQL{MYIN} }) {
- ($count{$t} += $tdbh->do($_)) =~ s/0E0/0/o;
+ $self->glog("Mongo objects removed from $tname: $t->{deleted_rows}", LOG_VERBOSE);
}
- next;
- }
-
- if ('oracle' eq $type) {
- my $tdbh = $t->{dbh};
- for (@{ $SQL{IN} }) {
- ($count{$t} += $tdbh->do($_)) =~ s/0E0/0/o;
+ elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type
+ or 'oracle' eq $type or 'sqlite' eq $type) {
+ my $tdbh = $t->{dbh};
+ for (@{ $SQL{IN}{$tname} }) {
+ $t->{deleted_rows} += $tdbh->do($_);
+ }
}
- next;
- }
-
- if ('redis' eq $type) {
- ## We need to remove the entire tablename:pkey:column for each column we know about
- my $cols = $goat->{cols};
- for my $pk (keys %$rows) {
- ## If this is a multi-column primary key, change our null delimiter to a colon
- if ($goat->{numpkcols} > 1) {
- $pk =~ s{\0}{:}go;
+ elsif ('redis' eq $type) {
+ ## We need to remove the entire tablename:pkey:column for each column we know about
+ my $cols = $goat->{cols};
+ for my $pk (keys %$rows) {
+ ## If this is a multi-column primary key, change our null delimiter to a colon
+ if ($goat->{numpkcols} > 1) {
+ $pk =~ s{\0}{:}go;
+ }
+ $t->{deleted_rows} += $t->{dbh}->del("$tname:$pk");
}
- $count = $t->{dbh}->del("$tname:$pk");
}
- next;
- }
-
- if ('sqlite' eq $type) {
- my $tdbh = $t->{dbh};
- for (@{ $SQL{IN} }) {
- ($count{$t} += $tdbh->do($_)) =~ s/0E0/0/o;
+ elsif ($type =~ /flat/o) { ## same as flatpg for now
+ for (@{ $SQL{IN}{$tname} }) {
+ print {$t->{filehandle}} qq{$_;\n\n};
+ }
+ $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
}
- next;
- }
-
- if ($type =~ /flat/o) { ## same as flatpg for now
- for (@{ $SQL{IN} }) {
- print {$t->{filehandle}} qq{$_;\n\n};
+ else {
+ die qq{No support for database type "$type" yet!};
}
- $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
- next;
- }
- die qq{No support for database type "$type" yet!};
- }
+ $t->{delete_complete} = 1;
- ## Final cleanup as needed (e.g. process async results)
- for my $t (@$deldb) {
- my $type = $t->{dbtype};
-
- if ('postgres' eq $type) {
+ ## Only one target at a time, please: we need to check on the asyncs
+ last;
- my $tdbh = $t->{dbh};
+ } ## end async target
- ## Wrap up all the async queries
- ($count{$t} += $tdbh->pg_result()) =~ s/0E0/0/o;
-
- ## Call finish if this was a statement handle (as opposed to a do)
- if ($t->{deletesth}) {
- $t->{deletesth}->finish();
+ ## If we did nothing this round, and there are no asyncs running, we are done.
+ ## Otherwise, we will wait for the oldest async to finish
+ if (!$did_something) {
+ if (! grep { $_->{async_active} } @$deldb) {
+ $done = 1;
+ }
+ else {
+ ## Since nothing else is going on, let's wait for the oldest async to finish
+ my $t = ( sort { $a->{async_active} > $b->{async_active} } grep { $_->{async_active} } @$deldb)[0];
+ if (1 == $numpks) {
+ $t->{deleted_rows} += $t->{dbh}->pg_result();
+ }
+ else {
+ $t->{dbh}->pg_result();
+ }
+ $t->{async_active} = 0;
}
- delete $t->{deletesth};
}
- }
+
+ } ## end of main deletion loop
## Generate our final deletion counts
- $count = 0;
+ my $count = 0;
for my $t (@$deldb) {
## We do not delete from certain types of targets
my $tname = $newname->{$t->{name}};
- $count += $count{$t};
- $self->glog(qq{Rows deleted from $t->{name}.$tname: $count{$t}}, LOG_VERBOSE);
+ $count += $t->{deleted_rows};
+ $self->glog(qq{Rows deleted from $t->{name}.$tname: $t->{deleted_rows}}, LOG_VERBOSE);
}
return $count;