Skip to content

feat: migrate from grpc to fila binary protocol (fibp)#10

Open
vieiralucas wants to merge 9 commits intomainfrom
feat/21.3-binary-protocol
Open

feat: migrate from grpc to fila binary protocol (fibp)#10
vieiralucas wants to merge 9 commits intomainfrom
feat/21.3-binary-protocol

Conversation

@vieiralucas
Copy link
Copy Markdown
Member

@vieiralucas vieiralucas commented Apr 4, 2026

Summary

  • Replace gRPC transport (@grpc/grpc-js, @grpc/proto-loader) with native FIBP binary protocol over TCP
  • Add src/fibp/ codec module: Encoder/Decoder/FrameReader with all encoding primitives (big-endian integers, length-prefixed strings/bytes/maps, optional types) and continuation frame reassembly
  • Add src/connection.ts connection manager: TCP + TLS wrapping, FIBP handshake with API key auth, request/response multiplexing via request IDs, server-push Delivery frame routing, ping/pong keepalive
  • Rewrite client with same public API surface (enqueue, enqueueMany, consume, ack, nack) plus new admin ops (createQueue, deleteQueue, getStats, listQueues, setConfig, getConfig, listConfig, redrive) and auth ops (createApiKey, revokeApiKey, listApiKeys, setAcl, getAcl)
  • Add connect() method -- required before any operations (was implicit with gRPC)
  • Rewrite batcher to use FIBP connection instead of gRPC client
  • Expand error types: 18 FIBP error codes mapped to specific error classes (QueueNotFoundError, UnauthorizedError, ForbiddenError, NotLeaderError, etc.)
  • Delete all proto files, generated TypeScript types, and gRPC dependencies
  • Net: -441 lines, zero runtime dependencies

Test plan

  • 17 codec unit tests (encoder/decoder round-trips, frame reader, continuation frames)
  • 7 batcher unit tests (constructor validation, batch modes)
  • 3 client integration tests (enqueue+consume+ack, nack redeliver, queue-not-found error)
  • 12 batch integration tests (enqueueMany, auto/disabled/linger modes, drain on close)
  • 8 auth integration tests (API key auth, server-TLS, mTLS + API key)
  • All 47 tests pass, lint clean, typecheck clean

Summary by cubic

Switches the SDK transport from gRPC to the Fila binary protocol (FIBP) over TCP for a leaner, faster client. Keeps the core API, adds admin/auth APIs, and removes gRPC/proto dependencies; also adds leader-hint reconnect on consume and several reliability fixes.

  • New Features

    • Native FIBP codec and TCP/TLS connection manager with API key handshake and optional mTLS.
    • Multiplexed requests, server-push delivery routing, ping/pong keepalive, and transparent leader-hint reconnect on consume streams.
    • Core APIs kept (enqueue, enqueueMany, consume, ack, nack); new admin/auth ops (create/delete queue, stats, config, redrive, API key and ACL management).
    • Smarter batching: enqueueMany, auto/linger/disabled modes, and drain-on-close.
    • Typed errors mapped from 18 FIBP error codes (e.g., QueueNotFoundError, UnauthorizedError, NotLeaderError).
    • Reliability: IPv6 [host]:port parsing, send Disconnect before close, keep request IDs unsigned, validate leader redirects, validate batch size > 0, and a codec grow() fix to avoid infinite loops.
    • No runtime deps; proto files and generated types removed.
  • Migration

    • Install/import now uses fila-client (was @fila/client): update package and imports.
    • Call client.connect() before any operation, and await client.close() when done.
    • Remove gRPC/proto build steps and deps (@grpc/grpc-js, @grpc/proto-loader).
    • TLS: enabling TLS without a CA uses the system trust store; use caCert/clientCert/clientKey in Client options for custom CA or mTLS.
    • Backward compatibility: RPCError now aliases ProtocolError.

Written for commit c26d1d6. Summary will update on new commits.

feat: transparent leader hint reconnect on consume
add batchEnqueue() for explicit batch operations with per-message
results. enqueue() now routes through an auto-batcher by default
(setImmediate-based opportunistic batching). three batch modes:
auto (default), linger (timer-based), and disabled. single-item
optimization uses Enqueue RPC to preserve error types. consume
stream transparently unpacks batched ConsumeResponse.messages
field. close() drains pending messages before disconnecting.
- update service.proto: single Enqueue RPC with repeated messages,
  typed per-result errors for enqueue/ack/nack, consume uses only
  repeated messages field
- remove batchEnqueue(), add enqueueMany() (no "batch" prefix)
- enqueue() wraps single message in repeated, parses first result
- ack()/nack() wrap single item in repeated, parse per-result errors
- consume() uses only messages field (singular message field removed)
- batcher uses unified Enqueue RPC for all batch sizes
- fix drain race: track in-flight RPCs to prevent premature resolution
- regenerate proto types, remove stale BatchEnqueue* generated files
replace grpc transport with native fibp binary protocol over tcp.
add fibp codec (encoder/decoder/frame reader), connection manager
with tls/handshake/multiplexing, and rewrite all client operations.
add admin ops (createQueue, deleteQueue, getStats, listQueues,
setConfig, getConfig, listConfig, redrive) and auth ops (createApiKey,
revokeApiKey, listApiKeys, setAcl, getAcl). remove @grpc/grpc-js,
@grpc/proto-loader, proto files, and generated code. all 47 tests pass.
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

13 issues found across 73 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="test/auth.test.ts">

<violation number="1" location="test/auth.test.ts:32">
P2: Wrap `client.connect()` in the same cleanup path as the rest of the test. A rejected handshake/TLS connect currently skips `client.close()` and can leak the underlying socket.</violation>
</file>

<file name="test/batch.test.ts">

<violation number="1" location="test/batch.test.ts:97">
P2: This assertion does not verify input ordering; it only checks that the two message IDs differ.</violation>
</file>

<file name="src/index.ts">

<violation number="1" location="src/index.ts:22">
P2: This drops the previously exported `RPCError` symbol from the package root, which is a breaking change for existing imports. Keep a compatibility alias when introducing `ProtocolError`.</violation>
</file>

<file name="test/batcher.unit.test.ts">

<violation number="1" location="test/batcher.unit.test.ts:9">
P2: Await `client.close()` in these tests; otherwise they can pass without actually verifying that the async close operation succeeds.</violation>
</file>

<file name="test/helpers.ts">

<violation number="1" location="test/helpers.ts:100">
P2: Close the readiness probe in a `finally` block so failed attempts do not leak client sockets.</violation>

<violation number="2" location="test/helpers.ts:127">
P2: Handle `adminClient.connect()` failures by cleaning up the spawned server and temp directory before rethrowing.</violation>
</file>

<file name="src/fibp/codec.ts">

<violation number="1" location="src/fibp/codec.ts:29">
P2: Infinite loop in `grow()` when buffer length is 0. If `initialCapacity` is 0 (or the buffer somehow becomes length 0), `newSize` starts at 0 and `0 * 2` stays 0 forever. Guard the doubling with a minimum size.</violation>
</file>

<file name="src/connection.ts">

<violation number="1" location="src/connection.ts:148">
P2: `close()` sets `closed` before sending `OP_DISCONNECT`, so the graceful disconnect frame is never transmitted.</violation>

<violation number="2" location="src/connection.ts:267">
P2: Bitwise wrapping turns request IDs negative after `2^31 - 1` requests, which breaks frame encoding.</violation>
</file>

<file name="src/client.ts">

<violation number="1" location="src/client.ts:93">
P2: `parseAddr()` breaks IPv6 addresses by treating the last colon as the host/port separator.</violation>

<violation number="2" location="src/client.ts:382">
P1: Leader-redirected consumers outlive `client.close()` because the extra `leaderConn` is never tracked or closed by the client.</violation>

<violation number="3" location="src/client.ts:412">
P2: The redirected consume path should validate `OP_CONSUME_OK` before entering the delivery loop.</violation>
</file>

<file name="src/batcher.ts">

<violation number="1" location="src/batcher.ts:47">
P2: Validate batch sizes before storing them; `0`, negative, or `NaN` values make `flushAll()` spin forever.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

- test/auth: close client in finally on connect rejection (finding 1)
- test/batch: verify input ordering matches output (finding 2)
- src/index: re-export ProtocolError as RPCError for backward compat (finding 3)
- test/batcher.unit: await client.close() (finding 4)
- test/helpers: close readiness probe in finally block (finding 5)
- test/helpers: clean up server on adminClient.connect() failure (finding 6)
- src/fibp/codec: prevent grow() infinite loop on zero-length buffer (finding 7)
- src/connection: send disconnect frame before setting closed flag (finding 8)
- src/connection: use >>> 0 to keep request IDs unsigned (finding 9)
- src/client: track and close leader connections on client.close() (finding 10)
- src/client: handle IPv6 [host]:port in parseAddr (finding 11)
- src/client: validate ConsumeOk opcode on leader redirect (finding 12)
- src/batcher: validate batch size > 0 (finding 13)
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 9 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/batcher.ts">

<violation number="1" location="src/batcher.ts:55">
P1: Batch size validation misses `NaN` (and other non-integer numbers), which can cause an infinite loop in `flushAll()` when `splice` removes zero items repeatedly.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

this.maxBatchSize = 1;
}

if (this.maxBatchSize <= 0) {
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Batch size validation misses NaN (and other non-integer numbers), which can cause an infinite loop in flushAll() when splice removes zero items repeatedly.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/batcher.ts, line 55:

<comment>Batch size validation misses `NaN` (and other non-integer numbers), which can cause an infinite loop in `flushAll()` when `splice` removes zero items repeatedly.</comment>

<file context>
@@ -51,6 +51,10 @@ export class Batcher {
       this.maxBatchSize = 1;
     }
+
+    if (this.maxBatchSize <= 0) {
+      throw new Error(`batch size must be greater than 0, got ${this.maxBatchSize}`);
+    }
</file context>
Suggested change
if (this.maxBatchSize <= 0) {
if (!Number.isInteger(this.maxBatchSize) || this.maxBatchSize <= 0) {
Fix with Cubic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant