Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ducktape/metadata/session_id
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2025-08-21--020
188 changes: 180 additions & 8 deletions src/confluent_kafka/schema_registry/_async/schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@
query['format'] = fmt
if reference_format is not None:
query['reference_format'] = reference_format

response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query)

registered_schema = RegisteredSchema.from_dict(response)
Expand Down Expand Up @@ -1060,7 +1059,8 @@
return registered_schema

async def get_referenced_by(
self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1
self, subject_name: str, version: Union[int, str] = "latest",
offset: int = 0, limit: int = -1
) -> List[int]:
"""
Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`.
Expand All @@ -1086,7 +1086,8 @@
_urlencode(subject_name), version), query)

async def get_versions(
self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1
self, subject_name: str, deleted: bool = False, deleted_only: bool = False,
offset: int = 0, limit: int = -1
) -> List[int]:
"""
Get a list of all versions registered with this subject.
Expand Down Expand Up @@ -1174,7 +1175,7 @@
)

return await self._rest_client.put(
'config/{}'.format(_urlencode(subject_name)), body={'compatibility': level.upper()}

Check failure on line 1178 in src/confluent_kafka/schema_registry/_async/schema_registry_client.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_async/schema_registry_client.py#L1178

Define a constant instead of duplicating this literal 'config/{}' 5 times.
)

async def get_compatibility(self, subject_name: Optional[str] = None) -> str:
Expand Down Expand Up @@ -1204,15 +1205,18 @@
return result['compatibilityLevel']

async def test_compatibility(
self, subject_name: str, schema: 'Schema',
version: Union[int, str] = "latest"
self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest",
normalize: bool = False, verbose: bool = False
) -> bool:
"""Test the compatibility of a candidate schema for a given subject and version
"""
Test the compatibility of a candidate schema for a given subject and version

Args:
subject_name (str): Subject name the schema is registered under
schema (Schema): Schema instance.
version (int or str, optional): Version number, or the string "latest". Defaults to "latest".
normalize (bool): Whether to normalize the input schema.
verbose (bool): Whether to return detailed error messages.

Returns:
bool: True if the schema is compatible with the specified version
Expand All @@ -1225,11 +1229,39 @@
""" # noqa: E501

request = schema.to_dict()

response = await self._rest_client.post(
'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request
'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format(
_urlencode(subject_name), version, normalize, verbose),
body=request
)
return response['is_compatible']

async def test_compatibility_all_versions(
self, subject_name: str, schema: 'Schema',
normalize: bool = False, verbose: bool = False
) -> bool:
"""
Test the input schema against all schema versions under the subject (depending on the compatibility level set).

Args:
subject_name (str): Subject of the schema versions against which compatibility is to be tested.
schema (Schema): Schema instance.
normalize (bool): Whether to normalize the input schema.
verbose (bool): Whether to return detailed error messages.

Returns:
bool: True if the schema is compatible with all of the subject's schemas versions.
See Also:
`POST Test Compatibility Against All API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions>`_
""" # noqa: E501

request = schema.to_dict()
response = await self._rest_client.post(
'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format(
_urlencode(subject_name), normalize, verbose
),
body=request,
)
return response['is_compatible']

async def set_config(
Expand Down Expand Up @@ -1268,6 +1300,30 @@
'config/{}'.format(_urlencode(subject_name)), body=config.to_dict()
)

async def delete_config(self, subject_name: Optional[str] = None) -> 'ServerConfig':
"""
Delete the specified subject-level compatibility level config and revert to the global default.

Args:
subject_name (str, optional): Subject name. Deletes global config
if left unset.

Returns:
ServerConfig: The old deleted config

Raises:
SchemaRegistryError: if the request was unsuccessful.

See Also:
`DELETE Subject Config API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--config-(string- subject)>`_
""" # noqa: E501
if subject_name is not None:
url = 'config/{}'.format(_urlencode(subject_name))
else:
url = 'config'
result = await self._rest_client.delete(url)
return ServerConfig.from_dict(result)

async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig':
"""
Get the current config.
Expand All @@ -1294,6 +1350,122 @@
result = await self._rest_client.get(url)
return ServerConfig.from_dict(result)

async def get_mode(self, subject_name: str) -> str:
"""
Get the mode for a subject.

Args:
subject_name (str): Subject name.

Returns:
str: Mode for the subject. Returns one of IMPORT, READONLY, READWRITE (default).

Raises:
SchemaRegistryError: if the request was unsuccessful.

See Also:
`GET Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode-(string-%20subject)>`_
""" # noqa: E501
result = await self._rest_client.get('mode/{}'.format(_urlencode(subject_name)))
return result['mode']

async def update_mode(self, subject_name: str, mode: str, force: bool = False) -> str:
"""
Update the mode for a subject.

Args:
subject_name (str): Subject name.
mode (str): Mode to update.
force (bool): Whether to force a mode change even if the Schema Registry has existing schemas.

Returns:
str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default).

Raises:
SchemaRegistryError: if the request was unsuccessful.

See Also:
`PUT Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode-(string-%20subject)>`_
""" # noqa: E501
result = await self._rest_client.put(
'mode/{}?force={}'.format(_urlencode(subject_name), force),
body={'mode': mode},
)
return result['mode']

async def delete_mode(self, subject_name: str) -> str:
"""
Delete the mode for a subject and revert to the global default

Args:
subject_name (str): Subject name.

Returns:
str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default).

Raises:
SchemaRegistryError: if the request was unsuccessful.

See Also:
`DELETE Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--mode-(string-%20subject)>`_
""" # noqa: E501
result = await self._rest_client.delete('mode/{}'.format(_urlencode(subject_name)))
return result['mode']

async def get_global_mode(self) -> str:
"""
Get the current mode for Schema Registry at a global level.

Returns:
str: Schema Registry mode. Must be one of IMPORT, READONLY, READWRITE (default).

Raises:
SchemaRegistryError: if the request was unsuccessful.

See Also:
`GET Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode>`_
""" # noqa: E501
result = await self._rest_client.get('mode')
return result['mode']

async def update_global_mode(self, mode: str, force: bool = False) -> str:
"""
Update the mode for the Schema Registry at a global level.

Args:
mode (str): Mode to update.
force (bool): Whether to force a mode change even if the Schema Registry has existing schemas.

Returns:
str: New mode for the Schema Registry. Must be one of IMPORT, READONLY, READWRITE (default).

Raises:
SchemaRegistryError: if the request was unsuccessful.

See Also:
`PUT Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode>`_
""" # noqa: E501
result = await self._rest_client.put('mode?force={}'.format(force), body={'mode': mode})
return result['mode']

async def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]:
"""
Retrieves a list of contexts.

Args:
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.

Returns:
List[str]: List of contexts.

Raises:
SchemaRegistryError: if the request was unsuccessful.
""" # noqa: E501

result = await self._rest_client.get('contexts', query={'offset': offset, 'limit': limit})
return result

def clear_latest_caches(self):
self._latest_version_cache.clear()
self._latest_with_metadata_cache.clear()
Expand Down
Loading