Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions modules/python/clients/kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,10 @@ def apply_manifest_from_url(self, manifest_url, namespace: Optional[str] = None)
# Parse YAML content (can contain multiple documents)
manifests = list(yaml.safe_load_all(response.text))

for manifest in manifests:
if not manifest: # Skip empty documents
continue
# Validate and expand manifests (handles List kind and non-dict manifests)
expanded_manifests = self._expand_and_validate_manifests(manifests)

for manifest in expanded_manifests:
self._apply_single_manifest(manifest, namespace=namespace)

logger.info("Successfully applied manifest from %s", manifest_url)
Expand All @@ -825,13 +825,13 @@ def delete_manifest_from_url(self, manifest_url, ignore_not_found: bool = True,
# Parse YAML content (can contain multiple documents)
manifests = list(yaml.safe_load_all(response.text))

# Delete manifests in reverse order (to handle dependencies)
manifests.reverse()
# Validate and expand manifests (handles List kind and non-dict manifests)
expanded_manifests = self._expand_and_validate_manifests(manifests)

for manifest in manifests:
if not manifest: # Skip empty documents
continue
# Delete manifests in reverse order (to handle dependencies)
expanded_manifests.reverse()

for manifest in expanded_manifests:
self._delete_single_manifest(manifest, ignore_not_found=ignore_not_found, namespace=namespace)

logger.info("Successfully deleted manifest from %s", manifest_url)
Expand Down Expand Up @@ -906,14 +906,14 @@ def apply_manifest_from_file(self, manifest_path: str = None, manifest_dict: dic
# Load manifests from various sources
manifests_to_apply, applied_sources = self._load_manifests_from_sources(manifest_path, manifest_dict)

# Validate and expand manifests (handles List kind and non-dict manifests)
manifests_to_apply = self._expand_and_validate_manifests(manifests_to_apply)

# Apply all manifests
namespace_info = f" in namespace '{namespace}'" if namespace else ""
logger.info(f"Applying {len(manifests_to_apply)} manifest(s) from: {', '.join(applied_sources)}{namespace_info}")

for i, manifest in enumerate(manifests_to_apply):
if not manifest: # Skip empty documents
continue

logger.info(f"Applying manifest {i+1}/{len(manifests_to_apply)}: {manifest.get('kind', 'Unknown')}/{manifest.get('metadata', {}).get('name', 'Unknown')}")
self._apply_single_manifest(manifest=manifest, namespace=namespace)

Expand All @@ -938,15 +938,15 @@ def delete_manifest_from_file(self, manifest_path: str = None, manifest_dict: di
# Load manifests from various sources
manifests_to_delete, deleted_sources = self._load_manifests_from_sources(manifest_path, manifest_dict)

# Validate and expand manifests (handles List kind and non-dict manifests)
manifests_to_delete = self._expand_and_validate_manifests(manifests_to_delete)

# Delete all manifests in reverse order (to handle dependencies)
manifests_to_delete.reverse()
namespace_info = f" in namespace '{namespace}'" if namespace else ""
logger.info(f"Deleting {len(manifests_to_delete)} manifest(s) from: {', '.join(deleted_sources)}{namespace_info}")

for i, manifest in enumerate(manifests_to_delete):
if not manifest: # Skip empty documents
continue

logger.info(f"Deleting manifest {i+1}/{len(manifests_to_delete)}: {manifest.get('kind', 'Unknown')}/{manifest.get('metadata', {}).get('name', 'Unknown')}")
self._delete_single_manifest(manifest=manifest, ignore_not_found=ignore_not_found, namespace=namespace)

Expand Down Expand Up @@ -1097,6 +1097,46 @@ def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool:

return False

def _expand_and_validate_manifests(self, manifests):
"""
Validate and expand manifests, handling List kind and non-dict manifests.

:param manifests: List of manifests (can be dicts, lists, or scalars)
:return: List of valid manifest dictionaries
"""
expanded = []
for manifest in manifests:
# Skip None or empty manifests
if not manifest:
continue

# Validate that manifest is a dictionary
if not isinstance(manifest, dict):
logger.warning(
"Skipping non-dictionary manifest (type: %s). "
"YAML documents must be mappings (dictionaries), not lists or scalars.",
type(manifest).__name__
)
continue

# Handle kind: List manifests by expanding their items
kind = manifest.get("kind")
if kind == "List":
items = manifest.get("items", [])
if not isinstance(items, list):
logger.warning(
"Skipping List manifest with invalid 'items' field (expected list, got %s)",
type(items).__name__
)
continue
logger.info("Expanding List manifest containing %d items", len(items))
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This info-level log message for expanding List manifests may create excessive noise in logs, especially for large deployments or frequent manifest operations. Consider using debug level instead: logger.debug(...) to keep info logs focused on significant operations.

Suggested change
logger.info("Expanding List manifest containing %d items", len(items))
logger.debug("Expanding List manifest containing %d items", len(items))

Copilot uses AI. Check for mistakes.
# Recursively expand in case items contain more Lists
expanded.extend(self._expand_and_validate_manifests(items))
else:
expanded.append(manifest)

return expanded

def _apply_single_manifest(self, manifest, namespace=None):
"""
Apply a single Kubernetes manifest using the appropriate API client.
Expand Down
250 changes: 250 additions & 0 deletions modules/python/tests/clients/test_kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5993,6 +5993,256 @@ def test_get_config_map_with_labels_and_annotations(self, mock_read_config_map):
self.assertEqual(result.metadata.annotations["description"], "Test configuration")
self.assertEqual(result.data["database.url"], "mysql://localhost:3306/testdb")

def test_expand_and_validate_manifests_with_valid_dict(self):
"""Test _expand_and_validate_manifests with valid dictionary manifest."""
manifests = [
{
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {'name': 'test-namespace'}
}
]

result = self.client._expand_and_validate_manifests(manifests)
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing private methods (prefixed with _) directly is generally discouraged as it couples tests to implementation details. While the extensive testing here is valuable, consider also adding tests that exercise this functionality through the public API methods (apply_manifest_from_url, etc.) to ensure the integration works correctly end-to-end.

Copilot uses AI. Check for mistakes.

self.assertEqual(len(result), 1)
self.assertEqual(result[0]['kind'], 'Namespace')
self.assertEqual(result[0]['metadata']['name'], 'test-namespace')

def test_expand_and_validate_manifests_with_list_kind(self):
"""Test _expand_and_validate_manifests with kind: List manifest."""
manifests = [
{
'apiVersion': 'v1',
'kind': 'List',
'items': [
{
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {'name': 'test-ns-1'}
},
{
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {'name': 'test-service', 'namespace': 'default'}
}
]
}
]

result = self.client._expand_and_validate_manifests(manifests)

self.assertEqual(len(result), 2)
self.assertEqual(result[0]['kind'], 'Namespace')
self.assertEqual(result[0]['metadata']['name'], 'test-ns-1')
self.assertEqual(result[1]['kind'], 'Service')
self.assertEqual(result[1]['metadata']['name'], 'test-service')

def test_expand_and_validate_manifests_with_nested_list_kind(self):
"""Test _expand_and_validate_manifests with nested kind: List manifests."""
manifests = [
{
'apiVersion': 'v1',
'kind': 'List',
'items': [
{
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {'name': 'test-ns-1'}
},
{
'apiVersion': 'v1',
'kind': 'List',
'items': [
{
'apiVersion': 'v1',
'kind': 'ConfigMap',
'metadata': {'name': 'test-cm', 'namespace': 'default'}
},
{
'apiVersion': 'v1',
'kind': 'Secret',
'metadata': {'name': 'test-secret', 'namespace': 'default'}
}
]
}
]
}
]

result = self.client._expand_and_validate_manifests(manifests)

self.assertEqual(len(result), 3)
self.assertEqual(result[0]['kind'], 'Namespace')
self.assertEqual(result[1]['kind'], 'ConfigMap')
self.assertEqual(result[2]['kind'], 'Secret')

def test_expand_and_validate_manifests_with_non_dict_list(self):
"""Test _expand_and_validate_manifests with list (not dict) manifest - should be skipped."""
manifests = [
['item1', 'item2', 'item3'],
{
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {'name': 'test-namespace'}
}
]

with self.assertLogs('clients.kubernetes_client', level='WARNING') as log:
result = self.client._expand_and_validate_manifests(manifests)

# Only the valid dict should be returned
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['kind'], 'Namespace')

# Check warning was logged
self.assertTrue(any('Skipping non-dictionary manifest' in message for message in log.output))
self.assertTrue(any('list' in message for message in log.output))

def test_expand_and_validate_manifests_with_scalar(self):
"""Test _expand_and_validate_manifests with scalar manifest - should be skipped."""
manifests = [
'just a string',
42,
{
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {'name': 'test-namespace'}
}
]

with self.assertLogs('clients.kubernetes_client', level='WARNING') as log:
result = self.client._expand_and_validate_manifests(manifests)

# Only the valid dict should be returned
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['kind'], 'Namespace')

# Check warnings were logged for both scalar values
warning_messages = [msg for msg in log.output if 'Skipping non-dictionary manifest' in msg]
self.assertEqual(len(warning_messages), 2)

def test_expand_and_validate_manifests_with_none_and_empty(self):
"""Test _expand_and_validate_manifests with None and empty manifests - should be skipped."""
manifests = [
None,
{},
{
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {'name': 'test-namespace'}
},
None
]

result = self.client._expand_and_validate_manifests(manifests)

# Both None and empty dict are skipped, only valid Namespace is kept
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['kind'], 'Namespace')

def test_expand_and_validate_manifests_with_list_kind_invalid_items(self):
"""Test _expand_and_validate_manifests with kind: List but invalid items field."""
manifests = [
{
'apiVersion': 'v1',
'kind': 'List',
'items': 'not-a-list' # Invalid: should be a list
}
]

with self.assertLogs('clients.kubernetes_client', level='WARNING') as log:
result = self.client._expand_and_validate_manifests(manifests)

# No items should be returned since items field is invalid
self.assertEqual(len(result), 0)

# Check warning was logged
self.assertTrue(any('invalid' in message.lower() and 'items' in message for message in log.output))

def test_expand_and_validate_manifests_with_list_kind_missing_items(self):
"""Test _expand_and_validate_manifests with kind: List but missing items field."""
manifests = [
{
'apiVersion': 'v1',
'kind': 'List'
# items field is missing
}
]

result = self.client._expand_and_validate_manifests(manifests)

# No items should be returned since items field is missing (defaults to [])
self.assertEqual(len(result), 0)

@patch('requests.get')
@patch('clients.kubernetes_client.KubernetesClient._apply_single_manifest')
def test_apply_manifest_from_url_with_list_kind(self, mock_apply_single, mock_requests_get):
"""Test apply_manifest_from_url with kind: List manifest."""
mock_response = MagicMock()
mock_response.text = """
apiVersion: v1
kind: List
items:
- apiVersion: v1
kind: Namespace
metadata:
name: test-namespace
- apiVersion: v1
kind: ConfigMap
metadata:
name: test-config
namespace: test-namespace
data:
key: value
"""
mock_response.raise_for_status.return_value = None
mock_requests_get.return_value = mock_response

self.client.apply_manifest_from_url("https://example.com/list-manifest.yaml")

# Verify both items from the List were applied
self.assertEqual(mock_apply_single.call_count, 2)

# Check first item (Namespace)
first_manifest = mock_apply_single.call_args_list[0][0][0]
self.assertEqual(first_manifest['kind'], 'Namespace')
self.assertEqual(first_manifest['metadata']['name'], 'test-namespace')

# Check second item (ConfigMap)
second_manifest = mock_apply_single.call_args_list[1][0][0]
self.assertEqual(second_manifest['kind'], 'ConfigMap')
self.assertEqual(second_manifest['metadata']['name'], 'test-config')

@patch('requests.get')
@patch('clients.kubernetes_client.KubernetesClient._apply_single_manifest')
def test_apply_manifest_from_url_with_non_dict_manifest(self, mock_apply_single, mock_requests_get):
"""Test apply_manifest_from_url with non-dict YAML document - should be skipped."""
mock_response = MagicMock()
mock_response.text = """
- item1
- item2
- item3
---
apiVersion: v1
kind: Namespace
metadata:
name: test-namespace
"""
mock_response.raise_for_status.return_value = None
mock_requests_get.return_value = mock_response

with self.assertLogs('clients.kubernetes_client', level='WARNING') as log:
self.client.apply_manifest_from_url("https://example.com/mixed-manifest.yaml")

# Only the valid Namespace should be applied
self.assertEqual(mock_apply_single.call_count, 1)
applied_manifest = mock_apply_single.call_args[0][0]
self.assertEqual(applied_manifest['kind'], 'Namespace')

# Check warning was logged
self.assertTrue(any('Skipping non-dictionary manifest' in message for message in log.output))


if __name__ == '__main__':
unittest.main()
Loading