Rewrite the delete_rows function.
authorGreg Sabino Mullane <greg@endpoint.com>
Thu, 10 Apr 2014 18:00:08 +0000 (14:00 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Thu, 10 Apr 2014 18:00:08 +0000 (14:00 -0400)
Loop through and make sure we have the maximum number of async targets running at all times.
Moved much of the logic to per-target attribs.

Bucardo.pm

index a6cc81d51771749932b746aa6c95385a4ca4f654..d6b00bdf0f7153dc73a613443ec1f399a1f65ef4 100644 (file)
@@ -2064,6 +2064,12 @@ sub start_kid {
         ## Can it do truncate?
         $x->{does_truncate} = 0;
 
+        ## Does it support asynchronous queries well?
+        $x->{does_async} = 0;
+
+        ## Does it have good support for ANY()?
+        $x->{does_ANY_clause} = 0;
+
         ## Can it do savepoints (and roll them back)?
         $x->{does_savepoints} = 0;
 
@@ -2079,6 +2085,9 @@ sub start_kid {
         ## List of tables in this database that need makedelta inserts
         $x->{needs_makedelta} = {};
 
+        ## Does it have that annoying timestamp +dd bug?
+        $x->{has_mysql_timestamp_issue} = 0;
+
         ## Start clumping into groups and adjust the attributes
 
         ## Postgres
@@ -2089,6 +2098,8 @@ sub start_kid {
             $x->{does_savepoints} = 1;
             $x->{does_cascade}    = 1;
             $x->{does_limit}      = 1;
+            $x->{does_async}      = 1;
+            $x->{does_ANY_clause} = 1;
         }
 
         ## Drizzle
@@ -2098,6 +2109,7 @@ sub start_kid {
             $x->{does_truncate}   = 1;
             $x->{does_savepoints} = 1;
             $x->{does_limit}      = 1;
+            $x->{has_mysql_timestamp_issue} = 1;
         }
 
         ## MongoDB
@@ -2112,6 +2124,7 @@ sub start_kid {
             $x->{does_truncate}   = 1;
             $x->{does_savepoints} = 1;
             $x->{does_limit}      = 1;
+            $x->{has_mysql_timestamp_issue} = 1;
         }
 
         ## Oracle
@@ -5010,7 +5023,6 @@ sub connect_database {
             $dsn .= join ';', map {
                 ($_ eq 'dbservice' ? 'service' : $_ ) . "=$d->{$_}";
             } grep { defined $d->{$_} and length $d->{$_} } qw/dbname dbservice/;
-            $self->glog("DDSSNN=$dsn");
         }
         elsif ('drizzle' eq $dbtype) {
             $dsn = "dbi:drizzle:database=$d->{dbname}";
@@ -5079,7 +5091,6 @@ sub connect_database {
 
     $self->glog("DSN: $dsn", LOG_NORMAL) if exists $config{log_level};
 
-warn "DSN=$dsn\n";
     $dbh = DBI->connect
         (
          $dsn,
@@ -8312,21 +8323,19 @@ sub delete_rows {
     my $pkcolsraw = $goat->{pkeycolsraw};
     my $numpks = $goat->{numpkcols};
 
-    ## Keep track of exact number of rows deleted from each target
-    my %count;
+    ## Have we already truncated this table? If yes, skip and reset the flag
+    if (exists $goat->{truncatewinner}) {
+        return 0;
+    }
 
-    ## Allow for non-arrays by forcing to an array
+    ## Ensure the target database argument is always an array
     if (ref $deldb ne 'ARRAY') {
         $deldb = [$deldb];
     }
 
+    ## We may be going from one table to another - this is the mapping hash
     my $newname = $goat->{newname}{$self->{syncname}};
 
-    ## Have we already truncated this table? If yes, skip and reset the flag
-    if (exists $goat->{truncatewinner}) {
-        return 0;
-    }
-
     ## Are we truncating?
     if (exists $self->{truncateinfo}{$S}{$T}) {
 
@@ -8343,6 +8352,7 @@ sub delete_rows {
             if ('postgres' eq $type) {
                 my $tdbh = $t->{dbh};
                 $tdbh->do("TRUNCATE table $tname", { pg_async => PG_ASYNC });
+                $t->{async_active} = time;
             } ## end postgres database
             ## For all other SQL databases, we simply truncate
             elsif ($x->{does_sql}) {
@@ -8353,11 +8363,9 @@ sub delete_rows {
             elsif ('mongo' eq $type) {
                 $self->{collection} = $t->{dbh}->get_collection($tname);
                 $self->{collection}->remove({}, { safe => 1} );
-                next;
             }
             ## For Redis, do nothing
             elsif ('redis' eq $type) {
-                next;
             }
             ## For flatfiles, write out a basic truncate statement
             elsif ($type =~ /flat/o) {
@@ -8370,10 +8378,10 @@ sub delete_rows {
 
         ## Final cleanup for each target
         for my $t (@$deldb) {
-            my $type = $t->{dbtype};
-            if ('postgres' eq $type) {
+            if ('postgres' eq $t->{dbtype}) {
                 ## Wrap up all the async truncate call
                 $t->{dbh}->pg_result();
+                $t->{async_active} = 0;
             }
         }
 
@@ -8384,6 +8392,7 @@ sub delete_rows {
     ## The number of items before we break it into a separate statement
     ## This is inexact, as we don't know how large each key is,
     ## but should be good enough as long as not set too high.
+    ## For now, all targets have the same chunksize
     my $chunksize = $config{statement_chunk_size} || 8_000;
 
     ## Setup our deletion SQL as needed
@@ -8392,96 +8401,57 @@ sub delete_rows {
 
         my $type = $t->{dbtype};
 
+        ## Track the number of rows actually deleted from this target
+        $t->{deleted_rows} = 0;
+
+        ## Set to true when all rounds completed
+        $t->{delete_complete} = 0;
+
         ## No special preparation for mongo or redis
         next if $type =~ /mongo|redis/;
 
-        ## Set the type of SQL we are using: IN vs ANY
-        my $sqltype = '';
-        if ('postgres' eq $type) {
-            $sqltype = (1 == $numpks) ? 'ANY' : 'IN';
-        }
-        elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
-            $sqltype = 'MYIN';
-        }
-        elsif ('oracle' eq $type) {
-            $sqltype = 'IN';
-        }
-        elsif ('sqlite' eq $type) {
-            $sqltype = 'IN';
-        }
-        elsif ($type =~ /flatpg/o) {
-            ## XXX Worth the trouble to allow building an ANY someday for flatpg?
-            $sqltype = 'IN';
-        }
-        elsif ($type =~ /flat/o) {
-            $sqltype = 'IN';
+        ## Set the type of SQL we are using: IN vs ANY. Default is IN
+        my $sqltype = 'IN';
+
+        ## Use of ANY is greatly preferred, but can only use if the
+        ## underlying database supports it, and if we have a single column pk
+        if ($t->{does_ANY_clause} and 1==$numpks) {
+            $sqltype = 'ANY';
         }
 
+        ## The actual target table name: may differ from the source!
         my $tname = $newname->{$t->{name}};
 
-        ## We may want to break this up into separate rounds if large
-        my $round = 0;
-
-        ## Internal counter of how many items we've processed this round
-        my $roundtotal = 0;
+        ## Internal counters to help us break queries into chunks if needed
+        my ($round, $roundtotal) = (0,0);
 
-        ## Postgres-specific optimization for a single primary key:
-        if ($sqltype eq 'ANY') {
-            $SQL{ANY}{$tname} ||= "$self->{sqlprefix}DELETE FROM $tname WHERE $pkcols = ANY(?)";
-            ## The array where we store each chunk
-            my @SQL;
+        ## Array to store each chunk of SQL
+        my @chunk;
+        ## Optimization for a single primary key using ANY(?)
+        if ('ANY' eq $sqltype and ! exists $SQL{ANY}{$tname}) {
+            $SQL{ANY}{$tname} = "$self->{sqlprefix}DELETE FROM $tname WHERE $pkcols = ANY(?)";
             for my $key (keys %$rows) {
-                push @{$SQL[$round]} => length $key ? ([split '\0', $key, -1]) : [''];
+                push @{$chunk[$round]} => length $key ? ([split '\0', $key, -1]) : [''];
                 if (++$roundtotal >= $chunksize) {
                     $roundtotal = 0;
                     $round++;
                 }
             }
-            $SQL{ANYargs} = \@SQL;
+            $SQL{ANYargs} = \@chunk;
         }
         ## Normal DELETE call with IN() clause
-        elsif ($sqltype eq 'IN') {
-            $SQL = sprintf '%sDELETE FROM %s WHERE %s IN (',
+        elsif ('IN' eq $sqltype and ! exists $SQL{IN}{$tname}) {
+            $SQL{IN}{$tname} = sprintf '%sDELETE FROM %s WHERE %s IN (',
                 $self->{sqlprefix},
                 $tname,
                 $pkcols;
-            ## The array where we store each chunk
-            my @SQL;
-            for my $key (keys %$rows) {
-                my $inner = length $key
-                    ? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; qq{'$_'}; } split '\0', $key, -1)
-                    : q{''};
-                $SQL[$round] .= "($inner),";
-                if (++$roundtotal >= $chunksize) {
-                    $roundtotal = 0;
-                    $round++;
-                }
-            }
-            ## Cleanup
-            for (@SQL) {
-                chop;
-                $_ = "$SQL $_)";
-            }
-            $SQL{IN} = \@SQL;
-        }
-        ## MySQL IN clause
-        elsif ($sqltype eq 'MYIN') {
-            (my $safepk = $pkcols) =~ s/\"/`/go;
-            $SQL = sprintf '%sDELETE FROM %s WHERE %s IN (',
-                $self->{sqlprefix},
-                $tname,
-                $safepk;
-
-            ## The array where we store each chunk
-            my @SQL;
-
-            ## Quick workaround for a more standard timestamp
-            if ($goat->{pkeytype}[0] =~ /timestamptz/) {
+            my $inner;
+            if ($t->{has_mysql_timestamp_issue}) {
                 for my $key (keys %$rows) {
-                    my $inner = length $key
+                    $inner = length $key
                         ? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; s/\+\d\d$//; qq{'$_'}; } split '\0', $key, -1)
                         : q{''};
-                    $SQL[$round] .= "($inner),";
+                    $chunk[$round] .= "($inner),";
                     if (++$roundtotal >= $chunksize) {
                         $roundtotal = 0;
                         $round++;
@@ -8490,10 +8460,10 @@ sub delete_rows {
             }
             else {
                 for my $key (keys %$rows) {
-                    my $inner = length $key
+                    $inner = length $key
                         ? (join ',' => map { s/\'/''/go; s{\\}{\\\\}; qq{'$_'}; } split '\0', $key, -1)
                         : q{''};
-                    $SQL[$round] .= "($inner),";
+                    $chunk[$round] .= "($inner),";
                     if (++$roundtotal >= $chunksize) {
                         $roundtotal = 0;
                         $round++;
@@ -8501,235 +8471,296 @@ sub delete_rows {
                 }
             }
             ## Cleanup
-            for (@SQL) {
+            for (@chunk) {
                 chop;
-                $_ = "$SQL $_)";
+                $_ = "$SQL{IN}{$tname} $_)";
+            }
+            $SQL{IN}{$tname} = \@chunk;
+        }
+
+        $t->{delete_rounds} = @chunk;
+
+        ## If we bypassed because of a cached version, use the cached delete_rounds too
+        if ('ANY' eq $sqltype) {
+            if (exists $SQL{ANYrounds}{$tname}) {
+                $t->{delete_rounds} = $SQL{ANYrounds}{$tname};
+            }
+            else {
+                $SQL{ANYrounds}{$tname} = $t->{delete_rounds};
             }
-            $SQL{MYIN} = \@SQL;
         }
+        elsif ('IN' eq $sqltype) {
+            if (exists $SQL{INrounds}{$tname}) {
+                $t->{delete_rounds} = $SQL{INrounds}{$tname};
+            }
+            else {
+                $SQL{INrounds}{$tname} = $t->{delete_rounds};
+            }
+        }
+
+        ## Empty our internal tracking items that may have been set previously
+        $t->{delete_round} = 0;
+        delete $t->{delete_sth};
+
     }
 
-    ## Do each target in turn
-    for my $t (@$deldb) {
+    ## Start the main deletion loop
+    ## The idea is to be efficient as possible by always having as many
+    ## async targets running as possible. We run one non-async at a time
+    ## before heading back to check on the asyncs.
 
-        my $type = $t->{dbtype};
+    my $done = 0;
+    my $did_something;
+    while (!$done) {
 
-        my $tname = $newname->{$t->{name}};
+        $did_something = 0;
 
-        if ('postgres' eq $type) {
-            my $tdbh = $t->{dbh};
+        ## Wrap up any async targets that have finished
+        for my $t (@$deldb) {
+            next if !$t->{async_active} or $t->{delete_complete};
+            if ('postgres' eq $t->{dbtype}) {
+                if ($t->{dbh}->pg_ready) {
+                    ## If this was a do(), we already have the number of rows
+                    if (1 == $numpks) {
+                        $t->{deleted_rows} += $t->{dbh}->pg_result();
+                    }
+                    else {
+                        $t->{dbh}->pg_result();
+                    }
+                    $t->{async_active} = 0;
+                }
+            }
+            ## Don't need to check for invalid types: happens on the kick off below
+        }
+
+        ## Kick off all dormant async targets
+        for my $t (@$deldb) {
+
+            ## Skip if this target does not support async, or is in the middle of a query
+            next if !$t->{does_async} or $t->{async_active} or $t->{delete_complete};
 
-            ## Only the last will be async
-            ## In most cases, this means always async
-            my $count = 1==$numpks ? @{ $SQL{ANYargs} } : @{ $SQL{IN} };
-            for my $loop (1..$count) {
-                my $async = PG_ASYNC;
-                my $pre = $count > 1 ? "/* $loop of $count */ " : '';
+            ## The actual target name
+            my $tname = $newname->{$t->{name}};
+
+            if ('postgres' eq $t->{dbtype}) {
+
+                ## Which chunk we are processing.
+                $t->{delete_round}++;
+                if ($t->{delete_round} > $t->{delete_rounds}) {
+                    $t->{delete_complete} = 1;
+                    next;
+                }
+                my $dbname = $t->{name};
+                $self->glog("Deleting from target $dbname.$tname. $t->{delete_round} of $t->{delete_rounds}", LOG_DEBUG);
 
-                $self->glog("Deleting target $tname. $loop of $count", LOG_DEBUG);
+                $did_something++;
 
+                ## Single primary key, so delete using the ANY(?) format
                 if (1 == $numpks) {
-                    $t->{deletesth} = $tdbh->prepare("$pre$SQL{ANY}{$tname}", { pg_async => $async });
-                    my $res = $t->{deletesth}->execute($SQL{ANYargs}->[$loop-1]);
-                    $count{$t} += $res unless $async;
+                    ## Use the or-equal so we only prepare this once
+                    $t->{delete_sth} ||= $t->{dbh}->prepare("$SQL{ANY}{$tname}", { pg_async => PG_ASYNC });
+                    $t->{delete_sth}->execute($SQL{ANYargs}->[$t->{delete_round}-1]);
                 }
+                ## Multiple primary keys, so delete old school via IN ((x,y),(a,b))
                 else {
-                    $count{$t} += $tdbh->do($pre.$SQL{IN}->[$loop-1], { pg_direct => 1, pg_async => $async });
-                    $t->{deletesth} = 0;
+                    my $pre = $t->{delete_rounds} > 1 ? "/* $t->{delete_round} of $t->{delete_rounds} */ " : '';
+                    ## The pg_direct tells DBD::Pg there are no placeholders, and to use PQexec directly
+                    $t->{deleted_rows} += $t->{dbh}->
+                        do($pre.$SQL{IN}{$tname}->[$t->{delete_round}-1], { pg_async => PG_ASYNC, pg_direct => 1 });
                 }
-            }
 
-            next;
+                $t->{async_active} = time;
+            } ## end postgres
+            else {
+                die qq{Do not know how to do async for type $t->{dbtype}!\n};
+            }
 
-        } ## end postgres database
+        } ## end all async targets
 
-        if ('mongo' eq $type) {
+        ## Kick off a single non-async target
+        for my $t (@$deldb) {
 
-            ## Grab the collection name and store it
-            $self->{collection} = $t->{dbh}->get_collection($tname);
+            ## Skip if this target is async, or has no more rounds
+            next if $t->{does_async} or $t->{delete_complete};
 
-            ## Because we may have multi-column primary keys, and each key may need modifying,
-            ## we have to put everything into an array of arrays.
-            ## The first level is the primary key number, the next is the actual values
-            my @delkeys = [];
+            $did_something++;
 
-            ## The pkcolsraw variable is a simple comma-separated list of PK column names
-            ## The rows variable is a hash with the PK values as keys (the values can be ignored)
+            my $type = $t->{dbtype};
 
-            ## Binary PKs are easy: all we have to do is decode
-            ## We can assume that binary PK means not a multi-column PK
-            if ($goat->{hasbinarypkey}) {
-                @{ $delkeys[0] } = map { decode_base64($_) } keys %$rows;
-            }
-            else {
+            ## The actual target name
+            my $tname = $newname->{$t->{name}};
 
-                ## Break apart the primary keys into an array of arrays
-                my @fullrow = map { length($_) ? [split '\0', $_, -1] : [''] } keys %$rows;
+            if ('mongo' eq $type) {
 
-                ## Which primary key column we are currently using
-                my $pknum = 0;
+                ## Grab the collection name and store it
+                $self->{collection} = $t->{dbh}->get_collection($tname);
 
-                ## Walk through each column making up the primary key
-                for my $realpkname (split /,/, $pkcolsraw, -1) {
+                ## Because we may have multi-column primary keys, and each key may need modifying,
+                ## we have to put everything into an array of arrays.
+                ## The first level is the primary key number, the next is the actual values
+                my @delkeys = [];
 
-                    ## Grab what type this column is
-                    ## We need to map non-strings to correct types as best we can
-                    my $type = $goat->{columnhash}{$realpkname}{ftype};
+                ## The pkcolsraw variable is a simple comma-separated list of PK column names
+                ## The rows variable is a hash with the PK values as keys (the values can be ignored)
 
-                    ## For integers, we simply force to a Perlish int
-                    if ($type =~ /smallint|integer|bigint/o) {
-                        @{ $delkeys[$pknum] } = map { int $_->[$pknum] } @fullrow;
-                    }
-                    ## Non-integer numbers get set via the strtod command from the 'POSIX' module
-                    elsif ($type =~ /real|double|numeric/o) {
-                        @{ $delkeys[$pknum] } = map { strtod $_->[$pknum] } @fullrow;
-                    }
-                    ## Boolean becomes true Perlish booleans via the 'boolean' module
-                    elsif ($type eq 'boolean') {
-                        @{ $delkeys[$pknum] } = map { $_->[$pknum] eq 't' ? true : false } @fullrow;
-                    }
-                    ## Everything else gets a direct mapping
-                    else {
-                        @{ $delkeys[$pknum] } = map { $_->[$pknum] } @fullrow;
-                    }
-                    $pknum++;
+                ## Binary PKs are easy: all we have to do is decode
+                ## We can assume that binary PK means not a multi-column PK
+                if ($goat->{hasbinarypkey}) {
+                    @{ $delkeys[0] } = map { decode_base64($_) } keys %$rows;
                 }
-            } ## end of multi-column PKs
+                else {
 
-            ## How many items we end up actually deleting
-            $count{$t} = 0;
+                    ## Break apart the primary keys into an array of arrays
+                    my @fullrow = map { length($_) ? [split '\0', $_, -1] : [''] } keys %$rows;
 
-            ## We may need to batch these to keep the total message size reasonable
-            my $max = keys %$rows;
-            $max--;
+                    ## Which primary key column we are currently using
+                    my $pknum = 0;
 
-            ## The bottom of our current array slice
-            my $bottom = 0;
+                    ## Walk through each column making up the primary key
+                    for my $realpkname (split /,/, $pkcolsraw, -1) {
 
-            ## This loop limits the size of our delete requests to mongodb
-          MONGODEL: {
-                ## Calculate the current top of the array slice
-                my $top = $bottom + $chunksize;
+                        ## Grab what type this column is
+                        ## We need to map non-strings to correct types as best we can
+                        my $type = $goat->{columnhash}{$realpkname}{ftype};
 
-                ## Stop at the total number of rows
-                $top = $max if $top > $max;
+                        ## For integers, we simply force to a Perlish int
+                        if ($type =~ /smallint|integer|bigint/o) {
+                            @{ $delkeys[$pknum] } = map { int $_->[$pknum] } @fullrow;
+                        }
+                        ## Non-integer numbers get set via the strtod command from the 'POSIX' module
+                        elsif ($type =~ /real|double|numeric/o) {
+                            @{ $delkeys[$pknum] } = map { strtod $_->[$pknum] } @fullrow;
+                        }
+                        ## Boolean becomes true Perlish booleans via the 'boolean' module
+                        elsif ($type eq 'boolean') {
+                            @{ $delkeys[$pknum] } = map { $_->[$pknum] eq 't' ? true : false } @fullrow;
+                        }
+                        ## Everything else gets a direct mapping
+                        else {
+                            @{ $delkeys[$pknum] } = map { $_->[$pknum] } @fullrow;
+                        }
+                        $pknum++;
+                    }
+                } ## end of multi-column PKs
 
-                ## If we have a single key, we can use the '$in' syntax
-                if ($numpks <= 1) {
-                    my @newarray = @{ $delkeys[0] }[$bottom..$top];
-                    my $result = $self->{collection}->remove(
+                ## We may need to batch these to keep the total message size reasonable
+                my $max = keys %$rows;
+                $max--;
+
+                ## The bottom of our current array slice
+                my $bottom = 0;
+
+                ## This loop limits the size of our delete requests to mongodb
+              MONGODEL: {
+                    ## Calculate the current top of the array slice
+                    my $top = $bottom + $chunksize;
+
+                    ## Stop at the total number of rows
+                    $top = $max if $top > $max;
+
+                    ## If we have a single key, we can use the '$in' syntax
+                    if ($numpks <= 1) {
+                        my @newarray = @{ $delkeys[0] }[$bottom..$top];
+                        my $result = $self->{collection}->remove(
                         {$pkcolsraw => { '$in' => \@newarray }}, { safe => 1 });
-                    $count{$t} += $result->{n};
-                }
-                else {
-                    ## For multi-column primary keys, we cannot use '$in', sadly.
-                    ## Thus, we will just call delete once per row
+                        $t->{deleted_rows} += $result->{n};
+                    }
+                    else {
+                        ## For multi-column primary keys, we cannot use '$in', sadly.
+                        ## Thus, we will just call delete once per row
 
-                    ## Put the names into an easy to access array
-                    my @realpknames = split /,/, $pkcolsraw, -1;
+                        ## Put the names into an easy to access array
+                        my @realpknames = split /,/, $pkcolsraw, -1;
 
-                    my @find;
+                        my @find;
 
-                    ## Which row we are currently processing
-                    my $numrows = scalar keys %$rows;
-                    for my $rownumber (0..$numrows-1) {
-                        for my $pknum (0..$numpks-1) {
-                            push @find => $realpknames[$pknum], $delkeys[$pknum][$rownumber];
+                        ## Which row we are currently processing
+                        my $numrows = scalar keys %$rows;
+                        for my $rownumber (0..$numrows-1) {
+                            for my $pknum (0..$numpks-1) {
+                                push @find => $realpknames[$pknum], $delkeys[$pknum][$rownumber];
+                            }
                         }
-                    }
 
-                    my $result = $self->{collection}->remove(
+                        my $result = $self->{collection}->remove(
                         { '$and' => \@find }, { safe => 1 });
 
-                    $count{$t} += $result->{n};
-
-                    ## We do not need to loop, as we just went 1 by 1 through the whole list
-                    last MONGODEL;
+                        $t->{deleted_rows} += $result->{n};
 
-                }
+                        ## We do not need to loop, as we just went 1 by 1 through the whole list
+                        last MONGODEL;
 
-                ## Bail out of the loop if we've hit the max
-                last MONGODEL if $top >= $max;
+                    }
 
-                ## Assign the bottom of our array slice to be above the current top
-                $bottom = $top + 1;
+                    ## Bail out of the loop if we've hit the max
+                    last MONGODEL if $top >= $max;
 
-                redo MONGODEL;
-            }
+                    ## Assign the bottom of our array slice to be above the current top
+                    $bottom = $top + 1;
 
-            $self->glog("Mongo objects removed from $tname: $count{$t}", LOG_VERBOSE);
-            next;
-        }
+                    redo MONGODEL;
+                }
 
-        if ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
-            my $tdbh = $t->{dbh};
-            for (@{ $SQL{MYIN} }) {
-                ($count{$t} += $tdbh->do($_)) =~ s/0E0/0/o;
+                $self->glog("Mongo objects removed from $tname: $t->{deleted_rows}", LOG_VERBOSE);
             }
-            next;
-        }
-
-        if ('oracle' eq $type) {
-            my $tdbh = $t->{dbh};
-            for (@{ $SQL{IN} }) {
-                ($count{$t} += $tdbh->do($_)) =~ s/0E0/0/o;
+            elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type
+                       or 'oracle' eq $type or 'sqlite' eq $type) {
+                my $tdbh = $t->{dbh};
+                for (@{ $SQL{IN}{$tname} }) {
+                    $t->{deleted_rows} += $tdbh->do($_);
+                }
             }
-            next;
-        }
-
-        if ('redis' eq $type) {
-            ## We need to remove the entire tablename:pkey:column for each column we know about
-            my $cols = $goat->{cols};
-            for my $pk (keys %$rows) {
-                ## If this is a multi-column primary key, change our null delimiter to a colon
-                if ($goat->{numpkcols} > 1) {
-                    $pk =~ s{\0}{:}go;
+            elsif ('redis' eq $type) {
+                ## We need to remove the entire tablename:pkey:column for each column we know about
+                my $cols = $goat->{cols};
+                for my $pk (keys %$rows) {
+                    ## If this is a multi-column primary key, change our null delimiter to a colon
+                    if ($goat->{numpkcols} > 1) {
+                        $pk =~ s{\0}{:}go;
+                    }
+                    $t->{deleted_rows} += $t->{dbh}->del("$tname:$pk");
                 }
-                $count = $t->{dbh}->del("$tname:$pk");
             }
-            next;
-        }
-
-        if ('sqlite' eq $type) {
-            my $tdbh = $t->{dbh};
-            for (@{ $SQL{IN} }) {
-                ($count{$t} += $tdbh->do($_)) =~ s/0E0/0/o;
+            elsif ($type =~ /flat/o) { ## same as flatpg for now
+                for (@{ $SQL{IN}{$tname} }) {
+                    print {$t->{filehandle}} qq{$_;\n\n};
+                }
+                $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
             }
-            next;
-        }
-
-        if ($type =~ /flat/o) { ## same as flatpg for now
-            for (@{ $SQL{IN} }) {
-                print {$t->{filehandle}} qq{$_;\n\n};
+            else {
+                die qq{No support for database type "$type" yet!};
             }
-            $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
-            next;
-        }
 
-        die qq{No support for database type "$type" yet!};
-    }
+            $t->{delete_complete} = 1;
 
-    ## Final cleanup as needed (e.g. process async results)
-    for my $t (@$deldb) {
-        my $type = $t->{dbtype};
-
-        if ('postgres' eq $type) {
+            ## Only one target at a time, please: we need to check on the asyncs
+            last;
 
-            my $tdbh = $t->{dbh};
+        } ## end async target
 
-            ## Wrap up all the async queries
-            ($count{$t} += $tdbh->pg_result()) =~ s/0E0/0/o;
-
-            ## Call finish if this was a statement handle (as opposed to a do)
-            if ($t->{deletesth}) {
-                $t->{deletesth}->finish();
+        ## If we did nothing this round, and there are no asyncs running, we are done.
+        ## Otherwise, we will wait for the oldest async to finish
+        if (!$did_something) {
+            if (! grep { $_->{async_active} } @$deldb) {
+                $done = 1;
+            }
+            else {
+                ## Since nothing else is going on, let's wait for the oldest async to finish
+                my $t = ( sort { $a->{async_active} > $b->{async_active} } grep { $_->{async_active} } @$deldb)[0];
+                if (1 == $numpks) {
+                    $t->{deleted_rows} += $t->{dbh}->pg_result();
+                }
+                else {
+                    $t->{dbh}->pg_result();
+                }
+                $t->{async_active} = 0;
             }
-            delete $t->{deletesth};
         }
-    }
+
+    } ## end of main deletion loop
 
     ## Generate our final deletion counts
-    $count = 0;
+    my $count = 0;
     for my $t (@$deldb) {
 
         ## We do not delete from certain types of targets
@@ -8737,8 +8768,8 @@ sub delete_rows {
 
         my $tname = $newname->{$t->{name}};
 
-        $count += $count{$t};
-        $self->glog(qq{Rows deleted from $t->{name}.$tname: $count{$t}}, LOG_VERBOSE);
+        $count += $t->{deleted_rows};
+        $self->glog(qq{Rows deleted from $t->{name}.$tname: $t->{deleted_rows}}, LOG_VERBOSE);
     }
 
     return $count;