Support for Drizzle. Untested.
authorGreg Sabino Mullane <greg@endpoint.com>
Thu, 30 Jun 2011 13:25:32 +0000 (09:25 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Thu, 30 Jun 2011 13:25:32 +0000 (09:25 -0400)
Bucardo.pm
MANIFEST
bucardo
bucardo.schema
t/20-drizzle.t [new file with mode: 0644]

index dd9ade2a5dcf10f71b558972aeff9a7bb8be7750..5764d40d1747f0ec3dfec824708d9331f0ea6384 100644 (file)
@@ -1797,6 +1797,7 @@ sub start_kid {
         push @dbs_dbi => $dbname
             if $x->{dbtype} eq 'postgres'
             or $x->{dbtype} eq 'mysql'
+            or $x->{dbtype} eq 'drizzle'
             or $x->{dbtype} eq 'oracle';
 
         push @dbs_connectable => $dbname
@@ -2469,6 +2470,10 @@ sub start_kid {
                 $self->glog(qq{Set db "$dbname" to serializable}, LOG_DEBUG);
             }
 
+            if ($x->{dbtype} eq 'drizzle') {
+                ## Drizzle does not appear to have anything to control this yet
+            }
+
             if ($x->{dbtype} eq 'oracle') {
                 $x->{dbh}->do('SET TRANSACTION READ WRITE');
                 $x->{dbh}->do('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE');
@@ -2516,6 +2521,12 @@ sub start_kid {
                         $x->{dbh}->do("LOCK TABLE $com");
                     }
 
+                    if ($x->{dbtype} eq 'drizzle') {
+                        my $com = "$g->{safetable} WRITE";
+                        $self->glog("Database $dbname: Locking table $com", LOG_TERSE);
+                        $x->{dbh}->do("LOCK TABLE $com");
+                    }
+
                     if ($x->{dbtype} eq 'oracle') {
                         my $com = "$g->{safeschema}.$g->{safetable} IN EXCLUSIVE MODE";
                         $self->glog("Database $dbname: Locking table $com", LOG_TERSE);
@@ -4117,6 +4128,9 @@ sub connect_database {
         elsif ('mysql' eq $dbtype) {
             $dsn = "dbi:mysql:database=$d->{dbname}";
         }
+        elsif ('drizzle' eq $dbtype) {
+            $dsn = "dbi:drizzle:database=$d->{dbname}";
+        }
         elsif ('oracle' eq $dbtype) {
             $dsn = "dbi:Oracle:$d->{dbname}";
         }
@@ -5217,11 +5231,8 @@ sub validate_sync {
             ## Mongo is skipped because it can create things on the fly
             next if $self->{sdb}{$dbname}{dbtype} =~ /mongo/o;
 
-            ## MySQL is skipped for now, but should be added later
-            next if $self->{sdb}{$dbname}{dbtype} =~ /mysql/o;
-
-            ## Oracle is skipped for now, but should be added later
-            next if $self->{sdb}{$dbname}{dbtype} =~ /oracle/o;
+            ## MySQL/Drizzle/Oracle is skipped for now, but should be added later
+            next if $self->{sdb}{$dbname}{dbtype} =~ /mysql|drizzle|oracle/o;
 
             ## Respond to ping here and now for very impatient watchdog programs
             $maindbh->commit();
@@ -6936,8 +6947,8 @@ sub delete_rows {
                 next;
             }
 
-            ## For MySQL, we simply truncate the table name without the schema
-            if ('mysql' eq $type) {
+            ## For MySQL and Drizzle, we simply truncate the table name without the schema
+            if ('mysql' eq $type or 'drizzle' eq $type) {
                 my $tdbh = $t->{dbh};
                 $tdbh->do("TRUNCATE TABLE $tname");
             }
@@ -6984,7 +6995,7 @@ sub delete_rows {
         if ('postgres' eq $type) {
             $sqltype = (1 == $numpks) ? 'ANY' : 'PGIN';
         }
-        elsif ('mysql' eq $type) {
+        elsif ('mysql' eq $type or 'drizzle' eq $type) {
             $sqltype = 'MYIN';
         }
         elsif ('oracle' eq $type) {
@@ -7122,7 +7133,7 @@ sub delete_rows {
             next;
         }
 
-        if ('mysql' eq $type) {
+        if ('mysql' eq $type or 'drizzle' eq $type) {
             my $tdbh = $t->{dbh};
             ($count{$t} = $tdbh->do($SQL{IN})) =~ s/0E0/0/o;
             next;
@@ -7249,7 +7260,7 @@ sub push_rows {
         elsif ('mongo' eq $type) {
             $self->{collection} = $t->{dbh}->get_collection($tname);
         }
-        elsif ('mysql' eq $type) {
+        elsif ('mysql' eq $type or 'drizzle' eq $type) {
             my $tgtcmd = "INSERT INTO $tname VALUES (";
             $tgtcmd .= '?,' x keys %{ $goat->{columnhash} };
             $tgtcmd =~ s/,$/)/o;
@@ -7321,8 +7332,8 @@ sub push_rows {
                 }
                 $self->{collection}->insert($object, { safe => 1 });
             }
-            ## For MySQL and Oracle, do some basic INSERTs
-            elsif ('mysql' eq $type or 'oracle' eq $type) {
+            ## For MySQL, Drizzle, and Oracle, do some basic INSERTs
+            elsif ('mysql' eq $type or 'drizzle' eq $type or 'oracle' eq $type) {
                 my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/ => $buffer;
                 $count += $t->{sth}->execute(@cols);
             }
index 5e12460321c45229117e37c09af5f0708c70ce35..56c924e09023a3efd2b32fd155d6b430fdac25c0 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -33,6 +33,7 @@ t/02-bctl-db.t
 t/02-bctl-dbg.t
 t/09-uniqueconstraint.t
 t/15-star.t
+t/20-drizzle.t
 t/20-mongo.t
 t/20-mysql.t
 t/20-oracle.t
diff --git a/bucardo b/bucardo
index 2ebbd69e8f8b2779d4bf0ec6571faa5b12c7216f..aee0b107b7d9f248b66fd87ecae4c914803860b0 100755 (executable)
--- a/bucardo
+++ b/bucardo
@@ -805,6 +805,7 @@ sub add_database {
     $vals->{dbtype} =~ s/postgres.*/postgres/io;
     $vals->{dbtype} =~ s/mongo.*/mongo/io;
     $vals->{dbtype} =~ s/mysql.*/mysql/io;
+    $vals->{dbtype} =~ s/drizzle.*/drizzle/io;
     $vals->{dbtype} =~ s/oracle.*/oracle/io;
 
     ## Attempt to insert this into the database
@@ -832,7 +833,7 @@ sub add_database {
         last TESTCONN if 'flatfile' eq $vals->{dbtype};
 
         ## Must have a valid type
-        if ($vals->{dbtype} !~ /^(?:postgres|mongo|mysql|oracle)$/) {
+        if ($vals->{dbtype} !~ /^(?:postgres|mongo|mysql|drizzle|oracle)$/) {
             die qq{Unknown database type: $vals->{dbtype}\n};
         }
 
@@ -950,6 +951,33 @@ sub add_database {
 
         } ## end mysql
 
+        if ('drizzle' eq $vals->{dbtype}) {
+
+            my ($type,$dsn,$user,$pass) = split /\n/ => $dbconn;
+
+            my $found = 0;
+            eval {
+                require DBD::drizzle;
+                $found = 1;
+            };
+            if (!$found) {
+                die "Cannot add unless the DBD::drizzle module is available\n";
+            }
+
+            my $testdbh;
+            $found = 0;
+            eval {
+                $testdbh = DBI->connect($dsn, $user, $pass, {AutoCommit=>0,RaiseError=>1,PrintError=>0});
+                $found = 1;
+            };
+            if (! $found) {
+                die "Drizzle connection test failed. You can force add it with the --force argument. Error was: $@\n\n";
+            }
+
+            $testdbh->disconnect();
+
+        } ## end drizzle
+
         if ('oracle' eq $vals->{dbtype}) {
 
             my ($type,$dsn,$user,$pass) = split /\n/ => $dbconn;
@@ -1379,6 +1407,15 @@ sub list_databases {
                 $showhost,
                 $showport;
         }
+        if ($dbtype eq 'drizzle') {
+            my $showport = (length $info->{dbport} and $info->{dbport} != 3306)
+                ? " --port $info->{dbport}" : '';
+            printf "Conn: drizzle -u %s -D %s%s%s",
+                $info->{dbuser},
+                $info->{dbname},
+                $showhost,
+                $showport;
+        }
         if ($dbtype eq 'oracle') {
             printf "Conn: sqlplus %s%s",
                 $info->{dbuser},
@@ -4509,6 +4546,7 @@ sub process_args {
         $arg{type} =~ s/postgres.*/postgres/io;
         $arg{type} =~ s/mongo.*/mongo/io;
         $arg{type} =~ s/mysql.*/mysql/io;
+        $arg{type} =~ s/drizzle.*/drizzle/io;
         $arg{type} =~ s/oracle.*/oracle/io;
     }
 
index 021f4227981c85d37222df77a5fa1b528da7422e..bab47646aa800fd1860326e01d2676d8837a5d1c 100644 (file)
@@ -611,6 +611,7 @@ AS $bc$
 ## Postgres: a connection string, username, password, and attribs
 ## Mongo: "foo: bar" style connection information, one per line
 ## MySQL: a connection string, username, and password
+## Drizzle: a connection string, username, and password
 ## Oracle: a connection string, username, and password
 
 use strict;
@@ -691,6 +692,21 @@ if ($dbtype eq 'mysql') {
 
 } ## end mysql
 
+if ($dbtype eq 'drizzle') {
+
+    length $db{name} or elog(ERROR, qq{Database name is mandatory\n});
+    length $db{user} or elog(ERROR, qq{Database username is mandatory\n});
+
+    my $connstring = "dbi:drizzle:database=$db{name}";
+    $db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= '';
+    length $db{host} and $connstring .= ";host=$db{host}";
+    length $db{port} and $connstring .= ";port=$db{port}";
+    length $db{conn} and $connstring .= ";$db{conn}";
+
+    return "$dbtype\n$connstring\n$db{user}\n$db{pass}";
+
+} ## end drizzle
+
 if ($dbtype eq 'oracle') {
 
     ## We should loosen this up somewhere
@@ -1210,6 +1226,9 @@ for my $dbname (sort { ($db{$b}{role} eq 'source') <=> ($db{$a}{role} eq 'source
     ## Skip if this is mysql
     next if $db{$dbname}{dbtype} eq 'mysql';
 
+    ## Skip if this is drizzle
+    next if $db{$dbname}{dbtype} eq 'drizzle';
+
     ## Skip if this is oracle
     next if $db{$dbname}{dbtype} eq 'oracle';
 
diff --git a/t/20-drizzle.t b/t/20-drizzle.t
new file mode 100644 (file)
index 0000000..80bdd3b
--- /dev/null
@@ -0,0 +1,353 @@
+#!/usr/bin/env perl
+# -*-mode:cperl; indent-tabs-mode: nil-*-
+
+## Test using Drizzle as a database target
+
+use 5.008003;
+use strict;
+use warnings;
+use Data::Dumper;
+use lib 't','.';
+use DBD::Pg;
+use Test::More;
+use MIME::Base64;
+
+use vars qw/ $bct $dbhX $dbhA $dbhB $dbhC $dbhD $res $command $t %pkey $SQL %sth %sql/;
+
+## Must have the DBD::drizzle module
+my $evalok = 0;
+eval {
+    require DBD::drizzle;
+    $evalok = 1;
+};
+if (!$evalok) {
+       plan (skip_all =>  'Cannot test Drizzle unless the Perl module DBD::drizzle is installed');
+}
+
+## Drizzle must be up and running
+$evalok = 0;
+my $dbh;
+my $dbuser = 'root';
+eval {
+    $dbh = DBI->connect('dbi:drizzle:database=test', $dbuser, '',
+                         {AutoCommit=>1, PrintError=>0, RaiseError=>1});
+    $evalok = 1;
+};
+if (!$evalok) {
+       plan (skip_all =>  "Cannot test Drizzle as we cannot connect to a running Drizzle database: $@");
+}
+
+use BucardoTesting;
+
+## For now, remove the bytea table type as we don't have full Drizzle support yet
+delete $tabletype{bucardo_test8};
+
+my $numtabletypes = keys %tabletype;
+plan tests => 119;
+
+## Drop the Drizzle database if it exists
+my $dbname = 'bucardo_test';
+eval {
+    $dbh->do("DROP DATABASE $dbname");
+};
+## Create the Drizzle database
+$dbh->do("CREATE DATABASE $dbname");
+
+## Reconnect to the new database
+$dbh = DBI->connect("dbi:drizzle:database=$dbname", $dbuser, '',
+                    {AutoCommit=>1, PrintError=>0, RaiseError=>1});
+
+## Create one table for each table type
+## For now, we use the same data types as MySQL
+for my $table (sort keys %tabletype) {
+
+    my $pkeyname = $table =~ /test5/ ? q{`id space`} : 'id';
+    my $pkindex = $table =~ /test2/ ? '' : 'PRIMARY KEY';
+    $SQL = qq{
+            CREATE TABLE $table (
+                $pkeyname    $tabletypemysql{$table} NOT NULL $pkindex};
+    $SQL .= $table =~ /X/ ? "\n)" : qq{,
+                data1 VARCHAR(100)           NULL,
+                inty  SMALLINT               NULL,
+                bite1 VARBINARY(999)         NULL,
+                bite2 VARBINARY(999)         NULL,
+                email VARCHAR(100)           NULL UNIQUE
+            )
+            };
+
+    $dbh->do($SQL);
+
+    if ($table =~ /test2/) {
+        $dbh->do("ALTER TABLE $table ADD CONSTRAINT multipk PRIMARY KEY ($pkeyname,data1)");
+    }
+
+}
+
+$bct = BucardoTesting->new() or BAIL_OUT "Creation of BucardoTesting object failed\n";
+$location = 'drizzle';
+
+pass("*** Beginning drizzle tests");
+
+END {
+    $bct and $bct->stop_bucardo($dbhX);
+    $dbhX and  $dbhX->disconnect();
+    $dbhA and $dbhA->disconnect();
+    $dbhB and $dbhB->disconnect();
+    $dbhC and $dbhC->disconnect();
+}
+
+## Get Postgres database A and B and C created
+$dbhA = $bct->repopulate_cluster('A');
+$dbhB = $bct->repopulate_cluster('B');
+$dbhC = $bct->repopulate_cluster('C');
+
+## Create a bucardo database, and install Bucardo into it
+$dbhX = $bct->setup_bucardo('A');
+
+## Tell Bucardo about these databases
+
+## Three Postgres databases will be source, source, and target
+for my $name (qw/ A B C /) {
+    $t = "Adding database from cluster $name works";
+    my ($dbuser,$dbport,$dbhost) = $bct->add_db_args($name);
+    $command = "bucardo add db $name dbname=bucardo_test user=$dbuser port=$dbport host=$dbhost";
+    $res = $bct->ctl($command);
+    like ($res, qr/Added database "$name"/, $t);
+}
+
+$t = 'Adding drizzle database Q works';
+$command =
+"bucardo add db Q dbname=$dbname type=drizzle dbuser=$dbuser";
+$res = $bct->ctl($command);
+like ($res, qr/Added database "Q"/, $t);
+
+## Teach Bucardo about all pushable tables, adding them to a new herd named "therd"
+$t = q{Adding all tables on the master works};
+$command =
+"bucardo add tables all db=A herd=therd pkonly";
+$res = $bct->ctl($command);
+like ($res, qr/Creating herd: therd.*New tables added: \d/s, $t);
+
+## Add all sequences, and add them to the newly created herd
+$t = q{Adding all sequences on the master works};
+$command =
+"bucardo add sequences all db=A herd=therd";
+$res = $bct->ctl($command);
+like ($res, qr/New sequences added: \d/, $t);
+
+## Create a new database group
+$t = q{Created a new database group};
+$command =
+"bucardo add dbgroup qx A:source B:source C Q";
+$res = $bct->ctl($command);
+like ($res, qr/Created database group "qx"/, $t);
+
+## Create a new sync
+$t = q{Created a new sync};
+$command =
+"bucardo add sync drizzle herd=therd dbs=qx ping=false";
+$res = $bct->ctl($command);
+like ($res, qr/Added sync "drizzle"/, $t);
+
+## Create a second sync, solely for multi-sync interaction issues
+$bct->ctl('bucardo add dbgroup t1 A:source B C');
+$bct->ctl('bucardo add sync tsync1 herd=therd dbs=t1 ping=false status=inactive');
+
+## Start up Bucardo with these new syncs
+$bct->restart_bucardo($dbhX);
+
+## Get the statement handles ready for each table type
+for my $table (sort keys %tabletype) {
+
+    $pkey{$table} = $table =~ /test5/ ? q{"id space"} : 'id';
+
+    ## INSERT
+    for my $x (1..6) {
+        $SQL = $table =~ /X/
+            ? "INSERT INTO $table($pkey{$table}) VALUES (?)"
+                : "INSERT INTO $table($pkey{$table},data1,inty) VALUES (?,'foo',$x)";
+        $sth{insert}{$x}{$table}{A} = $dbhA->prepare($SQL);
+        if ('BYTEA' eq $tabletype{$table}) {
+            $sth{insert}{$x}{$table}{A}->bind_param(1, undef, {pg_type => PG_BYTEA});
+        }
+    }
+
+    ## SELECT
+    $sql{select}{$table} = "SELECT inty FROM $table ORDER BY $pkey{$table}";
+    $table =~ /X/ and $sql{select}{$table} =~ s/inty/$pkey{$table}/;
+
+    ## DELETE ALL
+    $SQL = "DELETE FROM $table";
+    $sth{deleteall}{$table}{A} = $dbhA->prepare($SQL);
+
+    ## DELETE ONE
+    $SQL = "DELETE FROM $table WHERE inty = ?";
+    $sth{deleteone}{$table}{A} = $dbhA->prepare($SQL);
+
+    ## TRUNCATE
+    $SQL = "TRUNCATE TABLE $table";
+    $sth{truncate}{$table}{A} = $dbhA->prepare($SQL);
+    ## UPDATE
+    $SQL = "UPDATE $table SET inty = ?";
+    $sth{update}{$table}{A} = $dbhA->prepare($SQL);
+}
+
+## Add one row per table type to A
+for my $table (keys %tabletype) {
+    my $type = $tabletype{$table};
+    my $val1 = $val{$type}{1};
+    $sth{insert}{1}{$table}{A}->execute($val1);
+}
+
+## Before the commit on A, B and C should be empty
+for my $table (sort keys %tabletype) {
+    my $type = $tabletype{$table};
+    $t = qq{B has not received rows for table $table before A commits};
+    $res = [];
+    bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+    bc_deeply($res, $dbhC, $sql{select}{$table}, $t);
+}
+
+## Commit, then kick off the sync
+$dbhA->commit();
+$bct->ctl('bucardo kick drizzle 0');
+$bct->ctl('bucardo kick drizzle 0');
+
+## Check B and C for the new rows
+for my $table (sort keys %tabletype) {
+
+    my $type = $tabletype{$table};
+    $t = qq{Row with pkey of type $type gets copied to B};
+
+    $res = [[1]];
+    bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+    bc_deeply($res, $dbhC, $sql{select}{$table}, $t);
+}
+
+## Check that Drizzle has the new rows
+for my $table (sort keys %tabletype) {
+    $t = "Drizzle table $table has correct number of rows after insert";
+    $SQL = "SELECT * FROM $table";
+    my $sth = $dbh->prepare($SQL);
+    my $count = $sth->execute();
+    is ($count, 1, $t);
+
+    $t = "Drizzle table $table has correct entries";
+    my $info = $sth->fetchall_arrayref({})->[0];
+    my $type = $tabletype{$table};
+    my $id = $val{$type}{1};
+    my $pkeyname = $table =~ /test5/ ? 'id space' : 'id';
+
+    ## For now, binary is stored in escaped form, so we skip this one
+    next if $table =~ /test8/;
+
+    ## Datetime has no time zone thingy at the end
+    $tabletypemysql{$table} =~ /DATETIME/ and $id =~ s/\+.*//;
+
+    is_deeply(
+        $info,
+        {
+            $pkeyname => $id,
+            inty => 1,
+            email => undef,
+            bite1 => undef,
+            bite2 => undef,
+            data1 => 'foo',
+        },
+
+        $t);
+}
+
+## Update each row
+for my $table (keys %tabletype) {
+    $sth{update}{$table}{A}->execute(42);
+}
+$dbhA->commit();
+$bct->ctl('bucardo kick drizzle 0');
+
+for my $table (keys %tabletype) {
+    $t = "Drizzle table $table has correct number of rows after update";
+    $SQL = "SELECT * FROM $table";
+    my $sth = $dbh->prepare($SQL);
+    my $count = $sth->execute();
+    is ($count, 1, $t);
+
+    $t = "Drizzle table $table has updated value";
+    my $info = $sth->fetchall_arrayref({})->[0];
+    is ($info->{inty}, 42, $t);
+}
+
+## Delete each row
+for my $table (keys %tabletype) {
+    $sth{deleteall}{$table}{A}->execute();
+}
+$dbhA->commit();
+$bct->ctl('bucardo kick drizzle 0');
+
+for my $table (keys %tabletype) {
+    $t = "Drizzle table $table has correct number of rows after delete";
+    $SQL = "SELECT * FROM $table";
+    my $sth = $dbh->prepare($SQL);
+    (my $count = $sth->execute()) =~ s/0E0/0/;
+    $sth->finish();
+    is ($count, 0, $t);
+}
+
+## Insert two rows, then delete one of them
+## Add one row per table type to A
+for my $table (keys %tabletype) {
+    my $type = $tabletype{$table};
+    my $val1 = $val{$type}{1};
+    $sth{insert}{1}{$table}{A}->execute($val1);
+    my $val2 = $val{$type}{2};
+    $sth{insert}{2}{$table}{A}->execute($val2);
+}
+$dbhA->commit();
+$bct->ctl('bucardo kick drizzle 0');
+
+for my $table (keys %tabletype) {
+    $t = "Drizzle table $table has correct number of rows after double insert";
+    $SQL = "SELECT * FROM $table";
+    my $sth = $dbh->prepare($SQL);
+    my $count = $sth->execute();
+    $sth->finish();
+    is ($count, 2, $t);
+}
+
+## Delete one of the rows
+for my $table (keys %tabletype) {
+    $sth{deleteone}{$table}{A}->execute(2); ## inty = 2
+}
+$dbhA->commit();
+$bct->ctl('bucardo kick drizzle 0');
+
+for my $table (keys %tabletype) {
+    $t = "Drizzle table $table has correct number of rows after single deletion";
+    $SQL = "SELECT * FROM $table";
+    my $sth = $dbh->prepare($SQL);
+    my $count = $sth->execute();
+    $sth->finish();
+    is ($count, 1, $t);
+}
+
+## Insert two more rows, then truncate
+for my $table (keys %tabletype) {
+    my $type = $tabletype{$table};
+    my $val3 = $val{$type}{3};
+    $sth{insert}{3}{$table}{A}->execute($val3);
+    my $val4 = $val{$type}{4};
+    $sth{insert}{4}{$table}{A}->execute($val4);
+}
+$dbhA->commit();
+$bct->ctl('bucardo kick drizzle 0');
+
+for my $table (keys %tabletype) {
+    $t = "Drizzle table $table has correct number of rows after more inserts";
+    $SQL = "SELECT * FROM $table";
+    my $sth = $dbh->prepare($SQL);
+    my $count = $sth->execute();
+    $sth->finish();
+    is ($count, 3, $t);
+}
+
+exit;