1 unstable release
| 0.1.0 | Nov 7, 2025 |
|---|
#440 in HTTP server
58KB
1K
SLoC
kc - Kafka CLI Tool
A lightweight command-line tool for debugging and managing Kafka clusters using the native Kafka protocol.
Features
- Direct Kafka protocol communication without heavyweight dependencies
- Automatic API version negotiation with broker compatibility detection
- Intelligent caching of API versions for optimal performance
- Support for topic management (create, delete, inspect)
- Message publishing and fetching capabilities
- Cluster metadata inspection
- Built with Rust for speed and reliability
Installation
Prerequisites
- Rust 1.70 or higher
- Cargo
Build from Source
cargo build --release
The binary will be available at target/release/kc.
Usage
Basic Syntax
kc <BROKER> <COMMAND> [ARGUMENTS]
All commands require the broker address as the first argument.
Commands
Get Metadata
Retrieve metadata information about the Kafka cluster, including brokers, topics, and partitions.
# Get metadata for all topics
kc localhost:9092 get-metadata
# Get metadata for a specific topic
kc localhost:9092 get-metadata --topic my-topic
# Connect to a remote broker
kc kafka.example.com:9092 get-metadata
Get Topic Information
Get detailed information about a specific topic.
kc localhost:9092 get-topic my-topic
Shows:
- Topic ID
- Partition count
- Partition details (leader, replicas, ISR)
- Internal topic flag
Create Topic
Create a new topic with an optional partition count.
# Create topic with 1 partition (default)
kc localhost:9092 create-topic my-topic
# Create topic with 3 partitions
kc localhost:9092 create-topic my-topic:3
Behavior:
- If topic exists with matching partitions: reports "Topic already exists"
- If topic exists with different partitions: asks for confirmation to recreate (y/n)
- Validates partition count before creation
Delete Topic
Delete an existing topic.
kc localhost:9092 delete-topic my-topic
Verifies the topic exists before attempting deletion.
Publish Message
Publish a message to a topic with a key and optional value.
# Publish to partition 0 (default)
kc localhost:9092 publish my-topic key1 value1
# Publish to specific partition
kc localhost:9092 publish my-topic:2 key2 value2
# Publish with key only (no value)
kc localhost:9092 publish my-topic key3
Features:
- Validates topic and partition exist before publishing
- Shows partition, offset, and timestamp on success
- Clear error messages with error codes
Fetch Messages
Fetch and display messages from a topic partition.
# Fetch from partition 0, starting at offset 0
kc localhost:9092 fetch my-topic:0
# Fetch from specific offset
kc localhost:9092 fetch my-topic:0:100
# Fetch from partition 2, offset 50
kc localhost:9092 fetch my-topic:2:50
Features:
- Validates topic and partition exist before fetching
- Displays key-value pairs for each message
- Default offset is 0 (beginning)
API Version Negotiation
The client automatically handles API version negotiation:
- On the first request, it fetches and caches all supported API versions from the broker
- For each subsequent API call, it negotiates the best compatible version
- The negotiated version is used for encoding requests and decoding responses
This ensures compatibility across different Kafka broker versions without hardcoding version numbers.
Examples
Complete Workflow
# Set broker for convenience
BROKER=localhost:9092
# Check cluster metadata
kc $BROKER get-metadata
# Create a topic with 3 partitions
kc $BROKER create-topic orders:3
# Verify topic was created
kc $BROKER get-topic orders
# Publish some messages
kc $BROKER publish orders:0 order-1 '{"item":"laptop","price":999}'
kc $BROKER publish orders:1 order-2 '{"item":"mouse","price":25}'
kc $BROKER publish orders:2 order-3 '{"item":"keyboard","price":75}'
# Fetch messages
kc $BROKER fetch orders:0:0
kc $BROKER fetch orders:1:0
kc $BROKER fetch orders:2:0
# Delete the topic
kc $BROKER delete-topic orders
Debug Mode
Enable detailed logging with the RUST_LOG environment variable:
RUST_LOG=debug kc localhost:9092 create-topic my-topic:3
This will show:
- API version negotiation details
- Request/response sizes
- Wire protocol information
- Record batch encoding details
- Full error responses
Architecture
KafkaClient
The core client implementation includes:
- Connection Management: TCP socket connection to Kafka brokers
- API Version Cache: Automatic caching of broker-supported API versions
- Version Negotiation: Intelligent selection of compatible API versions
- Protocol Handling: Request encoding and response decoding using
kafka-protocolcrate
Request Flow
1. Client connects to broker
2. On first API call:
- Fetch API versions from broker
- Cache version information
3. For each request:
- Negotiate compatible API version
- Encode request with negotiated version
- Send request over TCP
- Receive and decode response
Supported API Versions
For maximum compatibility, the tool uses conservative API versions:
- CreateTopics: Version 4
- DeleteTopics: Version 4
- Produce: Version 7
- Fetch: Version 11
- Metadata: Version 12
Dependencies
tokio- Async runtime for I/O operationskafka-protocol- Kafka protocol serialization/deserializationanyhow- Error handlingclap- Command-line argument parsingbytes- Byte buffer utilitiestracing- Structured logging
Development
Running Tests
The project includes comprehensive unit tests covering:
- Argument parsing and validation
- Kafka error codes
- Record batch encoding/decoding
- API version negotiation logic
- Topic name and partition validation
- Broker address parsing
Run all tests:
cargo test
Run tests with output:
cargo test -- --nocapture
Run specific test:
cargo test test_record_batch_encoding
Test Coverage
Current test suite includes 16+ tests covering:
- Parsing: Topic specs, partition numbers, offsets, broker addresses
- Validation: Partition counts, topic names, offset ranges
- Protocol: Record encoding/decoding, error codes, API version negotiation
- Logic: Correlation ID management, bytes conversion
Building for Release
cargo build --release
Code Structure
src/main.rs- Main entry point, KafkaClient implementation, and testsCargo.toml- Project dependencies and metadata
Protocol Details
This tool implements the Kafka wire protocol directly:
- Frame Format:
[4 bytes: size][request/response data] - Request Structure:
[RequestHeader][Request Body] - Response Structure:
[ResponseHeader][Response Body] - API Version Support: Dynamically queried from broker
- Record Format: Version 2 (record batches)
Error Codes
Common Kafka error codes you might encounter:
- 0: No error (success)
- 1: OFFSET_OUT_OF_RANGE - Requested offset is invalid
- 3: UNKNOWN_TOPIC_OR_PARTITION - Topic or partition doesn't exist
- 5: LEADER_NOT_AVAILABLE - Partition leader is not available
- 6: NOT_LEADER_FOR_PARTITION - Broker is not the leader
- 36: TOPIC_ALREADY_EXISTS - Topic creation failed (already exists)
- 37: INVALID_PARTITIONS - Invalid partition count
- 38: INVALID_REPLICATION_FACTOR - Invalid replication factor
Troubleshooting
Connection Issues
If you encounter connection problems:
# Verify broker is accessible
telnet localhost 9092
# Check with debug logging
RUST_LOG=debug kc localhost:9092 get-metadata
Topic Already Exists
When creating a topic that already exists:
- If partition count matches: Tool reports the topic exists
- If partition count differs: Tool asks if you want to recreate it
- Recreating deletes the old topic and creates a new one
Publishing Not Working
If publish succeeds but fetch returns no messages:
- Verify the topic and partition exist:
kc <broker> get-topic <topic> - Check you're fetching from the correct partition
- Run with
RUST_LOG=debugto see detailed error information - Ensure the broker is properly configured and has sufficient resources
API Version Errors
If you see API version errors:
- Check that your Kafka broker is running and accessible
- Verify the broker version supports the requested APIs
- Enable debug logging to see version negotiation details
Performance
The client caches API versions per connection, so:
- First request will make two calls (API versions + actual request)
- Subsequent requests only make the actual API call
- Each command creates a new connection (stateless)
Contributing
Contributions are welcome! Please ensure:
- Code compiles without warnings
- All tests pass
- New features include appropriate documentation
- Follow Rust best practices and idioms
License
Apache-2.0
Dependencies
~20–27MB
~415K SLoC