feat: replace gRPC transport with FIBP binary protocol#5
Merged
vieiralucas merged 3 commits intomainfrom Mar 26, 2026
Merged
Conversation
rewrite the java sdk transport layer to use fibp (fila binary protocol)
over raw tcp instead of grpc/protobuf.
- add FibpConnection: tcp socket with length-prefixed framing, handshake,
correlation-id multiplexing via ConcurrentHashMap + CompletableFuture,
heartbeat scheduler, and optional tls via SSLSocket
- add FibpCodec: wire encoding/decoding for enqueue, consume push batch,
ack, nack, and error frames (exact format from fila-core/src/fibp/wire.rs)
- rewrite FilaClient to use FibpConnection; remove all grpc/protobuf deps
- rewrite Batcher to use FibpConnection; groups messages by queue into
separate fibp frames (one queue name per enqueue frame)
- rewrite ConsumerHandle without grpc Context dependency
- update RpcException to define its own Code enum (no grpc Status.Code)
- remove ApiKeyInterceptor (auth now via fibp OP_AUTH frame)
- remove proto/ directory and grpc/protobuf gradle deps; bump to v0.3.0
- add FibpAdminClient in test package for createQueue (hand-rolled protobuf
encoding of CreateQueueRequest, avoids adding a test protobuf dependency)
- update TestServer to use FibpAdminClient instead of grpc admin stub
- update BuilderTest: remove tests that require a server connection (fibp
connects eagerly unlike grpc lazy channels); add address-parsing unit tests
- add @EnabledIf("serverAvailable") guard to FilaClientTest
- update TlsAuthClientTest to use RpcException.Code instead of grpc Status.Code
There was a problem hiding this comment.
7 issues found across 17 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/main/java/dev/faisca/fila/FibpConnection.java">
<violation number="1" location="src/main/java/dev/faisca/fila/FibpConnection.java:191">
P2: Remove the pending entry when the consume setup times out or is interrupted; otherwise the failed consume leaves a stale future in the pending map.</violation>
<violation number="2" location="src/main/java/dev/faisca/fila/FibpConnection.java:328">
P2: Guard push handlers so an exception doesn’t kill the reader thread and strand pending requests.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/FibpCodec.java">
<violation number="1" location="src/main/java/dev/faisca/fila/FibpCodec.java:73">
P2: Guard the header count before writing the u8 field. Counts >255 wrap in writeByte and desynchronize the frame.</violation>
<violation number="2" location="src/main/java/dev/faisca/fila/FibpCodec.java:311">
P2: Validate str16 lengths before writing. Without a bounds check, strings longer than 65,535 bytes wrap in writeShort and corrupt the frame.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/FilaClient.java">
<violation number="1" location="src/main/java/dev/faisca/fila/FilaClient.java:112">
P2: enqueueMany does not enforce that all messages target the same queue even though the FIBP frame encodes only the first queue name. Mixed-queue batches will silently send messages to the wrong queue.</violation>
</file>
<file name="src/test/java/dev/faisca/fila/TestServer.java">
<violation number="1" location="src/test/java/dev/faisca/fila/TestServer.java:88">
P2: createQueueWithApiKey now connects over a plain Socket via FibpAdminClient, but it is used immediately after startWithTls. This skips TLS/mTLS setup, so queue creation will fail against a TLS-enabled server.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/Batcher.java">
<violation number="1" location="src/main/java/dev/faisca/fila/Batcher.java:239">
P2: respFuture.get can throw InterruptedException/TimeoutException, but the generic catch maps them to INTERNAL and clears the interrupt. Handle these explicitly (restore interrupt + map timeout to UNAVAILABLE) so batched enqueue errors match sendSync behavior.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
- FibpConnection: remove pending entry on consume timeout/interrupt to prevent stale future accumulation in the pending map - FibpConnection: guard push handler invocations so exceptions from the user handler do not kill the reader thread and strand pending requests - FibpCodec: validate header count <= 255 before writing u8 field - FibpCodec: validate str16 string length <= 65535 before writing u16 - FilaClient.enqueueMany: validate all messages target the same queue — FIBP enqueue frames encode one queue name at the request level - Batcher.flushQueueBatch: handle InterruptedException and TimeoutException explicitly; restore interrupt flag and map timeout to UNAVAILABLE - FibpAdminClient: add TLS/mTLS support so createQueueWithApiKey works against TLS-enabled servers - TestServer.createQueueImpl: use TLS connection when server has TLS enabled
There was a problem hiding this comment.
1 issue found across 6 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/main/java/dev/faisca/fila/Batcher.java">
<violation number="1" location="src/main/java/dev/faisca/fila/Batcher.java:244">
P2: The generic catch-all was removed, so unexpected runtime exceptions (e.g., decodeEnqueueResponse failures) will now escape this method and leave the per-item futures unresolved. Add a fallback catch to map and complete all futures on any other exception.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
without the fallback, unexpected runtime exceptions such as those from decodeEnqueueResponse can escape flushQueueBatch and leave per-item CompletableFuture instances unresolved, blocking callers indefinitely. add a RuntimeException catch-all after the typed handlers that resolves all futures with an INTERNAL error.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
build.gradle(no runtime dependencies remain)FibpConnection— TCP socket with length-prefixed framing, handshake, correlation-ID multiplexing viaConcurrentHashMap + CompletableFuture, heartbeat scheduler, and optional TLS viaSSLSocketFibpCodec— wire encoding/decoding matchingfila-core/src/fibp/wire.rsexactly (enqueue, consume push batch, ack, nack, error frames)FilaClientAPI identical:enqueue,enqueueMany,consume,ack,nack,close,Builderwith all TLS/auth/batch options preservedRpcException.Codereplacesio.grpc.Status.Code; error classification uses keyword matching on plain-text error payloads (matching the Rust SDK pattern)OP_AUTHframe (no length prefix, matching server protocol)msg_count:u16 | messages...)FibpAdminClientadded to test package with hand-rolled minimal protobuf forCreateQueueRequest(avoids a test protobuf dependency)FilaClientTest,BatchClientTest,TlsAuthClientTest) guarded by@EnabledIf("serverAvailable")and skip cleanly when no binary is present; unit tests all passTest plan
./gradlew test)spotlessCheckpasses (formatting enforced by Google Java Format)fila-serverbinarywithTls(),withTlsCaCert(),withTlsClientCert()(mTLS) all connectwithApiKey()sends AUTH frame and server accepts/rejects correctly🤖 Generated with Claude Code
Summary by cubic
Replaced the gRPC/protobuf transport with FIBP (Fila Binary Protocol) over raw TCP/TLS while keeping the public
FilaClientAPI the same. This removes all gRPC/protobuf deps, adds a single-socket multiplexed connection with heartbeat and AUTH, and includes stability/validation fixes.New Features
FibpConnection: framing, handshake, correlation IDs, heartbeat scheduler, optional TLS, and AUTH viaOP_AUTH.FibpCodec: encode/decode for enqueue, batched consume push, ack/nack, and error frames; validates header count (<=255) andstr16length (<=65535).FilaClientAPI unchanged; batcher groups messages by queue and sends one enqueue frame per queue;enqueueManyenforces a single target queue.RpcException.Code; errors mapped from server error text.UNAVAILABLE; add a fallback catch in the batcher to resolve futures withINTERNALon unexpected exceptions.FibpAdminClientfor queue creation (supports TLS/mTLS); README updated; version bumped to0.3.0.Migration
dev.faisca:fila-client:0.3.0.Status.CodewithRpcException.Code.build()now opens a connection; tests that constructed clients without a running server should guard or defer client creation.enqueueManyshould target a single queue per call.Written for commit 03bf792. Summary will update on new commits.