Overhaul testing suite, starting with 05-fullcopy.t
authorGreg Sabino Mullane <greg@endpoint.com>
Mon, 16 Aug 2010 20:48:14 +0000 (16:48 -0400)
committerGreg Sabino Mullane <greg@endpoint.com>
Mon, 16 Aug 2010 20:48:14 +0000 (16:48 -0400)
t/05-fullcopy.t
t/BucardoTesting.pm

index 1931ce6c42d79980346dc83332c98e5a782f4795..4fe64850e50a133ac03110259f1ace8a218594b9 100644 (file)
@@ -8,298 +8,466 @@ use warnings;
 use Data::Dumper;
 use lib 't','.';
 use DBD::Pg;
-use Test::More tests => 131;
+use Test::More;
 
 use BucardoTesting;
 my $bct = BucardoTesting->new() or BAIL_OUT "Creation of BucardoTesting object failed\n";
 $location = 'fullcopy';
 
-use vars qw/$SQL $sth $t $i $result $count %sql %val %pkey/;
+my $numtabletypes = keys %tabletype;
+plan tests => 29 + ($numtabletypes * 13);
 
 pass("*** Beginning 'fullcopy' tests");
 
-## Prepare a clean Bucardo database on A
-my $dbhA = $bct->blank_database('A');
-my $dbhX = $bct->setup_bucardo(A => $dbhA);
+use vars qw/ $dbhX $dbhA $dbhB $dbhC $dbhD $res $command $t %pkey $SQL %sth %sql/;
 
-## Server A is the master, the rest are slaves
-my $dbhB = $bct->blank_database('B');
-my $dbhC = $bct->blank_database('C');
+use vars qw/ $i $result /;
 
-## Tell Bucardo about these databases
-$bct->add_test_databases('A B C');
+END {
+       $bct->stop_bucardo($dbhX);
+       $dbhX->disconnect();
+       $dbhA->disconnect();
+       $dbhB->disconnect();
+       $dbhC->disconnect();
+}
 
-## Create a herd for 'A' and add all test tables to it
-$bct->add_test_tables_to_herd('A', 'testherd1');
+## Get A, B, and C created, emptied out, and repopulated with sample data
+$dbhA = $bct->repopulate_cluster('A');
+$dbhB = $bct->repopulate_cluster('B');
+$dbhC = $bct->repopulate_cluster('C');
+$dbhD = $bct->repopulate_cluster('D');
 
-## Create a new sync to fullcopy from A to B
-$t=q{Add sync works};
-$i = $bct->ctl("add sync fullcopytest source=testherd1 type=fullcopy targetdb=B");
-like($i, qr{Added sync}, $t);
+## Create a bucardo database, and install Bucardo into it
+$dbhX = $bct->setup_bucardo('A');
 
-## Tell sync kids to stay alive
-$dbhX->do(q{UPDATE bucardo.sync SET kidsalive = 't'});
-$dbhX->commit();
+## Tell Bucardo about these databases
 
-# Speed up checking for dead kids, so the resurrection test, at the end, works properly
-print $bct->ctl('set ctl_checkonkids_time=2 ctl_checkabortedkids_time=2') . "\n";
+$t = 'Adding database from cluster A works';
+my ($dbuser,$dbport,$dbhost) = $bct->add_db_args('A');
+$command =
+"bucardo_ctl add db bucardo_test name=A user=$dbuser port=$dbport host=$dbhost";
+$res = $bct->ctl($command);
+like ($res, qr/Added database "A"/, $t);
+
+$t = 'Adding database from cluster B works';
+($dbuser,$dbport,$dbhost) = $bct->add_db_args('B');
+$command =
+"bucardo_ctl add db bucardo_test name=B user=$dbuser port=$dbport host=$dbhost";
+$res = $bct->ctl($command);
+like ($res, qr/Added database "B"/, $t);
+
+$t = 'Adding database from cluster C works';
+($dbuser,$dbport,$dbhost) = $bct->add_db_args('C');
+$command =
+"bucardo_ctl add db bucardo_test name=C user=$dbuser port=$dbport host=$dbhost";
+$res = $bct->ctl($command);
+like ($res, qr/Added database "C"/, $t);
+
+$t = 'Adding database from cluster D works';
+($dbuser,$dbport,$dbhost) = $bct->add_db_args('D');
+$command =
+"bucardo_ctl add db bucardo_test name=D user=$dbuser port=$dbport host=$dbhost";
+$res = $bct->ctl($command);
+like ($res, qr/Added database "D"/, $t);
+
+## Teach Bucardo about all tables, adding them to a new herd named "therd"
+$t = q{Adding all tables on the master works};
+$command =
+"bucardo_ctl add tables all db=A herd=therd";
+$res = $bct->ctl($command);
+like ($res, qr/Creating herd: therd.*New tables added: \d+/s, $t);
+
+## Remove the 'droptest' table
+$command =
+"bucardo_ctl update herd therd remove droptest";
+$res = $bct->ctl($command);
+like ($res, qr/Removed from herd therd: public.droptest/, $t);
+
+## Add all sequences, and add them to the newly created herd
+$t = q{Adding all sequences on the master works};
+$command =
+"bucardo_ctl add sequences all db=A herd=therd";
+$res = $bct->ctl($command);
+like ($res, qr/New sequences added: \d/, $t);
+
+## Add a new fullcopy sync that goes from A to B
+$t = q{Adding a new fullcopy sync works};
+$command =
+"bucardo_ctl add sync fullcopyAB type=fullcopy source=therd targetdb=B";
+$res = $bct->ctl($command);
+like ($res, qr/Added sync "fullcopyAB"/, $t);
+
+## Create a database group consisting of A and B
+$t = q{Adding dbgroup 'slaves' works};
+$command =
+"bucardo_ctl add dbgroup slaves B C";
+$res = $bct->ctl($command);
+like ($res, qr/\QAdded database "B" to group "slaves"\E.*
+              \QAdded database "C" to group "slaves"\E.*
+              \QAdded database group "slaves"/xsm, $t);
+
+## We want to know when the sync has finished
+$dbhX->do(q{LISTEN "bucardo_syncdone_fullcopyAB"});
+$dbhX->commit();
 
+## Time to startup Bucardo
 $bct->restart_bucardo($dbhX);
 
-$dbhX->do('LISTEN bucardo_syncdone_fullcopytest');
-$dbhX->commit();
+## Now for the meat of the tests
 
+## Get the statement handles ready for each table type
 for my $table (sort keys %tabletype) {
 
-       my $type = $tabletype{$table};
-       my $val = $val{$type}{1};
-       if (!defined $val) {
-               BAIL_OUT "Could not determine value for $table $type\n";
-       }
-
        $pkey{$table} = $table =~ /test5/ ? q{"id space"} : 'id';
 
-       $SQL = $table =~ /0/
-               ? "INSERT INTO $table($pkey{$table}) VALUES (?)"
-                       : "INSERT INTO $table($pkey{$table},data1,inty) VALUES (?,'one',1)";
-       $sql{insert}{$table} = $dbhA->prepare($SQL);
-       if ($type eq 'BYTEA') {
-               $sql{insert}{$table}->bind_param(1, undef, {pg_type => PG_BYTEA});
+       ## INSERT
+       for my $x (1..4) {
+               $SQL = $table =~ /0/
+                       ? "INSERT INTO $table($pkey{$table}) VALUES (?)"
+                               : "INSERT INTO $table($pkey{$table},data1,inty) VALUES (?,'foo',$x)";
+               $sth{insert}{$x}{$table}{A} = $dbhA->prepare($SQL);
+               if ('BYTEA' eq $tabletype{$table}) {
+                       $sth{insert}{$x}{$table}{A}->bind_param(1, undef, {pg_type => PG_BYTEA});
+               }
        }
-       $val{$table} = $val;
 
-       $sql{insert}{$table}->execute($val{$table});
-}
+       ## SELECT
+       $sql{select}{$table} = "SELECT inty FROM $table ORDER BY $pkey{$table}";
+       $table =~ /0/ and $sql{select}{$table} =~ s/inty/$pkey{$table}/;
 
-$dbhA->commit();
+       ## DELETE
+       $SQL = "DELETE FROM $table";
+       $sth{deleteall}{$table}{A} = $dbhA->prepare($SQL);
 
-sub test_empty_drop {
-       my ($table, $dbh) = @_;
-       my $DROPSQL = 'SELECT * FROM droptest';
-       my $line = (caller)[2];
-       $t=qq{ Triggers and rules did NOT fire on remote table $table};
-       $result = [];
-       bc_deeply($result, $dbhB, $DROPSQL, $t, $line);
 }
 
-for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table still empty before commit };
-       $SQL = $table =~ /0/
-               ? "SELECT $pkey{$table} FROM $table"
-                       : "SELECT $pkey{$table},data1 FROM $table";
-       $result = [];
-       bc_deeply($result, $dbhB, $SQL, $t);
-
-       $t=q{ After insert, trigger and rule both populate droptest table };
-       my $qtable = $dbhX->quote($table);
-       my $LOCALDROPSQL = $table =~ /0/
-               ? "SELECT type,0 FROM droptest WHERE name = $qtable ORDER BY 1,2"
-                       : "SELECT type,inty FROM droptest WHERE name = $qtable ORDER BY 1,2";
-       my $tval = $table =~ /0/ ? 0 : 1;
-       $result = [['rule',$tval],['trigger',$tval]];
-       bc_deeply($result, $dbhA, $LOCALDROPSQL, $t);
-
-       test_empty_drop($table,$dbhB);
+## Add one row per table type to A
+for my $table (keys %tabletype) {
+       my $type = $tabletype{$table};
+       my $val1 = $val{$type}{1};
+       $sth{insert}{1}{$table}{A}->execute($val1);
 }
 
+## Before the commit on A, B should be empty
 for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table still empty before kick };
-       $sql{select}{$table} = "SELECT inty FROM $table ORDER BY $pkey{$table}";
-       $table =~ /0/ and $sql{select}{$table} =~ s/inty/$pkey{$table}/;
-       $result = [];
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
+       my $type = $tabletype{$table};
+       $t = qq{B has not received rows for table $table before A commits};
+       $res = [];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
 }
+$dbhA->commit();
+
+## Have it vacuum afterwards
+$t = q{Value of vacuum_after_copy can be changed};
+$command =
+'bucardo_ctl update sync fullcopyAB vacuum_after_copy=1';
+$res = $bct->ctl($command);
+like ($res, qr{vacuum_after_copy}, $t);
 
-## Give the table some heft for speed tests
-## $sth = $dbhA->prepare("INSERT INTO bucardo_test2(id,inty) VALUES(?,?)");
-## for my $x (2..100000) {     $sth->execute($x,1000); }
-## $dbhA->commit();
+## Reload the sync
+$command =
+"bucardo_ctl reload sync fullcopyAB";
+$res = $bct->ctl($command);
 
-$bct->ctl("kick fullcopytest 0");
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+## Kick the sync and wait for it to finish
+$bct->ctl('kick sync fullcopyAB 0');
 
+## Check the second database for the new rows
 for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table got the fullcopy row};
-       $result = [[1]];
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
 
-       test_empty_drop($table,$dbhB);
+       my $type = $tabletype{$table};
+       $t = qq{Row with pkey of type $type gets copied to B};
+
+       $res = [[1]];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
 }
 
+## The droptest table should be populated for A, but not for B
 for my $table (sort keys %tabletype) {
-       ## Make changes to B, have the sync blow them away
-       $i = $dbhB->do("UPDATE $table SET inty = 99");
-       $dbhB->do("DELETE FROM droptest");
-       $dbhB->commit();
+
+       $t = qq{Triggers and rules fired on A};
+       $SQL = qq{SELECT type FROM droptest WHERE name = '$table' ORDER BY 1};
+
+       $res = [['rule'],['trigger']];
+       bc_deeply($res, $dbhA, $SQL, $t);
+
+       $t = qq{Triggers and rules did not fire on B};
+       $res = [];
+       bc_deeply($res, $dbhB, $SQL, $t);
 }
 
-for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table can be changed directly};
-       $result = [[99]];
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
+## Delete the rows from A, make sure deletion makes it to B
+## Delete rows from A
+for my $table (keys %tabletype) {
+       $sth{deleteall}{$table}{A}->execute();
 }
+$dbhA->commit();
 
-$bct->ctl('kick fullcopytest 0');
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+## Kick the sync and wait for it to finish
+$bct->ctl('kick sync fullcopyAB 0');
 
+## Rows should be gone from B now
 for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table loses local changes on fullcopy};
-       $result = [[1]];
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
-}
 
-## Sequence testing
+       my $type = $tabletype{$table};
+       $t = qq{Row with pkey of type $type is deleted from B};
 
-$dbhA->do("SELECT setval('bucardo_test_seq1', 123)");
+       $res = [];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+}
+
+## Now add two rows at once
+for my $table (keys %tabletype) {
+       my $type = $tabletype{$table};
+       my $val2 = $val{$type}{2};
+       my $val3 = $val{$type}{3};
+       $sth{insert}{2}{$table}{A}->execute($val2);
+       $sth{insert}{3}{$table}{A}->execute($val3);
+}
 $dbhA->commit();
 
-$bct->ctl("kick fullcopytest 0");
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+## Kick the sync and wait for it to finish
+$bct->ctl('kick sync fullcopyAB 0');
 
-$SQL = q{SELECT nextval('bucardo_test_seq1')};
-$t='Fullcopy replicated a sequence properly';
-$result = [[123+1]];
-bc_deeply($result, $dbhB, $SQL, $t);
+## B should have the two new rows
+for my $table (sort keys %tabletype) {
 
-$dbhA->do("SELECT setval('bucardo_test_seq1', 223, false)");
+       my $type = $tabletype{$table};
+       $t = qq{Two rows with pkey of type $type are copied to B};
+
+       $res = [[2],[3]];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+}
+
+## Test out an update
+for my $table (keys %tabletype) {
+       my $type = $tabletype{$table};
+       $SQL = "UPDATE $table SET inty=inty+10";
+       $dbhA->do($SQL);
+}
 $dbhA->commit();
+$bct->ctl('kick sync fullcopyAB 0');
 
-$bct->ctl("kick fullcopytest 0");
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+## B should have the updated rows
+for my $table (sort keys %tabletype) {
 
-$SQL = q{SELECT nextval('bucardo_test_seq1')};
-$t='Fullcopy replicated a sequence properly with a false setval';
-$result = [[223]];
-bc_deeply($result, $dbhB, $SQL, $t);
+       my $type = $tabletype{$table};
+       $t = qq{Updates of two rows with pkey of type $type are copied to B};
 
-$dbhA->do("SELECT setval('bucardo_test_seq1', 345, true)");
-$dbhA->commit();
+       $res = [[12],[13]];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+}
 
-$bct->ctl("kick fullcopytest 0");
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+## Test insert, update, and delete all at once, across multiple transactions
+for my $table (keys %tabletype) {
+       my $type = $tabletype{$table};
+       $SQL = "UPDATE $table SET inty=inty-3";
+       $dbhA->do($SQL);
+       $dbhA->commit();
 
-$SQL = q{SELECT nextval('bucardo_test_seq1')};
-$t='Fullcopy replicated a sequence properly with a true setval';
-$result = [[345+1]];
-bc_deeply($result, $dbhB, $SQL, $t);
+       my $val4 = $val{$type}{4};
+       $sth{insert}{4}{$table}{A}->execute($val4);
+       $dbhA->commit();
 
-## Add another slave
-$t=q{Add dbgroup works};
-$i = $bct->ctl("add dbgroup tgroup B C");
-like($i, qr{Added database group}, $t);
+       $SQL = "DELETE FROM $table WHERE inty = 10";
+       $dbhA->do($SQL);
+       $dbhA->commit();
+}
+$bct->ctl('kick sync fullcopyAB 0');
 
-$t=q{Update sync works};
-$dbhB->commit();
-$i = $bct->ctl("update sync fullcopytest targetgroup=tgroup");
-like($i, qr{targetgroup : }, $t);
+## B should have the updated rows
+for my $table (sort keys %tabletype) {
 
-$dbhX->do("NOTIFY bucardo_reload_sync_fullcopytest");
-$dbhX->commit();
+       my $type = $tabletype{$table};
+       $t = qq{Updates of two rows with pkey of type $type are copied to B};
+
+       $res = [[9],[4]];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+}
 
 for my $table (sort keys %tabletype) {
-       $dbhA->do("DELETE FROM $table");
+       my $type = $tabletype{$table};
+       $dbhA->do("COPY $table($pkey{$table},inty,data1) FROM STDIN");
+       my $val5 = $val{$type}{5};
+       $val5 =~ s/\0//;
+       $dbhA->pg_putcopydata("$val5\t5\tfive");
+       $dbhA->pg_putcopyend();
+       $dbhA->commit();
 }
+$bct->ctl('kick sync fullcopyAB 0');
 
-$dbhA->commit();
-# XXX - Hack
-sleep 5;
-$bct->ctl('kick fullcopytest 0');
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+## B should have the new rows
+for my $table (sort keys %tabletype) {
+
+       my $type = $tabletype{$table};
+       $t = qq{COPY to A with pkey type $type makes it way to B};
 
+       $res = [[9],[4],[5]];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+}
+
+## Modify the sync and have it go to B *and* C
+$command =
+"bucardo_ctl update sync fullcopyAB set targetgroup=slaves";
+$res = $bct->ctl($command);
+
+## Before the sync reload, C should not have anything
 for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table was emptied out};
-       $result = [];
 
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
+       my $type = $tabletype{$table};
+       $t = qq{Row with pkey of type $type does not exist on C yet};
+
+       $res = [];
+       bc_deeply($res, $dbhC, $sql{select}{$table}, $t);
+}
+
+$command =
+"bucardo_ctl reload sync fullcopyAB";
+$res = $bct->ctl($command);
 
-       $t=qq{ Third table $table begins empty};
-       $result = [];
-       bc_deeply($result, $dbhC, $sql{select}{$table}, $t);
+$bct->ctl('kick sync fullcopyAB 0');
+
+## After the sync is reloaded and kicked, C will have all the rows
+for my $table (sort keys %tabletype) {
 
-       test_empty_drop($table,$dbhC);
+       my $type = $tabletype{$table};
+       $t = qq{Row with pkey of type $type is copied to C};
 
-       $sql{insert}{$table}->execute($val{$table});
+       $res = [[9],[4],[5]];
+       bc_deeply($res, $dbhC, $sql{select}{$table}, $t);
 }
 
+## Do an update, and have it appear on both sides
+for my $table (keys %tabletype) {
+       my $type = $tabletype{$table};
+       $SQL = "UPDATE $table SET inty=55 WHERE inty = 5";
+       $dbhA->do($SQL);
+}
 $dbhA->commit();
-
-$bct->ctl('kick fullcopytest 0');
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+$bct->ctl('kick sync fullcopyAB 0');
 
 for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table got the fullcopy row};
-       $result = [[1]];
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
 
-       $t=qq{ Third table $table got the fullcopy row};
-       $result = [[1]];
-       bc_deeply($result, $dbhC, $sql{select}{$table}, $t);
+       my $type = $tabletype{$table};
+       $t = qq{Row with pkey of type $type is replicated to B};
+
+       $res = [[9],[4],[55]];
+       bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+
+       $t = qq{Row with pkey of type $type is replicated to C};
+       $res = [[9],[4],[55]];
+       bc_deeply($res, $dbhC, $sql{select}{$table}, $t);
 }
 
-## Test out customselect - update just the id column
-$dbhX->do(q{UPDATE goat SET customselect='SELECT '||replace(qpkey,'|',',')||' FROM '||tablename});
-$dbhX->do(q{UPDATE sync SET usecustomselect = true});
-$dbhX->do("NOTIFY bucardo_reload_sync_fullcopytest");
-$dbhX->commit();
+## Sequence testing
 
-$dbhA->do("UPDATE bucardo_test1 SET id = id + 100, inty=inty + 100");
+$dbhA->do("SELECT setval('bucardo_test_seq1', 123)");
 $dbhA->commit();
 
-# XXX - Hack
-sleep 5;
-$bct->ctl('kick fullcopytest 0');
-wait_for_notice($dbhX, 'bucardo_syncdone_fullcopytest', 5);
+$bct->ctl("kick fullcopyAB 0");
 
-for my $table (sort keys %tabletype) {
-       $t=qq{ Second table $table got the fullcopy row};
-       $result = [[undef]];
-       bc_deeply($result, $dbhB, $sql{select}{$table}, $t);
+$SQL = q{SELECT nextval('bucardo_test_seq1')};
+$t='Fullcopy replicated a sequence properly to B';
+$result = [[123+1]];
+bc_deeply($result, $dbhB, $SQL, $t);
 
-       $t=qq{ Third table $table got the fullcopy row};
-       $result = [[undef]];
-       bc_deeply($result, $dbhC, $sql{select}{$table}, $t);
-}
+$t='Fullcopy replicated a sequence properly to C';
+bc_deeply($result, $dbhC, $SQL, $t);
 
+$dbhA->do("SELECT setval('bucardo_test_seq1', 223, false)");
+$dbhA->commit();
 
-KILLTEST: {
-sleep 1;
-}
+$bct->ctl("kick fullcopyAB 0");
 
-## Kill the Postgres backend for one of the kids to see how it is handled
-my $SQL = "SELECT * FROM bucardo.audit_pid WHERE target='B' ORDER BY id DESC LIMIT 1";
-my $info = $dbhX->prepare($SQL);
-$info->execute();
-$info = $info->fetchall_arrayref({})->[0];
-my $pid = $info->{'target_backend'};
-my $kidid = $info->{'id'};
-kill 15 => $pid;
+$SQL = q{SELECT nextval('bucardo_test_seq1')};
+$t='Fullcopy replicated a sequence properly with a false setval to B';
+$result = [[223]];
+bc_deeply($result, $dbhB, $SQL, $t);
 
-$dbhA->do("UPDATE bucardo_test1 SET id = id + 100, inty=inty + 100");
+$t='Fullcopy replicated a sequence properly with a false setval to C';
+bc_deeply($result, $dbhC, $SQL, $t);
+
+$dbhA->do("SELECT setval('bucardo_test_seq1', 345, true)");
 $dbhA->commit();
-$bct->ctl('kick fullcopytest 0');
 
-$SQL = "SELECT * FROM bucardo.audit_pid WHERE id = ?";
-$info = $dbhX->prepare($SQL);
-$info->execute($kidid);
-$info = $info->fetchall_arrayref({})->[0];
+$bct->ctl("kick fullcopyAB 0");
+wait_for_notice($dbhX, 'bucardo_syncdone_fullcopyAB', 5);
 
-$t = 'Kid death was detected and entered in audit_pid table';
-like ($info->{death}, qr{target error: 7}, $t);
+$SQL = q{SELECT nextval('bucardo_test_seq1')};
+$t='Fullcopy replicated a sequence properly with a true setval to B';
+$result = [[345+1]];
+bc_deeply($result, $dbhB, $SQL, $t);
 
-sleep 15;
-## Latest kid should have a life of 2
-$SQL = "SELECT * FROM bucardo.audit_pid WHERE target='B' ORDER BY id DESC LIMIT 1";
-$info = $dbhX->prepare($SQL);
-$info->execute();
-$info = $info->fetchall_arrayref({})->[0];
-$t = 'Kid was resurrected by the controller after untimely death';
-like ($info->{birth}, qr{Life: 2}, $t);
+$t='Fullcopy replicated a sequence properly with a true setval to C';
+$result = [[345+1]];
+bc_deeply($result, $dbhC, $SQL, $t);
 
-END {
-       $bct->stop_bucardo($dbhX);
-       $dbhX->disconnect();
-       $dbhA->disconnect();
-       $dbhB->disconnect();
-       $dbhC->disconnect();
-}
+## Add another slave on the fly
+$t = q{Added database D to group 'slaves'};
+$command =
+"bucardo_ctl add dbgroup slaves D";
+$res = $bct->ctl($command);
+like ($res, qr{Added database "D" to group "slaves"}, $t);
+
+## Test out customselect - update just the id column
+$t = q{Set customselect on table bucardo_test1};
+$command =
+"bucardo_ctl update table bucardo_test1 customselect='SELECT id FROM bucardo_test1'";
+$res = $bct->ctl($command);
+like ($res, qr{\Qcustomselect : changed from (null) to "SELECT id FROM bucardo_test1"}, $t);
+
+$t = q{Set usecustomselect to true for sync fullcopyAB};
+$command =
+"bucardo_ctl update sync fullcopyAB usecustomselect=true";
+$res = $bct->ctl($command);
+like ($res, qr{usecustomselect : changed from "f" to "true"}, $t);
+
+$t = q{Reloaded the sync fullcopyAB};
+$command =
+"bucardo_ctl reload sync fullcopyAB";
+$res = $bct->ctl($command);
+like ($res, qr{Reloading sync fullcopyAB...DONE!}, $t);
+
+## Update both id and inty, but only the former should get propagated
+$dbhA->do("UPDATE bucardo_test1 SET id=id + 100, inty=inty + 100");
+$dbhA->commit();
+
+$bct->ctl('kick sync fullcopyAB 0');
+
+$t = q{Table bucardo_test1 copied only some rows to B due to customselect};
+$SQL = 'SELECT id, inty FROM bucardo_test1';
+$result = [[102,undef], [104,undef], [105,undef]];
+bc_deeply($result, $dbhB, $SQL, $t);
+
+$t = q{Table bucardo_test2 copied all some rows to B due to lack of customselect};
+$SQL = 'SELECT id, inty FROM bucardo_test2';
+$result = [[1234569,9],[1234571,4],[1234572,55]];
+bc_deeply($result, $dbhB, $SQL, $t);
+
+## Now try truncate mode instead of delete
+$t = q{Reloaded the sync fullcopyAB};
+$command =
+"bucardo_ctl update sync fullcopyAB deletemethod=truncate usecustomselect=f";
+$res = $bct->ctl($command);
+like ($res, qr{Changes made to sync}, $t);
+
+$t = q{Reloaded the sync fullcopyAB};
+$command =
+"bucardo_ctl reload sync fullcopyAB";
+$res = $bct->ctl($command);
+like ($res, qr{Reloading sync fullcopyAB...DONE!}, $t);
+
+$bct->ctl("kick fullcopyAB 0");
+
+my $table = 'bucardo_test1';
+
+my $type = $tabletype{$table};
+$res = [[109],[104],[155]];
+
+$t = qq{Row with pkey of type $type is copied to B};
+bc_deeply($res, $dbhB, $sql{select}{$table}, $t);
+
+$t = qq{Row with pkey of type $type is copied to C};
+bc_deeply($res, $dbhC, $sql{select}{$table}, $t);
 
+exit;
index cc4c8afe7301bc86394326d659255045fb7c8d32..90a955a2c9b30573d63e84a964f8de4c4fbd2899 100644 (file)
@@ -14,16 +14,17 @@ use Data::Dumper;
 
 use vars qw/$SQL $sth $count $COM %dbh/;
 
-my $DEBUG = 0;
+my $DEBUG = 1;
 
 use base 'Exporter';
 our @EXPORT = qw/%tabletype %sequences %val compare_tables bc_deeply clear_notices wait_for_notice $location/;
 
+my $dbname = 'bucardo_test';
+
+
 our $location = 'setup';
 my $testmsg  = ' ?';
 my $testline = '?';
-my $showline = 1;
-my $showtime = 0;
 ## Sometimes, we want to stop as soon as we see an error
 my $bail_on_error = $ENV{BUCARDO_TESTBAIL} || 0;
 my $total_errors = 0;
@@ -51,6 +52,7 @@ our %tabletype =
         'bucardo_test6' => 'TIMESTAMP',
         'bucardo_test7' => 'NUMERIC',
         'bucardo_test8' => 'BYTEA',
+        'bucardo_test9' => 'int_unsigned',
         );
 
 our @tables2empty = (qw/droptest bucardo_test_multicol/);
@@ -119,10 +121,12 @@ for my $name ('A'..'Z') {
         initdb  => $linitdb,
                pgctl   => $lpgctl,
                version => $localver,
+               ver     => "$lmaj.$lmin",
                vmaj    => $lmaj,
         vmin    => $lmin,
         vrev    => $lrev,
-               dirname => "bucardo_test_database_${name}_$localver",
+               dirname => "bucardo_test_database_${name}_$lmaj.$lmin",
+               port    => $lport,
        };
 }
 
@@ -140,9 +144,6 @@ my $TIMEOUT_SLEEP = 0.1;
 ## How long to wait for a notice to be issued?
 my $TIMEOUT_NOTICE = 2;
 
-## Default test schema name.
-my $TEST_SCHEMA = 'bucardo_schema';
-
 ## Bail if the bucardo_ctl file does not exist / does not compile
 for my $file (qw/bucardo_ctl Bucardo.pm/) {
        if (! -e $file) {
@@ -169,38 +170,31 @@ for (1..30) {
        $val{TIMESTAMP}{$_} = $val{DATE}{$_} . " 12:34:56";
        $val{NUMERIC}{$_} = 0.7 + $_;
        $val{BYTEA}{$_} = "$_\0Z";
+       $val{int_unsigned}{$_} = 5000 + $_;
 }
 
-sub get_version {
-    return ($pg_major_version, $pg_minor_version, $pg_point_version);
-}
+
 
 sub new {
 
        ## Create a new BucardoTesting object.
-       ## Most defaults should be fine.
+       ## Arguments:
+       ## 1. Hashref of options (optional)
+       ## Returns: reference to a new BucardoTesting object
 
        my $class = shift;
-       my $arg = shift || {};
-       my $self = {};
-       $self->{file} = (caller)[1];
+       my $arg   = shift || {};
+       my $self  = {};
+       bless $self, $class;
 
-       ## Short name for this test. Should always be set.
-       $self->{name} = $arg->{name} || '?';
+       ## Make a note of which file invoked us for later debugging
+       $self->{file} = (caller)[1];
 
        ## Bail on first error? Default is ENV, then false.
        $self->{bail} = exists $arg->{bail} ? $arg->{bail} : $ENV{BUCARDO_TESTBAIL} || 0;
 
-       ## Whether to show what line an error came from. Defaults true.
-       $self->{showline} = exists $arg->{showline} ? $arg->{showline} : 1;
-
-       ## Whether to show a running time. Defaults false.
-       $self->{showtime} = $arg->{showtime} || 0;
-
-       ## Name of the test schema. Should rarely need to be set
-       $self->{schema} = $arg->{schema} || $TEST_SCHEMA;
-
-       bless $self, $class;
+       ## Name of the test schema
+       $self->{schema} = 'bucardo_schema';
 
        ## Let's find out where bucardo_ctl is. Prefer the blib ones, which are shebang adjusted
        if (-e 'blib/script/bucardo_ctl') {
@@ -220,66 +214,126 @@ sub new {
        }
 
        return $self;
-}
 
+} ## end of new
+
+
+sub debug {
 
-sub blank_database {
+       ## Simply internal debugging routine, prints a message if $DEBUG is set
+       ## Arguments:
+       ## 1. Message to print
+       ## 2. Optional level, defaults to 0
+       ## Returns: nothing
 
-       ## Create, start, and empty out a database ("server");
+       $DEBUG or return;
+
+       my $msg = shift || 'No message?!';
+       my $level = shift || 0;
+
+       return if $DEBUG < $level;
+
+       chomp $msg;
+       warn "DEBUG: $msg\n";
+
+       return;
+
+} ## end of debug
+
+
+sub empty_cluster {
+
+       ## Empty out a cluster's database
+       ## Creates the cluster and 'bucardo_test' database as needed
+       ## For existing databases, removes all known schemas
+       ## Always recreates the public schema
+       ## Arguments:
+       ## 1. Name of the cluster
+       ## Returns: database handle to the 'bucardo_test' database
 
        my $self = shift;
-       my $name = shift || 'A';
+       my $clustername = shift or die;
 
-       ## Does it exist? If not, create with initdb
-       $self->create_cluster($name);
+       ## Create the cluster if needed
+       $self->create_cluster($clustername);
 
-       ## Make sure it is started up
-    $self->start_cluster($name);
+       ## Start it up if needed
+    $self->start_cluster($clustername);
 
-       ## Empty it out (drop and recreate the test database)
-    my $dbh = $self->fresh_database($name);
+       ## Get a handle to the postgres database
+       my $dbh = $self->connect_database($clustername, 'postgres');
+
+       if (database_exists($dbh, $dbname)) {
+               $dbh = $self->connect_database($clustername, $dbname);
+               ## Remove any of our known schemas
+               my @slist;
+               for my $sname (qw/ public bucardo freezer /) {
+                       push @slist => $sname if $self->drop_schema($dbh, $sname);
+               }
+               debug(qq{Schemas dropped from $dbname on $clustername: } . join ',' => @slist);
 
-       ## Populate a test database
-       $self->add_test_schema($dbh,'foo');
+               ## Recreate the public schema
+               $dbh->do("CREATE SCHEMA public");
+               $dbh->commit();
+       }
+       else {
+               local $dbh->{AutoCommit} = 1;
+               debug(qq{Creating database $dbname});
+               $dbh->do("CREATE DATABASE $dbname");
+       }
 
        return $dbh;
 
-} ## end of blank_database
+} ## end of empty_cluster
 
 
 sub create_cluster {
 
        ## Create a cluster if it does not already exist
+       ## Runs initdb, then modifies postgresql.conf
+       ## Arguments:
+       ## 1. Name of the cluster
+       ## Returns: nothing
 
        my $self = shift;
-       my $name = shift || 'A';
-       my $arg = shift || ''; ## A string to append to initdb call
+       my $clustername = shift or die;
 
-       my $clusterinfo = $clusterinfo{$name}
-               or die qq{I do not know how to create a cluster named "$name"};
+       my $info = $pgver{$clustername}
+               or die qq{No such cluster as "$clustername"\n};
 
-       my $dirname = $pgver{$name}{dirname};
+       my $dirname = $info->{dirname};
 
        return if -d $dirname;
 
-       my $localinitdb = $pgver{$name}{initdb};
+       my $localinitdb = $info->{initdb};
 
-       $DEBUG and warn qq{Running $localinitdb for cluster "$name"\n};
+       debug(qq{Running $localinitdb for cluster "$clustername"});
 
-       qx{$localinitdb -D $dirname $arg 2>&1};
+       qx{$localinitdb -D $dirname 2>&1};
 
        ## Make some minor adjustments
        my $file = "$dirname/postgresql.conf";
        open my $fh, '>>', $file or die qq{Could not open "$file": $!\n};
-       printf $fh "\n\nport = %d\nmax_connections = 20\nrandom_page_cost = 2.5\nlog_statement = 'all'\nclient_min_messages = WARNING\nlog_line_prefix='%s[%s] '\nlisten_addresses = ''\nlog_duration = on\n\n",
-               $clusterinfo->{port}, '%m', '%p';
-
-    if ($pgver{$name}{vmin} =~ /devel/ or $pgver{$name}{vmaj} > 8 or ($pgver{$name}{vmaj} == 8 and int($pgver{$name}{vmin}) > 2)) {
-               # the int() call above prevents errors when the version is, for instance, '8.4devel'
-        print $fh "logging_collector = off\n";
+       printf {$fh} "
+
+port                       = %d
+max_connections            = 20
+random_page_cost           = 2.5
+log_statement              = 'all'
+log_min_duration_statement = 0
+client_min_messages        = WARNING
+log_line_prefix            = '%s[%s] '
+listen_addresses           = ''
+
+",
+       $info->{port}, '%m', '%p';
+
+       ## Make some per-version adjustments
+       if ($info->{ver} >= 8.3) {
+        print {$fh} "logging_collector = off\n";
     }
     else {
-        print $fh "redirect_stderr = off\n";
+        print {$fh} "redirect_stderr   = off\n";
     }
        close $fh or die qq{Could not close "$file": $!\n};
 
@@ -292,73 +346,92 @@ sub create_cluster {
 sub start_cluster {
 
        ## Startup a cluster if not already running
+       ## Arguments:
+       ## 1. Name of the cluster
+       ## Returns: nothing
 
        my $self = shift;
-       my $name = shift || 'A';
-       my $arg = shift || '';
+       my $clustername = shift || 'A';
 
-       my $dirname = $pgver{$name}{dirname};
+       ## Create the cluster if needed
+       $self->create_cluster($clustername);
 
-       ## Just in case
-       -d $dirname or $self->create_cluster($name);
+       my $info = $pgver{$clustername};
+
+       my $dirname = $info->{dirname};
 
+       ## Check the PID file. If it exists and is active, simply return
        my $pidfile = "$dirname/postmaster.pid";
        if (-e $pidfile) {
                open my $fh, '<', $pidfile or die qq{Could not open "$pidfile": $!\n};
                <$fh> =~ /(\d+)/ or die qq{No PID found in file "$pidfile"\n};
                my $pid = $1;
                close $fh or die qq{Could not close "$pidfile": $!\n};
-               ## Make sure it's still around
+               ## An active process should respond to a "ping kill"
                $count = kill 0 => $pid;
-               return if $count == 1;
-               $DEBUG and warn qq{Server seems to have died, removing file "$pidfile"\n};
+               #warn "GOT A count of $count for $pid!\n";
+               #my $count2 = kill 1 => $pid;
+               #warn "GOT A count of $count2 for $pid kill 1!\n";
+               return if 1 == $count;
+               ## If no response, remove the pidfile ourselves and go on
+               debug(qq{Server seems to have died, removing file "$pidfile"});
                unlink $pidfile or die qq{Could not remove file "$pidfile"\n};
        }
 
-       $DEBUG and warn qq{Starting cluster "$name"\n};
+       my $port = $info->{port};
+       debug(qq{Starting cluster "$clustername" on port $port});
 
+       ## If not Windows, we'll use Unix sockets with a custom socket dir
        my $option = '';
        if ($^O !~ /Win32/) {
                my $sockdir = "$dirname/socket";
                -e $sockdir or mkdir $sockdir;
                $option = q{-o '-k socket'};
                ## Older versions do not assume socket is right off of data dir
-               if ($pgver{$name}{vmin} !~ /devel/ and $pgver{$name}{vmaj}==8 and $pgver{$name}{vmin} < 1) {
+               if ($info->{ver} <= 8.0) {
                        $option = qq{-o '-k $dirname/socket'};
                }
        }
 
-       my $localpgctl = $pgver{$name}{pgctl};
+       ## Attempt to start it up with a pg_ctl call
+       my $localpgctl = $info->{pgctl};
 
        $COM = qq{$localpgctl $option -l $dirname/pg.log -D $dirname start};
-       $DEBUG and warn qq{Running: $COM\n};
+       debug(qq{Running: $COM});
        qx{$COM};
 
+       ## Wait for the pidfile to appear
+       my $maxwaitseconds = 20;
+       my $loops = 0;
        {
                last if -e $pidfile;
                sleep 0.1;
+               if ($loops++ > ($maxwaitseconds * 10)) {
+                       die "Failed to startup cluster $clustername, command was $COM\n";
+               }
                redo;
        }
 
-       ## Wait for the database to come up and accept connections
-       my $clusterinfo = $clusterinfo{$name}
-               or die qq{I do not know how to find a port for a cluster named "$name"};
-       my $port = $clusterinfo->{port};
+       ## Keep attempting to get a database connection until we get one or timeout
+       $maxwaitseconds = 10;
 
        my $dbhost = getcwd;
        $dbhost .= "/$dirname/socket";
 
+       ## Using the "invalidname" is a nice way to work around locale issues
        my $dsn = "dbi:Pg:dbname=invalidname;port=$port;host=$dbhost";
        my $dbh;
 
-       my $loops = 0;
+       debug(qq{Connecting as $dsn});
+
+       $loops = 0;
   LOOP: {
                eval {
-                       $dbh = DBI->connect($dsn, '', '', {PrintError=>0,RaiseError=>1});
+                       $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 });
                };
                last if $@ =~ /"invalidname"/;
                sleep 0.1;
-               if ($loops++ > 50) {
+               if ($loops++ > ($maxwaitseconds * 10)) {
                        die "Database did not come up: dsn was $dsn\n";
                }
                redo;
@@ -369,186 +442,65 @@ sub start_cluster {
 } ## end of start_cluster
 
 
-sub fresh_database {
-
-       ## Drop and create the bucardo_test database
-       ## First arg is cluster name
-       ## Second arg is hashref, can be 'dropdb'
-
-       my $self = shift;
-       my $name = shift || 'A';
-       my $arg = shift || {};
-
-       my $dirname = $pgver{$name}{dirname};
-
-       ## Just in case
-       -d $dirname or $self->create_cluster($name);
-       -e "$dirname/postmaster.pid" or $self->start_cluster($name);
-
-       my $dbh = $self->connect_database($name, 'postgres');
-
-       my $dbname = 'bucardo_test';
-       my $brandnew = 0;
-       {
-               if (database_exists($dbh => $dbname) and $arg->{dropdb}) {
-                       local $dbh->{AutoCommit} = 1;
-                       $DEBUG and warn "Dropping database $dbname\n";
-                       $dbh->do("DROP DATABASE $dbname");
-               }
-               if (!database_exists($dbh => $dbname)) {
-                       local $dbh->{AutoCommit} = 1;
-                       $DEBUG and warn "Creating database $dbname\n";
-                       $dbh->do("CREATE DATABASE $dbname");
-                       $brandnew = 1;
-                       $dbh->disconnect();
-               }
-       }
-
-       $dbh = $self->connect_database($name, $dbname);
-
-       return $dbh if $brandnew;
-
-       $self->empty_test_database($dbh);
-
-       return $dbh;
-
-} ## end of fresh_database
-
-
-sub empty_test_database {
-
-       ## Wipe all data tables from a test database
-       ## Takes a database handle as only arg
-
-       my $self = shift;
-       my $dbh = shift;
-
-       if ($dbh->{pg_server_version} >= 80300) {
-               $dbh->do(q{SET session_replication_role = 'replica'});
-       }
-
-       for my $table (sort keys %tabletype) {
-               $dbh->do("TRUNCATE TABLE $table");
-       }
-
-       for my $table (@tables2empty) {
-               $dbh->do("TRUNCATE TABLE $table");
-       }
-
-       if ($dbh->{pg_server_version} >= 80300) {
-               $dbh->do(q{SET session_replication_role = 'origin'});
-       }
-       $dbh->commit;
-
-       return;
-
-} ## end of empty_test_database
-
-sub shutdown_cluster {
-
-       ## Shutdown a cluster if running
-       ## Takes the cluster name
-
-       my $self = shift;
-       my $name = shift;
-
-       my $dirname = $pgver{$name}{dirname};
-
-       return if ! -d $dirname;
-
-       my $pidfile = "$dirname/postmaster.pid";
-       return if ! -e $pidfile;
-
-       open my $fh, '<', $pidfile or die qq{Could not open "$pidfile": $!\n};
-       <$fh> =~ /(\d+)/ or die qq{No PID found in file "$pidfile"\n};
-       my $pid = $1;
-       close $fh or die qq{Could not close "$pidfile": $!\n};
-       ## Make sure it's still around
-       $count = kill 0 => $pid;
-       if ($count != 1) {
-               $DEBUG and warn "Removing $pidfile\n";
-               unlink $pidfile;
-       }
-       $count = kill 15 => $pid;
-       print "New count: $count\n";
-       {
-               $count = kill 0 => $pid;
-               last if $count != 1;
-               sleep 0.2;
-               redo;
-       }
-
-       return;
-
-} ## end of shutdown_cluster
-
-
-sub remove_cluster {
-
-       ## Remove a cluster, shutting it down first
-       ## Takes the cluster name
-
-       my $self = shift;
-       my $name = shift;
-
-       my $dirname = $pgver{$name}{dirname};
-
-       return if ! -d $dirname;
-
-       ## Just in case
-       $self->shutdown_cluster($name);
-
-       system("rm -fr $dirname");
-
-       return;
-
-} ## end of remove_cluster
-
 sub connect_database {
 
-       ## Given a cluster name, return a connection to it
-       ## Second arg is the database name, defaults to 'bucardo_test'
+       ## Return a connection to a database within a cluster
+       ## Arguments:
+       ## 1. Name of the cluster
+       ## 2. Name of the database (optional, defaults to 'bucardo_test')
+       ## Returns: database handle
 
        my $self = shift;
-       my $name = shift || 'A';
-       my $dbname = shift || 'bucardo_test';
-
-       my $clusterinfo = $clusterinfo{$name}
-               or die qq{I do not know about a cluster named "$name"};
+       my $clustername = shift or die;
+       my $ldbname = shift || $dbname;
 
-       my $dbport = $clusterinfo->{port};
-       my $dbhost = getcwd;
-       my $dirname = $pgver{$name}{dirname};
-       $dbhost .= "/$dirname/socket";
+       ## Create and start the cluster as needed
+       $self->start_cluster($clustername);
 
-       my $dsn = "dbi:Pg:dbname=$dbname;port=$dbport;host=$dbhost";
+       ## Build the DSN to connect with
+       my $info = $pgver{$clustername};
+       my $dbport = $info->{port};
+       my $dbhost = getcwd . "/$info->{dirname}/socket";
+       my $dsn = "dbi:Pg:dbname=$ldbname;port=$dbport;host=$dbhost";
 
+       ## If we already have a cached version and it responds, return it
        if (exists $dbh{$dsn}) {
                my $dbh = $dbh{$dsn};
                $dbh->ping and return $dbh;
+               ## No ping? Remove from the cache
+               $dbh->disconnect();
                delete $dbh{$dsn};
        }
 
        my $dbh;
        eval {
-               $dbh = DBI->connect($dsn, '', '', {AutoCommit=>0, RaiseError=>1, PrintError=>0});
+               $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 });
        };
        if ($@) {
-               if ($@ =~ /database "postgres" does not exist/) {
+               if ($ldbname eq 'postgres' and $@ =~ /"postgres"/) {
+
                        ## Probably an older version that uses template1
                        (my $localdsn = $dsn) =~ s/dbname=postgres/dbname=template1/;
+
+                       ## Give up right away if we are already trying template1
                        die $@ if $localdsn eq $dsn;
-                       $dbh = DBI->connect($localdsn, '', '', {AutoCommit=>1, RaiseError=>1, PrintError=>0});
+
+                       debug(qq{Connection failed, trying to connect to template1 to create a postgres database});
+
+                       ## Connect as template1 and create a postgres database
+                       $dbh = DBI->connect($localdsn, '', '', { AutoCommit=>1, RaiseError=>1, PrintError=>0 });
                        $dbh->do('CREATE DATABASE postgres');
+                       $dbh->disconnect();
+
                        ## Reconnect to our new database
-                       $dbh = DBI->connect($dsn, '', '', {AutoCommit=>0, RaiseError=>1, PrintError=>0});
+                       $dbh = DBI->connect($dsn, '', '', { AutoCommit=>0, RaiseError=>1, PrintError=>0 });
                }
                else {
                        die "$@\n";
                }
        }
 
-       $dbh->ping();
+       ## Just in case, set the search path
        $dbh->do('SET search_path = public');
        $dbh->commit();
 
@@ -556,65 +508,115 @@ sub connect_database {
 
 } ## end of connect_database
 
+sub drop_schema {
+
+       ## Drop a schema if it exists
+       ## Two arguments:
+       ## 1. database handle
+       ## 2. name of the schema
+       ## Returns 1 if dropped, 0 if not
+
+       my ($self,$dbh,$sname) = @_;
+
+       return 0 if ! schema_exists($dbh, $sname);
+
+       local $dbh->{AutoCommit} = 1;
+       local $dbh->{Warn} = 0;
+       $dbh->do("DROP SCHEMA $sname CASCADE");
+
+       return 1;
+
+} ## end of drop_schema
+
+
+sub repopulate_cluster {
+
+       ## Make sure a cluster is empty, then add in the sample data
+       ## Arguments:
+       ## 1. Name of the cluster
+       ## Returns: database handle to the 'bucardo_test' database
+
+       my $self = shift;
+       my $clustername = shift or die;
+
+       my $dbh = $self->empty_cluster($clustername);
+
+       $self->add_test_schema($dbh);
+
+       return $dbh;
+
+} ## end of repopulate_cluster
+
 
 sub add_test_schema {
 
        ## Add an empty test schema to a database
-       ## Takes a database handle
+       ## Arguments:
+       ## 1. database handle (usually to 'bucardo_test')
+       ## Returns: nothing
 
        my $self = shift;
-       my $dbh = shift;
+       my $dbh = shift or die;
 
-       ## Assume it is empty and just load it in
+       my ($tcount,$scount,$fcount) = (0,0,0);
 
        ## Empty out or create the droptest table
        if (table_exists($dbh => 'droptest')) {
                $dbh->do('TRUNCATE TABLE droptest');
        }
        else {
+               $tcount++;
                $dbh->do(q{
             CREATE TABLE droptest (
               name TEXT NOT NULL,
-              type TEXT NOT NULL,
-              inty INTEGER NOT NULL
+              type TEXT NOT NULL
             )
         });
        }
 
        ## Create the language if needed
        if (!language_exists($dbh => 'plpgsql')) {
+               debug(q{Creating language plpgsql'});
                $dbh->do('CREATE LANGUAGE plpgsql');
        }
-       $dbh->commit();
+       $dbh->commit() if ! $dbh->{AutoCommit};
 
        ## Create supporting functions as needed
        if (!function_exists($dbh => 'trigger_test')) {
+               $fcount++;
                $dbh->do(q{
                 CREATE FUNCTION trigger_test()
                 RETURNS trigger
                 LANGUAGE plpgsql
                 AS $_$ BEGIN
-                INSERT INTO droptest(name,type,inty)
-                    VALUES (TG_RELNAME, 'trigger', NEW.inty);
+                INSERT INTO droptest(name,type)
+                    VALUES (TG_RELNAME, 'trigger');
                 RETURN NULL;
                 END;
                 $_$
             });
        }
        if (!function_exists($dbh => 'trigger_test_zero')) {
+               $fcount++;
                $dbh->do(q{
                 CREATE FUNCTION trigger_test_zero()
                 RETURNS trigger
                 LANGUAGE plpgsql
                 AS $_$ BEGIN
-                INSERT INTO droptest(name,type,inty)
-                    VALUES (TG_RELNAME, 'trigger', 0);
+                INSERT INTO droptest(name,type)
+                    VALUES (TG_RELNAME, 'trigger');
                 RETURN NULL;
                 END;
                 $_$;
             });
        }
 
+       ## Create our helper domain for pseudo-types
+       if (domain_exists($dbh => 'int_unsigned')) {
+               $dbh->do('DROP DOMAIN int_unsigned CASCADE');
+       }
+       $dbh->do('CREATE DOMAIN int_unsigned INTEGER CHECK (value >= 0)');
+
        ## Create one table for each table type
        for my $table (sort keys %tabletype) {
 
@@ -640,6 +642,7 @@ sub add_test_schema {
                        };
 
                $dbh->do($SQL);
+               $tcount++;
 
                if ($table =~ /test2/) {
                        $dbh->do("ALTER TABLE $table ADD CONSTRAINT multipk PRIMARY KEY ($pkeyname,data1)");
@@ -658,13 +661,14 @@ sub add_test_schema {
                $SQL = qq{
                        CREATE OR REPLACE RULE bcrule_$table
                        AS ON INSERT TO $table
-                       DO ALSO INSERT INTO droptest(name,type,inty) VALUES ('$table','rule',NEW.inty)
+                       DO ALSO INSERT INTO droptest(name,type) VALUES ('$table','rule')
                        };
                $table =~ /0/ and $SQL =~ s/NEW.inty/0/;
                $dbh->do($SQL);
-
        }
+
        if ( !table_exists($dbh => 'bucardo_test_multicol') ) {
+               $tcount++;
                $dbh->do(q{CREATE TABLE bucardo_test_multicol (
         id   INTEGER,
         id2  INTEGER,
@@ -673,7 +677,7 @@ sub add_test_schema {
         PRIMARY KEY (id, id2, id3))});
        }
 
-       ## Create one table for each table type
+       ## Create one sequence for each table type
        for my $seq (sort keys %sequences) {
 
                local $dbh->{Warn} = 0;
@@ -685,82 +689,267 @@ sub add_test_schema {
 
                $SQL = qq{CREATE SEQUENCE $seq};
                $dbh->do($SQL);
+               $scount++;
        }
 
-       $dbh->commit();
+       debug("Test objects created. Tables: $tcount  Sequences: $scount  Functions: $fcount");
+
+       $dbh->commit() if ! $dbh->{AutoCommit};
 
        return;
 
 } ## end of add_test_schema
 
-sub setup_bucardo {
 
-       ## Import the bucardo schema into a database named 'bucardo_control_test'
-       ## Takes a cluster name and an optional database handle
-       ## Returns a handle to the control database
+sub add_test_databases {
 
-       my $self = shift;
-       my $name = shift || 'A';
-       my $dbh = shift || $self->connect_database($name);
+       ## Add one or more databases to the bucardo.db table
+       ## Arguments:
+       ## 1. White-space separated db names
+       ## Returns: nothing
 
-       my $dbname = 'bucardo_control_test';
+       my $self = shift;
+       my $string = shift or die;
 
-       if (!database_exists($dbh => $dbname)) {
-               local $dbh->{AutoCommit} = 1;
-               $dbh->do("CREATE DATABASE $dbname");
-               $dbh->do("CREATE SCHEMA bucardo");
-               $dbh->do("CREATE SCHEMA freezer");
-               $dbh->do("ALTER DATABASE $dbname SET search_path = bucardo, freezer, public");
-               $DEBUG and warn "Creating database $dbname\n";
+       for my $db (split /\s+/ => $string) {
+               my $ctlargs = $self->add_db_args($db);
+               my $i = $self->ctl("add database bucardo_test $ctlargs");
+               die $i if $i =~ /ERROR/;
        }
 
-       ## Are we connected to this database? If not, connect to it
-       $SQL = "SELECT current_database()";
-       my $localdb = $dbh->selectall_arrayref($SQL)->[0][0];
-       if ($localdb ne $dbname) {
-               $dbh = $self->connect_database($name, $dbname);
-       }
+       return;
 
-       ## Create the languages if needed
-       if (!language_exists($dbh => 'plpgsql')) {
-               $dbh->do('CREATE LANGUAGE plpgsql');
-       }
-       if (!language_exists($dbh => 'plperlu')) {
-               $dbh->do('CREATE LANGUAGE plperlu');
-       }
-       $dbh->commit();
+} ## end of add_test_databases
 
-       ## Drop the existing schemas
-       if (schema_exists($dbh => 'bucardo')) {
-               local $dbh->{Warn};
-               $dbh->do('DROP SCHEMA bucardo CASCADE');
-               $dbh->do('DROP SCHEMA freezer CASCADE');
-       }
-       $dbh->commit();
 
-       add_bucardo_schema_to_database($dbh);
+sub add_db_args {
 
-       return $dbh;
+       ## Arguments:
+       ## 1. Name of a cluster
+       ## Returns: DSN-like string to connect to that cluster
+       ## May return string or array dependig on how it was called
 
-} ## end of setup_bucardo
+       my $self = shift;
+       my $clustername = shift or die;
 
-sub thing_exists {
-       my ($dbh,$name,$table,$column) = @_;
-       my $SQL = "SELECT 1 FROM $table WHERE $column = ?";
-       my $sth = $dbh->prepare($SQL);
-       $count = $sth->execute($name);
-       $sth->finish();
-       $dbh->commit();
-       return $count < 1 ? 0 : $count;
-}
+       ## Build the DSN to connect with
+       my $info = $pgver{$clustername};
+       my $dbport = $info->{port};
+       my $dbhost = getcwd . "/$info->{dirname}/socket";
+       my $dsn = "dbi:Pg:dbname=$dbname;port=$dbport;host=$dbhost";
 
-sub schema_exists   { return thing_exists(@_, 'pg_namespace', 'nspname'); }
-sub language_exists { return thing_exists(@_, 'pg_language',  'lanname'); }
-sub database_exists { return thing_exists(@_, 'pg_database',  'datname'); }
-sub user_exists     { return thing_exists(@_, 'pg_user',      'usename'); }
+       my $arg = 
+
+       return wantarray
+               ? ($user,$dbport,$dbhost)
+               : "name=$dbname user=$user port=$dbport host=$dbhost";
+
+} ## end of add_db_args
+
+
+sub stop_bucardo {
+
+       ## Stops Bucardo via a bucardo_ctl request
+       ## Arguments: none
+       ## Returns: 1
+
+       my $self = shift;
+
+       $self->ctl('stop testing');
+
+       sleep 0.2;
+
+       return 1;
+
+} ## end of stop_bucardo
+
+
+sub ctl {
+
+       ## Run a simple non-forking command against bucardo_ctl
+       ## Emulates a command-line invocation
+       ## Arguments:
+       ## 1. String to pass to bucardo_ctl
+       ## Returns: answer as a string
+
+       my ($self,$args) = @_;
+
+       my $info;
+       my $ctl = $self->{bucardo_ctl};
+
+       ## Build the connection options
+       my $bc = $self->{bcinfo};
+       my $connopts = '';
+       for my $arg (qw/host port pass/) {
+               my $val = 'DB' . (uc $arg) . '_bucardo';
+               next unless exists $bc->{$val} and length $bc->{$val};
+               $connopts .= " --db$arg=$bc->{$val}";
+       }
+       $connopts .= " --dbname=bucardo --debugfile=1";
+       $connopts .= " --dbuser=$user";
+       ## Just hard-code these, no sense in multiple Bucardo base dbs yet:
+       $connopts .= " --dbport=58921";
+       my $dbhost = getcwd;
+       my $dirname = $pgver{A}{dirname};
+       $dbhost .= "/$dirname/socket";
+       $connopts .= " --dbhost=$dbhost";
+
+       ## Whitespace cleanup
+       $args =~ s/^\s+//s;
+
+       ## Allow the caller to look better
+       $args =~ s/^bucardo_ctl//;
+
+       debug("Connection options: $connopts Args: $args", 3);
+       eval {
+               $info = qx{$ctl $connopts $args 2>&1};
+       };
+       if ($@) {
+               return "Error running bucardo_ctl: $@\n";
+       }
+       debug("bucardo_ctl said: $info", 3);
+
+       return $info;
+
+} ## end of ctl
+
+
+sub restart_bucardo {
+
+       ## Start Bucardo, but stop first if it is already running
+       ## Arguments:
+       ## 1. database handle to the bucardo_control_test db
+       ## 2. The notice we wait for, defaults to: bucardo_started
+       ## 3. The message to give to the "pass" function, defaults to: Bucardo was started
+       ## Returns: nothing
+
+       my ($self,$dbh,$notice,$passmsg) = @_;
+
+       $notice ||= 'bucardo_started';
+       $passmsg ||= 'Bucardo was started';
+
+       $self->stop_bucardo();
+
+       pass('Starting up Bucardo');
+       $dbh->do('LISTEN bucardo_boot');
+       $dbh->do('LISTEN bucardo_started');
+       $dbh->do('LISTEN bucardo_nosyncs');
+       $dbh->commit();
+
+       $self->ctl('start testing');
+
+       my $bail = 10;
+       my $n;
+  WAITFORIT: {
+               if ($bail--<0) {
+                       die "Bucardo did not start, but we waited!\n";
+               }
+               while ($n = $dbh->func('pg_notifies')) {
+                       last WAITFORIT if $n->[0] eq $notice;
+               }
+               $dbh->commit();
+               sleep 0.2;
+               redo;
+       }
+       pass($passmsg);
+
+       return 1;
+
+} ## end of restart_bucardo
+
+sub setup_bucardo {
+
+       ## Installs bucardo via "bucardo_ctl install" into a database
+       ## The database will be emptied out first if it already exists
+       ## If it does not exist, it will be created
+       ## If the cluster does not exist, it will be created
+       ## Arguments:
+       ## 1. Name of the cluster
+       ## Returns: database handle to the bucardo database
+
+       my $self = shift;
+       my $clustername = shift or die;
+
+       $self->create_cluster($clustername);
+       my $dbh = $self->connect_database($clustername, 'postgres');
+       if (database_exists($dbh,'bucardo')) {
+               ## Kick off all other people
+               $SQL = q{SELECT procpid FROM pg_stat_activity WHERE datname = 'bucardo' and procpid <> pg_backend_pid()};
+               for my $row ($dbh->selectall_arrayref($SQL)) {
+                       my $pid = $row->[0][0];
+                       $SQL = 'SELECT pg_terminate_backend(?)';
+                       $sth = $dbh->prepare($SQL);
+                       $sth->execute($pid);
+               }
+               $dbh->commit();
+               debug(qq{Dropping database bucardo from cluster $clustername});
+               local $dbh->{AutoCommit} = 1;
+               $dbh->do('DROP DATABASE bucardo');
+       }
+
+       ## Make sure we have a postgres role
+       if (! user_exists($dbh, 'postgres')) {
+               $dbh->do('CREATE USER postgres SUPERUSER');
+               $dbh->commit();
+       }
+
+       ## Now run the install. Timeout after a few seconds
+       debug(qq{Running bucardo_ctl install on cluster $clustername});
+       my $info;
+       eval {
+               local $SIG{ALRM} = sub { die "Alarum!\n"; };
+               alarm 5;
+               $info = $self->ctl('install --batch');
+               alarm 0;
+       };
+       if ($@ =~ /Alarum/ or $info =~ /Alarum/) {
+               warn "bucardo_ctl install never finished!\n";
+               exit;
+       }
+       $@ and die $@;
+
+       if ($info !~ /Installation is now complete/) {
+               die "Installation failed: $info\n";
+       }
+
+       ## Reconnect to the new database
+       $dbh = $self->connect_database($clustername, 'bucardo');
+
+       ## Make some adjustments
+       $sth = $dbh->prepare('UPDATE bucardo.bucardo_config SET value = $2 WHERE setting = $1');
+       $count = $sth->execute('piddir' => $PIDDIR);
+       $count = $sth->execute('reason_file' => "$PIDDIR/reason");
+       $count = $sth->execute('audit_pid' => 1);
+       $dbh->commit();
+
+       ## Adjust a second way
+       $self->ctl('set log_level=debug');
+
+       debug(qq{Install complete});
+
+       return $dbh;
+
+} ## end of setup_bucardo
+
+## Utility functions for object existences:
+sub thing_exists {
+       my ($dbh,$name,$table,$column) = @_;
+       my $SQL = "SELECT 1 FROM $table WHERE $column = ?";
+       my $sth = $dbh->prepare($SQL);
+       $count = $sth->execute($name);
+       $sth->finish();
+       $dbh->commit() if ! $dbh->{AutoCommit};
+       return $count < 1 ? 0 : $count;
+}
+sub schema_exists   { return thing_exists(@_, 'pg_namespace', 'nspname'); }
+sub language_exists { return thing_exists(@_, 'pg_language',  'lanname'); }
+sub database_exists { return thing_exists(@_, 'pg_database',  'datname'); }
+sub user_exists     { return thing_exists(@_, 'pg_user',      'usename'); }
 sub table_exists    { return thing_exists(@_, 'pg_class',     'relname'); }
 sub function_exists { return thing_exists(@_, 'pg_proc',      'proname'); }
+sub domain_exists   { return thing_exists(@_, 'pg_type',      'typname'); }
 
+## Hack to override some Test::More methods
 ## no critic
 {
        no warnings; ## Yes, we know they are being redefined!
@@ -819,6 +1008,239 @@ sub function_exists { return thing_exists(@_, 'pg_proc',      'proname'); }
        } ## end of ok
 }
 ## use critic
+## end of Test::More hacks
+
+
+sub wait_for_notice {
+
+       ## Wait until a named NOTIFY is issued
+       ## Arguments:
+       ## 1. The listen string
+       ## 2. Seconds until we give up
+       ## 3. Seconds we sleep between checks
+       ## 4. Boolean: bail out if not found (defaults to true)
+
+       my $dbh = shift;
+       my $text = shift;
+       my $timeout = shift || $TIMEOUT_NOTICE;
+       my $sleep = shift || $TIMEOUT_SLEEP;
+    my $bail = shift;
+    $bail = 1 if !defined($bail);
+       my $n;
+       eval {
+               local $SIG{ALRM} = sub { die "Lookout!\n"; };
+               alarm $timeout;
+         N: {
+                       while ($n = $dbh->func('pg_notifies')) {
+                               if ($n->[0] eq $text) {
+                    last N;
+                }
+                               else {
+                    debug("notice was $n->[0]", 1);
+                }
+                       }
+                       sleep $sleep;
+                       redo;
+               }
+               alarm 0;
+       };
+       if ($@) {
+               if ($@ =~ /Lookout/o) {
+                       my $line = (caller)[2];
+            my $notice = qq{Gave up waiting for notice "$text": timed out at $timeout from line $line};
+                       if ($bail) {
+                Test::More::BAIL_OUT ($notice);
+            }
+            else {
+                die $notice;
+            }
+                       return;
+               }
+       }
+       return;
+
+} ## end of wait_for_notice
+
+
+## Older methods:
+
+sub fresh_database {
+
+       ## Drop and create the bucardo_test database
+       ## First arg is cluster name
+       ## Second arg is hashref, can be 'dropdb'
+
+       my $self = shift;
+       my $name = shift || 'A';
+       my $arg = shift || {};
+
+       my $dirname = $pgver{$name}{dirname};
+
+       ## Just in case
+       -d $dirname or $self->create_cluster($name);
+       -e "$dirname/postmaster.pid" or $self->start_cluster($name);
+
+       my $dbh = $self->connect_database($name, 'postgres');
+
+       my $brandnew = 0;
+       {
+               if (database_exists($dbh => $dbname) and $arg->{dropdb}) {
+                       local $dbh->{AutoCommit} = 1;
+                       debug("Dropping database $dbname");
+                       $dbh->do("DROP DATABASE $dbname");
+               }
+               if (!database_exists($dbh => $dbname)) {
+                       local $dbh->{AutoCommit} = 1;
+                       debug("Creating database $dbname");
+                       $dbh->do("CREATE DATABASE $dbname");
+                       $brandnew = 1;
+                       $dbh->disconnect();
+               }
+       }
+
+       $dbh = $self->connect_database($name, $dbname);
+
+       return $dbh if $brandnew;
+
+       $self->empty_test_database($dbh);
+
+       return $dbh;
+
+} ## end of fresh_database
+
+
+
+sub create_database {
+
+       ## Create a new database
+       ## First argument is the cluster name
+       ## Second argument is the name of the database
+       ## If the database already exists, nothing will be done
+       ## Returns a database handle to the database
+
+       my $self = shift;
+       my $clustername = shift or die;
+       my $dbname = shift or die;
+
+       my $dirname = $pgver{$clustername}{dirname};
+
+       ## Create the cluster if needed
+       -d $dirname or $self->create_cluster($clustername);
+
+       ## Start the cluster up if needed
+       -e "$dirname/postmaster.pid" or $self->start_cluster($clustername);
+
+       ## Connect to the database
+
+       my $dbh = $self->connect_database($clustername, 'postgres');
+
+       if (! database_exists($dbh => $dbname)) {
+               local $dbh->{AutoCommit} = 1;
+               debug("Creating database $dbname");
+               $dbh->do("CREATE DATABASE $dbname");
+               $dbh->disconnect();
+       }
+
+       $dbh = $self->connect_database($clustername, $dbname);
+
+       return $dbh;
+
+} ## end of create_database
+
+
+sub empty_test_database {
+
+       ## Wipe all data tables from a test database
+       ## Takes a database handle as only arg
+
+       my $self = shift;
+       my $dbh = shift;
+
+       if ($dbh->{pg_server_version} >= 80300) {
+               $dbh->do(q{SET session_replication_role = 'replica'});
+       }
+
+       for my $table (sort keys %tabletype) {
+               $dbh->do("TRUNCATE TABLE $table");
+       }
+
+       for my $table (@tables2empty) {
+               $dbh->do("TRUNCATE TABLE $table");
+       }
+
+       if ($dbh->{pg_server_version} >= 80300) {
+               $dbh->do(q{SET session_replication_role = 'origin'});
+       }
+       $dbh->commit;
+
+       return;
+
+} ## end of empty_test_database
+
+sub shutdown_cluster {
+
+       ## Shutdown a cluster if running
+       ## Takes the cluster name
+
+       my $self = shift;
+       my $name = shift;
+
+       my $dirname = $pgver{$name}{dirname};
+
+       return if ! -d $dirname;
+
+       my $pidfile = "$dirname/postmaster.pid";
+       return if ! -e $pidfile;
+
+       open my $fh, '<', $pidfile or die qq{Could not open "$pidfile": $!\n};
+       <$fh> =~ /(\d+)/ or die qq{No PID found in file "$pidfile"\n};
+       my $pid = $1;
+       close $fh or die qq{Could not close "$pidfile": $!\n};
+       ## Make sure it's still around
+       $count = kill 0 => $pid;
+       if ($count != 1) {
+               debug("Removing $pidfile");
+               unlink $pidfile;
+       }
+       $count = kill 15 => $pid;
+       {
+               $count = kill 0 => $pid;
+               last if $count != 1;
+               sleep 0.2;
+               redo;
+       }
+
+       return;
+
+} ## end of shutdown_cluster
+
+
+sub remove_cluster {
+
+       ## Remove a cluster, shutting it down first
+       ## Takes the cluster name
+
+       my $self = shift;
+       my $name = shift;
+
+       my $dirname = $pgver{$name}{dirname};
+
+       return if ! -d $dirname;
+
+       ## Just in case
+       $self->shutdown_cluster($name);
+
+       system("rm -fr $dirname");
+
+       return;
+
+} ## end of remove_cluster
+
+
+
+
+
+
 
 
 sub tt {
@@ -826,7 +1248,7 @@ sub tt {
        my $name = shift or die qq{Need a name!\n};
        if (exists $timing{$name}) {
                my $newtime = tv_interval($timing{$name});
-               $DEBUG and warn "Timing for $name: $newtime\n";
+               debug("Timing for $name: $newtime");
                delete $timing{$name};
        }
        else {
@@ -836,20 +1258,19 @@ sub tt {
 } ## end of tt
 
 sub t {
+
        $testmsg = shift;
        $testline = shift || (caller)[2];
        $testmsg =~ s/^\s+//;
        if ($location) {
                $testmsg = "($location) $testmsg";
        }
-       if ($showline) {
-               $testmsg .= " [line: $testline]";
-       }
-       if ($showtime) {
-               my $time = time;
-               $testmsg .= " [time: $time]";
-       }
+       $testmsg .= " [line: $testline]";
+       my $time = time;
+       $testmsg .= " [time: $time]";
+
        return;
+
 } ## end of t
 
 sub add_bucardo_schema_to_database {
@@ -933,84 +1354,7 @@ sub add_bucardo_schema_to_database {
 
 } ## end of add_bucardo_schema_to_database
 
-sub add_db_args {
-
-       ## Return a DSN-like string for a particular named cluster
-       my ($self,$name) = @_;
-
-       my $clusterinfo = $clusterinfo{$name}
-               or die qq{I do not know how to create a cluster named "$name"};
-
-       my $port = $clusterinfo->{port};
 
-       my $host = getcwd;
-       my $dirname = $pgver{$name}{dirname};
-       $host .= "/$dirname/socket";
-
-       my $arg = "name=$name user=$user port=$port host=$host";
-
-       return $arg;
-
-} ## end of add_db_args
-
-
-sub ctl {
-
-       ## Run a simple non-forking command against bucardo_ctl, get the answer back as a string
-       ## Emulates a command-line invocation
-
-       my ($self,$args) = @_;
-
-       my $info;
-       my $ctl = $self->{bucardo_ctl};
-
-       ## Build the connection options
-       my $bc = $self->{bcinfo};
-       my $connopts = '';
-       for my $arg (qw/host port pass/) {
-               my $val = 'DB' . (uc $arg) . '_bucardo';
-               next unless exists $bc->{$val} and length $bc->{$val};
-               $connopts .= " --db$arg=$bc->{$val}";
-       }
-       $connopts .= " --dbname=bucardo_control_test --debugfile=1";
-       $connopts .= " --dbuser=$user";
-       ## Just hard-code these, no sense in multiple Bucardo base dbs yet:
-       $connopts .= " --dbport=58921";
-       my $dbhost = getcwd;
-       my $dirname = $pgver{A}{dirname};
-       $dbhost .= "/$dirname/socket";
-       $connopts .= " --dbhost=$dbhost";
-
-       $DEBUG >=3 and warn "Connection options: $connopts Args: $args\n";
-       eval {
-               $info = qx{$ctl $connopts $args 2>&1};
-       };
-       if ($@) {
-               return "Error running bucardo_ctl: $@\n";
-       }
-       $DEBUG >= 3 and warn "bucardo_ctl said: $info\n";
-
-       return $info;
-
-} ## end of ctl
-
-sub add_test_databases {
-
-       ## Add one or more databases to the bucardo.db table
-       ## Arg is a string containing white-space separated db names
-
-       my $self = shift;
-       my $string = shift;
-
-       for my $db (split /\s+/ => $string) {
-               my $ctlargs = $self->add_db_args($db);
-               my $i = $self->ctl("add database bucardo_test $ctlargs");
-               die $i if $i =~ /ERROR/;
-       }
-
-       return;
-
-} ## end of add_test_databases
 
 
 sub add_test_tables_to_herd {
@@ -1049,58 +1393,8 @@ sub add_test_tables_to_herd {
 
 
 
-sub restart_bucardo {
-
-       ## Start Bucardo, but stop first if it is already running
-       ## Pass in a database handle to the bucardo_control_test db
-
-       my ($self,$dbh,$notice,$passmsg) = @_;
-
-       ## Which notice is good enough?
-       $notice ||= 'bucardo_started';
-       $passmsg ||= 'Bucardo was started';
-
-       $self->stop_bucardo();
-
-       pass('Starting up Bucardo');
-       $dbh->do('LISTEN bucardo_boot');
-       $dbh->do('LISTEN bucardo_started');
-       $dbh->do('LISTEN bucardo_nosyncs');
-       $dbh->commit();
-
-       $self->ctl('start testing');
-
-       my $bail = 10;
-       my $n;
-  WAITFORIT: {
-               if ($bail--<0) {
-                       die "Bucardo did not start, but we waited!\n";
-               }
-               while ($n = $dbh->func('pg_notifies')) {
-                       last WAITFORIT if $n->[0] eq $notice;
-               }
-               $dbh->commit();
-               sleep 0.2;
-               redo;
-       }
-       pass($passmsg);
-
-       return 1;
-
-} ## end of restart_bucardo
-
-
-sub stop_bucardo {
-
-       my ($self,$dbh) = @_;
-
-       $self->ctl('stop testing');
-
-       sleep 0.2;
 
-       return 1;
 
-} ## end of stop_bucardo
 
 
 sub bc_deeply {
@@ -1133,47 +1427,6 @@ sub clear_notices {
     0 while (my $n = $dbh->func('pg_notifies'));
 }
 
-sub wait_for_notice {
-
-       my $dbh = shift;
-       my $text = shift;
-       my $timeout = shift || $TIMEOUT_NOTICE;
-       my $sleep = shift || $TIMEOUT_SLEEP;
-    my $bail = shift;
-    $bail = 1 if !defined($bail);
-       my $n;
-       eval {
-               local $SIG{ALRM} = sub { die "Lookout!\n"; };
-               alarm $timeout;
-         N: {
-                       while ($n = $dbh->func('pg_notifies')) {
-                               if ($n->[0] eq $text) {
-                    last N;
-                } else {
-                    print "$n->[0]\n" if $DEBUG;
-                }
-                       }
-                       sleep $sleep;
-                       redo;
-               }
-               alarm 0;
-       };
-       if ($@) {
-               if ($@ =~ /Lookout/o) {
-                       my $line = (caller)[2];
-            my $notice = qq{Gave up waiting for notice "$text": timed out at $timeout from line $line};
-                       if ($bail) {
-                Test::More::BAIL_OUT ($notice);
-            }
-            else {
-                die $notice;
-            }
-                       return;
-               }
-       }
-       return;
-
-} ## end of wait_for_notice
 
 sub get_pgctl_options {
     my $dirname = shift;
@@ -1240,4 +1493,98 @@ sub scrub_bucardo_tables {
 
 
 
+
+sub setup_monkey_data {
+
+       my ($self,$dbh) = @_;
+
+       ## Make sure the database is setup with the data for the "monkey" tests
+
+       ## If already there, reset everything
+
+       debug('Inside setup_monkey_data');
+
+       ## Database A contains bucardo itself and is the first "master"
+       $self->start_cluster('A');
+
+       my $dbhA = $self->connect_database('A', 'postgres');
+
+       $dbhA->{AutoCommit} = 1;
+
+       if ( !database_exists($dbh => 'bucardo')) {
+               
+       }
+
+
+
+
+       ## First, we need the clusters to be there.
+       $self->start_cluster('B');
+       $self->start_cluster('C');
+
+
+
+
+       ## Next, we install Bucardo into database A
+       if (schema_exists($dbhA => 'bucardo')) {
+               $dbhA->do('DROP SCHEMA bucardo CASCADE');
+               $dbhA->commit();
+       }
+       eval {
+               $dbhA->do('CREATE USER postgres SUPERUSER');
+       };
+       $dbhA->commit();
+
+       my $info;
+       eval {
+               local $SIG{ALRM} = sub { die "Alarum!\n"; };
+               alarm 3;
+               $info = $self->ctl('install --batch');
+               alarm 0;
+       };
+       if ($@ and $@ =~ /Alarum/ or $info =~ /Alarum/) {
+               warn "bucardo_ctl install never finished!\n";
+               exit;
+       }
+       $@ and die $@;
+
+       if ($info !~ /Installation is now complete/) {
+               die "Installation failed\n";
+       }
+die Dumper $info;
+exit;
+       debug('Bucardo has been installed on database A');
+
+       my $dbname = 'monkey';
+
+       ## If needed, create the monkey database
+       if (! database_exists($dbhA => $dbname)) {
+               debug("Creating database $dbname");
+               $dbhA->do("CREATE DATABASE $dbname");
+       }
+
+       ## Drop the public schema and recreate with all tables
+       if (schema_exists($dbhA => 'public')) {
+               debug('Dropping the public schema');
+               $dbhA->do('DROP SCHEMA public CASCADE');
+       }
+       $dbhA->do('CREATE SCHEMA public');
+
+       $self->add_test_schema($dbhA);
+
+       if (! database_exists($dbhA => "${dbname}_template")) {
+               $dbhA->do("CREATE DATABASE ${dbname}_template");
+       }
+
+
+
+       $dbhA->disconnect();
+
+       ## XXX: Make a template db for all of this
+
+       return;
+
+} ## End of setup_monkey_data
+
+
 1;