Deprecated Londiste command line parameter --copy-condition.
Londiste handler part.py must be used instead as it leaves
configuration in maintainable state.
Fixed Londiste compare and repair for one to many and
many to one repliaction use cases. Now filtering
condition is applied on both ends and datasets compared
should be the same.
Cleaned up database splitting howto.
used to drive PL/Proxy. Here it is used simply to provide
configuration to `part` handler.
+== Prerequisites ==
+
+Obviously skytools must be installed but in addition we need pghashlib and
+pgbench.
+
== Setting up the Root Database ==
-=== Create database ===
+=== Create databases ===
+
+Create root database that will contain all data and two shard databases
Run the following SQL:
----
-CREATE DATABASE l3part_root;
+psql -c "CREATE DATABASE rootdb;"
+psql -c "CREATE DATABASE sharddb_0;"
+psql -c "CREATE DATABASE sharddb_1;"
----
+Deploy hash function everywhere. This is needed because internal hashtext
+function was changed between 8.3 and 8.4 versions and may be changed again
+in future withoud consideration for it's users.
+
+---
+psql rootdb < /usr/share/postgresql/8.4/contrib/hashlib.sql
+psql sharddb_0 < /usr/share/postgresql/8.4/contrib/hashlib.sql
+psql sharddb_1 < /usr/share/postgresql/8.4/contrib/hashlib.sql
+---
+
=== Set up pgbench schema ===
In this HowTo we are using pgbench for setting up the schema,
populating it with sampledata and later running SQL loads to be replicated.
-
This command will create pgbanch tables and fill them with data:
----
-/usr/lib/postgresql/9.1/bin/pgbench -i -s 2 -F 80 l3part_root
-----
-
-=== Change primary key columns to text ===
-
-Standard pgbench schema has integer primary key columns for its tables.
-The standard partitioning handler is able to partition only text columns,
-so we change the primary key column types to text
-
-
-----
-alter table pgbench_accounts alter column aid type text;
-alter table pgbench_branches alter column bid type text;
-alter table pgbench_tellers alter column tid type text;
-----
-
-Now create the partition databases to replicate to.
-Each of these will get roughly half of the individual data rows.
-
-
-Create database for partition #0:
-----
-createdb l3part_part0;
+/usr/lib/postgresql/8.4/bin/pgbench -i -s 2 -F 80 rootdb
----
-And create a partition configuration table in this database
+Write partconf.sql that will be deployed to all db's
----
-
CREATE SCHEMA partconf;
CREATE TABLE partconf.conf (
part_nr integer,
max_slot integer,
cluster_name text
);
-insert into partconf.conf(part_nr, max_part) values(0,1);
+CREATE FUNCTION partconf.get_hash_raw
+( i_input integer
+) RETURNS integer
+AS
+$_$
+-- used to wrap hashtext so that we can replace it in 8.4 with
+-- older implementation to keep compatibility
+select hash_string($1::text, 'lookup2');
+$_$
+LANGUAGE sql;
----
+Populate shard configuration tables. These values are used inside part.py
-Create database for partition #1:
----
-CREATE DATABASE l3part_part1;
-----
-
-
-----
-
-CREATE SCHEMA partconf;
-CREATE TABLE partconf.conf (
- part_nr integer,
- max_part integer,
- db_code bigint,
- is_primary boolean,
- max_slot integer,
- cluster_name text
-);
-insert into partconf.conf(part_nr, max_part) values(1,1);
+psql rootdb < partconf.sql
+psql sharddb_0 < partconf.sql
+psql sharddb_1 < partconf.sql
+psql sharddb_0 -c "insert into partconf.conf(part_nr, max_part) values(0,1);"
+psql sharddb_1 -c "insert into partconf.conf(part_nr, max_part) values(1,1);"
----
Next create configuration files file for root node and both partitions
-st3partsplit/st3_l3part_root.ini
+st3partsplit/st3_rootdb.ini
----
[londiste3]
-job_name = st3_l3part_root
-db = dbname=l3part_root
+job_name = st3_rootdb
+db = dbname=rootdb
queue_name = replika
-logfile = st3partsplit/log/st3_l3part_root.log
-pidfile = st3partsplit/pid/st3_l3part_root.pid
+logfile = st3partsplit/log/st3_rootdb.log
+pidfile = st3partsplit/pid/st3_rootdb.pid
----
-st3partsplit/st3_l3part_part0.ini
+st3partsplit/st3_sharddb_0.ini
----
[londiste3]
-job_name = st3_l3part_part0
-db = dbname=l3part_part0
+job_name = st3_sharddb_0
+db = dbname=sharddb_0
queue_name = replika
-logfile = st3partsplit/log/st3_l3part_part0.log
-pidfile = st3partsplit/pid/st3_l3part_part0.pid
+logfile = st3partsplit/log/st3_sharddb_0.log
+pidfile = st3partsplit/pid/st3_sharddb_0.pid
----
-st3partsplit/st3_l3part_part1.ini
+st3partsplit/st3_sharddb_1.ini
----
[londiste3]
-job_name = st3_l3part_part1
-db = dbname=l3part_part1
+job_name = st3_sharddb_1
+db = dbname=sharddb_1
queue_name = replika
-logfile = st3partsplit/log/st3_l3part_part1.log
-pidfile = st3partsplit/pid/st3_l3part_part1.pid
+logfile = st3partsplit/log/st3_sharddb_1.log
+pidfile = st3partsplit/pid/st3_sharddb_1.pid
----
Then create root node:
----
-londiste3 st3partsplit/st3_l3part_root.ini create-root node1 dbname=l3part_root
+londiste3 st3partsplit/st3_rootdb.ini create-root node1 dbname=rootdb
----
And start the worker on root:
----
-londiste3 -d st3partsplit/st3_l3part_root.ini worker
+londiste3 -d st3partsplit/st3_rootdb.ini worker
----
-
-
And create leaf nodes and start the workers on partitions :
----
-londiste3 st3partsplit/st3_l3part_part0.ini create-leaf node2_0 dbname=l3part_part0 --provider=dbname=l3part_root
-londiste3 -d st3partsplit/st3_l3part_part0.ini worker
+londiste3 st3partsplit/st3_sharddb_0.ini create-leaf node2_0 dbname=sharddb_0 --provider=dbname=rootdb
+londiste3 -d st3partsplit/st3_sharddb_0.ini worker
----
Second node:
----
-londiste3 st3partsplit/st3_l3part_part1.ini create-leaf node2_1 dbname=l3part_part1 --provider=dbname=l3part_root
-londiste3 -d st3partsplit/st3_l3part_part1.ini worker
+londiste3 st3partsplit/st3_sharddb_1.ini create-leaf node2_1 dbname=sharddb_1 --provider=dbname=rootdb
+londiste3 -d st3partsplit/st3_sharddb_1.ini worker
----
-
-
Create config file st3partsplit/pgqd.ini for `pgqd` ("the ticker")
----
[pgqd]
----
-
Start the ticker process :
----
pgqd -d st3partsplit/pgqd.ini
----
-
-
Now add the replicated tables to root and partitions.
Here we use `--create` switch to add them to partition,
which means Londiste takes schema from root node and
Run command the following commands :
----
-londiste3 st3partsplit/st3_l3part_root.ini add-table pgbench_accounts --handler=part --handler-arg=key=aid
-londiste3 st3partsplit/st3_l3part_part0.ini add-table pgbench_accounts --create --handler=part --handler-arg=key=aid
-londiste3 st3partsplit/st3_l3part_part1.ini add-table pgbench_accounts --create --handler=part --handler-arg=key=aid
+londiste3 st3partsplit/st3_rootdb.ini add-table pgbench_accounts --handler=part --handler-arg=key=aid
+londiste3 st3partsplit/st3_sharddb_0.ini add-table pgbench_accounts --create --handler=part --handler-arg=key=aid
+londiste3 st3partsplit/st3_sharddb_1.ini add-table pgbench_accounts --create --handler=part --handler-arg=key=aid
-londiste3 st3partsplit/st3_l3part_root.ini add-table pgbench_branches --handler=part --handler-arg=key=bid
-londiste3 st3partsplit/st3_l3part_part0.ini add-table pgbench_branches --create --handler=part --handler-arg=key=bid
-londiste3 st3partsplit/st3_l3part_part1.ini add-table pgbench_branches --create --handler=part --handler-arg=key=bid
+londiste3 st3partsplit/st3_rootdb.ini add-table pgbench_branches --handler=part --handler-arg=key=bid
+londiste3 st3partsplit/st3_sharddb_0.ini add-table pgbench_branches --create --handler=part --handler-arg=key=bid
+londiste3 st3partsplit/st3_sharddb_1.ini add-table pgbench_branches --create --handler=part --handler-arg=key=bid
-londiste3 st3partsplit/st3_l3part_root.ini add-table pgbench_tellers --handler=part --handler-arg=key=tid
-londiste3 st3partsplit/st3_l3part_part0.ini add-table pgbench_tellers --create --handler=part --handler-arg=key=tid
-londiste3 st3partsplit/st3_l3part_part1.ini add-table pgbench_tellers --create --handler=part --handler-arg=key=tid
+londiste3 st3partsplit/st3_rootdb.ini add-table pgbench_tellers --handler=part --handler-arg=key=tid
+londiste3 st3partsplit/st3_sharddb_0.ini add-table pgbench_tellers --create --handler=part --handler-arg=key=tid
+londiste3 st3partsplit/st3_sharddb_1.ini add-table pgbench_tellers --create --handler=part --handler-arg=key=tid
----
The following command will run pgbench full speed with 5 parallel
database connections for 10 seconds.
----
-/usr/lib/postgresql/9.1/bin/pgbench -T 10 -c 5 l3part_root
+/usr/lib/postgresql/8.4/bin/pgbench -T 10 -c 5 rootdb
----
After this is done, you can check that the tables on both sides hanve the same data with
----
-londiste3 st3partsplit/st3_l3part_part0.ini compare
-londiste3 st3partsplit/st3_l3part_part0.ini compare
+londiste3 st3partsplit/st3_sharddb_0.ini compare
+londiste3 st3partsplit/st3_sharddb_0.ini compare
----
Except of course that they dont - each partition will only have roughly half
help = "add: walk upstream to find node to copy from")
g.add_option("--copy-node", dest="copy_node",
help = "add: use NODE as source for initial COPY")
- g.add_option("--copy-condition", dest="copy_condition",
- help = "add: set WHERE expression for copy")
g.add_option("--merge-all", action="store_true",
help="merge tables from all source queues", default=False)
g.add_option("--no-merge", action="store_true",
"""Simple checker based in Syncer.
When tables are in sync runs simple SQL query on them.
"""
- def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
+ def process_sync(self, t1, t2, src_db, dst_db):
"""Actual comparision."""
+ src_tbl = t1.dest_table
+ dst_tbl = t2.dest_table
+
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
+ src_where = t1.plugin.get_copy_condition(src_curs, dst_curs)
+ dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
+
self.log.info('Counting %s' % dst_tbl)
# get common cols
q = self.cf.get('compare_sql', q)
q = q.replace("_COLS_", cols)
src_q = q.replace('_TABLE_', skytools.quote_fqident(src_tbl))
+ if src_where:
+ src_q = src_q + " WHERE " + src_where
dst_q = q.replace('_TABLE_', skytools.quote_fqident(dst_tbl))
+ if dst_where:
+ dst_q = dst_q + " WHERE " + dst_where
f = "%(cnt)d rows, checksum=%(chksum)s"
f = self.cf.get('compare_fmt', f)
"""Called when batch finishes."""
pass
- def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
+ def get_copy_condition(self, src_curs, dst_curs):
+ """ Use if you want to filter data """
+ return ''
+
+ def real_copy(self, src_tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
- condition = ' and '.join(cond_list)
+ condition = self.get_copy_condition(src_curs, dst_curs)
return skytools.full_copy(src_tablename, src_curs, dst_curs,
column_list, condition,
dst_tablename = self.dest_table)
return self.enc.validate_dict(row, self.table_name)
return row
- def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, src_tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
-
- condition = ' and '.join(cond_list)
-
+
if self.enc:
def _write_hook(obj, data):
return self.enc.validate_copy(data, column_list, src_tablename)
else:
_write_hook = None
-
+ condition = self.get_copy_condition(src_curs, dst_curs)
return skytools.full_copy(src_tablename, src_curs, dst_curs,
column_list, condition,
dst_tablename = self.dest_table,
exec_with_vals(self.conf.post_part)
self.log.info("Created table: %s" % dst)
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
_src_cols = _dst_cols = column_list
- condition = ' and '.join(cond_list)
+ condition = ''
if self.conf.skip_fields:
_src_cols = [col for col in column_list
self.log.debug('part.process_event: my event, processing')
TableHandler.process_event(self, ev, sql_queue_func, arg)
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
- """Copy only slots needed locally."""
+ def get_copy_condition(self, src_curs, dst_curs):
+ """Prepare the where condition for copy and replay filtering"""
self.load_part_info(dst_curs)
w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
self.log.debug('part: copy_condition=%s' % w)
- cond_list.append(w)
-
- return TableHandler.real_copy(self, tablename, src_curs, dst_curs,
- column_list, cond_list)
+ return w
def load_part_info(self, curs):
"""Load slot info from database."""
trigger_arg_list.append('SKIP')
trigger_arg_list.append('expect_sync')
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""Force copy not to start"""
return (0,0)
p.add_option("--apply", action="store_true", help="apply fixes")
return p
- def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
+ def process_sync(self, t1, t2, src_db, dst_db):
"""Actual comparision."""
apply_db = None
self.apply_curs = apply_db.cursor()
self.apply_curs.execute("set session_replication_role = 'replica'")
+ src_tbl = t1.dest_table
+ dst_tbl = t2.dest_table
+
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
dump_dst = dst_tbl + ".dst"
self.log.info("Dumping src table: %s" % src_tbl)
- self.dump_table(src_tbl, src_curs, dump_src)
+ src_where = t1.plugin.get_copy_condition(src_curs, dst_curs)
+ self.dump_table(src_tbl, src_curs, dump_src, src_where)
src_db.commit()
self.log.info("Dumping dst table: %s" % dst_tbl)
- self.dump_table(dst_tbl, dst_curs, dump_dst)
+ dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
+ self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where)
dst_db.commit()
self.log.info("Sorting src table: %s" % dump_src)
cols = ",".join(fqlist)
self.log.debug("using columns: %s" % cols)
- def dump_table(self, tbl, curs, fn):
+ def dump_table(self, tbl, curs, fn, whr):
"""Dump table to disk."""
cols = ','.join(self.fq_common_fields)
- q = "copy %s (%s) to stdout" % (skytools.quote_fqident(tbl), cols)
-
+ if len(whr) == 0:
+ whr = 'true'
+ q = "copy (SELECT %s FROM %s WHERE %s) to stdout" % (cols, skytools.quote_fqident(tbl), whr)
+ self.log.debug("Query: %s" % q)
f = open(fn, "w", 64*1024)
curs.copy_expert(q, f)
size = f.tell()
help = "add: find table source for copy by walking upwards")
p.add_option("--copy-node", dest="copy_node",
help = "add: use NODE as source for initial copy")
- p.add_option("--copy-condition", dest="copy_condition",
- help = "copy: where expression")
p.add_option("--force", action="store_true",
help="force", default=False)
p.add_option("--all", action="store_true",
if not self.options.expect_sync:
if self.options.skip_truncate:
attrs['skip_truncate'] = 1
- if self.options.copy_condition:
- attrs['copy_condition'] = self.options.copy_condition
if self.options.max_parallel_copy:
attrs['max_parallel_copy'] = self.options.max_parallel_copy
self.check_consumer(setup_db)
- self.check_table(t1.dest_table, t2.dest_table, lock_db, src_db, dst_db, setup_db)
+ self.check_table(t1, t2, lock_db, src_db, dst_db, setup_db)
lock_db.commit()
src_db.commit()
dst_db.commit()
if dur > 10 and not self.options.force:
raise Exception("Ticker seems dead")
- def check_table(self, src_tbl, dst_tbl, lock_db, src_db, dst_db, setup_db):
+ def check_table(self, t1, t2, lock_db, src_db, dst_db, setup_db):
"""Get transaction to same state, then process."""
+ src_tbl = t1.dest_table
+ dst_tbl = t2.dest_table
+
lock_curs = lock_db.cursor()
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
self.unlock_table_branch(lock_db, setup_db)
# do work
- bad = self.process_sync(src_tbl, dst_tbl, src_db, dst_db)
+ bad = self.process_sync(t1, t2, src_db, dst_db)
if bad:
self.bad_tables += 1
setup_curs = setup_db.cursor()
self.resume_consumer(setup_curs, self.provider_node['worker_name'])
- def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
+ def process_sync(self, t1, t2, src_db, dst_db):
"""It gets 2 connections in state where tbl should be in same state.
"""
raise Exception('process_sync not implemented')
tbl_stat.dropped_ddl = ddl
# do truncate & copy
- self.real_copy(src_curs, dst_curs, tbl_stat, common_cols, src_real_table)
+ self.log.info("%s: start copy" % tbl_stat.name)
+ p = tbl_stat.get_plugin()
+ stats = p.real_copy(src_real_table, src_curs, dst_curs, common_cols)
+ if stats:
+ self.log.info("%s: copy finished: %d bytes, %d rows" % (
+ tbl_stat.name, stats[0], stats[1]))
# get snapshot
src_curs.execute("select txid_current_snapshot()")
src_curs.execute(q, [self.queue_name])
src_db.commit()
- def real_copy(self, srccurs, dstcurs, tbl_stat, col_list, src_real_table):
- "Actual copy."
-
- tablename = tbl_stat.name
- # do copy
- self.log.info("%s: start copy" % tablename)
- p = tbl_stat.get_plugin()
- cond_list = []
- cond = tbl_stat.table_attrs.get('copy_condition')
- if cond:
- cond_list.append(cond)
- stats = p.real_copy(src_real_table, srccurs, dstcurs, col_list, cond_list)
- if stats:
- self.log.info("%s: copy finished: %d bytes, %d rows" % (
- tablename, stats[0], stats[1]))
-
def work(self):
if not self.reg_ok:
# check if needed? (table, not existing reg)