From 3a2596be2a55a7eed71ceb88eabc2abdd66d2227 Mon Sep 17 00:00:00 2001 From: Greg Sabino Mullane Date: Fri, 1 Jul 2011 12:50:43 -0400 Subject: [PATCH] Add in basic Redis support. Make sure we add new tables before new columns when upgrading. Rearrange all the databases, for consistent order of types. --- Bucardo.pm | 96 ++++++++++++++++++++------- bucardo | 172 ++++++++++++++++++++++++++++++++----------------- bucardo.schema | 39 +++++------ 3 files changed, 206 insertions(+), 101 deletions(-) diff --git a/Bucardo.pm b/Bucardo.pm index b575c4282..fee1d0213 100644 --- a/Bucardo.pm +++ b/Bucardo.pm @@ -718,9 +718,7 @@ sub mcp_main { for my $dbname (keys %{ $self->{sdb} }) { $x = $self->{sdb}{$dbname}; - next if $x->{dbtype} =~ /flat/o; - - next if $x->{dbtype} =~ /mongo/o; + next if $x->{dbtype} =~ /flat|mongo|redis/o; if (! $x->{dbh}->ping) { ## Database is not reachable, so we'll try and reconnect @@ -1780,7 +1778,7 @@ sub start_kid { ## Set up some common groupings of the databases inside sync->{db} ## Also setup common attributes my (@dbs, @dbs_source, @dbs_target, @dbs_connectable, @dbs_dbi, - @dbs_postgres, @dbs_drizzle, @dbs_mongo, @dbs_mysql, @dbs_oracle); + @dbs_postgres, @dbs_drizzle, @dbs_mongo, @dbs_mysql, @dbs_oracle, @dbs_redis); for my $dbname (sort keys %{ $sync->{db} }) { $x = $sync->{db}{$dbname}; @@ -1827,7 +1825,7 @@ sub start_kid { } ## MySQL - if ('postgres' eq $x->{dbtype}) { + if ('mysql' eq $x->{dbtype}) { push @dbs_mysql => $dbname; $x->{does_sql} = 1; $x->{does_truncate} = 1; @@ -1843,6 +1841,11 @@ sub start_kid { $x->{does_savepoints} = 1; } + ## Redis + if ('redis' eq $x->{dbtype}) { + push @dbs_redis => $dbname; + } + ## Flat files if ($x->{dbtype} =~ /flat/) { $x->{does_append_only} = 1; @@ -4206,7 +4209,13 @@ sub connect_database { return 0, 'flat'; } - if ('mongo' eq $dbtype) { + if ('postgres' eq $dbtype) { + $dsn = "dbi:Pg:dbname=$d->{dbname}"; + } + elsif ('drizzle' eq $dbtype) { + $dsn = "dbi:drizzle:database=$d->{dbname}"; + } + elsif ('mongo' eq $dbtype) { my $dsn = {}; for my $name (qw/ dbhost dbport dbuser dbpass /) { defined $d->{$name} and length $d->{$name} and $dsn->{$name} = $d->{$name}; @@ -4219,19 +4228,36 @@ sub connect_database { return $backend, $dbh; } - - if ('postgres' eq $dbtype) { - $dsn = "dbi:Pg:dbname=$d->{dbname}"; - } elsif ('mysql' eq $dbtype) { $dsn = "dbi:mysql:database=$d->{dbname}"; } - elsif ('drizzle' eq $dbtype) { - $dsn = "dbi:drizzle:database=$d->{dbname}"; - } elsif ('oracle' eq $dbtype) { $dsn = "dbi:Oracle:$d->{dbname}"; } + elsif ('redis' eq $dbtype) { + my $dsn = {}; + for my $name (qw/ dbhost dbport dbuser dbpass /) { + defined $d->{$name} and length $d->{$name} and $dsn->{$name} = $d->{$name}; + } + my @dsn; + my $server = ''; + if (defined $d->{host} and length $d->{host}) { + $server = $d->{host}; + } + if (defined $d->{port} and length $d->{port}) { + $server = ":$d->{port}"; + } + if ($server) { + push @dsn => 'server', $server; + } + + ## For now, we simply require it + require Redis; + $dbh = Redis->new(@dsn); + my $backend = 0; + + return $backend, $dbh; + } else { die qq{Cannot handle databases of type "$dbtype"\n}; } @@ -5326,9 +5352,12 @@ sub validate_sync { ## Flat files are obviously skipped as we create them de novo next if $self->{sdb}{$dbname}{dbtype} =~ /flat/o; - ## Mongo is skipped because it can create things on the fly + ## Mongo is skipped because it can create schemas on the fly next if $self->{sdb}{$dbname}{dbtype} =~ /mongo/o; + ## Redis is skipped because we can create keys on the fly + next if $self->{sdb}{$dbname}{dbtype} =~ /redis/o; + ## MySQL/Drizzle/Oracle is skipped for now, but should be added later next if $self->{sdb}{$dbname}{dbtype} =~ /mysql|drizzle|oracle/o; @@ -5726,8 +5755,7 @@ sub fork_controller { for my $dbname (keys %{ $self->{sdb} }) { $x = $self->{sdb}{$dbname}; - next if $x->{dbtype} =~ /flat/o; - next if $x->{dbtype} =~ /mongo/o; + next if $x->{dbtype} =~ /flat|mongo|redis/o; $x->{dbh}->{InactiveDestroy} = 1; } @@ -6677,6 +6705,11 @@ sub table_has_rows { $sth->finish(); return $count >= 1 ? 1 : 0; } + elsif ('mongo' eq $x->{dbtype}) { + my $collection = $x->{dbh}->get_collection($tname); + $count = $collection->count({}); + return $count >= 1 ? 1 : 0; + } elsif ('oracle' eq $x->{dbtype}) { $SQL = "SELECT 1 FROM $tname WHERE rownum > 1"; $sth = $x->{dbh}->prepare($SQL); @@ -6684,10 +6717,9 @@ sub table_has_rows { $sth->finish(); return $count >= 1 ? 1 : 0; } - elsif ('mongo' eq $x->{dbtype}) { - my $collection = $x->{dbh}->get_collection($tname); - $count = $collection->count({}); - return $count >= 1 ? 1 : 0; + elsif ('redis' eq $x->{dbtype}) { + ## No sense in returning anything here + return 0; } else { die "Cannot handle database type $x->{dbtype} yet!"; @@ -7051,6 +7083,9 @@ sub delete_table { my $res = $collection->remove({}, { safe => 1} ); $count = $res->{n}; } + elsif ('redis' eq $x->{dbtype}) { + ## Do nothing here yet + } else { die "Do not know how to delete a dbtype of $x->{dbtype}"; } @@ -7114,6 +7149,10 @@ sub delete_rows { $self->{collection}->remove({}, { safe => 1} ); next; } + ## For Redis, do nothing + elsif ('redis' eq $type) { + next; + } ## For flatfiles, write out a basic truncate statement elsif ($type =~ /flat/o) { printf {$t->{filehandle}} qq{TRUNCATE TABLE %S;\n\n}, @@ -7142,8 +7181,8 @@ sub delete_rows { my $type = $t->{dbtype}; - ## No special preparation for mongo - next if 'mongo' eq $type; + ## No special preparation for mongo or redis + next if $type =~ /mongo|redis/; ## Set the type of SQL we are using: IN vs ANY my $sqltype = ''; @@ -7300,6 +7339,11 @@ sub delete_rows { next; } + if ('redis' eq $type) { + ## TODO + next; + } + if ($type =~ /flatpg/o) { print {$t->{filehandle}} qq{$SQL{PGIN};\n\n}; $self->glog(qq{Appended to flatfile "$t->{filename}"}, LOG_VERBOSE); @@ -7336,7 +7380,7 @@ sub delete_rows { $count = 0; for my $t (@$deldb) { - next if $t->{dbtype} =~ /mongo|flat/o; + next if $t->{dbtype} =~ /mongo|flat|redis/o; $count += $count{$t}; $self->glog(qq{Rows deleted from $t->{name}.$S.$T: $count{$t}}, LOG_VERBOSE); } @@ -7415,6 +7459,9 @@ sub push_rows { elsif ('mongo' eq $type) { $self->{collection} = $t->{dbh}->get_collection($tname); } + elsif ('redis' eq $type) { + ## TODO + } elsif ('mysql' eq $type or 'drizzle' eq $type) { my $tgtcmd = "INSERT INTO $tname VALUES ("; $tgtcmd .= '?,' x keys %{ $goat->{columnhash} }; @@ -7492,6 +7539,9 @@ sub push_rows { my @cols = map { $_ = undef if $_ eq '\\N'; $_; } split /\t/ => $buffer; $count += $t->{sth}->execute(@cols); } + elsif ('redis' eq $type) { + ## TODO + } } } diff --git a/bucardo b/bucardo index aee0b107b..3c67281e5 100755 --- a/bucardo +++ b/bucardo @@ -803,10 +803,11 @@ sub add_database { ## Clean up and standardize the names $vals->{dbtype} = lc $vals->{dbtype}; $vals->{dbtype} =~ s/postgres.*/postgres/io; + $vals->{dbtype} =~ s/drizzle.*/drizzle/io; $vals->{dbtype} =~ s/mongo.*/mongo/io; $vals->{dbtype} =~ s/mysql.*/mysql/io; - $vals->{dbtype} =~ s/drizzle.*/drizzle/io; $vals->{dbtype} =~ s/oracle.*/oracle/io; + $vals->{dbtype} =~ s/redis.*/redis/io; ## Attempt to insert this into the database $SQL = "INSERT INTO bucardo.db ($cols) VALUES ($phs)"; @@ -833,7 +834,7 @@ sub add_database { last TESTCONN if 'flatfile' eq $vals->{dbtype}; ## Must have a valid type - if ($vals->{dbtype} !~ /^(?:postgres|mongo|mysql|drizzle|oracle)$/) { + if ($vals->{dbtype} !~ /^(?:postgres|drizzle|mongo|mysql|oracle|redis)$/) { die qq{Unknown database type: $vals->{dbtype}\n}; } @@ -843,34 +844,6 @@ sub add_database { $count = $sth->execute($item_name); my $dbconn = $sth->fetchall_arrayref()->[0][0]; - if ('mongo' eq $vals->{dbtype}) { - my $found = 0; - eval { - require MongoDB; - $found = 1; - }; - if (!$found) { - die "Cannot add unless the MongoDB perl driver is available\n"; - } - - my $testdbh; - $found = 0; - my $dsn = {}; - eval { - for my $line (split /\n/ => $dbconn) { - next if $line !~ /(\w+):\s+(.+)/; - $dsn->{$1} = $2; - } - $testdbh = MongoDB::Connection->new($dsn); - $found = 1; - }; - if (! $found) { - warn Dumper $dsn; - die qq{Connection to mongodb failed: $@\n}; - } - - } ## end of mongo - if ('postgres' eq $vals->{dbtype}) { my ($type,$dsn,$user,$pass,$ssp) = split /\n/ => $dbconn; @@ -924,17 +897,17 @@ sub add_database { } ## end postgres - if ('mysql' eq $vals->{dbtype}) { + if ('drizzle' eq $vals->{dbtype}) { my ($type,$dsn,$user,$pass) = split /\n/ => $dbconn; my $found = 0; eval { - require DBD::mysql; + require DBD::drizzle; $found = 1; }; if (!$found) { - die "Cannot add unless the DBD::mysql module is available\n"; + die "Cannot add unless the DBD::drizzle module is available\n"; } my $testdbh; @@ -944,24 +917,52 @@ sub add_database { $found = 1; }; if (! $found) { - die "MySQL connection test failed. You can force add it with the --force argument. Error was: $@\n\n"; + die "Drizzle connection test failed. You can force add it with the --force argument. Error was: $@\n\n"; } $testdbh->disconnect(); - } ## end mysql + } ## end drizzle - if ('drizzle' eq $vals->{dbtype}) { + if ('mongo' eq $vals->{dbtype}) { + my $found = 0; + eval { + require MongoDB; + $found = 1; + }; + if (!$found) { + die "Cannot add unless the MongoDB perl driver is available\n"; + } + + my $testdbh; + $found = 0; + my $dsn = {}; + for my $line (split /\n/ => $dbconn) { + next if $line !~ /(\w+):\s+(.+)/; + $dsn->{$1} = $2; + } + eval { + $testdbh = MongoDB::Connection->new($dsn); + $found = 1; + }; + if (! $found) { + warn Dumper $dsn; + die qq{Connection to mongodb failed: $@\n}; + } + + } ## end of mongo + + if ('mysql' eq $vals->{dbtype}) { my ($type,$dsn,$user,$pass) = split /\n/ => $dbconn; my $found = 0; eval { - require DBD::drizzle; + require DBD::mysql; $found = 1; }; if (!$found) { - die "Cannot add unless the DBD::drizzle module is available\n"; + die "Cannot add unless the DBD::mysql module is available\n"; } my $testdbh; @@ -971,12 +972,12 @@ sub add_database { $found = 1; }; if (! $found) { - die "Drizzle connection test failed. You can force add it with the --force argument. Error was: $@\n\n"; + die "MySQL connection test failed. You can force add it with the --force argument. Error was: $@\n\n"; } $testdbh->disconnect(); - } ## end drizzle + } ## end mysql if ('oracle' eq $vals->{dbtype}) { @@ -1005,6 +1006,45 @@ sub add_database { } ## end oracle + if ('redis' eq $vals->{dbtype}) { + my $found = 0; + eval { + require Redis; + $found = 1; + }; + if (!$found) { + die "Cannot add unless the Redis perl driver is available\n"; + } + + my $testdbh; + $found = 0; + my $tempdsn = {}; + for my $line (split /\n/ => $dbconn) { + next if $line !~ /(\w+):\s+(.+)/; + $tempdsn->{$1} = $2; + } + my $server; + if (exists $tempdsn->{host}) { + $server = $tempdsn->{host}; + } + if (exists $tempdsn->{port}) { + $server .= ":$tempdsn->{port}"; + } + my @dsn; + if (defined $server) { + push @dsn => 'server', $server; + } + eval { + $testdbh = Redis->new(@dsn); + $found = 1; + }; + if (! $found) { + warn Dumper \@dsn; + die qq{Connection to redis failed: $@\n}; + } + + } ## end of redis + } ## end of TESTCONN ## If we got a group, process that as well @@ -1390,6 +1430,15 @@ sub list_databases { print ' (SSP is off)'; } } + if ($dbtype eq 'drizzle') { + my $showport = (length $info->{dbport} and $info->{dbport} != 3306) + ? " --port $info->{dbport}" : ''; + printf "Conn: drizzle -u %s -D %s%s%s", + $info->{dbuser}, + $info->{dbname}, + $showhost, + $showport; + } if ($dbtype eq 'flatfile') { print "Prefix: $info->{dbname}"; } @@ -1407,20 +1456,24 @@ sub list_databases { $showhost, $showport; } - if ($dbtype eq 'drizzle') { - my $showport = (length $info->{dbport} and $info->{dbport} != 3306) - ? " --port $info->{dbport}" : ''; - printf "Conn: drizzle -u %s -D %s%s%s", - $info->{dbuser}, - $info->{dbname}, - $showhost, - $showport; - } if ($dbtype eq 'oracle') { printf "Conn: sqlplus %s%s", $info->{dbuser}, $showhost ? qq{\@$showhost} : ''; } + if ($dbtype eq 'redis') { + my $server = ''; + if (length $info->{dbhost}) { + $server .= $info->{dbhost}; + } + if (length $info->{dbport}) { + $server .= ":$info->{dbport}"; + } + if ($server) { + $server = "server=$server"; + print "Conn: $server"; + } + } print "\n"; @@ -4544,10 +4597,11 @@ sub process_args { ## Clean up and standardize the names if (exists $arg{type}) { $arg{type} =~ s/postgres.*/postgres/io; + $arg{type} =~ s/drizzle.*/drizzle/io; $arg{type} =~ s/mongo.*/mongo/io; $arg{type} =~ s/mysql.*/mysql/io; - $arg{type} =~ s/drizzle.*/drizzle/io; $arg{type} =~ s/oracle.*/oracle/io; + $arg{type} =~ s/redis.*/redis/io; } return \%arg; @@ -6604,6 +6658,15 @@ sub upgrade { $changes++; } + ## Check for any added tables + for my $row (@tablelist) { + my ($name,$body) = @$row; + next if relation_exists('bucardo', $name); + upgrade_and_log($body); + clog "Created table $name"; + $changes++; + } + ## Add new columns as needed from the schema for my $row (@collist) { my ($schema,$table,$column,$def) = @$row; @@ -6818,15 +6881,6 @@ sub upgrade { $changes++; } - ## Check for any added tables - for my $row (@tablelist) { - my ($name,$body) = @$row; - next if relation_exists('bucardo', $name); - upgrade_and_log($body); - clog "Created table $name"; - $changes++; - } - ## Check for any added triggers for my $row (@tlist) { my ($name,$body) = @$row; diff --git a/bucardo.schema b/bucardo.schema index bab47646a..afb25b023 100644 --- a/bucardo.schema +++ b/bucardo.schema @@ -609,10 +609,11 @@ AS $bc$ ## Given the name of a db, return the type, plus type-specific connection information ## Postgres: a connection string, username, password, and attribs +## Drizzle: a connection string, username, and password ## Mongo: "foo: bar" style connection information, one per line ## MySQL: a connection string, username, and password -## Drizzle: a connection string, username, and password ## Oracle: a connection string, username, and password +## Redis: "foo: bar" style connection information, one per line use strict; use warnings; @@ -677,12 +678,12 @@ if ($dbtype eq 'postgres') { } ## end postgres -if ($dbtype eq 'mysql') { +if ($dbtype eq 'drizzle') { length $db{name} or elog(ERROR, qq{Database name is mandatory\n}); length $db{user} or elog(ERROR, qq{Database username is mandatory\n}); - my $connstring = "dbi:mysql:database=$db{name}"; + my $connstring = "dbi:drizzle:database=$db{name}"; $db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= ''; length $db{host} and $connstring .= ";host=$db{host}"; length $db{port} and $connstring .= ";port=$db{port}"; @@ -690,14 +691,23 @@ if ($dbtype eq 'mysql') { return "$dbtype\n$connstring\n$db{user}\n$db{pass}"; -} ## end mysql +} ## end drizzle -if ($dbtype eq 'drizzle') { +if ($dbtype eq 'mongo') { + my $connstring = "$dbtype\n"; + for my $name (qw/ host port user pass /) { + defined $db{$name} and length $db{$name} and $connstring .= "$name: $db{$name}\n"; + } + chomp $connstring; + return $connstring; +} + +if ($dbtype eq 'mysql') { length $db{name} or elog(ERROR, qq{Database name is mandatory\n}); length $db{user} or elog(ERROR, qq{Database username is mandatory\n}); - my $connstring = "dbi:drizzle:database=$db{name}"; + my $connstring = "dbi:mysql:database=$db{name}"; $db{host} ||= ''; $db{port} ||= ''; $db{pass} ||= ''; length $db{host} and $connstring .= ";host=$db{host}"; length $db{port} and $connstring .= ";port=$db{port}"; @@ -705,7 +715,7 @@ if ($dbtype eq 'drizzle') { return "$dbtype\n$connstring\n$db{user}\n$db{pass}"; -} ## end drizzle +} ## end mysql if ($dbtype eq 'oracle') { @@ -724,7 +734,7 @@ if ($dbtype eq 'oracle') { } ## end oracle -if ($dbtype eq 'mongo') { +if ($dbtype eq 'redis') { my $connstring = "$dbtype\n"; for my $name (qw/ host port user pass /) { defined $db{$name} and length $db{$name} and $connstring .= "$name: $db{$name}\n"; @@ -1220,17 +1230,8 @@ for my $dbname (sort { ($db{$b}{role} eq 'source') <=> ($db{$a}{role} eq 'source ## Skip if this is a flatfile next if $db{$dbname}{dbtype} =~ /flat/; - ## Skip if this is mongodb - next if $db{$dbname}{dbtype} =~ /mongo/; - - ## Skip if this is mysql - next if $db{$dbname}{dbtype} eq 'mysql'; - - ## Skip if this is drizzle - next if $db{$dbname}{dbtype} eq 'drizzle'; - - ## Skip if this is oracle - next if $db{$dbname}{dbtype} eq 'oracle'; + ## Skip if this is a non-supported database + next if $db{$dbname}{dbtype} =~ /drizzle|mongo|mysql|oracle|redis/; ## Figure out how to connect to this database my $rv = spi_exec_query("SELECT bucardo.db_getconn('$dbname') AS conn"); -- 2.39.5