From 93c7e5f50d110502c39a145b57f0ac7fffbea432 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 10 Nov 2009 13:19:27 +0000 Subject: [PATCH] New SPLIT statement. It will split incoming array(s) into per-partition arrays. Roughly based on design here: http://lists.pgfoundry.org/pipermail/plproxy-users/2008-June/000093.html Written by Martin Pihlak --- Makefile | 2 +- doc/syntax.txt | 56 ++++++ doc/todo.txt | 1 - expected/plproxy_split.out | 125 +++++++++++++ sql/plproxy_split.sql | 87 +++++++++ src/cluster.c | 2 +- src/execute.c | 356 +++++++++++++++++++++++++++++-------- src/function.c | 63 ++++++- src/main.c | 4 + src/parser.y | 24 ++- src/plproxy.h | 50 +++++- src/query.c | 48 ++--- src/scanner.l | 1 + src/type.c | 4 + 14 files changed, 708 insertions(+), 115 deletions(-) create mode 100644 expected/plproxy_split.out create mode 100644 sql/plproxy_split.sql diff --git a/Makefile b/Makefile index 4adcce3..c1ac40f 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ DIST_FILES = Makefile src/plproxy.h src/rowstamp.h src/scanner.l src/parser.y \ # regression testing setup REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \ plproxy_errors plproxy_clustermap plproxy_dynamic_record \ - plproxy_encoding + plproxy_encoding plproxy_split REGRESS_OPTS = --load-language=plpgsql # load PGXS makefile diff --git a/doc/syntax.txt b/doc/syntax.txt index 2fc120a..72de3c6 100644 --- a/doc/syntax.txt +++ b/doc/syntax.txt @@ -71,6 +71,62 @@ tagged, query will be sent in parallel to them. Take hash value directly from function argument. + +== SPLIT == + + SPLIT array_arg_1 [ , array_arg_2 ... ] ; + +Split the input arrays based on RUN ON statement into per-partition arrays. +This is done by evaluating RUN ON condition for each array element and building +per-partition parameter arrays for each matching partition. During execution +each tagged partition then gets its own subset of the array to process. + +The semantics of RUN ON statement is slightly changed with SPLIT arrays: + + RUN ON partition_func(..); + +The array is split between the partitions matching `partition_func()`. Any +SPLIT parameters passed to the function are actually replaced with the +individual array elements. + + RUN ON argname; RUN ON $1; + +An array of partition numbers (or hashes) can be passed as `argname`. The function +shall be run on the partitions specified in the array. + + RUN ON ANY; + +Each element is assigned to random partition. + + RUN ON ALL; + RUN ON ; + +Unaffected, except for the added overhead of array copying. + +Example: + + CREATE FUNCTION set_profiles(i_users text[], i_profiles text[]) + RETURNS SETOF text AS $$ + CLUSTER 'userdb'; + SPLIT i_users, i_profiles; + RUN ON hashtext(i_users); + $$ LANGUAGE plproxy; + +Given query: + + SELECT * FROM set_profiles(ARRAY['foo', 'bar'], ARRAY['a', 'b']); + +The hash function is called 2 times: + + SELECT * FROM hashtext('foo'); + SELECT * FROM hashtext('bar'); + +And target partitions get queries: + + SELECT * FROM set_profiles(ARRAY['foo'], ARRAY['a']); + SELECT * FROM set_profiles(ARRAY['bar'], ARRAY['b']); + + == SELECT == SELECT .... ; diff --git a/doc/todo.txt b/doc/todo.txt index 0f1bad5..16ed61e 100644 --- a/doc/todo.txt +++ b/doc/todo.txt @@ -13,7 +13,6 @@ == Just thoughts == - * SPREAD BY clause for OLAP loads. * Drop plproxy.get_cluster_config()... * Dynamic connstr loading for CONNECT functions? diff --git a/expected/plproxy_split.out b/expected/plproxy_split.out new file mode 100644 index 0000000..4001b86 --- /dev/null +++ b/expected/plproxy_split.out @@ -0,0 +1,125 @@ +-- partition functions +\c test_part0 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c test_part1 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c test_part2 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c test_part3 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c regression +-- invalid arg reference +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split $4; cluster 'testcluster'; run on 0;$$ language plproxy; +select * from test_array(array['a'], array['g'], 'foo'); +ERROR: PL/Proxy function public.test_array(3): Compile error at line 1: invalid argument reference: $4 +-- invalid arg name +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split x; cluster 'testcluster'; run on 0; $$ language plproxy; +select * from test_array(array['a'], array['b', 'c'], 'foo'); +ERROR: PL/Proxy function public.test_array(3): Compile error at line 1: invalid argument reference: x +-- cannot split more than once +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b, b; cluster 'testcluster'; run on 0; $$ language plproxy; +select * from test_array(array['a'], array['b', 'c'], 'foo'); +ERROR: PL/Proxy function public.test_array(3): SPLIT parameter specified more than once: b +-- attempt to split non-array +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split $3; cluster 'testcluster'; run on 0;$$ language plproxy; +select * from test_array(array['a'], array['g'], 'foo'); +ERROR: PL/Proxy function public.test_array(3): SPLIT parameter is not an array: $3 +-- array size/dimensions mismatch +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on 0; $$ language plproxy; +select * from test_array(array['a'], array['b', 'c'], 'foo'); +ERROR: PL/Proxy function public.test_array(3): split arrays must be of identical lengths +select * from test_array(array['a','b','c','d'], null, 'foo'); +ERROR: PL/Proxy function public.test_array(3): split arrays must be of identical lengths +select * from test_array(null, array['e','f','g','h'], 'foo'); +ERROR: PL/Proxy function public.test_array(3): split arrays must be of identical lengths +select * from test_array(array[array['a1'],array['a2']], array[array['b1'],array['b2']], 'foo'); +ERROR: PL/Proxy function public.test_array(3): split multi-dimensional arrays are not supported +-- run on array hash, split one array +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a; cluster 'testcluster'; run on ascii(a);$$ language plproxy; +select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo'); + test_array +----------------------------------- + test_part0 $1:d $2:e,f,g,h $3:foo + test_part1 $1:a $2:e,f,g,h $3:foo + test_part2 $1:b $2:e,f,g,h $3:foo + test_part3 $1:c $2:e,f,g,h $3:foo +(4 rows) + +-- run on text hash, split two arrays (nop split) +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on ascii(c);$$ language plproxy; +select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo'); + test_array +----------------------------------------- + test_part2 $1:a,b,c,d $2:e,f,g,h $3:foo +(1 row) + +-- run on array hash, split two arrays +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on ascii(a);$$ language plproxy; +select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo'); + test_array +----------------------------- + test_part0 $1:d $2:h $3:foo + test_part1 $1:a $2:e $3:foo + test_part2 $1:b $2:f $3:foo + test_part3 $1:c $2:g $3:foo +(4 rows) + +select * from test_array(null, null, null); + test_array +------------ +(0 rows) + +select * from test_array('{}'::text[], '{}'::text[], 'foo'); + test_array +------------ +(0 rows) + +-- run on arg +create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as +$$ split a; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy; +select * from test_array_direct(array[2,3], array['a','b','c','d'], 'foo'); + test_array_direct +---------------------------------- + test_part2 $1: $2:a,b,c,d $3:foo + test_part3 $1: $2:a,b,c,d $3:foo +(2 rows) + +create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy; +select * from test_array_direct(array[0,1,2,3], array['a','b','c','d'], 'foo'); + test_array_direct +---------------------------- + test_part0 $1: $2:a $3:foo + test_part1 $1: $2:b $3:foo + test_part2 $1: $2:c $3:foo + test_part3 $1: $2:d $3:foo +(4 rows) + diff --git a/sql/plproxy_split.sql b/sql/plproxy_split.sql new file mode 100644 index 0000000..d17d723 --- /dev/null +++ b/sql/plproxy_split.sql @@ -0,0 +1,87 @@ +-- partition functions +\c test_part0 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c test_part1 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c test_part2 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; +\c test_part3 +create or replace function test_array(a text[], b text[], c text) returns text as +$$ +select current_database() || ' $1:' || array_to_string($1, ',') + || ' $2:' || array_to_string($2, ',') + || ' $3:' || $3; +$$ language sql; + +\c regression + +-- invalid arg reference +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split $4; cluster 'testcluster'; run on 0;$$ language plproxy; +select * from test_array(array['a'], array['g'], 'foo'); + +-- invalid arg name +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split x; cluster 'testcluster'; run on 0; $$ language plproxy; +select * from test_array(array['a'], array['b', 'c'], 'foo'); + +-- cannot split more than once +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b, b; cluster 'testcluster'; run on 0; $$ language plproxy; +select * from test_array(array['a'], array['b', 'c'], 'foo'); + +-- attempt to split non-array +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split $3; cluster 'testcluster'; run on 0;$$ language plproxy; +select * from test_array(array['a'], array['g'], 'foo'); + +-- array size/dimensions mismatch +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on 0; $$ language plproxy; +select * from test_array(array['a'], array['b', 'c'], 'foo'); +select * from test_array(array['a','b','c','d'], null, 'foo'); +select * from test_array(null, array['e','f','g','h'], 'foo'); +select * from test_array(array[array['a1'],array['a2']], array[array['b1'],array['b2']], 'foo'); + +-- run on array hash, split one array +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a; cluster 'testcluster'; run on ascii(a);$$ language plproxy; +select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo'); + +-- run on text hash, split two arrays (nop split) +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on ascii(c);$$ language plproxy; +select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo'); + +-- run on array hash, split two arrays +create or replace function test_array(a text[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on ascii(a);$$ language plproxy; +select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo'); +select * from test_array(null, null, null); +select * from test_array('{}'::text[], '{}'::text[], 'foo'); + +-- run on arg +create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as +$$ split a; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy; + +select * from test_array_direct(array[2,3], array['a','b','c','d'], 'foo'); + +create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as +$$ split a, b; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy; + +select * from test_array_direct(array[0,1,2,3], array['a','b','c','d'], 'foo'); diff --git a/src/cluster.c b/src/cluster.c index f314ee5..3feb063 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -406,7 +406,7 @@ resolve_query(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *query) HeapTuple row; TupleDesc desc; - plproxy_query_exec(func, fcinfo, query); + plproxy_query_exec(func, fcinfo, query, NULL, 0); if (SPI_processed != 1) plproxy_error(func, "'%s' returned %d rows, expected 1", diff --git a/src/execute.c b/src/execute.c index f28114a..b8517e9 100644 --- a/src/execute.c +++ b/src/execute.c @@ -439,7 +439,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; /* decide what to do */ @@ -482,7 +482,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; switch (conn->state) @@ -545,33 +545,32 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn /* Run the query on all tagged connections in parallel */ static void -remote_execute(ProxyFunction *func, - const char **values, int *plengths, int *pformats) +remote_execute(ProxyFunction *func) { ExecStatusType err; ProxyConnection *conn; ProxyCluster *cluster = func->cur_cluster; int i, - pending; + pending = 0; struct timeval now; /* either launch connection or send query */ for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; /* check if conn is alive, and launch if not */ prepare_conn(func, conn); + pending++; /* if conn is ready, then send query away */ if (conn->state == C_READY) - send_query(func, conn, values, plengths, pformats); + send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats); } /* now loop until all results are arrived */ - pending = 1; while (pending) { /* allow postgres to cancel processing */ @@ -587,12 +586,12 @@ remote_execute(ProxyFunction *func, for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; /* login finished, send query */ if (conn->state == C_READY) - send_query(func, conn, values, plengths, pformats); + send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats); if (conn->state != C_DONE) pending++; @@ -606,11 +605,11 @@ remote_execute(ProxyFunction *func, { conn = &cluster->conn_list[i]; - if ((conn->run_on || conn->res) - && !(conn->run_on && conn->res)) - plproxy_error(func, "run_on does not match res"); + if ((conn->run_tag || conn->res) + && !(conn->run_tag && conn->res)) + plproxy_error(func, "run_tag does not match res"); - if (!conn->run_on) + if (!conn->run_tag) continue; if (conn->state != C_DONE) @@ -661,9 +660,14 @@ remote_cancel(ProxyFunction *func) } } -/* Run hash function and tag connections */ +/* + * Run hash function and tag connections. If any of the hash function + * arguments are mentioned in the split_arrays an element of the array + * is used instead of the actual array. + */ static void -tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) +tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag, + DatumArray **array_params, int array_row) { int i; TupleDesc desc; @@ -671,7 +675,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) ProxyCluster *cluster = func->cur_cluster; /* execute cached plan */ - plproxy_query_exec(func, fcinfo, func->hash_sql); + plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row); /* get header */ desc = SPI_tuptable->tupdesc; @@ -698,7 +702,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) plproxy_error(func, "Hash result must be int2, int4 or int8"); hashval &= cluster->part_mask; - cluster->part_map[hashval]->run_on = 1; + cluster->part_map[hashval]->run_tag = tag; } /* sanity check */ @@ -708,109 +712,305 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) " allows hashcount <> 1"); } -/* Clean old results and prepare for new one */ -void -plproxy_clean_results(ProxyCluster *cluster) +/* + * Deconstruct an array type to array of Datums, note NULL elements + * and determine the element type information. + */ +static DatumArray * +make_datum_array(ProxyFunction *func, ArrayType *v, Oid elem_type) { - int i; - ProxyConnection *conn; + DatumArray *da = palloc0(sizeof(*da)); - if (!cluster) - return; - - cluster->ret_total = 0; - cluster->ret_cur_conn = 0; + da->type = plproxy_find_type_info(func, elem_type, true); - for (i = 0; i < cluster->conn_count; i++) - { - conn = &cluster->conn_list[i]; - if (conn->res) - { - PQclear(conn->res); - conn->res = NULL; - } - conn->pos = 0; - conn->run_on = 0; - } - /* conn state checks are done in prepare_conn */ + if (v) + deconstruct_array(v, + da->type->type_oid, da->type->length, da->type->by_value, + da->type->alignment, + &da->values, &da->nulls, &da->elem_count); + return da; } -/* Select partitions and execute query on them */ -void -plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo) +/* + * Evaluate the run condition. Tag the matching connections with the specified + * tag. + * + * Note that we don't allow nested plproxy calls on the same cluster (ie. + * remote hash functions). The cluster and connection state are global and + * would easily get messed up. + */ +static void +tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag, + DatumArray **array_params, int array_row) { - const char *values[FUNC_MAX_ARGS]; - int plengths[FUNC_MAX_ARGS]; - int pformats[FUNC_MAX_ARGS]; - int i; - int gotbin; - ProxyCluster *cluster = func->cur_cluster; - - /* clean old results */ - plproxy_clean_results(cluster); + ProxyCluster *cluster = func->cur_cluster; + int i; - /* tag interesting partitions */ switch (func->run_type) { case R_HASH: - tag_hash_partitions(func, fcinfo); + tag_hash_partitions(func, fcinfo, tag, array_params, array_row); break; case R_ALL: for (i = 0; i < cluster->part_count; i++) - cluster->part_map[i]->run_on = 1; + cluster->part_map[i]->run_tag = tag; break; case R_EXACT: i = func->exact_nr; if (i < 0 || i >= cluster->part_count) plproxy_error(func, "part number out of range"); - cluster->part_map[i]->run_on = 1; + cluster->part_map[i]->run_tag = tag; break; case R_ANY: i = random() & cluster->part_mask; - cluster->part_map[i]->run_on = 1; + cluster->part_map[i]->run_tag = tag; break; default: plproxy_error(func, "uninitialized run_type"); } +} - /* prepare args */ - gotbin = 0; - for (i = 0; i < func->remote_sql->arg_count; i++) +/* + * Tag the partitions to be run on, if split is requested prepare the + * per-partition split array parameters. + * + * This is done by looping over all of the split arrays side-by-side, for each + * tuple see if it satisfies the RUN ON condition. If so, copy the tuple + * to the partition's private array parameters. + */ +static void +prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) +{ + int i, row, col; + int split_array_len = -1; + int split_array_count = 0; + ProxyCluster *cluster = func->cur_cluster; + DatumArray *arrays_to_split[FUNC_MAX_ARGS]; + + /* + * See if we have any arrays to split. If so, make them manageable by + * converting them to Datum arrays. During the process verify that all + * the arrays are of the same length. + */ + for (i = 0; i < func->arg_count; i++) { - int idx = func->remote_sql->arg_lookup[i]; - plengths[i] = 0; - pformats[i] = 0; - if (PG_ARGISNULL(idx)) + ArrayType *v; + + if (!IS_SPLIT_ARG(func, i)) { - values[i] = NULL; + arrays_to_split[i] = NULL; + continue; } + + if (PG_ARGISNULL(i)) + v = NULL; else { - bool bin = cluster->config.disable_binary ? 0 : 1; + v = PG_GETARG_ARRAYTYPE_P(i); - values[i] = plproxy_send_type(func->arg_types[idx], - PG_GETARG_DATUM(idx), - bin, - &plengths[i], - &pformats[i]); + if (ARR_NDIM(v) > 1) + plproxy_error(func, "split multi-dimensional arrays are not supported"); + } + + arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]->elem_type); + + /* Check that the element counts match */ + if (split_array_len < 0) + split_array_len = arrays_to_split[i]->elem_count; + else if (arrays_to_split[i]->elem_count != split_array_len) + plproxy_error(func, "split arrays must be of identical lengths"); + + ++split_array_count; + } - if (pformats[i]) - gotbin = 1; + /* If nothing to split, just tag the partitions and be done with it */ + if (!split_array_count) + { + tag_run_on_partitions(func, fcinfo, 1, NULL, 0); + return; + } + + /* Need to split, evaluate the RUN ON condition for each of the elements. */ + for (row = 0; row < split_array_len; row++) + { + int part; + int my_tag = row+1; + + /* + * Tag the run-on partitions with a tag that allows us us to identify + * which partitions need the set of elements from this row. + */ + tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row); + + /* Add the array elements to the partitions tagged in previous step */ + for (part = 0; part < cluster->conn_count; part++) + { + ProxyConnection *conn = &cluster->conn_list[part]; + + if (conn->run_tag != my_tag) + continue; + + if (!conn->bstate) + conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate)); + + /* Add this set of elements to the partition specific arrays */ + for (col = 0; col < func->arg_count; col++) + { + if (!IS_SPLIT_ARG(func, col)) + continue; + + conn->bstate[col] = accumArrayResult(conn->bstate[col], + arrays_to_split[col]->values[row], + arrays_to_split[col]->nulls[row], + arrays_to_split[col]->type->type_oid, + CurrentMemoryContext); + } } } /* - * Run query. On cancel, send cancel request to partitions too. + * Finally, copy the accumulated arrays to the actual connections + * to be used as parameters. + */ + for (i = 0; i < cluster->conn_count; i++) + { + ProxyConnection *conn = &cluster->conn_list[i]; + + if (!conn->run_tag) + continue; + + conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params)); + + for (col = 0; col < func->arg_count; col++) + { + if (!IS_SPLIT_ARG(func, col)) + conn->split_params[col] = PointerGetDatum(NULL); + else + conn->split_params[col] = makeArrayResult(conn->bstate[col], + CurrentMemoryContext); + } + } +} + +/* + * Prepare parameters for the query. + */ +static void +prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo) +{ + int i; + ProxyCluster *cluster = func->cur_cluster; + + for (i = 0; i < func->remote_sql->arg_count; i++) + { + int idx = func->remote_sql->arg_lookup[i]; + bool bin = cluster->config.disable_binary ? 0 : 1; + const char *fixed_param_val = NULL; + int fixed_param_len, fixed_param_fmt; + int part; + + /* Avoid doing multiple conversions for fixed parameters */ + if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx)) + { + fixed_param_val = plproxy_send_type(func->arg_types[idx], + PG_GETARG_DATUM(idx), + bin, + &fixed_param_len, + &fixed_param_fmt); + } + + /* Add the parameters to partitions */ + for (part = 0; part < cluster->conn_count; part++) + { + ProxyConnection *conn = &cluster->conn_list[part]; + + if (!conn->run_tag) + continue; + + if (PG_ARGISNULL(idx)) + { + conn->param_values[i] = NULL; + conn->param_lengths[i] = 0; + conn->param_formats[i] = 0; + } + else + { + if (IS_SPLIT_ARG(func, idx)) + { + conn->param_values[i] = plproxy_send_type(func->arg_types[idx], + conn->split_params[idx], + bin, + &conn->param_lengths[i], + &conn->param_formats[i]); + } + else + { + conn->param_values[i] = fixed_param_val; + conn->param_lengths[i] = fixed_param_len; + conn->param_formats[i] = fixed_param_fmt; + } + } + } + } +} + +/* Clean old results and prepare for new one */ +void +plproxy_clean_results(ProxyCluster *cluster) +{ + int i; + ProxyConnection *conn; + + if (!cluster) + return; + + cluster->ret_total = 0; + cluster->ret_cur_conn = 0; + + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (conn->res) + { + PQclear(conn->res); + conn->res = NULL; + } + conn->pos = 0; + conn->run_tag = 0; + conn->bstate = NULL; + } + /* conn state checks are done in prepare_conn */ +} + +/* Select partitions and execute query on them */ +void +plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo) +{ + /* + * Prepare parameters and run query. On cancel, send cancel request to + * partitions too. */ PG_TRY(); { - if (gotbin) - remote_execute(func, values, plengths, pformats); - else - remote_execute(func, values, NULL, NULL); + func->cur_cluster->busy = true; + + /* clean old results */ + plproxy_clean_results(func->cur_cluster); + + /* tag the partitions and prepare per-partition parameters */ + prepare_and_tag_partitions(func, fcinfo); + + /* prepare the target query parameters */ + prepare_query_parameters(func, fcinfo); + + remote_execute(func); + + func->cur_cluster->busy = false; } PG_CATCH(); { + func->cur_cluster->busy = false; + if (geterrcode() == ERRCODE_QUERY_CANCELED) remote_cancel(func); PG_RE_THROW(); diff --git a/src/function.c b/src/function.c index a948925..42a97e2 100644 --- a/src/function.c +++ b/src/function.c @@ -73,6 +73,63 @@ plproxy_func_strdup(ProxyFunction *func, const char *s) return res; } +/* Find the index of a named parameter, -1 if not found */ +int +plproxy_get_parameter_index(ProxyFunction *func, const char *ident) +{ + int i; + + if (ident[0] == '$') + { + /* Probably a $# parameter reference */ + i = atoi(ident + 1) - 1; + if (i >= 0 && i < func->arg_count) + return i; + } + else if (func->arg_names) + { + /* Named parameter, go through the argument names */ + for (i = 0; i < func->arg_count; i++) + { + if (!func->arg_names[i]) + continue; + if (pg_strcasecmp(ident, func->arg_names[i]) == 0) + return i; + } + } + + return -1; +} + +/* Add a new split argument */ +bool +plproxy_split_add_ident(ProxyFunction *func, const char *ident) +{ + int argindex; + + if ((argindex = plproxy_get_parameter_index(func, ident)) < 0) + return false; + + /* Already split? */ + if (IS_SPLIT_ARG(func, argindex)) + plproxy_error(func, "SPLIT parameter specified more than once: %s", ident); + + /* Is it an array? */ + if (!func->arg_types[argindex]->is_array) + plproxy_error(func, "SPLIT parameter is not an array: %s", ident); + + if (!func->split_args) + { + size_t alloc_size = sizeof(*func->split_args) * func->arg_count; + + func->split_args = plproxy_func_alloc(func, alloc_size); + MemSet(func->split_args, 0, alloc_size); + } + + func->split_args[argindex] = true; + + return true; +} /* Initialize PL/Proxy function cache */ void @@ -413,11 +470,11 @@ fn_compile(FunctionCallInfo fcinfo, /* prepare local queries */ if (f->cluster_sql) - plproxy_query_prepare(f, fcinfo, f->cluster_sql); + plproxy_query_prepare(f, fcinfo, f->cluster_sql, false); if (f->hash_sql) - plproxy_query_prepare(f, fcinfo, f->hash_sql); + plproxy_query_prepare(f, fcinfo, f->hash_sql, true); if (f->connect_sql) - plproxy_query_prepare(f, fcinfo, f->connect_sql); + plproxy_query_prepare(f, fcinfo, f->connect_sql, false); /* sanity check */ if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset) diff --git a/src/main.c b/src/main.c index d0a3672..36441d9 100644 --- a/src/main.c +++ b/src/main.c @@ -140,6 +140,10 @@ compile_and_execute(FunctionCallInfo fcinfo) /* get actual cluster to run on */ cluster = plproxy_find_cluster(func, fcinfo); + /* Don't allow nested calls on the same cluster */ + if (cluster->busy) + plproxy_error(func, "Nested PL/Proxy calls to the same cluster are not supported."); + /* fetch PGresults */ func->cur_cluster = cluster; plproxy_exec(func, fcinfo); diff --git a/src/parser.y b/src/parser.y index f6c331a..987ef7a 100644 --- a/src/parser.y +++ b/src/parser.y @@ -35,7 +35,7 @@ void plproxy_yy_scan_bytes(const char *bytes, int len); static ProxyFunction *xfunc; /* remember what happened */ -static int got_run, got_cluster, got_connect; +static int got_run, got_cluster, got_connect, got_split; static QueryBuffer *cluster_sql; static QueryBuffer *select_sql; @@ -48,7 +48,7 @@ static QueryBuffer *cur_sql; /* keep the resetting code together with variables */ static void reset_parser_vars(void) { - got_run = got_cluster = got_connect = 0; + got_run = got_cluster = got_connect = got_split = 0; cur_sql = select_sql = cluster_sql = hash_sql = connect_sql = NULL; xfunc = NULL; } @@ -58,7 +58,7 @@ static void reset_parser_vars(void) %name-prefix="plproxy_yy" %token CONNECT CLUSTER RUN ON ALL ANY SELECT -%token IDENT NUMBER FNCALL STRING +%token IDENT NUMBER FNCALL SPLIT STRING %token SQLIDENT SQLPART %union @@ -70,7 +70,7 @@ static void reset_parser_vars(void) body: | body stmt ; -stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ; +stmt: cluster_stmt | split_stmt | run_stmt | select_stmt | connect_stmt ; connect_stmt: CONNECT connect_spec ';' { if (got_connect) @@ -117,6 +117,22 @@ cluster_func: FNCALL { cluster_sql = plproxy_query_start(xfunc, false); cluster_name: STRING { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); } ; +split_stmt: SPLIT split_param_list ';' { + if (got_split) + yyerror("Only one SPLIT statement allowed"); + got_split = 1; + } + ; + +split_param_list: split_param + | split_param_list ',' split_param + ; + +split_param: IDENT { + if (!plproxy_split_add_ident(xfunc, $1)) + yyerror("invalid argument reference: %s", $1); + } + run_stmt: RUN ON run_spec ';' { if (got_run) yyerror("Only one RUN statement allowed"); got_run = 1; } diff --git a/src/plproxy.h b/src/plproxy.h index 37d97fd..8b8d20e 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -64,6 +65,11 @@ #endif +/* + * Determine if this argument is to SPLIT + */ +#define IS_SPLIT_ARG(func, arg) ((func)->split_args && (func)->split_args[arg]) + /* * Maintenece period in seconds. Connnections will be freed * from stale results, and checked for lifetime. @@ -118,9 +124,25 @@ typedef struct ConnState state; /* Connection state */ time_t connect_time; /* When connection was started */ time_t query_time; /* When last query was sent */ - bool run_on; /* True it this connection should be used */ bool same_ver; /* True if dest backend has same X.Y ver */ bool tuning; /* True if tuning query is running on conn */ + + /* + * Nonzero if this connection should be used. The actual tag value is only + * used by SPLIT processing, others should treat it as a boolean value. + */ + int run_tag; + + /* + * Per-connection parameters. These are a assigned just before the + * remote call is made. + */ + + Datum *split_params; /* Split array parameters */ + ArrayBuildState **bstate; /* Temporary build state */ + const char *param_values[FUNC_MAX_ARGS]; /* Parameter values */ + int param_lengths[FUNC_MAX_ARGS]; /* Parameter lengths (binary io) */ + int param_formats[FUNC_MAX_ARGS]; /* Parameter formats (binary io) */ } ProxyConnection; /* Info about one cluster */ @@ -142,6 +164,8 @@ typedef struct ProxyCluster int ret_cur_conn; /* Result walking: index of current conn */ int ret_cur_pos; /* Result walking: index of current row */ int ret_total; /* Result walking: total rows left */ + + bool busy; /* True if the cluster is already involved in execution */ } ProxyCluster; /* @@ -161,6 +185,10 @@ typedef struct ProxyType bool has_send; /* Has binary output */ bool has_recv; /* Has binary input */ bool by_value; /* False if Datum is a pointer to data */ + char alignment; /* Type alignment */ + bool is_array; /* True if array */ + Oid elem_type; /* Array element type */ + short length; /* Type length */ /* I/O functions */ union @@ -207,6 +235,17 @@ typedef struct ProxyQuery void *plan; /* Optional prepared plan for local queries */ } ProxyQuery; +/* + * Deconstructed array parameters + */ +typedef struct DatumArray +{ + ProxyType *type; + Datum *values; + bool *nulls; + int elem_count; +} DatumArray; + /* * Complete info about compiled function. * @@ -224,6 +263,8 @@ typedef struct ProxyFunction char **arg_names; /* Argument names, may contain NULLs */ short arg_count; /* Argument count of proxy function */ + bool *split_args; /* Map of arguments to split */ + /* if the function returns untyped RECORD that needs AS clause */ bool dynamic_record; @@ -272,6 +313,8 @@ void plproxy_error(ProxyFunction *func, const char *fmt,...); void plproxy_function_cache_init(void); void *plproxy_func_alloc(ProxyFunction *func, int size); char *plproxy_func_strdup(ProxyFunction *func, const char *s); +int plproxy_get_parameter_index(ProxyFunction *func, const char *ident); +bool plproxy_split_add_ident(ProxyFunction *func, const char *ident); ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate); /* execute.c */ @@ -312,8 +355,9 @@ bool plproxy_query_add_const(QueryBuffer *q, const char *data); bool plproxy_query_add_ident(QueryBuffer *q, const char *ident); ProxyQuery *plproxy_query_finish(QueryBuffer *q); ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types); -void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q); -void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q); +void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support); +void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, + DatumArray **array_params, int array_row); void plproxy_query_freeplan(ProxyQuery *q); #endif diff --git a/src/query.c b/src/query.c index c734bf0..d0e0577 100644 --- a/src/query.c +++ b/src/query.c @@ -90,25 +90,8 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident) fn_idx = -1, sql_idx = -1; - if (ident[0] == '$') - { - fn_idx = atoi(ident + 1) - 1; - if (fn_idx < 0 || fn_idx >= q->func->arg_count) - return false; - } - else if (q->func->arg_names) - { - for (i = 0; i < q->func->arg_count; i++) - { - if (!q->func->arg_names[i]) - continue; - if (pg_strcasecmp(ident, q->func->arg_names[i]) == 0) - { - fn_idx = i; - break; - } - } - } + fn_idx = plproxy_get_parameter_index(q->func, ident); + if (fn_idx >= 0) { for (i = 0; i < q->arg_count; i++) @@ -127,7 +110,12 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident) add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types); } else + { + if (ident[0] == '$') + return false; appendStringInfoString(q->sql, ident); + } + return true; } @@ -149,6 +137,7 @@ plproxy_query_finish(QueryBuffer *q) len = q->arg_count * sizeof(int); pq->arg_lookup = palloc(len); pq->plan = NULL; + memcpy(pq->arg_lookup, q->arg_lookup, len); MemoryContextSwitchTo(old); @@ -247,7 +236,7 @@ plproxy_standard_query(ProxyFunction *func, bool add_types) * Prepare ProxyQuery for local execution */ void -plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) +plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support) { int i; Oid types[FUNC_MAX_ARGS]; @@ -258,7 +247,11 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery * { int idx = q->arg_lookup[i]; - types[i] = func->arg_types[idx]->type_oid; + if (split_support && IS_SPLIT_ARG(func, idx)) + /* for SPLIT arguments use array element type instead */ + types[i] = func->arg_types[idx]->elem_type; + else + types[i] = func->arg_types[idx]->type_oid; } /* prepare & store plan */ @@ -272,12 +265,12 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery * * Result will be in SPI_tuptable. */ void -plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) +plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, + DatumArray **array_params, int array_row) { int i, idx, err; - ProxyType *type; char arg_nulls[FUNC_MAX_ARGS]; Datum arg_values[FUNC_MAX_ARGS]; @@ -285,12 +278,19 @@ plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) for (i = 0; i < q->arg_count; i++) { idx = q->arg_lookup[i]; - type = func->arg_types[idx]; + if (PG_ARGISNULL(idx)) { arg_nulls[i] = 'n'; arg_values[i] = (Datum) NULL; } + else if (array_params && IS_SPLIT_ARG(func, idx)) + { + DatumArray *ats = array_params[idx]; + + arg_nulls[i] = ats->nulls[array_row] ? 'n' : ' '; + arg_values[i] = ats->nulls[array_row] ? (Datum) NULL : ats->values[array_row]; + } else { arg_nulls[i] = ' '; diff --git a/src/scanner.l b/src/scanner.l index cb203b7..36166c9 100644 --- a/src/scanner.l +++ b/src/scanner.l @@ -179,6 +179,7 @@ run { return RUN; } on { return ON; } all { return ALL; } any { return ANY; } +split { return SPLIT; } select { BEGIN(sql); yylval.str = yytext; return SELECT; } /* function call */ diff --git a/src/type.c b/src/type.c index dd7165d..3679d83 100644 --- a/src/type.c +++ b/src/type.c @@ -253,6 +253,10 @@ plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send) type->for_send = for_send; type->by_value = s_type->typbyval; type->name = plproxy_func_strdup(func, namebuf); + type->is_array = (s_type->typelem != 0 && s_type->typlen == -1); + type->elem_type = s_type->typelem; + type->alignment = s_type->typalign; + type->length = s_type->typlen; /* decide what function is needed */ if (for_send) -- 2.39.5