Cleanup and partial rewrite of push_rows()
authorGreg Sabino Mullane <greg@endpoint.com>
Sun, 23 Aug 2015 14:54:43 +0000 (10:54 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Sun, 23 Aug 2015 14:54:43 +0000 (10:54 -0400)
A major change is to use ANY(?) when possible, and to avoid do any quoting.

Bucardo.pm

index d4373b72c474aea82984cdf67011050a23e31037..33086258256e6271a70b8fa610f0d4b8276fd2d3 100644 (file)
@@ -9576,8 +9576,11 @@ sub push_rows {
 
     my ($self,$rows,$Table,$Sync,$SourceDB,$TargetDB) = @_;
 
+    ## Build a list of all PK values to feed to IN clauses
+    ## This is an array in case we go over $chunksize
+    my @pkvals = [];
+
     ## If fullcopy, $rows will be the scalar 'fullcopy' instead of a hashref
-    ## In which case, we create a dummy hashref
     my $fullcopy = 0;
     if (! ref $rows) {
         if ($rows eq 'fullcopy') {
@@ -9588,142 +9591,145 @@ sub push_rows {
             die "Invalid rows passed to push_rows: $rows\n";
         }
         $rows = {};
+        $pkvals[0] = 'fullcopy';
     }
 
-    ## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
-    if (ref $TargetDB ne 'ARRAY') {
-        $TargetDB = [$TargetDB];
+    ## This will be zero for fullcopy of course
+    my $total_rows = keys %$rows;
+
+    if (!$total_rows and !$fullcopy) {
+        return 0; ## Can happen on a truncation
     }
 
-    ## This will be zero for fullcopy of course
-    my $total = keys %$rows;
+    my $syncname = $Sync->{name} || '';
+
+    ## We may want to change the target table based on the customname table
+    ## It is up to the caller to populate these, even if the syncname is ''
+    my $customname = $Table->{newname}{$syncname} || {};
 
-    ## Total number of rows written
-    $count = 0;
+    ## We may want to change the SELECT based on the customcols table
+    my $customcols = $Table->{newcols}{$syncname} || {};
 
-    my $syncname = $Sync->{name};
-    my $newname = $Table->{newname}{$syncname};
     my $numpks = $Table->{numpkcols};
 
     ## As with delete, we may break this into more than one step
     ## Should only be a factor for very large numbers of keys
     my $chunksize = $config{statement_chunk_size} || $default_statement_chunk_size;
 
-    ## Build a list of all PK values to feed to IN clauses
-    my @pkvals;
-    my $round = 0;
-    my $roundtotal = 0;
-    for my $key (keys %$rows) {
-        my $inner = length $key
-            ? (join ',' => map { s{\'}{''}go; s{\\}{\\\\}go; qq{'$_'}; } split '\0', $key, -1)
-            : q{''};
-        push @{ $pkvals[$round] ||= [] } => $numpks > 1 ? "($inner)" : $inner;
-        if (++$roundtotal >= $chunksize) {
-            $roundtotal = 0;
-            $round++;
+    ## If there is only one primary key, and a sane number of rows, we can use '= ANY(?)'
+    if (! $fullcopy) {
+        if ($numpks == 1 and $total_rows < $chunksize) {
+            $pkvals[0] = 'allrows';
         }
-    }
+        ## Otherwise, we push our completed SQL into bins
+        else {
+            my $pkrounds = 1;
+            my $current_row = 1;
 
-    ## Example: 1234, 221
-    ## Example MCPK: ('1234','Don''t Stop','2008-01-01'),('221','foobar','2008-11-01')
+            ## Loop through each row and create the needed SQL fragment
+            for my $key (keys %$rows) {
 
-    ## This can happen if we truncated but had no delta activity
-    return 0 if (! $pkvals[0] or ! length $pkvals[0]->[0] ) and ! $fullcopy;
+                push @{ $pkvals[$pkrounds-1] ||= [] } => split '\0', $key, -1;
 
-    ## Put dummy data into @pkvals if using fullcopy
-    if ($fullcopy) {
-        push @pkvals => ['fullcopy'];
+                ## Make sure our SQL statement doesn't grow too large
+                if (++$current_row > $chunksize) {
+                    $current_row = 1;
+                    $pkrounds++;
+                }
+            }
+        }
     }
 
-    ## Get ready to export from the source
-    ## This may have multiple versions depending on the customcols table
-    my $newcols = $Table->{newcols}{$syncname} || {};
+    ## Make sure TargetDB is an arrayref (may come as a single TargetDB object)
+    if (ref $TargetDB ne 'ARRAY') {
+        $TargetDB = [$TargetDB];
+    }
 
-    ## Walk through and grab which SQL is needed for each target
-    ## Cache this earlier on - controller?
 
+    ## Figure out the different SELECT clauses, and assign targets to them
     my %srccmd;
     for my $t (@$TargetDB ) {
 
-        ## The SELECT clause we use (may be empty)
-        my $clause = $newcols->{$t->{name}};
+        ## The SELECT clause we use (usually an empty string unless customcols is being used)
+        my $select_clause = $customcols->{$t->{name}} || '';
 
         ## Associate this target with this clause
-        push @{$srccmd{$clause}} => $t;
+        push @{$srccmd{$select_clause}} => $t;
     }
 
-    my $tablename = "$Table->{safeschema}.$Table->{safetable}";
-    my $fromdbh = $SourceDB->{dbh};
+    my $source_tablename = "$Table->{safeschema}.$Table->{safetable}";
+    my $sourcedbh = $SourceDB->{dbh};
+
+    ## The total number of source rows returned
+    my $total_source_rows = 0;
 
-    ## Loop through each source command and push it out to all targets
-    ## that are associated with it
-    for my $clause (sort keys %srccmd) {
+    ## Loop through each select command and push it out to all targets that are associated with it
+    for my $select_clause (sort keys %srccmd) {
 
         ## Build the clause (cache) and kick it off
-        my $SELECT = $clause || 'SELECT *';
+        my $SELECT = $select_clause || 'SELECT *';
 
-        ## Prepare each target in turn
-        for my $t (@{ $srccmd{$clause} }) {
+        ## Prepare each target that is using this select clause
+        for my $target (@{ $srccmd{$select_clause} }) {
 
             ## Internal name of this target
-            my $targetname = $t->{name};
+            my $targetname = $target->{name};
 
-            ## Name of the table we are pushing to on this target
-            my $tname = $newname->{$targetname};
+            ## The actual target table name. Depends on dbtype and customname table entries
+            my $target_tablename = $customname->{$targetname};
 
             ## The columns we are pushing to, both as an arrayref and a CSV:
             my $cols = $Table->{tcolumns}{$SELECT};
-            my $columnlist = $t->{does_sql} ?
-                ('(' . (join ',', map { $t->{dbh}->quote_identifier($_) } @$cols) . ')')
+            my $columnlist = $target->{does_sql} ?
+                ('(' . (join ',', map { $target->{dbh}->quote_identifier($_) } @$cols) . ')')
               : ('(' . (join ',', map { $_ } @$cols) . ')');
 
-            my $type = $t->{dbtype};
+            my $type = $target->{dbtype};
 
             ## Use columnlist below so we never have to worry about the order
             ## of the columns on the target
 
             if ('postgres' eq $type) {
-                my $tgtcmd = "$self->{sqlprefix}COPY $tname$columnlist FROM STDIN";
-                $t->{dbh}->do($tgtcmd);
+                my $tgtcmd = "$self->{sqlprefix}COPY $target_tablename$columnlist FROM STDIN";
+                $target->{dbh}->do($tgtcmd);
             }
             elsif ('flatpg' eq $type) {
-                print {$t->{filehandle}} "COPY $tname$columnlist FROM STDIN;\n";
-                $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+                print {$target->{filehandle}} "COPY $target_tablename$columnlist FROM STDIN;\n";
+                $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
             }
             elsif ('flatsql' eq $type) {
-                print {$t->{filehandle}} "INSERT INTO $tname$columnlist VALUES\n";
-                $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE);
+                print {$target->{filehandle}} "INSERT INTO $target_tablename$columnlist VALUES\n";
+                $self->glog(qq{Appended to flatfile "$target->{filename}"}, LOG_VERBOSE);
             }
             elsif ('mongo' eq $type) {
-                $self->{collection} = $t->{dbh}->get_collection($tname);
+                $self->{collection} = $target->{dbh}->get_collection($target_tablename);
             }
             elsif ('redis' eq $type) {
-                ## No prep needed, other than to reset our count of changes
-                $t->{redis} = 0;
+                ## No setup needed
             }
             elsif ('mysql' eq $type or 'drizzle' eq $type or 'mariadb' eq $type) {
-                my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
                 $tgtcmd .= '?,' x @$cols;
                 $tgtcmd =~ s/,$/)/o;
-                $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
             }
             elsif ('oracle' eq $type) {
-                my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
                 $tgtcmd .= '?,' x @$cols;
                 $tgtcmd =~ s/,$/)/o;
-                $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
             }
             elsif ('sqlite' eq $type) {
-                my $tgtcmd = "INSERT INTO $tname$columnlist VALUES (";
+                my $tgtcmd = "INSERT INTO $target_tablename$columnlist VALUES (";
                 $tgtcmd .= '?,' x @$cols;
                 $tgtcmd =~ s/,$/)/o;
-                $t->{sth} = $t->{dbh}->prepare($tgtcmd);
+                $target->{sth} = $target->{dbh}->prepare($tgtcmd);
             }
             else {
                 die qq{No support for database type "$type" yet!};
             }
 
-        } ## end preparing each target for this clause
+        } ## end preparing each target for this select clause
 
         my $loop = 1;
         my $pcount = @pkvals;
@@ -9731,72 +9737,88 @@ sub push_rows {
 
         ## Loop through each chunk of primary keys to copy over
         for my $pk_values (@pkvals) {
-            my $pkvs = join ',' => @{ $pk_values };
 
-            ## Message to prepend to the statement if chunking
-            my $pre = $pcount <= 1 ? '' : "/* $loop of $pcount */ ";
-            $loop++;
+            ## Start streaming rows from the source
+            $self->glog(qq{Copying from $fromname.$source_tablename}, LOG_VERBOSE);
 
-            ## Kick off the copy on the source
-            $self->glog(qq{${pre}Copying from $fromname.$tablename}, LOG_VERBOSE);
-            my $srccmd = sprintf '%s%sCOPY (%s FROM %s %s) TO STDOUT%s',
-                $pre,
+            ## If we are doing a small batch of single primary keys, use ANY
+            ## If we are doing fullcopy, leave out the WHERE clause completely
+            if (! ref $pk_values ) {
+                my $srccmd = sprintf '%sCOPY (%s FROM %s %s) TO STDOUT%s',
                 $self->{sqlprefix},
                 $SELECT,
-                $tablename,
-                $fullcopy ? '' : " WHERE $Table->{pkcols} IN ($pkvs)",
+                $source_tablename,
+                $pk_values eq 'fullcopy' ? '' : " WHERE $Table->{pklist} = ANY(?)",
                 $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
-            $fromdbh->do($srccmd);
-
-            my $buffer = '';
-
-            ## Loop through all changed rows on the source, and push to the target(s)
-            my $multirow = 0;
-
-            ## If in fullcopy mode, we don't know how many rows will get copied,
-            ## so we count as we go along
-            if ($fullcopy) {
-                $total = 0;
+                my $srcsth = $sourcedbh->prepare($srccmd);
+                $pk_values eq 'fullcopy' ? $srcsth->execute()
+                    : $srcsth->execute( [ keys %$rows ]);
             }
+            else {
+                my $baseq = '?';
+                if ($numpks > 1) {
+                    $baseq = '?,' x $numpks;
+                    $baseq =~ s/(.+?).$/\($1\)/;
+                }
+                my $number_values = @$pk_values;
+                my $placeholders = "$baseq," x ($number_values / $numpks);
+                chop $placeholders;
+                my $srccmd = sprintf '%s%sCOPY (%s FROM %s WHERE %s IN (%s)) TO STDOUT%s',
+                    "/* $loop of $pcount */ ",
+                    $self->{sqlprefix},
+                    $SELECT,
+                    $source_tablename,
+                    $Table->{pkeycols},
+                    $placeholders,
+                    $Sync->{copyextra} ? " $Sync->{copyextra}" : '';
+                my $srcsth = $sourcedbh->prepare($srccmd);
+                $srcsth->execute( @$pk_values );
+                $loop++;
+            }
+
+            ## How many rows we read from the source database this chunk
+            my $source_rows = 0;
 
             ## Loop through each row output from the source, storing it in $buffer
-            while ($fromdbh->pg_getcopydata($buffer) >= 0) {
-
-                $total++ if $fullcopy;
+            ## Future optimzation: slurp in X rows at a time, then process them
+            my $buffer = '';
+            while ($sourcedbh->pg_getcopydata($buffer) >= 0) {
 
-                ## For each target using this particular COPY statement
-                for my $t (@{ $srccmd{$clause} }) {
+                $source_rows++;
 
-                    my $type = $t->{dbtype};
-                    my $cols = $Table->{tcolumns}{$SELECT};
-                    my $tname = $newname->{$t->{name}};
+                ## For each target using this particular SELECT clause
+                for my $target (@{ $srccmd{$select_clause} }) {
 
-                    chomp $buffer;
+                    my $type = $target->{dbtype};
 
                     ## For Postgres, we simply do COPY to COPY
                     if ('postgres' eq $type) {
-                        $t->{dbh}->pg_putcopydata("$buffer\n");
+                        $target->{dbh}->pg_putcopydata($buffer);
                     }
                     ## For flat files destined for Postgres, just do a tab-delimited dump
                     elsif ('flatpg' eq $type) {
-                        print {$t->{filehandle}} "$buffer\n";
+                        print {$target->{filehandle}} $buffer;
                     }
                     ## For other flat files, make a standard VALUES list
                     elsif ('flatsql' eq $type) {
-                        if ($multirow++) {
-                            print {$t->{filehandle}} ",\n";
+                        chomp $buffer;
+                        if ($source_rows > 1) {
+                            print {$target->{filehandle}} ",\n";
                         }
-                        print {$t->{filehandle}} '(' .
-                            (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
+                        print {$target->{filehandle}} '(' .
+                             (join ',' => map { $self->{masterdbh}->quote($_) } split /\t/, $buffer, -1) . ')';
                     }
                     ## For Mongo, do some mongomagic
                     elsif ('mongo' eq $type) {
                         ## Have to map these values back to their names
+                        chomp $buffer;
                         my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
 
+                        my $targetcols = $Table->{tcolumns}{$SELECT};
+
                         ## Our object consists of the primary keys, plus all other fields
                         my $object = {};
-                        for my $cname (@{ $cols }) {
+                        for my $cname (@{ $targetcols }) {
                             $object->{$cname} = shift @cols;
                         }
                         ## Coerce non-strings into different objects
@@ -9814,7 +9836,7 @@ sub push_rows {
                                 }
                             }
                             elsif ($Table->{columnhash}{$key}{ftype} =~ /real|double|numeric/o) {
-                                $object->{$key} = strtod($object->{$key});
+                                $object->{$key} = strtod($object->{$key}); 
                             }
                         }
                         $self->{collection}->insert($object, { safe => 1 });
@@ -9825,18 +9847,21 @@ sub push_rows {
                             or 'drizzle' eq $type
                             or 'oracle' eq $type
                             or 'sqlite' eq $type) {
+                        chomp $buffer;
                         my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
+                        my $targetcols = $Table->{tcolumns}{$SELECT};
                         for my $cindex (0..@cols) {
                             next unless defined $cols[$cindex];
-                            if ($Table->{columnhash}{$cols->[$cindex]}{ftype} eq 'boolean') {
+                            if ($Table->{columnhash}{$targetcols->[$cindex]}{ftype} eq 'boolean') {
                                 # BOOLEAN support is inconsistent, but almost everyone will coerce 1/0 to TRUE/FALSE
                                 $cols[$cindex] = ( $cols[$cindex] =~ /^[1ty]/i )? 1 : 0;
                             }
                         }
-                        $count += $t->{sth}->execute(@cols);
+                        $target->{sth}->execute(@cols);
                     }
                     elsif ('redis' eq $type) {
                         ## We are going to set a Redis hash, in which the key is "tablename:pkeyvalue"
+                        chomp $buffer;
                         my @colvals = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/, $buffer, -1;
                         my @pkey;
                         for (1 .. $Table->{numpkcols}) {
@@ -9846,66 +9871,78 @@ sub push_rows {
                         ## Build a list of non-null key/value pairs to set in the hash
                         my @add;
                         $i = $Table->{numpkcols} - 1;
+                        my $targetcols = $Table->{tcolumns}{$SELECT};
                         for my $val (@colvals) {
                             $i++;
                             next if ! defined $val;
-                            push @add, $cols->[$i], $val;
+                            push @add, $targetcols->[$i], $val;
                         }
 
-                        $t->{dbh}->hmset("$tname:$pkeyval", @add);
-                        $count++;
-                        $t->{redis}++;
+                        my $target_tablename = $customname->{$target->{name}};
+                        $target->{dbh}->hmset("$target_tablename:$pkeyval", @add);
                     }
 
                 } ## end each target
 
             } ## end each row pulled from the source
 
+            $total_source_rows += $source_rows;
+
         } ## end each pklist
 
         ## Workaround for DBD::Pg bug
         ## Once we require a minimum version of 2.18.1 or better, we can remove this!
-        if ($self->{dbdpgversion} < 21801) {
-            $fromdbh->do('SELECT 1');
+        if ($SourceDB->{dbtype} eq 'postgres' and $self->{dbdpgversion} < 21801) {
+            $sourcedbh->do('SELECT 1');
         }
 
         ## Perform final cleanups for each target
-        for my $t (@{ $srccmd{$clause} }) {
+        for my $target (@{ $srccmd{$select_clause} }) {
 
-            my $type = $t->{dbtype};
+            my $type = $target->{dbtype};
 
-            my $tname = $newname->{$t->{name}};
+            my $tname = $customname->{$target->{name}};
 
             if ('postgres' eq $type) {
-                my $dbh = $t->{dbh};
+                my $dbh = $target->{dbh};
                 $dbh->pg_putcopyend();
                 ## Same bug as above
                 if ($self->{dbdpgversion} < 21801) {
                     $dbh->do('SELECT 1');
                 }
-                $self->glog(qq{Rows copied to $t->{name}.$tname: $total}, LOG_VERBOSE);
-                $count += $total;
+                $self->glog(qq{Rows copied to $target->{name}.$tname: $total_rows}, LOG_VERBOSE);
                 ## If this table is set to makedelta, add rows to bucardo.delta to simulate the
                 ##   normal action of a trigger and add a row to bucardo.track to indicate that
                 ##   it has already been replicated here.
-                my $d = $Sync->{db}{ $t->{name} };
-                if (!$fullcopy and $d->{does_makedelta}{$tablename}) {
-                    $self->glog("Using makedelta to populate delta and track tables for $t->{name}.$tname", LOG_VERBOSE);
-                    my $vals;
-                    if ($numpks == 1) {
-                        $vals = join ',', map { "($_)" } map { @{ $_ } } @pkvals;
-                    }
-                    else {
-                        $vals = join ',', map { @{ $_ } } @pkvals;
-                    }
+                my $d = $Sync->{db}{ $target->{name} };
+                if (!$fullcopy and $d->{does_makedelta}{$source_tablename} ) {
+
+                    $self->glog("Using makedelta to populate delta and track tables for $target->{name}.$tname", LOG_VERBOSE);
+
                     my $cols = join ',' => @{ $Table->{qpkey} };
 
-                    $dbh->do(qq{
-                        INSERT INTO bucardo.$Table->{deltatable} ($cols)
-                        VALUES $vals
-                    });
+                    for my $pk_values (@pkvals) {
+
+                        my $baseq = '?';
+                        if ($numpks > 1) {
+                            $baseq = '?,' x $numpks;
+                            chop $baseq;
+                        }
+                        my $number_values = ref $pk_values ? @$pk_values : keys %$rows;
+                        my $placeholders = "($baseq)," x ($number_values / $numpks);
+                        chop $placeholders;
+
+                        my $SQL = sprintf 'INSERT INTO bucardo.%s (%s) VALUES %s',
+                            $Table->{deltatable},
+                                $cols,
+                                    $placeholders;
+                        $self->glog("GOT $placeholders leading to $SQL", LOG_DEBUG);
+                        my $sth = $dbh->prepare($SQL);
+                        $sth->execute(ref $pk_values ? @$pk_values : (keys %$rows));
+                    }
+
                     # Make sure we track it - but only if this sync already acts as a source!
-                    if ($t->{role} eq 'source') {
+                    if ($target->{role} eq 'source') {
                         $dbh->do(qq{
                             INSERT INTO bucardo.$Table->{tracktable}
                             VALUES (NOW(), ?)
@@ -9914,7 +9951,7 @@ sub push_rows {
 
                     ## We want to send a kick signal to other syncs that are using this table
                     ## However, we do not want to kick unless they are set to autokick and active
-
+                    ## This works even if we do not have a real syncs, as $syncname will be ''
                     $self->glog('Signalling other syncs that this table has changed', LOG_DEBUG);
                     if (! exists $self->{kick_othersyncs}{$syncname}{$tname}) {
                         $SQL = 'SELECT name FROM sync WHERE herd IN (SELECT herd FROM herdmap WHERE goat IN (SELECT id FROM goat WHERE schemaname=? AND tablename = ?)) AND name <> ? AND autokick AND status = ?';
@@ -9929,19 +9966,22 @@ sub push_rows {
                 }
             }
             elsif ('flatpg' eq $type) {
-                print {$t->{filehandle}} "\\\.\n\n";
+                print {$target->{filehandle}} "\\\.\n\n";
             }
             elsif ('flatsql' eq $type) {
-                print {$t->{filehandle}} ";\n\n";
+                print {$target->{filehandle}} ";\n\n";
             }
             elsif ('redis' eq $type) {
-                $self->glog(qq{Rows copied to Redis $t->{name}.$tname:<pkeyvalue>: $t->{redis}}, LOG_VERBOSE);
+                $self->glog(qq{Rows copied to Redis $target->{name}.$tname:<pkeyvalue>: $total_source_rows}, LOG_VERBOSE);
+            }
+            else {
+                ## Nothing to be done for mongo, mysql, mariadb, sqlite, oracle
             }
         }
 
     } ## end of each clause in the source command list
 
-    return $count;
+    return $total_source_rows;
 
 } ## end of push_rows