Go client SDK for the Fila message broker.
go get github.com/faisca/fila-gopackage main
import (
"context"
"fmt"
"log"
fila "github.com/faisca/fila-go"
)
func main() {
client, err := fila.Dial("localhost:5555")
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// Enqueue a message.
msgID, err := client.Enqueue(ctx, "my-queue", map[string]string{
"tenant": "acme",
}, []byte("hello world"))
if err != nil {
log.Fatal(err)
}
fmt.Println("Enqueued:", msgID)
// Consume messages.
ch, err := client.Consume(ctx, "my-queue")
if err != nil {
log.Fatal(err)
}
for msg := range ch {
fmt.Printf("Received: %s (attempt %d)\n", msg.ID, msg.AttemptCount)
// Acknowledge successful processing.
if err := client.Ack(ctx, "my-queue", msg.ID); err != nil {
// Negative acknowledge on failure.
client.Nack(ctx, "my-queue", msg.ID, err.Error())
}
}
}If the broker's certificate is issued by a public CA (e.g., Let's Encrypt) or a CA already in the operating system's trust store, enable TLS with no arguments:
client, err := fila.Dial("broker.example.com:5555",
fila.WithTLS(),
)For self-signed certificates or private CAs not in the system trust store, provide the CA certificate explicitly:
caCert, err := os.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
client, err := fila.Dial("localhost:5555",
fila.WithTLSCACert(caCert),
)For mutual TLS (mTLS), also provide the client certificate and key:
caCert, err := os.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
clientCert, err := os.ReadFile("client.crt")
if err != nil {
log.Fatal(err)
}
clientKey, err := os.ReadFile("client.key")
if err != nil {
log.Fatal(err)
}
client, err := fila.Dial("localhost:5555",
fila.WithTLSCACert(caCert),
fila.WithTLSClientCert(clientCert, clientKey),
)Connect to an auth-enabled broker by providing an API key:
client, err := fila.Dial("localhost:5555",
fila.WithAPIKey("your-api-key"),
)TLS and API key auth can be combined:
client, err := fila.Dial("localhost:5555",
fila.WithTLSCACert(caCert),
fila.WithTLSClientCert(clientCert, clientKey),
fila.WithAPIKey("your-api-key"),
)Connect to a Fila broker. Connection is established lazily on the first RPC call.
fila.WithTLS()— Enable TLS using the system's default root CA poolfila.WithTLSCACert(caCertPEM []byte)— Enable TLS with a custom CA certificate for verifying the serverfila.WithTLSClientCert(certPEM, keyPEM []byte)— Client certificate and key for mTLS (requiresWithTLSorWithTLSCACert)fila.WithAPIKey(key string)— API key sent asBearertoken on every RPCfila.WithGRPCDialOption(opt grpc.DialOption)— Raw gRPC dial option for advanced configuration
Enqueue a message. Returns the broker-assigned message ID.
Open a streaming consumer. Returns a channel that delivers messages as they become available. The channel is closed when the stream ends or the context is cancelled.
Acknowledge a successfully processed message. The message is permanently removed.
Negatively acknowledge a failed message. The message is requeued or routed to the dead-letter queue based on the queue's configuration.
Per-operation sentinel errors are checkable via errors.Is:
_, err := client.Enqueue(ctx, "missing-queue", nil, []byte("test"))
if errors.Is(err, fila.ErrQueueNotFound) {
// handle queue not found
}
err = client.Ack(ctx, "my-queue", "missing-id")
if errors.Is(err, fila.ErrMessageNotFound) {
// handle message not found
}AGPLv3