provider_object_list = [
skytools.DBLanguage("plpgsql"),
- skytools.DBFunction('get_current_snapshot', 0, sql_file = "txid.sql"),
+ skytools.DBFunction('txid_current_snapshot', 0, sql_file = "txid.sql"),
skytools.DBSchema('pgq', sql_file = "pgq.sql"),
skytools.DBSchema('londiste', sql_file = "londiste.sql")
]
self.real_copy(src_curs, dst_curs, tbl_stat)
# get snapshot
- src_curs.execute("select get_current_snapshot()")
+ src_curs.execute("select txid_current_snapshot()")
snapshot = src_curs.fetchone()[0]
src_db.commit()
__all__ = ['SmartTicker']
def is_txid_sane(curs):
- curs.execute("select get_current_txid()")
+ curs.execute("select txid_current()")
txid = curs.fetchone()[0]
# on 8.2 theres no such table
def installer(self):
objs = [
skytools.DBLanguage("plpgsql"),
- skytools.DBFunction("get_current_txid", 0, sql_file="txid.sql"),
+ skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"),
skytools.DBSchema("pgq", sql_file="pgq.sql"),
]
--
-- Simplest solution would be
-- > WHERE ev_txid >= xmin1 AND ev_txid <= xmax2
--- > AND NOT txid_in_snapshot(ev_txid, sn1)
--- > AND txid_in_snapshot(ev_txid, sn2)
+-- > AND NOT txid_visible_in_snapshot(ev_txid, sn1)
+-- > AND txid_visible_in_snapshot(ev_txid, sn2)
--
-- The simple solution has a problem with long transactions (xmin1 very low).
-- All the batches that happen when the long tx is active will need
batch record;
begin
select s.sub_last_tick, s.sub_next_tick, s.sub_id, s.sub_queue,
- get_snapshot_xmax(last.tick_snapshot) as tx_start,
- get_snapshot_xmax(cur.tick_snapshot) as tx_end,
+ txid_snapshot_xmax(last.tick_snapshot) as tx_start,
+ txid_snapshot_xmax(cur.tick_snapshot) as tx_end,
last.tick_snapshot as last_snapshot,
cur.tick_snapshot as cur_snapshot
into batch
for rec in
-- active tx-es in prev_snapshot that were committed in cur_snapshot
select id1 from
- get_snapshot_active(batch.last_snapshot) id1 left join
- get_snapshot_active(batch.cur_snapshot) id2 on (id1 = id2)
+ txid_snapshot_xip(batch.last_snapshot) id1 left join
+ txid_snapshot_xip(batch.cur_snapshot) id2 on (id1 = id2)
where id2 is null
order by 1 desc
loop
|| ' and last.tick_queue = ' || batch.sub_queue
|| ' and ev.ev_txid >= ' || batch.tx_start
|| ' and ev.ev_txid <= ' || batch.tx_end
- || ' and txid_in_snapshot(ev.ev_txid, cur.tick_snapshot)'
- || ' and not txid_in_snapshot(ev.ev_txid, last.tick_snapshot)'
+ || ' and txid_visible_in_snapshot(ev.ev_txid, cur.tick_snapshot)'
+ || ' and not txid_visible_in_snapshot(ev.ev_txid, last.tick_snapshot)'
|| retry_expr;
-- now include older tx-es, that were ongoing
-- at the time of prev_snapshot
batch record;
begin
select
- get_snapshot_xmin(last.tick_snapshot) as tx_min, -- absolute minimum
- get_snapshot_xmax(cur.tick_snapshot) as tx_max, -- absolute maximum
+ txid_snapshot_xmin(last.tick_snapshot) as tx_min, -- absolute minimum
+ txid_snapshot_xmax(cur.tick_snapshot) as tx_max, -- absolute maximum
q.queue_data_pfx, q.queue_ntables,
q.queue_cur_table, q.queue_switch_step1, q.queue_switch_step2
into batch
-- check if any consumer is on previous table
select coalesce(count(*), 0) into badcnt
from pgq.subscription, pgq.tick
- where get_snapshot_xmin(tick_snapshot) < cf.queue_switch_step2
+ where txid_snapshot_xmin(tick_snapshot) < cf.queue_switch_step2
and sub_queue = cf.queue_id
and tick_queue = cf.queue_id
and tick_id = (select tick_id from pgq.tick
update pgq.queue
set queue_cur_table = nr,
queue_switch_time = current_timestamp,
- queue_switch_step1 = get_current_txid(),
+ queue_switch_step1 = txid_current(),
queue_switch_step2 = NULL
where queue_id = cf.queue_id;
-- clean ticks - avoid partial batches
delete from pgq.tick
where tick_queue = cf.queue_id
- and get_snapshot_xmin(tick_snapshot) < cf.queue_switch_step2;
+ and txid_snapshot_xmin(tick_snapshot) < cf.queue_switch_step2;
return 1;
end;
-- tranaction than step1
begin
update pgq.queue
- set queue_switch_step2 = get_current_txid()
+ set queue_switch_step2 = txid_current()
where queue_switch_step2 is null;
return 1;
end;
queue_ntables integer not null default 3,
queue_cur_table integer not null default 0,
queue_rotation_period interval not null default '2 hours',
- queue_switch_step1 bigint not null default get_current_txid(),
- queue_switch_step2 bigint default get_current_txid(),
+ queue_switch_step1 bigint not null default txid_current(),
+ queue_switch_step2 bigint default txid_current(),
queue_switch_time timestamptz not null default now(),
queue_external_ticker boolean not null default false,
tick_queue int4 not null,
tick_id bigint not null,
tick_time timestamptz not null default now(),
- tick_snapshot txid_snapshot not null default get_current_snapshot(),
+ tick_snapshot txid_snapshot not null default txid_current_snapshot(),
constraint tick_pkey primary key (tick_queue, tick_id),
constraint tick_queue_fkey foreign key (tick_queue)
ev_id bigint not null,
ev_time timestamptz not null,
- ev_txid bigint not null default get_current_txid(),
+ ev_txid bigint not null default txid_current(),
ev_owner int4,
ev_retry int4,
+include ../../config.mak
+
+PGVER := $(shell $(PG_CONFIG) --version | sed 's/PostgreSQL //')
+
+ifeq ($(PGVER),)
+$(error skytools not configured, cannot continue)
+else
+# postgres >= manages epoch itself, so skip epoch tables
+pg83 = $(shell test $(PGVER) "<" "8.3" && echo "false" || echo "true")
+pg82 = $(shell test $(PGVER) "<" "8.2" && echo "false" || echo "true")
+endif
+
+ifeq ($(pg83),true) # we have 8.3 with internal txid
+
+# nothing to do
+
+else # 8.2 or 8.1
+#
+# pg < 8.3 needs this module
+#
MODULE_big = txid
SRCS = txid.c epoch.c
OBJS = $(SRCS:.c=.o)
-
-DATA_built = txid.sql
+REGRESS = txid
+REGRESS_OPTS = --load-language=plpgsql
DATA = uninstall_txid.sql
DOCS = README.txid
+DATA_built = txid.sql
EXTRA_CLEAN = txid.sql.in
-REGRESS = txid
-REGRESS_OPTS = --load-language=plpgsql
+ifeq ($(pg82),true)
+# 8.2 tracks epoch internally
+TXID_SQL = txid.std.sql
+else
+# 8.1 needs epoch-tracking code
+TXID_SQL = txid.std.sql txid.schema.sql
+endif # ! 8.2
+endif # ! 8.3
-include ../../config.mak
+# PGXS build procedure
include $(PGXS)
# additional deps
txid.o: txid.h
epoch.o: txid.h
-# postgres >= manages epoch itself, so skip epoch tables
-pgnew = $(shell test $(VERSION) "<" "8.2" && echo "false" || echo "true")
-ifeq ($(pgnew),true)
-TXID_SQL = txid.std.sql
-else
-TXID_SQL = txid.std.sql txid.schema.sql
-endif
txid.sql.in: $(TXID_SQL)
cat $(TXID_SQL) > $@
Also it contains type 'txid_snapshot' and following functions:
-get_current_txid() returns int8
+txid_current() returns int8
Current transaction ID
-get_current_snapshot() returns txid_snapshot
+txid_current_snapshot() returns txid_snapshot
Current snapshot
-get_snapshot_xmin( snap ) returns int8 --
+txid_snapshot_xmin( snap ) returns int8
Smallest TXID in snapshot. TXID's smaller than this
are all visible in snapshot.
-get_snapshot_xmax( snap ) returns int8
+txid_snapshot_xmax( snap ) returns int8
Largest TXID in snapshot. TXID's starting from this one are
all invisible in snapshot.
-get_snapshot_values( snap ) setof int8
+txid_snapshot_xip( snap ) setof int8
List of uncommitted TXID's in snapshot, that are invisible
in snapshot. Values are between xmin and xmax.
-txid_in_snapshot(id, snap) returns bool
+txid_visible_in_snapshot(id, snap) returns bool
Is TXID visible in snapshot?
-txid_not_in_snapshot(id, snap) returns bool
-
- Is TXID invisible in snapshot?
-
Problems
--------
100001:100009:100005,100007,100008
(3 rows)
-select get_snapshot_xmin(snap),
- get_snapshot_xmax(snap),
- get_snapshot_active(snap)
+select txid_snapshot_xmin(snap),
+ txid_snapshot_xmax(snap),
+ txid_snapshot_xip(snap)
from snapshot_test order by nr;
- get_snapshot_xmin | get_snapshot_xmax | get_snapshot_active
--------------------+-------------------+---------------------
- 12 | 20 | 13
- 12 | 20 | 15
- 12 | 20 | 18
- 100001 | 100009 | 100005
- 100001 | 100009 | 100007
- 100001 | 100009 | 100008
+ txid_snapshot_xmin | txid_snapshot_xmax | txid_snapshot_xip
+--------------------+--------------------+-------------------
+ 12 | 20 | 13
+ 12 | 20 | 15
+ 12 | 20 | 18
+ 100001 | 100009 | 100005
+ 100001 | 100009 | 100007
+ 100001 | 100009 | 100008
(6 rows)
-select id, txid_in_snapshot(id, snap),
- txid_not_in_snapshot(id, snap)
+select id, txid_visible_in_snapshot(id, snap)
from snapshot_test, generate_series(11, 21) id
where nr = 2;
- id | txid_in_snapshot | txid_not_in_snapshot
-----+------------------+----------------------
- 11 | t | f
- 12 | t | f
- 13 | f | t
- 14 | t | f
- 15 | f | t
- 16 | t | f
- 17 | t | f
- 18 | f | t
- 19 | t | f
- 20 | f | t
- 21 | f | t
+ id | txid_visible_in_snapshot
+----+--------------------------
+ 11 | t
+ 12 | t
+ 13 | f
+ 14 | t
+ 15 | f
+ 16 | t
+ 17 | t
+ 18 | f
+ 19 | t
+ 20 | f
+ 21 | f
(11 rows)
-- test current values also
-select get_current_txid() >= get_snapshot_xmin(get_current_snapshot());
+select txid_current() >= txid_snapshot_xmin(txid_current_snapshot());
?column?
----------
t
(1 row)
-select get_current_txid() < get_snapshot_xmax(get_current_snapshot());
- ?column?
-----------
- t
-(1 row)
-
-select txid_in_snapshot(get_current_txid(), get_current_snapshot()),
- txid_not_in_snapshot(get_current_txid(), get_current_snapshot());
- txid_in_snapshot | txid_not_in_snapshot
-------------------+----------------------
- t | f
-(1 row)
-
+-- select txid_current_txid() < txid_snapshot_xmax(txid_current_snapshot());
+-- select txid_in_snapshot(txid_current_txid(), txid_current_snapshot()),
+-- txid_not_in_snapshot(txid_current_txid(), txid_current_snapshot());
select snap from snapshot_test order by nr;
-select get_snapshot_xmin(snap),
- get_snapshot_xmax(snap),
- get_snapshot_active(snap)
+select txid_snapshot_xmin(snap),
+ txid_snapshot_xmax(snap),
+ txid_snapshot_xip(snap)
from snapshot_test order by nr;
-select id, txid_in_snapshot(id, snap),
- txid_not_in_snapshot(id, snap)
+select id, txid_visible_in_snapshot(id, snap)
from snapshot_test, generate_series(11, 21) id
where nr = 2;
-- test current values also
-select get_current_txid() >= get_snapshot_xmin(get_current_snapshot());
-select get_current_txid() < get_snapshot_xmax(get_current_snapshot());
+select txid_current() >= txid_snapshot_xmin(txid_current_snapshot());
+-- select txid_current_txid() < txid_snapshot_xmax(txid_current_snapshot());
-select txid_in_snapshot(get_current_txid(), get_current_snapshot()),
- txid_not_in_snapshot(get_current_txid(), get_current_snapshot());
+-- select txid_in_snapshot(txid_current_txid(), txid_current_snapshot()),
+-- txid_not_in_snapshot(txid_current_txid(), txid_current_snapshot());
#include "access/xact.h"
#include "funcapi.h"
#include "lib/stringinfo.h"
+#include "libpq/pqformat.h"
#include "txid.h"
#define SET_VARSIZE(x, len) VARATT_SIZEP(x) = len
#endif
+/* txid will be signed int8 in database, so must limit to 63 bits */
+#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
+
/*
* If defined, use bsearch() function for searching
* txid's inside snapshots that have more than given values.
PG_FUNCTION_INFO_V1(txid_current);
PG_FUNCTION_INFO_V1(txid_snapshot_in);
PG_FUNCTION_INFO_V1(txid_snapshot_out);
-PG_FUNCTION_INFO_V1(txid_in_snapshot);
-PG_FUNCTION_INFO_V1(txid_not_in_snapshot);
+PG_FUNCTION_INFO_V1(txid_snapshot_recv);
+PG_FUNCTION_INFO_V1(txid_snapshot_send);
PG_FUNCTION_INFO_V1(txid_current_snapshot);
PG_FUNCTION_INFO_V1(txid_snapshot_xmin);
PG_FUNCTION_INFO_V1(txid_snapshot_xmax);
+
+/* new API in 8.3 */
+PG_FUNCTION_INFO_V1(txid_visible_in_snapshot);
+PG_FUNCTION_INFO_V1(txid_snapshot_xip);
+
+/* old API */
+PG_FUNCTION_INFO_V1(txid_in_snapshot);
+PG_FUNCTION_INFO_V1(txid_not_in_snapshot);
PG_FUNCTION_INFO_V1(txid_snapshot_active);
/*
PG_RETURN_CSTRING(str.data);
}
+/*
+ * txid_snapshot_recv(internal) returns txid_snapshot
+ *
+ * binary input function for type txid_snapshot
+ *
+ * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
+ */
+Datum
+txid_snapshot_recv(PG_FUNCTION_ARGS)
+{
+ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+ TxidSnapshot *snap;
+ txid last = 0;
+ int nxip;
+ int i;
+ int avail;
+ int expect;
+ txid xmin, xmax;
+
+ /*
+ * load nxip and check for nonsense.
+ *
+ * (nxip > avail) check is against int overflows in 'expect'.
+ */
+ nxip = pq_getmsgint(buf, 4);
+ avail = buf->len - buf->cursor;
+ expect = 8 + 8 + nxip * 8;
+ if (nxip < 0 || nxip > avail || expect > avail)
+ goto bad_format;
+
+ xmin = pq_getmsgint64(buf);
+ xmax = pq_getmsgint64(buf);
+ if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
+ goto bad_format;
+
+ snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
+ snap->xmin = xmin;
+ snap->xmax = xmax;
+ snap->nxip = nxip;
+ SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
+
+ for (i = 0; i < nxip; i++)
+ {
+ txid cur = pq_getmsgint64(buf);
+ if (cur <= last || cur < xmin || cur >= xmax)
+ goto bad_format;
+ snap->xip[i] = cur;
+ last = cur;
+ }
+ PG_RETURN_POINTER(snap);
+
+bad_format:
+ elog(ERROR, "invalid snapshot data");
+ return (Datum)NULL;
+}
+
+/*
+ * txid_snapshot_send(txid_snapshot) returns bytea
+ *
+ * binary output function for type txid_snapshot
+ *
+ * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
+ */
+Datum
+txid_snapshot_send(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0);
+ StringInfoData buf;
+ uint32 i;
+
+ pq_begintypsend(&buf);
+ pq_sendint(&buf, snap->nxip, 4);
+ pq_sendint64(&buf, snap->xmin);
+ pq_sendint64(&buf, snap->xmax);
+ for (i = 0; i < snap->nxip; i++)
+ pq_sendint64(&buf, snap->xip[i]);
+ PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
+
static int
_txid_in_snapshot(txid value, const TxidSnapshot *snap)
PG_RETURN_BOOL(res);
}
+/*
+ * changed api
+ */
+Datum
+txid_visible_in_snapshot(PG_FUNCTION_ARGS)
+{
+ txid value = PG_GETARG_INT64(0);
+ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
+ int res;
+
+ res = _txid_in_snapshot(value, snap) ? true : false;
+
+ PG_FREE_IF_COPY(snap, 1);
+ PG_RETURN_BOOL(res);
+}
+
/*
* txid_not_in_snapshot - is txid invisible in snapshot ?
* txid_snapshot_active - returns uncommitted TXID's in snapshot.
*/
Datum
-txid_snapshot_active(PG_FUNCTION_ARGS)
+txid_snapshot_xip(PG_FUNCTION_ARGS)
{
FuncCallContext *fctx;
struct snap_state *state;
}
}
+/* old api */
+Datum
+txid_snapshot_active(PG_FUNCTION_ARGS)
+{
+ return txid_snapshot_xip(fcinfo);
+}
+
txid xip[1];
} TxidSnapshot;
+#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(txid) * nxip)
typedef struct {
uint64 last_value;
Datum txid_snapshot_in(PG_FUNCTION_ARGS);
Datum txid_snapshot_out(PG_FUNCTION_ARGS);
+Datum txid_snapshot_recv(PG_FUNCTION_ARGS);
+Datum txid_snapshot_send(PG_FUNCTION_ARGS);
-Datum txid_in_snapshot(PG_FUNCTION_ARGS);
-Datum txid_not_in_snapshot(PG_FUNCTION_ARGS);
Datum txid_snapshot_xmin(PG_FUNCTION_ARGS);
Datum txid_snapshot_xmax(PG_FUNCTION_ARGS);
+Datum txid_snapshot_xip(PG_FUNCTION_ARGS);
+Datum txid_visible_in_snapshot(PG_FUNCTION_ARGS);
+
Datum txid_snapshot_active(PG_FUNCTION_ARGS);
+Datum txid_in_snapshot(PG_FUNCTION_ARGS);
+Datum txid_not_in_snapshot(PG_FUNCTION_ARGS);
#endif /* _TXID_H_ */
RETURNS cstring
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
+CREATE OR REPLACE FUNCTION txid_snapshot_recv(internal)
+ RETURNS txid_snapshot
+ AS 'MODULE_PATHNAME' LANGUAGE C
+ IMMUTABLE STRICT;
+CREATE OR REPLACE FUNCTION txid_snapshot_send(txid_snapshot)
+ RETURNS bytea
+ AS 'MODULE_PATHNAME' LANGUAGE C
+ IMMUTABLE STRICT;
--
-- The data type itself
CREATE TYPE txid_snapshot (
INPUT = txid_snapshot_in,
OUTPUT = txid_snapshot_out,
+ RECEIVE = txid_snapshot_recv,
+ SEND = txid_snapshot_send,
INTERNALLENGTH = variable,
STORAGE = extended,
ALIGNMENT = double
);
-CREATE OR REPLACE FUNCTION get_current_txid()
+--CREATE OR REPLACE FUNCTION get_current_txid()
+CREATE OR REPLACE FUNCTION txid_current()
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_current' LANGUAGE C
- SECURITY DEFINER;
+ STABLE SECURITY DEFINER;
-CREATE OR REPLACE FUNCTION get_current_snapshot()
+-- CREATE OR REPLACE FUNCTION get_current_snapshot()
+CREATE OR REPLACE FUNCTION txid_current_snapshot()
RETURNS txid_snapshot
AS 'MODULE_PATHNAME', 'txid_current_snapshot' LANGUAGE C
- SECURITY DEFINER;
+ STABLE SECURITY DEFINER;
-CREATE OR REPLACE FUNCTION get_snapshot_xmin(txid_snapshot)
+--CREATE OR REPLACE FUNCTION get_snapshot_xmin(txid_snapshot)
+CREATE OR REPLACE FUNCTION txid_snapshot_xmin(txid_snapshot)
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xmin' LANGUAGE C
IMMUTABLE STRICT;
-CREATE OR REPLACE FUNCTION get_snapshot_xmax(txid_snapshot)
+-- CREATE OR REPLACE FUNCTION get_snapshot_xmax(txid_snapshot)
+CREATE OR REPLACE FUNCTION txid_snapshot_xmax(txid_snapshot)
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xmax' LANGUAGE C
IMMUTABLE STRICT;
-CREATE OR REPLACE FUNCTION get_snapshot_active(txid_snapshot)
+-- CREATE OR REPLACE FUNCTION get_snapshot_active(txid_snapshot)
+CREATE OR REPLACE FUNCTION txid_snapshot_xip(txid_snapshot)
RETURNS setof bigint
- AS 'MODULE_PATHNAME', 'txid_snapshot_active' LANGUAGE C
+ AS 'MODULE_PATHNAME', 'txid_snapshot_xip' LANGUAGE C
IMMUTABLE STRICT;
-- Special comparision functions used by the remote worker
-- for sync chunk selection
--
+CREATE OR REPLACE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot)
+ RETURNS boolean
+ AS 'MODULE_PATHNAME', 'txid_visible_in_snapshot' LANGUAGE C
+ IMMUTABLE STRICT;
+/*
CREATE OR REPLACE FUNCTION txid_in_snapshot(bigint, txid_snapshot)
RETURNS boolean
AS 'MODULE_PATHNAME', 'txid_in_snapshot' LANGUAGE C
RETURNS boolean
AS 'MODULE_PATHNAME', 'txid_not_in_snapshot' LANGUAGE C
IMMUTABLE STRICT;
-
+*/
DROP DOMAIN txid;
DROP TYPE txid_snapshot cascade;
DROP SCHEMA txid CASCADE;
-DROP FUNCTION get_current_txid();
-DROP FUNCTION get_snapshot_xmin();
-DROP FUNCTION get_snapshot_xmax();
-DROP FUNCTION get_snapshot_active();
+DROP FUNCTION txid_current();