-
-
Notifications
You must be signed in to change notification settings - Fork 207
Closed
Description
PGSync version:
Version: 3.1.0
Postgres version:
PostgreSQL 16.3
Elasticsearch/OpenSearch version:
{
"name": "opensearch-node1",
"cluster_name": "opensearch-cluster",
"cluster_uuid": "zXLzc_O5SQGhuP0lQrhHNg",
"version": {
"distribution": "opensearch",
"number": "2.14.0",
"build_type": "tar",
"build_hash": "aaa555453f4713d652b52436874e11ba258d8f03",
"build_date": "2024-05-09T18:51:35.817208743Z",
"build_snapshot": false,
"lucene_version": "9.10.0",
"minimum_wire_compatibility_version": "7.10.0",
"minimum_index_compatibility_version": "7.0.0"
},
"tagline": "The OpenSearch Project: https://opensearch.org/"
}
Redis version:
Redis version=7.2.5
Python version:
Python 3.10.2
Problem Description:
PGSync crashes when adding a new row to table phone_number
Create following tables in Postgres
CREATE TYPE phone_number_type AS ENUM ('home', 'work', 'cell', 'fax');
CREATE TABLE customers
(
id uuid PRIMARY KEY,
date_of_birth date,
email varchar(255),
first_name varchar(255),
last_name varchar(255)
);
CREATE TABLE phone_numbers
(
id SERIAL PRIMARY KEY,
phone_type phone_number_type,
phone_number VARCHAR(20),
extension VARCHAR(10)
);
CREATE TABLE customer_phone_numbers
(
customer_id uuid REFERENCES customers (id) ON DELETE CASCADE,
phone_number_id INTEGER REFERENCES phone_numbers (id) ON DELETE CASCADE,
PRIMARY KEY (customer_id, phone_number_id)
);
When I add a new row to phone_numbers table, pgsync crashes with exception
Exception in poll_redis() for thread Thread-17: (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
it will resolve if I run bootstrap command again before running pgsync but it requires run of bootstrap after any new row added to phone_numbers
this is my schema.json file
[
{
"database": "test",
"index": "customer",
"nodes": {
"table": "customers",
"columns": [
"id",
"first_name",
"last_name",
"date_of_birth",
"email"
],
"children": [
{
"table": "phone_numbers",
"label": "phone_number",
"columns": [
"id",
"phone_number"
],
"relationship": {
"variant": "scalar",
"type": "one_to_many",
"through_tables": [
"customer_phone_numbers"
]
}
}
],
"transform": {
"concat": [
{
"columns": [
"first_name",
"last_name"
],
"destination": "full_name",
"delimiter": " "
}
],
"mapping": {
"date_of_birth": {
"type": "date",
"format": "strict_date_optional_time || epoch_second"
},
"full_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"search_as_you_type": {
"type": "search_as_you_type",
"max_shingle_size": 3
}
}
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"search_as_you_type": {
"type": "search_as_you_type",
"max_shingle_size": 3
}
}
}
}
}
}
}
]
Error Message (if any):
- public.customers
- public.phone_number
[--------------------------------------------------] 0/0 0%
[--------------------------------------------------] 0/0 0%
0:00:00.358809 (0.36 sec)
Sync test:customer Xlog: [0] => Db: [0] => Redis: [0] => OpenSearch: [0]...
2024-06-11 22:03:58.520:ERROR:pgsync.search_client: Exception (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
^
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
[SQL: SELECT count(*) AS count_1
FROM public.customers AS customers_1 LEFT OUTER JOIN LATERAL (SELECT JSON_AGG(CAST(anon_2._keys AS JSONB) || CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_1)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.customer_id)), JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_3)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.phone_number_id)))) AS JSONB)) AS _keys, JSON_AGG(anon_2.anon) AS phone_number, customer_phone_numbers_1.customer_id AS customer_id
FROM public.customer_phone_numbers AS customer_phone_numbers_1 LEFT OUTER JOIN LATERAL (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_4)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_5)s, JSON_BUILD_ARRAY(phone_numbers_1.id)))) AS JSONB) AS _keys, phone_numbers_1.id AS anon, phone_numbers_1.id AS id
FROM public.phone_numbers AS phone_numbers_1
WHERE phone_numbers_1.id = customer_phone_numbers_1.phone_number_id) AS anon_2 ON anon_2.id = customer_phone_numbers_1.phone_number_id GROUP BY customer_phone_numbers_1.customer_id) AS anon_1 ON anon_1.customer_id = customers_1.id
WHERE customers_1.id = %(id_1)s]
[parameters: {'JSON_BUILD_OBJECT_1': 'customer_phone_numbers', 'JSON_BUILD_OBJECT_2': 'customer_id', 'JSON_BUILD_OBJECT_3': 'phone_number_id', 'JSON_BUILD_OBJECT_4': 'phone_numbers', 'JSON_BUILD_OBJECT_5': 'id', 'id_1': 1}]
(Background on this error at: https://sqlalche.me/e/20/f405)
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1969, in _exec_single_context
self.dialect.do_execute(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 922, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedFunction: operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
^
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.9/site-packages/pgsync/search_client.py", line 138, in bulk
self._bulk(
File "/opt/homebrew/lib/python3.9/site-packages/pgsync/search_client.py", line 194, in _bulk
for ok, _ in self.parallel_bulk(
File "/opt/homebrew/lib/python3.9/site-packages/opensearchpy/helpers/actions.py", line 487, in parallel_bulk
for result in pool.imap(
File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 870, in next
raise value
File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 144, in _helper_reraises_exception
raise ex
File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 388, in _guarded_task_generation
for i, x in enumerate(iterable):
File "/opt/homebrew/lib/python3.9/site-packages/opensearchpy/helpers/actions.py", line 168, in _chunk_actions
for action, data in actions:
File "/opt/homebrew/lib/python3.9/site-packages/pgsync/sync.py", line 924, in _payloads
yield from self.sync(
File "/opt/homebrew/lib/python3.9/site-packages/pgsync/sync.py", line 966, in sync
count: int = self.fetchcount(node._subquery)
File "/opt/homebrew/lib/python3.9/site-packages/pgsync/base.py", line 895, in fetchcount
return conn.execute(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1416, in execute
return meth(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 517, in _execute_on_connection
return connection._execute_clauseelement(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1639, in _execute_clauseelement
ret = self._execute_context(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1848, in _execute_context
return self._exec_single_context(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1988, in _exec_single_context
self._handle_dbapi_exception(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2344, in _handle_dbapi_exception
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1969, in _exec_single_context
self.dialect.do_execute(
File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 922, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
^
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
[SQL: SELECT count(*) AS count_1
FROM public.customers AS customers_1 LEFT OUTER JOIN LATERAL (SELECT JSON_AGG(CAST(anon_2._keys AS JSONB) || CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_1)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.customer_id)), JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_3)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.phone_number_id)))) AS JSONB)) AS _keys, JSON_AGG(anon_2.anon) AS phone_number, customer_phone_numbers_1.customer_id AS customer_id
FROM public.customer_phone_numbers AS customer_phone_numbers_1 LEFT OUTER JOIN LATERAL (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_4)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_5)s, JSON_BUILD_ARRAY(phone_numbers_1.id)))) AS JSONB) AS _keys, phone_numbers_1.id AS anon, phone_numbers_1.id AS id
FROM public.phone_numbers AS phone_numbers_1
WHERE phone_numbers_1.id = customer_phone_numbers_1.phone_number_id) AS anon_2 ON anon_2.id = customer_phone_numbers_1.phone_number_id GROUP BY customer_phone_numbers_1.customer_id) AS anon_1 ON anon_1.customer_id = customers_1.id
WHERE customers_1.id = %(id_1)s]
[parameters: {'JSON_BUILD_OBJECT_1': 'customer_phone_numbers', 'JSON_BUILD_OBJECT_2': 'customer_id', 'JSON_BUILD_OBJECT_3': 'phone_number_id', 'JSON_BUILD_OBJECT_4': 'phone_numbers', 'JSON_BUILD_OBJECT_5': 'id', 'id_1': 1}]
(Background on this error at: https://sqlalche.me/e/20/f405)
Exception in poll_redis() for thread Thread-17: (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
^
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
Exiting...
/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
Metadata
Metadata
Assignees
Labels
No labels