feat: migrate from grpc to fila binary protocol (fibp)#10
Open
vieiralucas wants to merge 9 commits intomainfrom
Open
feat: migrate from grpc to fila binary protocol (fibp)#10vieiralucas wants to merge 9 commits intomainfrom
vieiralucas wants to merge 9 commits intomainfrom
Conversation
feat: transparent leader hint reconnect on consume
add batchEnqueue() for explicit batch operations with per-message results. enqueue() now routes through an auto-batcher by default (setImmediate-based opportunistic batching). three batch modes: auto (default), linger (timer-based), and disabled. single-item optimization uses Enqueue RPC to preserve error types. consume stream transparently unpacks batched ConsumeResponse.messages field. close() drains pending messages before disconnecting.
- update service.proto: single Enqueue RPC with repeated messages, typed per-result errors for enqueue/ack/nack, consume uses only repeated messages field - remove batchEnqueue(), add enqueueMany() (no "batch" prefix) - enqueue() wraps single message in repeated, parses first result - ack()/nack() wrap single item in repeated, parse per-result errors - consume() uses only messages field (singular message field removed) - batcher uses unified Enqueue RPC for all batch sizes - fix drain race: track in-flight RPCs to prevent premature resolution - regenerate proto types, remove stale BatchEnqueue* generated files
replace grpc transport with native fibp binary protocol over tcp. add fibp codec (encoder/decoder/frame reader), connection manager with tls/handshake/multiplexing, and rewrite all client operations. add admin ops (createQueue, deleteQueue, getStats, listQueues, setConfig, getConfig, listConfig, redrive) and auth ops (createApiKey, revokeApiKey, listApiKeys, setAcl, getAcl). remove @grpc/grpc-js, @grpc/proto-loader, proto files, and generated code. all 47 tests pass.
There was a problem hiding this comment.
13 issues found across 73 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="test/auth.test.ts">
<violation number="1" location="test/auth.test.ts:32">
P2: Wrap `client.connect()` in the same cleanup path as the rest of the test. A rejected handshake/TLS connect currently skips `client.close()` and can leak the underlying socket.</violation>
</file>
<file name="test/batch.test.ts">
<violation number="1" location="test/batch.test.ts:97">
P2: This assertion does not verify input ordering; it only checks that the two message IDs differ.</violation>
</file>
<file name="src/index.ts">
<violation number="1" location="src/index.ts:22">
P2: This drops the previously exported `RPCError` symbol from the package root, which is a breaking change for existing imports. Keep a compatibility alias when introducing `ProtocolError`.</violation>
</file>
<file name="test/batcher.unit.test.ts">
<violation number="1" location="test/batcher.unit.test.ts:9">
P2: Await `client.close()` in these tests; otherwise they can pass without actually verifying that the async close operation succeeds.</violation>
</file>
<file name="test/helpers.ts">
<violation number="1" location="test/helpers.ts:100">
P2: Close the readiness probe in a `finally` block so failed attempts do not leak client sockets.</violation>
<violation number="2" location="test/helpers.ts:127">
P2: Handle `adminClient.connect()` failures by cleaning up the spawned server and temp directory before rethrowing.</violation>
</file>
<file name="src/fibp/codec.ts">
<violation number="1" location="src/fibp/codec.ts:29">
P2: Infinite loop in `grow()` when buffer length is 0. If `initialCapacity` is 0 (or the buffer somehow becomes length 0), `newSize` starts at 0 and `0 * 2` stays 0 forever. Guard the doubling with a minimum size.</violation>
</file>
<file name="src/connection.ts">
<violation number="1" location="src/connection.ts:148">
P2: `close()` sets `closed` before sending `OP_DISCONNECT`, so the graceful disconnect frame is never transmitted.</violation>
<violation number="2" location="src/connection.ts:267">
P2: Bitwise wrapping turns request IDs negative after `2^31 - 1` requests, which breaks frame encoding.</violation>
</file>
<file name="src/client.ts">
<violation number="1" location="src/client.ts:93">
P2: `parseAddr()` breaks IPv6 addresses by treating the last colon as the host/port separator.</violation>
<violation number="2" location="src/client.ts:382">
P1: Leader-redirected consumers outlive `client.close()` because the extra `leaderConn` is never tracked or closed by the client.</violation>
<violation number="3" location="src/client.ts:412">
P2: The redirected consume path should validate `OP_CONSUME_OK` before entering the delivery loop.</violation>
</file>
<file name="src/batcher.ts">
<violation number="1" location="src/batcher.ts:47">
P2: Validate batch sizes before storing them; `0`, negative, or `NaN` values make `flushAll()` spin forever.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
- test/auth: close client in finally on connect rejection (finding 1) - test/batch: verify input ordering matches output (finding 2) - src/index: re-export ProtocolError as RPCError for backward compat (finding 3) - test/batcher.unit: await client.close() (finding 4) - test/helpers: close readiness probe in finally block (finding 5) - test/helpers: clean up server on adminClient.connect() failure (finding 6) - src/fibp/codec: prevent grow() infinite loop on zero-length buffer (finding 7) - src/connection: send disconnect frame before setting closed flag (finding 8) - src/connection: use >>> 0 to keep request IDs unsigned (finding 9) - src/client: track and close leader connections on client.close() (finding 10) - src/client: handle IPv6 [host]:port in parseAddr (finding 11) - src/client: validate ConsumeOk opcode on leader redirect (finding 12) - src/batcher: validate batch size > 0 (finding 13)
There was a problem hiding this comment.
1 issue found across 9 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/batcher.ts">
<violation number="1" location="src/batcher.ts:55">
P1: Batch size validation misses `NaN` (and other non-integer numbers), which can cause an infinite loop in `flushAll()` when `splice` removes zero items repeatedly.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| this.maxBatchSize = 1; | ||
| } | ||
|
|
||
| if (this.maxBatchSize <= 0) { |
There was a problem hiding this comment.
P1: Batch size validation misses NaN (and other non-integer numbers), which can cause an infinite loop in flushAll() when splice removes zero items repeatedly.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/batcher.ts, line 55:
<comment>Batch size validation misses `NaN` (and other non-integer numbers), which can cause an infinite loop in `flushAll()` when `splice` removes zero items repeatedly.</comment>
<file context>
@@ -51,6 +51,10 @@ export class Batcher {
this.maxBatchSize = 1;
}
+
+ if (this.maxBatchSize <= 0) {
+ throw new Error(`batch size must be greater than 0, got ${this.maxBatchSize}`);
+ }
</file context>
Suggested change
| if (this.maxBatchSize <= 0) { | |
| if (!Number.isInteger(this.maxBatchSize) || this.maxBatchSize <= 0) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
src/fibp/codec module: Encoder/Decoder/FrameReader with all encoding primitives (big-endian integers, length-prefixed strings/bytes/maps, optional types) and continuation frame reassemblysrc/connection.tsconnection manager: TCP + TLS wrapping, FIBP handshake with API key auth, request/response multiplexing via request IDs, server-push Delivery frame routing, ping/pong keepaliveconnect()method -- required before any operations (was implicit with gRPC)Test plan
Summary by cubic
Switches the SDK transport from gRPC to the Fila binary protocol (FIBP) over TCP for a leaner, faster client. Keeps the core API, adds admin/auth APIs, and removes gRPC/proto dependencies; also adds leader-hint reconnect on consume and several reliability fixes.
New Features
enqueueMany, auto/linger/disabled modes, and drain-on-close.[host]:portparsing, send Disconnect before close, keep request IDs unsigned, validate leader redirects, validate batch size > 0, and a codec grow() fix to avoid infinite loops.Migration
fila-client(was@fila/client): update package and imports.@grpc/grpc-js,@grpc/proto-loader).RPCErrornow aliasesProtocolError.Written for commit c26d1d6. Summary will update on new commits.