--sync-watermark: limit watermark to specific nodes
authorMarko Kreen <markokr@gmail.com>
Tue, 31 Jan 2012 08:58:39 +0000 (10:58 +0200)
committerMarko Kreen <markokr@gmail.com>
Tue, 31 Jan 2012 08:58:39 +0000 (10:58 +0200)
This will allow specified nodes to sync global watermark
with each other, without sending it upwards.

This isolates root node from lag on downstream nodes.

12 files changed:
python/londiste.py
python/pgq/cascade/admin.py
python/pgq/cascade/nodeinfo.py
python/pgq/cascade/worker.py
sql/pgq_node/expected/pgq_node_test.out
sql/pgq_node/functions/pgq_node.get_node_info.sql
sql/pgq_node/functions/pgq_node.set_node_attrs.sql [new file with mode: 0644]
sql/pgq_node/functions/pgq_node.upgrade_schema.sql
sql/pgq_node/sql/pgq_node_test.sql
sql/pgq_node/structure/functions.sql
sql/pgq_node/structure/tables.sql
tests/londiste/regen.sh

index 5ce717b9e43695c8078857b5e384df72a5e0c09f..b7bc99e0f617e9e2f22198e50a59c12bc78c3bce 100755 (executable)
@@ -97,6 +97,8 @@ class Londiste(skytools.DBScript):
                 help = "takeover: old node was root")
         g.add_option("--dead-branch", action = 'store_true',
                 help = "takeover: old node was branch")
+        g.add_option("--sync-watermark",
+                help = "create-branch: list of node names to sync wm with")
         p.add_option_group(g)
         g = optparse.OptionGroup(p, "repair queue position")
         g.add_option("--rewind", action = "store_true",
index 48f4e43aa3fb8e274a05d020928cb58b98f7c52f..964af08195521edd3b742e15efb31c1fe487faaf 100644 (file)
@@ -94,6 +94,8 @@ class CascadeAdmin(skytools.AdminScript):
                     help = "tag some node as dead")
         g.add_option("--dead-branch", action="store_true",
                     help = "tag some node as dead")
+        g.add_option("--sync-watermark",
+                    help = "list of node names to sync with")
         p.add_option_group(g)
         return p
 
@@ -147,6 +149,7 @@ class CascadeAdmin(skytools.AdminScript):
             return
 
         self.log.info("Initializing node")
+        node_attrs = {}
 
         worker_name = self.options.worker
         if not worker_name:
@@ -155,6 +158,11 @@ class CascadeAdmin(skytools.AdminScript):
         if combined_queue and node_type != 'leaf':
             raise Exception('--merge can be used only for leafs')
 
+        if self.options.sync_watermark:
+            if node_type != 'branch':
+                raise UsageError('--sync-watermark can be used only for branch nodes')
+            node_attrs['sync_watermark'] = self.options.sync_watermark
+
         # register member
         if node_type == 'root':
             global_watermark = None
@@ -221,6 +229,11 @@ class CascadeAdmin(skytools.AdminScript):
 
         self.extra_init(node_type, db, provider_db)
 
+        if node_attrs:
+            s_attrs = skytools.db_urlencode(node_attrs)
+            self.exec_cmd(db, "select * from pgq_node.set_node_attrs(%s, %s)",
+                          [self.queue_name, s_attrs])
+
         self.log.info("Done")
 
     def extra_init(self, node_type, node_db, provider_db):
index 48dea4b201d34b7f5d5f3086ca59af89e0647ba5..726b311e82636f78401125e98f5fc66f64872d0b 100644 (file)
@@ -6,6 +6,7 @@
 __all__ = ['MemberInfo', 'NodeInfo', 'QueueInfo']
 
 import datetime
+import skytools
 
 # node types
 ROOT = 'root'
@@ -49,6 +50,7 @@ class NodeInfo:
     combined_queue = None
     combined_type = None
     last_tick = None
+    node_attrs = {}
 
     def __init__(self, queue_name, row, main_worker = True, node_name = None):
         self.queue_name = queue_name
@@ -83,6 +85,12 @@ class NodeInfo:
         self.combined_type = row['combined_type']
         self.last_tick = row['worker_last_tick']
 
+        self.node_attrs = {}
+        if 'node_attrs' in row:
+            a = row['node_attrs']
+            if a:
+                self.node_attrs = skytools.db_urldecode(a)
+
     def __get_target_queue(self):
         qname = None
         if self.type == LEAF:
@@ -129,6 +137,10 @@ class NodeInfo:
             txt += ", NOT UPTODATE"
         lst.append(txt)
 
+        for k, v in self.node_attrs.items():
+            txt = "Attr: %s=%s" % (k, v)
+            lst.append(txt)
+
         for cname, row in self.cascaded_consumer_map.items():
             err = row['cur_error']
             if err:
index 1d3f83259e6ce370d65cbf86cc0fcc91d30ef322..a721eaa082b6bf9bb2abe312880fe83ae038ede0 100644 (file)
@@ -4,10 +4,11 @@ CascadedConsumer that also maintains node.
 
 """
 
-import sys, time
+import sys, time, skytools
 
 from pgq.cascade.consumer import CascadedConsumer
 from pgq.producer import bulk_insert_events
+from pgq.event import Event
 
 __all__ = ['CascadedWorker']
 
@@ -32,10 +33,20 @@ class WorkerState:
     filtered_copy = 0       # ok
     process_global_wm = 0   # ok
 
+    sync_watermark = 0      # ?
+    wm_sync_nodes = []
+
     def __init__(self, queue_name, nst):
         self.node_type = nst['node_type']
         self.node_name = nst['node_name']
         self.local_watermark = nst['local_watermark']
+        self.global_watermark = nst['global_watermark']
+
+        self.node_attrs = {}
+        attrs = nst.get('node_attrs', '')
+        if attrs:
+            self.node_attrs = skytools.db_urldecode(attrs)
+
         ntype = nst['node_type']
         ctype = nst['combined_type']
         if ntype == 'root':
@@ -49,7 +60,12 @@ class WorkerState:
             self.process_tick_event = 1
             self.keep_event_ids = 1
             self.create_tick = 1
-            self.process_global_wm = 1
+            if 'sync_watermark' in self.node_attrs:
+                slist = self.node_attrs['sync_watermark']
+                self.sync_watermark = 1
+                self.wm_sync_nodes = slist.split(',')
+            else:
+                self.process_global_wm = 1
         elif ntype == 'leaf' and not ctype:
             self.process_batch = 1
             self.process_events = 1
@@ -139,8 +155,6 @@ class CascadedWorker(CascadedConsumer):
                     self.process_remote_event(src_curs, dst_curs, ev)
             if ev.ev_id > max_id:
                 max_id = ev.ev_id
-        if st.local_wm_publish:
-            self.publish_local_wm(src_db)
         if max_id > self.cur_max_id:
             self.cur_max_id = max_id
 
@@ -195,22 +209,72 @@ class CascadedWorker(CascadedConsumer):
             self.create_branch_tick(dst_db, cur_tick, tick_time)
         return True
 
-    def publish_local_wm(self, src_db):
+    def publish_local_wm(self, src_db, dst_db):
         """Send local watermark to provider.
         """
-        if not self.main_worker:
-            return
+
         t = time.time()
         if t - self.local_wm_publish_time < self.local_wm_publish_period:
             return
 
         st = self._worker_state
-        self.log.debug("Publishing local watermark: %d" % st.local_watermark)
+        wm = st.local_watermark
+        if st.sync_watermark:
+            # dont send local watermark upstream
+            wm = self.batch_info['prev_tick_id']
+
+        self.log.debug("Publishing local watermark: %d" % wm)
         src_curs = src_db.cursor()
         q = "select * from pgq_node.set_subscriber_watermark(%s, %s, %s)"
-        src_curs.execute(q, [self.pgq_queue_name, st.node_name, st.local_watermark])
+        src_curs.execute(q, [self.pgq_queue_name, st.node_name, wm])
+
+        # if last part fails, dont repeat it immediately
         self.local_wm_publish_time = t
 
+        if st.sync_watermark:
+            # instead sync 'global-watermark' with specific nodes
+            dst_curs = dst_db.cursor()
+            nmap = self._get_node_map(dst_curs)
+            dst_db.commit()
+
+            wm = st.local_watermark
+            for node in st.wm_sync_nodes:
+                if node == st.node_name:
+                    continue
+                if node not in nmap:
+                    # dont ignore missing nodes - cluster may be partially set up
+                    self.log.warning('Unknown node in sync_watermark list: %s' % node)
+                    return
+                n = nmap[node]
+                if n['dead']:
+                    # ignore dead nodes
+                    continue
+                wmdb = self.get_database('wmdb', connstr = n['node_location'], autocommit = 1)
+                wmcurs = wmdb.cursor()
+                q = 'select local_watermark from pgq_node.get_node_info(%s)'
+                wmcurs.execute(q, [self.queue_name])
+                row = wmcurs.fetchone()
+                if not row:
+                    # partially set up node?
+                    self.log.warning('Node not working: %s' % node)
+                elif row['local_watermark'] < wm:
+                    # keep lowest wm
+                    wm = row['local_watermark']
+                self.close_database('wmdb')
+
+            # now we have lowest wm, store it
+            q = "select pgq_node.set_global_watermark(%s, %s)"
+            dst_curs.execute(q, [self.queue_name, wm])
+            dst_db.commit()
+
+    def _get_node_map(self, curs):
+        q = "select node_name, node_location, dead from pgq_node.get_queue_locations(%s)"
+        curs.execute(q, [self.queue_name])
+        res = {}
+        for row in curs.fetchall():
+            res[row['node_name']] = row
+        return res
+
     def process_remote_event(self, src_curs, dst_curs, ev):
         """Handle cascading events.
         """
@@ -245,7 +309,10 @@ class CascadedWorker(CascadedConsumer):
             q = "select * from pgq_node.unregister_location(%s, %s)"
             dst_curs.execute(q, [self.pgq_queue_name, node])
         elif t == "pgq.global-watermark":
-            if st.process_global_wm:
+            if st.sync_watermark:
+                tick_id = int(ev.ev_data)
+                self.log.info('Ignoring global watermark %s' % tick_id)
+            elif st.process_global_wm:
                 tick_id = int(ev.ev_data)
                 q = "select * from pgq_node.set_global_watermark(%s, %s)"
                 dst_curs.execute(q, [self.pgq_queue_name, tick_id])
@@ -288,6 +355,8 @@ class CascadedWorker(CascadedConsumer):
                 tick_id = self.batch_info['tick_id']
                 tick_time = self.batch_info['batch_end']
                 self.create_branch_tick(dst_db, tick_id, tick_time)
+            if st.local_wm_publish:
+                self.publish_local_wm(src_db, dst_db)
 
     def create_branch_tick(self, dst_db, tick_id, tick_time):
         q = "select pgq.ticker(%s, %s, %s, %s)"
@@ -308,6 +377,14 @@ class CascadedWorker(CascadedConsumer):
                 return
         if len(self.ev_buf) >= self.max_evbuf:
             self.flush_events(dst_curs)
+
+        if ev.type == 'pgq.global-watermark':
+            st = self._worker_state
+            if st.sync_watermark:
+                # replace payload with synced global watermark
+                row = ev._event_row.copy()
+                row['ev_data'] = str(st.global_watermark)
+                ev = Event(self.queue_name, row)
         self.ev_buf.append(ev)
 
     def flush_events(self, dst_curs):
index 1e69d3d93885700109529c4b8595cbd28bdbf1b9..1bd4ce55fd8d920b5b53b0ec805f53a9f9d6b2bb 100644 (file)
@@ -4,6 +4,10 @@
               0
 (1 row)
 
+ sub_consumer | sub_id | co_name 
+--------------+--------+---------
+(0 rows)
+
  upgrade_schema 
 ----------------
               0
@@ -159,10 +163,16 @@ select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
  aqueue     | node2_worker      |         1
 (3 rows)
 
+select * from pgq_node.set_node_attrs('aqueue', 'test=1');
+ ret_code |        ret_note         
+----------+-------------------------
+      200 | Node attributes updated
+(1 row)
+
 select * from pgq_node.get_node_info('aqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick 
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
-      100 | Ok       | root      | node1     |                1 |               1 | node1         | dbname=node1      |                |               | node1_worker | f             | f               |                3
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick | node_attrs 
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+      100 | Ok       | root      | node1     |                1 |               1 | node1         | dbname=node1      |                |               | node1_worker | f             | f               |                3 | test=1
 (1 row)
 
 select * from pgq_node.get_subscriber_info('aqueue');
@@ -218,28 +228,28 @@ select * from pgq_node.local_state;
 (4 rows)
 
 select * from pgq_node.node_info;
- queue_name | node_type | node_name | worker_name  | combined_queue 
-------------+-----------+-----------+--------------+----------------
- aqueue     | root      | node1     | node1_worker | 
- bqueue     | branch    | node2     | node2_worker | 
+ queue_name | node_type | node_name | worker_name  | combined_queue | node_attrs 
+------------+-----------+-----------+--------------+----------------+------------
+ aqueue     | root      | node1     | node1_worker |                | test=1
+ bqueue     | branch    | node2     | node2_worker |                | 
 (2 rows)
 
 select * from pgq_node.get_node_info('aqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick 
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
-      100 | Ok       | root      | node1     |                1 |               1 | node1         | dbname=node1      |                |               | node1_worker | f             | f               |                3
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick | node_attrs 
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+      100 | Ok       | root      | node1     |                1 |               1 | node1         | dbname=node1      |                |               | node1_worker | f             | f               |                3 | test=1
 (1 row)
 
 select * from pgq_node.get_node_info('bqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick 
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
-      100 | Ok       | branch    | node2     |                1 |               1 | node1         | dbname=node1      |                |               | node2_worker | f             | f               |                1
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick | node_attrs 
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+      100 | Ok       | branch    | node2     |                1 |               1 | node1         | dbname=node1      |                |               | node2_worker | f             | f               |                1 | 
 (1 row)
 
 select * from pgq_node.get_node_info('cqueue');
- ret_code |       ret_note        | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick 
-----------+-----------------------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+-------------+---------------+-----------------+------------------
-      404 | Unknown queue: cqueue |           |           |                  |                 |               |                   |                |               |             |               |                 |                 
+ ret_code |       ret_note        | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name | worker_paused | worker_uptodate | worker_last_tick | node_attrs 
+----------+-----------------------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+-------------+---------------+-----------------+------------------+------------
+      404 | Unknown queue: cqueue |           |           |                  |                 |               |                   |                |               |             |               |                 |                  | 
 (1 row)
 
 select * from pgq_node.get_worker_state('aqueue');
@@ -347,9 +357,9 @@ select * from pgq_node.get_consumer_state('bqueue', 'random_consumer2');
 (1 row)
 
 select * from pgq_node.get_node_info('bqueue');
- ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick 
-----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------
-      100 | Ok       | branch    | node2     |                1 |               1 | node1         | dbname=node1      |                |               | node2_worker | f             | f               |                1
+ ret_code | ret_note | node_type | node_name | global_watermark | local_watermark | provider_node | provider_location | combined_queue | combined_type | worker_name  | worker_paused | worker_uptodate | worker_last_tick | node_attrs 
+----------+----------+-----------+-----------+------------------+-----------------+---------------+-------------------+----------------+---------------+--------------+---------------+-----------------+------------------+------------
+      100 | Ok       | branch    | node2     |                1 |               1 | node1         | dbname=node1      |                |               | node2_worker | f             | f               |                1 | 
 (1 row)
 
 set session_replication_role = 'replica';
index 5f00d693e224d29337db74a2e4ddfd7b16c1aa6a..a11dbfc33325af13ea05060a30d879a07e361cbb 100644 (file)
@@ -1,4 +1,6 @@
 
+drop function if exists pgq_node.get_node_info(text);
+
 create or replace function pgq_node.get_node_info(
     in i_queue_name text,
 
@@ -17,7 +19,8 @@ create or replace function pgq_node.get_node_info(
     out worker_name text,
     out worker_paused bool,
     out worker_uptodate bool,
-    out worker_last_tick bigint
+    out worker_last_tick bigint,
+    out node_attrs text
 ) returns record as $$
 -- ----------------------------------------------------------------------
 -- Function: pgq_node.get_node_info(1)
@@ -46,10 +49,12 @@ declare
 begin
     select 100, 'Ok', n.node_type, n.node_name,
            c.node_type, c.queue_name, w.provider_node, l.node_location,
-           n.worker_name, w.paused, w.uptodate, w.last_tick_id
+           n.worker_name, w.paused, w.uptodate, w.last_tick_id,
+           n.node_attrs
       into ret_code, ret_note, node_type, node_name,
            combined_type, combined_queue, provider_node, provider_location,
-           worker_name, worker_paused, worker_uptodate, worker_last_tick
+           worker_name, worker_paused, worker_uptodate, worker_last_tick,
+           node_attrs
       from pgq_node.node_info n
            left join pgq_node.node_info c on (c.queue_name = n.combined_queue)
            left join pgq_node.local_state w on (w.queue_name = n.queue_name and w.consumer_name = n.worker_name)
diff --git a/sql/pgq_node/functions/pgq_node.set_node_attrs.sql b/sql/pgq_node/functions/pgq_node.set_node_attrs.sql
new file mode 100644 (file)
index 0000000..fb015ed
--- /dev/null
@@ -0,0 +1,35 @@
+
+create or replace function pgq_node.set_node_attrs(
+    in i_queue_name text,
+    in i_node_attrs text,
+    out ret_code int4,
+    out ret_note  text)
+returns record as $$
+-- ----------------------------------------------------------------------
+-- Function: pgq_node.create_attrs(2)
+--
+--      Set node attributes.
+--
+-- Parameters:
+--      i_node_name - cascaded queue name
+--      i_node_attrs - urlencoded node attrs
+--
+-- Returns:
+--      200 - ok
+--      404 - node not found
+-- ----------------------------------------------------------------------
+begin
+    update pgq_node.node_info
+        set node_attrs = i_node_attrs
+        where queue_name = i_queue_name;
+    if not found then
+        select 404, 'Node not found' into ret_code, ret_note;
+        return;
+    end if;
+
+    select 200, 'Node attributes updated'
+        into ret_code, ret_note;
+    return;
+end;
+$$ language plpgsql security definer;
+
index 61130cdbba0ebdc1f6e7470c32b9490d834fb53a..d9c1b6f42c2277c5949c47eee8e399a0204f9b48 100644 (file)
@@ -5,6 +5,16 @@ returns int4 as $$
 declare
     cnt int4 = 0;
 begin
+    -- node_info.node_attrs
+    perform 1 from information_schema.columns
+      where table_schema = 'pgq_node'
+        and table_name = 'node_info'
+        and column_name = 'node_attrs';
+    if not found then
+        alter table pgq_node.node_info add column node_attrs text;
+        cnt := cnt + 1;
+    end if;
+
     return cnt;
 end;
 $$ language plpgsql;
index 9f01271803e8c2b2cb19b4c07a37880ac3911725..1b2e1b92344a9b66c9b468e83cc9940a80e3042a 100644 (file)
@@ -35,6 +35,8 @@ select * from pgq.ticker('aqueue');
 select * from pgq_node.set_subscriber_watermark('aqueue', 'node2', 3);
 select queue_name, consumer_name, last_tick from pgq.get_consumer_info();
 
+select * from pgq_node.set_node_attrs('aqueue', 'test=1');
+
 select * from pgq_node.get_node_info('aqueue');
 select * from pgq_node.get_subscriber_info('aqueue');
 
index 821a5656aa9e3a7d9e92747da6a2adb98f22fcae..6b3cb1ff6f6717ced16d5d4191f7a8030fea5fd1 100644 (file)
@@ -51,6 +51,7 @@ select pgq_node.upgrade_schema();
 
 \i functions/pgq_node.demote_root.sql
 \i functions/pgq_node.promote_branch.sql
+\i functions/pgq_node.set_node_attrs.sql
 
 -- Group: Provider side operations - worker
 \i   functions/pgq_node.register_subscriber.sql
index 73abc774ffa6efe978d374d43bb34690247ff8f1..bd8d4e1d9f3a146a12cf7183484442a70b589041 100644 (file)
@@ -50,6 +50,7 @@ create table pgq_node.node_location (
 --      provider_node       - provider node name
 --      worker_name         - consumer name that maintains this node
 --      combined_queue      - on 'leaf' the target combined set name
+--      node_attrs          - urlencoded fields for worker
 --
 -- Node types:
 --      root            - data + batches is generated here
@@ -62,6 +63,7 @@ create table pgq_node.node_info (
     node_name       text not null,
     worker_name     text,
     combined_queue  text,
+    node_attrs      text,
 
     foreign key (queue_name, node_name) references pgq_node.node_location,
     check (node_type in ('root', 'branch', 'leaf')),
index a362b91b2921d9d85819e239cc36b35acee34ee1..46741f7e337423249af457968b499b73e78ff93b 100755 (executable)
@@ -71,15 +71,15 @@ msg "Install londiste3 and initialize nodes"
 run londiste3 $v conf/londiste_db1.ini create-root node1 'dbname=db1'
 run londiste3 $v conf/londiste_db2.ini create-branch node2 'dbname=db2' --provider='dbname=db1'
 run londiste3 $v conf/londiste_db3.ini create-branch node3 'dbname=db3' --provider='dbname=db1'
-run londiste3 $v conf/londiste_db4.ini create-branch node4 'dbname=db4' --provider='dbname=db2'
-run londiste3 $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db3'
+run londiste3 $v conf/londiste_db4.ini create-branch node4 'dbname=db4' --provider='dbname=db2' --sync-watermark=node4,node5
+run londiste3 $v conf/londiste_db5.ini create-branch node5 'dbname=db5' --provider='dbname=db3' --sync-watermark=node4,node5
 
 msg "Run ticker"
 run pgqd $v -d conf/pgqd.ini
 run sleep 5
 
 msg "See topology"
-run londiste3 $v conf/londiste_db4.ini status
+run londiste3 $v conf/londiste_db1.ini status
 
 msg "Run londiste3 daemon for each node"
 for db in $db_list; do