#kafka #kafka-topic #publish #broker #debugging-tool #cache #negotiation #kc #command-line-tool

app kcli

A Kafka client CLI tool for debugging and inspecting Kafka clusters

1 unstable release

0.1.0 Nov 7, 2025

#440 in HTTP server

Apache-2.0

58KB
1K SLoC

kc - Kafka CLI Tool

A lightweight command-line tool for debugging and managing Kafka clusters using the native Kafka protocol.

Features

  • Direct Kafka protocol communication without heavyweight dependencies
  • Automatic API version negotiation with broker compatibility detection
  • Intelligent caching of API versions for optimal performance
  • Support for topic management (create, delete, inspect)
  • Message publishing and fetching capabilities
  • Cluster metadata inspection
  • Built with Rust for speed and reliability

Installation

Prerequisites

  • Rust 1.70 or higher
  • Cargo

Build from Source

cargo build --release

The binary will be available at target/release/kc.

Usage

Basic Syntax

kc <BROKER> <COMMAND> [ARGUMENTS]

All commands require the broker address as the first argument.

Commands

Get Metadata

Retrieve metadata information about the Kafka cluster, including brokers, topics, and partitions.

# Get metadata for all topics
kc localhost:9092 get-metadata

# Get metadata for a specific topic
kc localhost:9092 get-metadata --topic my-topic

# Connect to a remote broker
kc kafka.example.com:9092 get-metadata

Get Topic Information

Get detailed information about a specific topic.

kc localhost:9092 get-topic my-topic

Shows:

  • Topic ID
  • Partition count
  • Partition details (leader, replicas, ISR)
  • Internal topic flag

Create Topic

Create a new topic with an optional partition count.

# Create topic with 1 partition (default)
kc localhost:9092 create-topic my-topic

# Create topic with 3 partitions
kc localhost:9092 create-topic my-topic:3

Behavior:

  • If topic exists with matching partitions: reports "Topic already exists"
  • If topic exists with different partitions: asks for confirmation to recreate (y/n)
  • Validates partition count before creation

Delete Topic

Delete an existing topic.

kc localhost:9092 delete-topic my-topic

Verifies the topic exists before attempting deletion.

Publish Message

Publish a message to a topic with a key and optional value.

# Publish to partition 0 (default)
kc localhost:9092 publish my-topic key1 value1

# Publish to specific partition
kc localhost:9092 publish my-topic:2 key2 value2

# Publish with key only (no value)
kc localhost:9092 publish my-topic key3

Features:

  • Validates topic and partition exist before publishing
  • Shows partition, offset, and timestamp on success
  • Clear error messages with error codes

Fetch Messages

Fetch and display messages from a topic partition.

# Fetch from partition 0, starting at offset 0
kc localhost:9092 fetch my-topic:0

# Fetch from specific offset
kc localhost:9092 fetch my-topic:0:100

# Fetch from partition 2, offset 50
kc localhost:9092 fetch my-topic:2:50

Features:

  • Validates topic and partition exist before fetching
  • Displays key-value pairs for each message
  • Default offset is 0 (beginning)

API Version Negotiation

The client automatically handles API version negotiation:

  1. On the first request, it fetches and caches all supported API versions from the broker
  2. For each subsequent API call, it negotiates the best compatible version
  3. The negotiated version is used for encoding requests and decoding responses

This ensures compatibility across different Kafka broker versions without hardcoding version numbers.

Examples

Complete Workflow

# Set broker for convenience
BROKER=localhost:9092

# Check cluster metadata
kc $BROKER get-metadata

# Create a topic with 3 partitions
kc $BROKER create-topic orders:3

# Verify topic was created
kc $BROKER get-topic orders

# Publish some messages
kc $BROKER publish orders:0 order-1 '{"item":"laptop","price":999}'
kc $BROKER publish orders:1 order-2 '{"item":"mouse","price":25}'
kc $BROKER publish orders:2 order-3 '{"item":"keyboard","price":75}'

# Fetch messages
kc $BROKER fetch orders:0:0
kc $BROKER fetch orders:1:0
kc $BROKER fetch orders:2:0

# Delete the topic
kc $BROKER delete-topic orders

Debug Mode

Enable detailed logging with the RUST_LOG environment variable:

RUST_LOG=debug kc localhost:9092 create-topic my-topic:3

This will show:

  • API version negotiation details
  • Request/response sizes
  • Wire protocol information
  • Record batch encoding details
  • Full error responses

Architecture

KafkaClient

The core client implementation includes:

  • Connection Management: TCP socket connection to Kafka brokers
  • API Version Cache: Automatic caching of broker-supported API versions
  • Version Negotiation: Intelligent selection of compatible API versions
  • Protocol Handling: Request encoding and response decoding using kafka-protocol crate

Request Flow

1. Client connects to broker
2. On first API call:
   - Fetch API versions from broker
   - Cache version information
3. For each request:
   - Negotiate compatible API version
   - Encode request with negotiated version
   - Send request over TCP
   - Receive and decode response

Supported API Versions

For maximum compatibility, the tool uses conservative API versions:

  • CreateTopics: Version 4
  • DeleteTopics: Version 4
  • Produce: Version 7
  • Fetch: Version 11
  • Metadata: Version 12

Dependencies

  • tokio - Async runtime for I/O operations
  • kafka-protocol - Kafka protocol serialization/deserialization
  • anyhow - Error handling
  • clap - Command-line argument parsing
  • bytes - Byte buffer utilities
  • tracing - Structured logging

Development

Running Tests

The project includes comprehensive unit tests covering:

  • Argument parsing and validation
  • Kafka error codes
  • Record batch encoding/decoding
  • API version negotiation logic
  • Topic name and partition validation
  • Broker address parsing

Run all tests:

cargo test

Run tests with output:

cargo test -- --nocapture

Run specific test:

cargo test test_record_batch_encoding

Test Coverage

Current test suite includes 16+ tests covering:

  • Parsing: Topic specs, partition numbers, offsets, broker addresses
  • Validation: Partition counts, topic names, offset ranges
  • Protocol: Record encoding/decoding, error codes, API version negotiation
  • Logic: Correlation ID management, bytes conversion

Building for Release

cargo build --release

Code Structure

  • src/main.rs - Main entry point, KafkaClient implementation, and tests
  • Cargo.toml - Project dependencies and metadata

Protocol Details

This tool implements the Kafka wire protocol directly:

  • Frame Format: [4 bytes: size][request/response data]
  • Request Structure: [RequestHeader][Request Body]
  • Response Structure: [ResponseHeader][Response Body]
  • API Version Support: Dynamically queried from broker
  • Record Format: Version 2 (record batches)

Error Codes

Common Kafka error codes you might encounter:

  • 0: No error (success)
  • 1: OFFSET_OUT_OF_RANGE - Requested offset is invalid
  • 3: UNKNOWN_TOPIC_OR_PARTITION - Topic or partition doesn't exist
  • 5: LEADER_NOT_AVAILABLE - Partition leader is not available
  • 6: NOT_LEADER_FOR_PARTITION - Broker is not the leader
  • 36: TOPIC_ALREADY_EXISTS - Topic creation failed (already exists)
  • 37: INVALID_PARTITIONS - Invalid partition count
  • 38: INVALID_REPLICATION_FACTOR - Invalid replication factor

Troubleshooting

Connection Issues

If you encounter connection problems:

# Verify broker is accessible
telnet localhost 9092

# Check with debug logging
RUST_LOG=debug kc localhost:9092 get-metadata

Topic Already Exists

When creating a topic that already exists:

  • If partition count matches: Tool reports the topic exists
  • If partition count differs: Tool asks if you want to recreate it
  • Recreating deletes the old topic and creates a new one

Publishing Not Working

If publish succeeds but fetch returns no messages:

  1. Verify the topic and partition exist: kc <broker> get-topic <topic>
  2. Check you're fetching from the correct partition
  3. Run with RUST_LOG=debug to see detailed error information
  4. Ensure the broker is properly configured and has sufficient resources

API Version Errors

If you see API version errors:

  1. Check that your Kafka broker is running and accessible
  2. Verify the broker version supports the requested APIs
  3. Enable debug logging to see version negotiation details

Performance

The client caches API versions per connection, so:

  • First request will make two calls (API versions + actual request)
  • Subsequent requests only make the actual API call
  • Each command creates a new connection (stateless)

Contributing

Contributions are welcome! Please ensure:

  1. Code compiles without warnings
  2. All tests pass
  3. New features include appropriate documentation
  4. Follow Rust best practices and idioms

License

Apache-2.0

Dependencies

~20–27MB
~415K SLoC