1
+ """
2
+ Kafka client wrapper for Ducktape tests
3
+ Assumes Kafka is already running on localhost:9092
4
+ """
5
+ import time
6
+ from ducktape .services .service import Service
7
+
8
+
9
+ class KafkaClient (Service ):
10
+ """Kafka client wrapper - assumes external Kafka is running"""
11
+
12
+ def __init__ (self , context , bootstrap_servers = "localhost:9092" ):
13
+ # Use num_nodes=0 since we're not managing any nodes
14
+ super (KafkaClient , self ).__init__ (context , num_nodes = 0 )
15
+ self .bootstrap_servers_str = bootstrap_servers
16
+
17
+ def start_node (self , node ):
18
+ """No-op since we assume Kafka is already running"""
19
+ self .logger .info ("Assuming Kafka is already running on %s" , self .bootstrap_servers_str )
20
+
21
+ def stop_node (self , node ):
22
+ """No-op since we don't manage the Kafka service"""
23
+ self .logger .info ("Not stopping external Kafka service" )
24
+
25
+ def clean_node (self , node ):
26
+ """No-op since we don't manage any files"""
27
+ self .logger .info ("No cleanup needed for external Kafka service" )
28
+
29
+ def bootstrap_servers (self ):
30
+ """Get bootstrap servers string for clients"""
31
+ return self .bootstrap_servers_str
32
+
33
+ def verify_connection (self ):
34
+ """Verify that Kafka is accessible"""
35
+ try :
36
+ from confluent_kafka .admin import AdminClient
37
+ admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
38
+
39
+ # Try to get cluster metadata to verify connection
40
+ metadata = admin_client .list_topics (timeout = 10 )
41
+ self .logger .info ("Successfully connected to Kafka. Available topics: %s" ,
42
+ list (metadata .topics .keys ()))
43
+ return True
44
+ except Exception as e :
45
+ self .logger .error ("Failed to connect to Kafka at %s: %s" , self .bootstrap_servers_str , e )
46
+ return False
47
+
48
+ def create_topic (self , topic , partitions = 1 , replication_factor = 1 ):
49
+ """Create a topic using Kafka admin client"""
50
+ try :
51
+ from confluent_kafka .admin import AdminClient , NewTopic
52
+
53
+ admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
54
+
55
+ topic_config = NewTopic (
56
+ topic = topic ,
57
+ num_partitions = partitions ,
58
+ replication_factor = replication_factor
59
+ )
60
+
61
+ self .logger .info ("Creating topic: %s (partitions=%d, replication=%d)" ,
62
+ topic , partitions , replication_factor )
63
+
64
+ # Create topic
65
+ fs = admin_client .create_topics ([topic_config ])
66
+
67
+ # Wait for topic creation to complete
68
+ for topic_name , f in fs .items ():
69
+ try :
70
+ f .result (timeout = 30 ) # Wait up to 30 seconds
71
+ self .logger .info ("Topic %s created successfully" , topic_name )
72
+ except Exception as e :
73
+ if "already exists" in str (e ).lower ():
74
+ self .logger .info ("Topic %s already exists" , topic_name )
75
+ else :
76
+ self .logger .warning ("Failed to create topic %s: %s" , topic_name , e )
77
+
78
+ except ImportError :
79
+ self .logger .error ("confluent_kafka not available for topic creation" )
80
+ except Exception as e :
81
+ self .logger .error ("Failed to create topic %s: %s" , topic , e )
82
+
83
+ def list_topics (self ):
84
+ """List all topics using admin client"""
85
+ try :
86
+ from confluent_kafka .admin import AdminClient
87
+
88
+ admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
89
+ metadata = admin_client .list_topics (timeout = 10 )
90
+
91
+ topics = list (metadata .topics .keys ())
92
+ self .logger .debug ("Available topics: %s" , topics )
93
+ return topics
94
+
95
+ except ImportError :
96
+ self .logger .error ("confluent_kafka not available for listing topics" )
97
+ return []
98
+ except Exception as e :
99
+ self .logger .error ("Failed to list topics: %s" , e )
100
+ return []
101
+
102
+ def wait_for_topic (self , topic_name , max_wait_time = 30 , initial_wait = 0.1 ):
103
+ """
104
+ Wait for topic to be created with exponential backoff retry logic.
105
+ """
106
+ wait_time = initial_wait
107
+ total_wait = 0
108
+
109
+ self .logger .info ("Waiting for topic '%s' to be available..." , topic_name )
110
+
111
+ while total_wait < max_wait_time :
112
+ topics = self .list_topics ()
113
+ if topic_name in topics :
114
+ self .logger .info ("Topic '%s' is ready after %.2fs" , topic_name , total_wait )
115
+ return True
116
+
117
+ self .logger .debug ("Topic '%s' not found, waiting %.2fs (total: %.2fs)" ,
118
+ topic_name , wait_time , total_wait )
119
+ time .sleep (wait_time )
120
+ total_wait += wait_time
121
+
122
+ # Exponential backoff with max cap of 2 seconds
123
+ wait_time = min (wait_time * 2 , 2.0 )
124
+
125
+ self .logger .error ("Timeout waiting for topic '%s' after %ds" , topic_name , max_wait_time )
126
+ return False
0 commit comments