Skip to content

feat: replace gRPC transport with FIBP binary protocol#5

Merged
vieiralucas merged 3 commits intomainfrom
feat/fibp-transport
Mar 26, 2026
Merged

feat: replace gRPC transport with FIBP binary protocol#5
vieiralucas merged 3 commits intomainfrom
feat/fibp-transport

Conversation

@vieiralucas
Copy link
Copy Markdown
Member

@vieiralucas vieiralucas commented Mar 26, 2026

Summary

  • Replaces the gRPC/protobuf transport with FIBP (Fila Binary Protocol) over raw TCP/TLS
  • Removes all gRPC and protobuf dependencies from build.gradle (no runtime dependencies remain)
  • Adds FibpConnection — TCP socket with length-prefixed framing, handshake, correlation-ID multiplexing via ConcurrentHashMap + CompletableFuture, heartbeat scheduler, and optional TLS via SSLSocket
  • Adds FibpCodec — wire encoding/decoding matching fila-core/src/fibp/wire.rs exactly (enqueue, consume push batch, ack, nack, error frames)
  • Keeps the public FilaClient API identical: enqueue, enqueueMany, consume, ack, nack, close, Builder with all TLS/auth/batch options preserved
  • Batcher groups messages by queue before sending (one FIBP enqueue frame per queue name, since the frame-level field is per-queue)
  • RpcException.Code replaces io.grpc.Status.Code; error classification uses keyword matching on plain-text error payloads (matching the Rust SDK pattern)
  • AUTH uses raw key bytes in OP_AUTH frame (no length prefix, matching server protocol)
  • Consume push frames decoded as batches (msg_count:u16 | messages...)
  • FibpAdminClient added to test package with hand-rolled minimal protobuf for CreateQueueRequest (avoids a test protobuf dependency)
  • Integration tests (FilaClientTest, BatchClientTest, TlsAuthClientTest) guarded by @EnabledIf("serverAvailable") and skip cleanly when no binary is present; unit tests all pass

Test plan

  • All 21 unit tests pass (./gradlew test)
  • spotlessCheck passes (formatting enforced by Google Java Format)
  • CI integration tests pass against downloaded fila-server binary
  • TLS tests: verify withTls(), withTlsCaCert(), withTlsClientCert() (mTLS) all connect
  • Auth tests: verify withApiKey() sends AUTH frame and server accepts/rejects correctly

🤖 Generated with Claude Code


Summary by cubic

Replaced the gRPC/protobuf transport with FIBP (Fila Binary Protocol) over raw TCP/TLS while keeping the public FilaClient API the same. This removes all gRPC/protobuf deps, adds a single-socket multiplexed connection with heartbeat and AUTH, and includes stability/validation fixes.

  • New Features

    • Added FibpConnection: framing, handshake, correlation IDs, heartbeat scheduler, optional TLS, and AUTH via OP_AUTH.
    • Added FibpCodec: encode/decode for enqueue, batched consume push, ack/nack, and error frames; validates header count (<=255) and str16 length (<=65535).
    • Public FilaClient API unchanged; batcher groups messages by queue and sends one enqueue frame per queue; enqueueMany enforces a single target queue.
    • Replaced gRPC status with RpcException.Code; errors mapped from server error text.
    • Robustness: remove pending futures on consume timeout/interrupt; guard push handler exceptions to keep the reader thread alive; map batch timeouts to UNAVAILABLE; add a fallback catch in the batcher to resolve futures with INTERNAL on unexpected exceptions.
    • Tests use FibpAdminClient for queue creation (supports TLS/mTLS); README updated; version bumped to 0.3.0.
  • Migration

    • Update dependency to dev.faisca:fila-client:0.3.0.
    • Replace any uses of gRPC Status.Code with RpcException.Code.
    • build() now opens a connection; tests that constructed clients without a running server should guard or defer client creation.
    • Consume handlers run on the FIBP reader thread; avoid heavy or blocking work in the callback.
    • enqueueMany should target a single queue per call.

Written for commit 03bf792. Summary will update on new commits.

rewrite the java sdk transport layer to use fibp (fila binary protocol)
over raw tcp instead of grpc/protobuf.

- add FibpConnection: tcp socket with length-prefixed framing, handshake,
  correlation-id multiplexing via ConcurrentHashMap + CompletableFuture,
  heartbeat scheduler, and optional tls via SSLSocket
- add FibpCodec: wire encoding/decoding for enqueue, consume push batch,
  ack, nack, and error frames (exact format from fila-core/src/fibp/wire.rs)
- rewrite FilaClient to use FibpConnection; remove all grpc/protobuf deps
- rewrite Batcher to use FibpConnection; groups messages by queue into
  separate fibp frames (one queue name per enqueue frame)
- rewrite ConsumerHandle without grpc Context dependency
- update RpcException to define its own Code enum (no grpc Status.Code)
- remove ApiKeyInterceptor (auth now via fibp OP_AUTH frame)
- remove proto/ directory and grpc/protobuf gradle deps; bump to v0.3.0
- add FibpAdminClient in test package for createQueue (hand-rolled protobuf
  encoding of CreateQueueRequest, avoids adding a test protobuf dependency)
- update TestServer to use FibpAdminClient instead of grpc admin stub
- update BuilderTest: remove tests that require a server connection (fibp
  connects eagerly unlike grpc lazy channels); add address-parsing unit tests
- add @EnabledIf("serverAvailable") guard to FilaClientTest
- update TlsAuthClientTest to use RpcException.Code instead of grpc Status.Code
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 17 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/main/java/dev/faisca/fila/FibpConnection.java">

<violation number="1" location="src/main/java/dev/faisca/fila/FibpConnection.java:191">
P2: Remove the pending entry when the consume setup times out or is interrupted; otherwise the failed consume leaves a stale future in the pending map.</violation>

<violation number="2" location="src/main/java/dev/faisca/fila/FibpConnection.java:328">
P2: Guard push handlers so an exception doesn’t kill the reader thread and strand pending requests.</violation>
</file>

<file name="src/main/java/dev/faisca/fila/FibpCodec.java">

<violation number="1" location="src/main/java/dev/faisca/fila/FibpCodec.java:73">
P2: Guard the header count before writing the u8 field. Counts >255 wrap in writeByte and desynchronize the frame.</violation>

<violation number="2" location="src/main/java/dev/faisca/fila/FibpCodec.java:311">
P2: Validate str16 lengths before writing. Without a bounds check, strings longer than 65,535 bytes wrap in writeShort and corrupt the frame.</violation>
</file>

<file name="src/main/java/dev/faisca/fila/FilaClient.java">

<violation number="1" location="src/main/java/dev/faisca/fila/FilaClient.java:112">
P2: enqueueMany does not enforce that all messages target the same queue even though the FIBP frame encodes only the first queue name. Mixed-queue batches will silently send messages to the wrong queue.</violation>
</file>

<file name="src/test/java/dev/faisca/fila/TestServer.java">

<violation number="1" location="src/test/java/dev/faisca/fila/TestServer.java:88">
P2: createQueueWithApiKey now connects over a plain Socket via FibpAdminClient, but it is used immediately after startWithTls. This skips TLS/mTLS setup, so queue creation will fail against a TLS-enabled server.</violation>
</file>

<file name="src/main/java/dev/faisca/fila/Batcher.java">

<violation number="1" location="src/main/java/dev/faisca/fila/Batcher.java:239">
P2: respFuture.get can throw InterruptedException/TimeoutException, but the generic catch maps them to INTERNAL and clears the interrupt. Handle these explicitly (restore interrupt + map timeout to UNAVAILABLE) so batched enqueue errors match sendSync behavior.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

- FibpConnection: remove pending entry on consume timeout/interrupt to
  prevent stale future accumulation in the pending map
- FibpConnection: guard push handler invocations so exceptions from the
  user handler do not kill the reader thread and strand pending requests
- FibpCodec: validate header count <= 255 before writing u8 field
- FibpCodec: validate str16 string length <= 65535 before writing u16
- FilaClient.enqueueMany: validate all messages target the same queue —
  FIBP enqueue frames encode one queue name at the request level
- Batcher.flushQueueBatch: handle InterruptedException and TimeoutException
  explicitly; restore interrupt flag and map timeout to UNAVAILABLE
- FibpAdminClient: add TLS/mTLS support so createQueueWithApiKey works
  against TLS-enabled servers
- TestServer.createQueueImpl: use TLS connection when server has TLS enabled
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 6 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/main/java/dev/faisca/fila/Batcher.java">

<violation number="1" location="src/main/java/dev/faisca/fila/Batcher.java:244">
P2: The generic catch-all was removed, so unexpected runtime exceptions (e.g., decodeEnqueueResponse failures) will now escape this method and leave the per-item futures unresolved. Add a fallback catch to map and complete all futures on any other exception.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

without the fallback, unexpected runtime exceptions such as those from
decodeEnqueueResponse can escape flushQueueBatch and leave per-item
CompletableFuture instances unresolved, blocking callers indefinitely.
add a RuntimeException catch-all after the typed handlers that resolves
all futures with an INTERNAL error.
@vieiralucas vieiralucas merged commit 6694a8e into main Mar 26, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant