feat: replace grpc with fibp binary protocol#8
Open
vieiralucas wants to merge 11 commits intomainfrom
Open
Conversation
feat: transparent leader hint reconnect on consume
) add explicit batchEnqueue() method, transparent delivery batching in consume stream (unpacks repeated messages field with singular fallback), and three BatchMode options: AUTO (opportunistic, default), LINGER (timer-based), and DISABLED. enqueue() now routes through the batcher by default. single-item optimization uses regular Enqueue RPC for exact error semantics. close() drains pending messages before disconnecting.
- Copy new service.proto: BatchEnqueue RPC removed, Enqueue now takes repeated EnqueueMessage, Ack/Nack take repeated messages with per-item results, ConsumeResponse uses only repeated messages field - Rename BatchEnqueueResult to EnqueueResult (no "batch" prefix) - Replace batchEnqueue() with enqueueMany() on FilaClient - Update enqueue() to wrap single message in repeated EnqueueMessage - Update ack()/nack() to wrap in repeated, parse first result with typed error handling (AckError/NackError) - Update Batcher to use unified Enqueue RPC for all batch sizes - Update consumeStream() to use only getMessagesList() (no singular message fallback) - Update all tests to use new API names and types
ack() and nack() silently treated empty results and RESULT_NOT_SET as success. Now both methods validate exactly one result is returned and that it is an explicit success case, matching the pattern in enqueueDirect().
migrate the java sdk from grpc/protobuf to the fila binary protocol (fibp). adds fibp codec package with primitives, opcodes, frame encoding, and tcp connection manager. rewrites client to use fibp for all operations including enqueue, consume, ack, nack, and adds admin and auth operations. removes all grpc and protobuf dependencies. - new fibp codec: Primitives, Opcodes, Codec, FrameHeader, Connection - connection: tcp with optional tls, handshake, multiplexed req/resp, server-push delivery routing, ping/pong, continuation frames - client: admin ops (createQueue, deleteQueue, setConfig, getConfig, redrive), auth ops (createApiKey, revokeApiKey, setAcl) - error handling: fibp error codes replace grpc status codes - version bump to 0.3.0, zero external dependencies - 40 tests passing (27 unit + 13 integration including tls)
There was a problem hiding this comment.
11 issues found across 24 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/test/java/dev/faisca/fila/TlsAuthClientTest.java">
<violation number="1" location="src/test/java/dev/faisca/fila/TlsAuthClientTest.java:86">
P2: `assertNotNull(ex.getMessage())` is too weak here and can pass for unrelated connection failures, so the test no longer verifies API-key auth rejection specifically.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/fibp/Codec.java">
<violation number="1" location="src/main/java/dev/faisca/fila/fibp/Codec.java:83">
P3: Javadoc opcode value is wrong: `ACK` is `0x16`, not `0x15` (0x15 is `CANCEL_CONSUME`).</violation>
<violation number="2" location="src/main/java/dev/faisca/fila/fibp/Codec.java:94">
P3: Javadoc opcode value is wrong: `NACK` is `0x18`, not `0x17` (0x17 is `ACK_RESULT`).</violation>
</file>
<file name="src/test/java/dev/faisca/fila/TestServer.java">
<violation number="1" location="src/test/java/dev/faisca/fila/TestServer.java:91">
P2: Restore the interrupt flag when handling `InterruptedException` instead of swallowing it in a combined catch block.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/fibp/Primitives.java">
<violation number="1" location="src/main/java/dev/faisca/fila/fibp/Primitives.java:81">
P1: `writeString` can silently overflow its U16 length prefix, producing invalid frames for long strings.</violation>
<violation number="2" location="src/main/java/dev/faisca/fila/fibp/Primitives.java:93">
P1: `writeStringMap` can overflow the U16 element count and corrupt decoding for large maps.</violation>
<violation number="3" location="src/main/java/dev/faisca/fila/fibp/Primitives.java:184">
P1: `readBytes` should validate decoded length before allocating to avoid malformed-frame crashes or excessive allocation.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/fibp/Connection.java">
<violation number="1" location="src/main/java/dev/faisca/fila/fibp/Connection.java:83">
P1: Socket and stream resources leak if `performHandshake()` throws. Wrap the handshake (and read-loop start) in a try-catch that closes the socket on failure.</violation>
<violation number="2" location="src/main/java/dev/faisca/fila/fibp/Connection.java:192">
P2: `readFrame()` returns `null` for continuation frames, but `performHandshake()` dereferences the result without a null check, risking an NPE. Loop until a non-null frame is received.</violation>
</file>
<file name="src/main/java/dev/faisca/fila/FilaClient.java">
<violation number="1" location="src/main/java/dev/faisca/fila/FilaClient.java:152">
P2: `InterruptedException` caught without restoring the interrupt flag. All `catch (IOException | InterruptedException e)` blocks in this class wrap the exception but never call `Thread.currentThread().interrupt()`, unlike the batched `enqueue()` path which handles it correctly. This drops the interrupt signal, breaking cooperative cancellation for callers.</violation>
<violation number="2" location="src/main/java/dev/faisca/fila/FilaClient.java:596">
P1: Connection and client leak in leader redirect. `leaderConn` is never closed — `AutoCloseable.close()` is not called by GC, and the `ConsumerHandle` returned by `leaderClient.consume()` is discarded, so the redirected consumer can never be cancelled or its connection released.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
1 issue found across 7 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/main/java/dev/faisca/fila/FilaClient.java">
<violation number="1" location="src/main/java/dev/faisca/fila/FilaClient.java:643">
P1: Blocking on `handle.awaitDone()` in leader redirection can make redirected consumers non-cancellable and leave background consume threads running indefinitely.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| ConsumerHandle handle = leaderClient.consume(queue, handler); | ||
| // Block until the consumer thread finishes. This method is called from | ||
| // within the original consumer thread, so blocking here is expected. | ||
| handle.awaitDone(); |
There was a problem hiding this comment.
P1: Blocking on handle.awaitDone() in leader redirection can make redirected consumers non-cancellable and leave background consume threads running indefinitely.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/main/java/dev/faisca/fila/FilaClient.java, line 643:
<comment>Blocking on `handle.awaitDone()` in leader redirection can make redirected consumers non-cancellable and leave background consume threads running indefinitely.</comment>
<file context>
@@ -598,11 +638,11 @@ private void retryConsumeOnLeader(
- // This is called from within the consumer thread, so we just let it run
+ // Block until the consumer thread finishes. This method is called from
+ // within the original consumer thread, so blocking here is expected.
+ handle.awaitDone();
} finally {
- // Connection will be closed when the temporary client is GC'd or when
</file context>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
dev.faisca.fila.fibpcodec package:Primitives(big-endian encoding),Opcodes,Codec(frame encoding for all 42 opcodes),FrameHeader, andConnection(TCP/TLS, handshake, multiplexed request/response, server-push delivery, ping/pong, continuation frame reassembly)FilaClientto use FIBP for all operations, adds admin ops (createQueue,deleteQueue,setConfig,getConfig,redrive) and auth ops (createApiKey,revokeApiKey,setAcl)RpcExceptionto use FIBP error codes instead of gRPC status codesTest plan
PrimitivesTest(11 round-trip tests for all encoding primitives)BatchModeTest,EnqueueResultTest,BuilderTest(configuration validation)FilaClientTest(enqueue/consume/ack lifecycle, nack redelivery, nonexistent queue error)BatchClientTest(explicit batch, auto batching, linger batching, disabled batching, cross-queue batch errors)TlsAuthClientTest(TLS + API key, rejection without API key)spotlessApplypassesSummary by cubic
Replaced gRPC/protobuf with the Fila binary protocol (FIBP) in the Java SDK to remove external deps and add admin/auth features. Adds smart enqueue batching and addresses review feedback with stricter TLS builder checks and safer stream cancellation.
New Features
dev.faisca.fila.fibpcodec: primitives, opcodes, frame encoding, and a TCP/TLSConnectionwith handshake, multiplexed req/resp, server-push delivery, ping/pong, and continuation reassembly.FilaClientto use FIBP for enqueue/consume/ack/nack; added admin ops (create/delete queue, set/get config, redrive) and auth ops (create/revoke API key, set ACL).BatchMode, backgroundBatcher,enqueueMany, per-itemEnqueueResult; default opportunistic batching with linger and disabled options.RpcExceptionto expose FIBP error codes; tightened builder validation (e.g., client cert/key require TLS) and improved consume cancellation handling.Migration
io.grpcandcom.google.protobufdependencies and the Gradle protobuf plugin; zero external runtime deps.RpcException.getErrorCode()instead of gRPC status codes.Written for commit af8e350. Summary will update on new commits.