Skip to content

Commit 198819f

Browse files
authored
Add missing endpoints (/compatibility, /mode) to SchemaRegistryClient (#2024)
* updaet * update * tests * fix integration tests * regenerate sync code + fix it * fix some unit tests broken from unasync * missing params and address feedback * lint * revert breaking chamges * update * update * update * mode * mode * updaste * linter * fix typos * update * add more * update * fix merge conflict errors * update
1 parent 58e6bee commit 198819f

File tree

6 files changed

+638
-28
lines changed

6 files changed

+638
-28
lines changed

.ducktape/metadata/session_id

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2025-08-21--020

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 180 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,6 @@ async def get_schema(
728728
query['format'] = fmt
729729
if reference_format is not None:
730730
query['reference_format'] = reference_format
731-
732731
response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query)
733732

734733
registered_schema = RegisteredSchema.from_dict(response)
@@ -1089,7 +1088,8 @@ async def get_version(
10891088
return registered_schema
10901089

10911090
async def get_referenced_by(
1092-
self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1
1091+
self, subject_name: str, version: Union[int, str] = "latest",
1092+
offset: int = 0, limit: int = -1
10931093
) -> List[int]:
10941094
"""
10951095
Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`.
@@ -1115,7 +1115,8 @@ async def get_referenced_by(
11151115
_urlencode(subject_name), version), query)
11161116

11171117
async def get_versions(
1118-
self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1
1118+
self, subject_name: str, deleted: bool = False, deleted_only: bool = False,
1119+
offset: int = 0, limit: int = -1
11191120
) -> List[int]:
11201121
"""
11211122
Get a list of all versions registered with this subject.
@@ -1233,15 +1234,18 @@ async def get_compatibility(self, subject_name: Optional[str] = None) -> str:
12331234
return result['compatibilityLevel']
12341235

12351236
async def test_compatibility(
1236-
self, subject_name: str, schema: 'Schema',
1237-
version: Union[int, str] = "latest"
1237+
self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest",
1238+
normalize: bool = False, verbose: bool = False
12381239
) -> bool:
1239-
"""Test the compatibility of a candidate schema for a given subject and version
1240+
"""
1241+
Test the compatibility of a candidate schema for a given subject and version
12401242
12411243
Args:
12421244
subject_name (str): Subject name the schema is registered under
12431245
schema (Schema): Schema instance.
12441246
version (int or str, optional): Version number, or the string "latest". Defaults to "latest".
1247+
normalize (bool): Whether to normalize the input schema.
1248+
verbose (bool): Whether to return detailed error messages.
12451249
12461250
Returns:
12471251
bool: True if the schema is compatible with the specified version
@@ -1254,11 +1258,39 @@ async def test_compatibility(
12541258
""" # noqa: E501
12551259

12561260
request = schema.to_dict()
1257-
12581261
response = await self._rest_client.post(
1259-
'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request
1262+
'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format(
1263+
_urlencode(subject_name), version, normalize, verbose),
1264+
body=request
12601265
)
1266+
return response['is_compatible']
12611267

1268+
async def test_compatibility_all_versions(
1269+
self, subject_name: str, schema: 'Schema',
1270+
normalize: bool = False, verbose: bool = False
1271+
) -> bool:
1272+
"""
1273+
Test the input schema against all schema versions under the subject (depending on the compatibility level set).
1274+
1275+
Args:
1276+
subject_name (str): Subject of the schema versions against which compatibility is to be tested.
1277+
schema (Schema): Schema instance.
1278+
normalize (bool): Whether to normalize the input schema.
1279+
verbose (bool): Whether to return detailed error messages.
1280+
1281+
Returns:
1282+
bool: True if the schema is compatible with all of the subject's schemas versions.
1283+
See Also:
1284+
`POST Test Compatibility Against All API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions>`_
1285+
""" # noqa: E501
1286+
1287+
request = schema.to_dict()
1288+
response = await self._rest_client.post(
1289+
'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format(
1290+
_urlencode(subject_name), normalize, verbose
1291+
),
1292+
body=request,
1293+
)
12621294
return response['is_compatible']
12631295

12641296
async def set_config(
@@ -1297,6 +1329,30 @@ async def set_config(
12971329
'config/{}'.format(_urlencode(subject_name)), body=config.to_dict()
12981330
)
12991331

1332+
async def delete_config(self, subject_name: Optional[str] = None) -> 'ServerConfig':
1333+
"""
1334+
Delete the specified subject-level compatibility level config and revert to the global default.
1335+
1336+
Args:
1337+
subject_name (str, optional): Subject name. Deletes global config
1338+
if left unset.
1339+
1340+
Returns:
1341+
ServerConfig: The old deleted config
1342+
1343+
Raises:
1344+
SchemaRegistryError: if the request was unsuccessful.
1345+
1346+
See Also:
1347+
`DELETE Subject Config API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--config-(string- subject)>`_
1348+
""" # noqa: E501
1349+
if subject_name is not None:
1350+
url = 'config/{}'.format(_urlencode(subject_name))
1351+
else:
1352+
url = 'config'
1353+
result = await self._rest_client.delete(url)
1354+
return ServerConfig.from_dict(result)
1355+
13001356
async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig':
13011357
"""
13021358
Get the current config.
@@ -1323,6 +1379,122 @@ async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig'
13231379
result = await self._rest_client.get(url)
13241380
return ServerConfig.from_dict(result)
13251381

1382+
async def get_mode(self, subject_name: str) -> str:
1383+
"""
1384+
Get the mode for a subject.
1385+
1386+
Args:
1387+
subject_name (str): Subject name.
1388+
1389+
Returns:
1390+
str: Mode for the subject. Returns one of IMPORT, READONLY, READWRITE (default).
1391+
1392+
Raises:
1393+
SchemaRegistryError: if the request was unsuccessful.
1394+
1395+
See Also:
1396+
`GET Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode-(string-%20subject)>`_
1397+
""" # noqa: E501
1398+
result = await self._rest_client.get('mode/{}'.format(_urlencode(subject_name)))
1399+
return result['mode']
1400+
1401+
async def update_mode(self, subject_name: str, mode: str, force: bool = False) -> str:
1402+
"""
1403+
Update the mode for a subject.
1404+
1405+
Args:
1406+
subject_name (str): Subject name.
1407+
mode (str): Mode to update.
1408+
force (bool): Whether to force a mode change even if the Schema Registry has existing schemas.
1409+
1410+
Returns:
1411+
str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default).
1412+
1413+
Raises:
1414+
SchemaRegistryError: if the request was unsuccessful.
1415+
1416+
See Also:
1417+
`PUT Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode-(string-%20subject)>`_
1418+
""" # noqa: E501
1419+
result = await self._rest_client.put(
1420+
'mode/{}?force={}'.format(_urlencode(subject_name), force),
1421+
body={'mode': mode},
1422+
)
1423+
return result['mode']
1424+
1425+
async def delete_mode(self, subject_name: str) -> str:
1426+
"""
1427+
Delete the mode for a subject and revert to the global default
1428+
1429+
Args:
1430+
subject_name (str): Subject name.
1431+
1432+
Returns:
1433+
str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default).
1434+
1435+
Raises:
1436+
SchemaRegistryError: if the request was unsuccessful.
1437+
1438+
See Also:
1439+
`DELETE Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--mode-(string-%20subject)>`_
1440+
""" # noqa: E501
1441+
result = await self._rest_client.delete('mode/{}'.format(_urlencode(subject_name)))
1442+
return result['mode']
1443+
1444+
async def get_global_mode(self) -> str:
1445+
"""
1446+
Get the current mode for Schema Registry at a global level.
1447+
1448+
Returns:
1449+
str: Schema Registry mode. Must be one of IMPORT, READONLY, READWRITE (default).
1450+
1451+
Raises:
1452+
SchemaRegistryError: if the request was unsuccessful.
1453+
1454+
See Also:
1455+
`GET Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode>`_
1456+
""" # noqa: E501
1457+
result = await self._rest_client.get('mode')
1458+
return result['mode']
1459+
1460+
async def update_global_mode(self, mode: str, force: bool = False) -> str:
1461+
"""
1462+
Update the mode for the Schema Registry at a global level.
1463+
1464+
Args:
1465+
mode (str): Mode to update.
1466+
force (bool): Whether to force a mode change even if the Schema Registry has existing schemas.
1467+
1468+
Returns:
1469+
str: New mode for the Schema Registry. Must be one of IMPORT, READONLY, READWRITE (default).
1470+
1471+
Raises:
1472+
SchemaRegistryError: if the request was unsuccessful.
1473+
1474+
See Also:
1475+
`PUT Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode>`_
1476+
""" # noqa: E501
1477+
result = await self._rest_client.put('mode?force={}'.format(force), body={'mode': mode})
1478+
return result['mode']
1479+
1480+
async def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]:
1481+
"""
1482+
Retrieves a list of contexts.
1483+
1484+
Args:
1485+
offset (int): Pagination offset for results.
1486+
limit (int): Pagination size for results. Ignored if negative.
1487+
1488+
Returns:
1489+
List[str]: List of contexts.
1490+
1491+
Raises:
1492+
SchemaRegistryError: if the request was unsuccessful.
1493+
""" # noqa: E501
1494+
1495+
result = await self._rest_client.get('contexts', query={'offset': offset, 'limit': limit})
1496+
return result
1497+
13261498
def clear_latest_caches(self):
13271499
self._latest_version_cache.clear()
13281500
self._latest_with_metadata_cache.clear()

0 commit comments

Comments
 (0)