Documentation
¶
Index ¶
- Constants
- Variables
- func GenerateTLSConfig(caCertFile, clientKeyFile, clientCertFile string, tlsSkipVerify bool) (*tls.Config, error)
- func MustNewUUID() string
- type AsLeaderConfig
- type Config
- type ConsumerConfig
- type INatty
- type KeyValueMap
- type Logger
- type Mode
- type Natty
- func (n *Natty) AsLeader(ctx context.Context, cfg AsLeaderConfig, f func() error) error
- func (n *Natty) Consume(ctx context.Context, cfg *ConsumerConfig, ...) error
- func (n *Natty) Create(ctx context.Context, bucket string, key string, data []byte, ...) error
- func (n *Natty) CreateBucket(_ context.Context, name string, ttl time.Duration, replicaCount int, ...) error
- func (n *Natty) CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error
- func (n *Natty) CreateStream(ctx context.Context, name string, subjects []string) error
- func (n *Natty) Delete(ctx context.Context, bucket string, key string) error
- func (n *Natty) DeleteBucket(_ context.Context, bucket string) error
- func (n *Natty) DeleteConsumer(ctx context.Context, consumerName, streamName string) error
- func (n *Natty) DeletePublisher(ctx context.Context, topic string) bool
- func (n *Natty) DeleteStream(ctx context.Context, name string) error
- func (n *Natty) Get(ctx context.Context, bucket string, key string) ([]byte, error)
- func (n *Natty) GetLeader(ctx context.Context, bucketName, keyName string) (string, error)
- func (n *Natty) HaveLeader(ctx context.Context, nodeName, bucketName, keyName string) bool
- func (n *Natty) Keys(ctx context.Context, bucket string) ([]string, error)
- func (n *Natty) Publish(ctx context.Context, subject string, value []byte)
- func (n *Natty) Put(ctx context.Context, bucket string, key string, data []byte, ...) error
- func (n *Natty) Refresh(_ context.Context, bucket string, key string) error
- func (n *Natty) Status(ctx context.Context, bucket string) (nats.KeyValueStatus, error)
- func (n *Natty) WatchBucket(ctx context.Context, bucket string) (nats.KeyWatcher, error)
- func (n *Natty) WatchKey(ctx context.Context, bucket, key string) (nats.KeyWatcher, error)
- type NoOpLogger
- func (l *NoOpLogger) Debug(args ...interface{})
- func (l *NoOpLogger) Debugf(format string, args ...interface{})
- func (l *NoOpLogger) Error(args ...interface{})
- func (l *NoOpLogger) Errorf(format string, args ...interface{})
- func (l *NoOpLogger) Info(args ...interface{})
- func (l *NoOpLogger) Infof(format string, args ...interface{})
- func (l *NoOpLogger) Warn(args ...interface{})
- func (l *NoOpLogger) Warnf(format string, args ...interface{})
- type PublishError
- type Publisher
Constants ¶
const ( DefaultAsLeaderBucketTTL = time.Second * 10 DefaultAsLeaderElectionLooperInterval = time.Second DefaultAsLeaderReplicaCount = 1 )
const ( DefaultMaxMsgs = 10_000 DefaultFetchSize = 100 DefaultFetchTimeout = time.Second * 1 DefaultDeliverPolicy = nats.DeliverLastPolicy DefaultSubBatchSize = 256 DefaultWorkerIdleTimeout = time.Minute DefaultPublishTimeout = time.Second * 5 // TODO: figure out a good value for this )
Variables ¶
var ( ErrEmptyStreamName = errors.New("StreamName cannot be empty") ErrEmptyConsumerName = errors.New("ConsumerName cannot be empty") ErrEmptySubject = errors.New("Subject cannot be empty") )
var (
ErrBucketTTLMismatch = errors.New("bucket ttl mismatch")
)
Functions ¶
func GenerateTLSConfig ¶
func MustNewUUID ¶
func MustNewUUID() string
Types ¶
type AsLeaderConfig ¶
type AsLeaderConfig struct {
// Looper is the loop construct that will be used to execute Func (required)
Looper director.Looper
// Bucket specifies what K/V bucket will be used for leader election (required)
Bucket string
// Key specifies the keyname that the leader election will occur on (required)
Key string
// NodeName is the name used for this node; should be unique in cluster (required)
NodeName string
// Description will set the bucket description (optional)
Description string
// ElectionLooper allows you to override the used election looper (optional)
ElectionLooper director.Looper
// BucketTTL specifies the TTL policy the bucket should use (optional)
BucketTTL time.Duration
// ReplicaCount specifies the number of replicas the bucket should use (optional, default 1)
ReplicaCount int
}
type Config ¶
type Config struct {
// NatsURL defines the NATS urls the library will attempt to connect to. Iff
// first URL fails, we will try to connect to the next one. Only fail if all
// URLs fail.
NatsURL []string
// MaxMsgs defines the maximum number of messages a stream will contain.
MaxMsgs int64
// FetchSize defines the number of messages to fetch from the stream during
// a single Fetch() call.
FetchSize int
// FetchTimeout defines how long a Fetch() call will wait to attempt to reach
// defined FetchSize before continuing.
FetchTimeout time.Duration
// DeliverPolicy defines the policy the library will use to deliver messages.
// Default: DeliverLastPolicy which will deliver from the last message that
// the consumer has seen.
DeliverPolicy nats.DeliverPolicy
// Logger allows you to inject a logger into the library. Optional.
Logger Logger
// Whether to use TLS
UseTLS bool
// TLS CA certificate file
TLSCACertFile string
// TLS client certificate file
TLSClientCertFile string
// TLS client key file
TLSClientKeyFile string
// Do not perform server certificate checks
TLSSkipVerify bool
// PublishBatchSize is how many messages to async publish at once
// Default: 256
PublishBatchSize int
// ServiceShutdownContext is used by main() to shutdown services before application termination
ServiceShutdownContext context.Context
// MainShutdownFunc is triggered by watchForShutdown() after all publisher queues are exhausted
// and is used to trigger shutdown of APIs and then main()
MainShutdownFunc context.CancelFunc
// WorkerIdleTimeout determines how long to keep a publish worker alive if no activity
WorkerIdleTimeout time.Duration
// PublishTimeout is how long to wait for a batch of async publish calls to be ACK'd
PublishTimeout time.Duration
// PublishErrorCh will receive any
PublishErrorCh chan *PublishError
}
type ConsumerConfig ¶
type ConsumerConfig struct {
// Subject is the subject to consume off of a stream
Subject string
// StreamName is the name of JS stream to consume from.
// This should first be created with CreateStream()
StreamName string
// ConsumerName is the consumer that was made with CreateConsumer()
ConsumerName string
// Looper is optional, if none is provided, one will be created
Looper director.Looper
// ErrorCh is used to retrieve any errors returned during asynchronous publishing
// If nil, errors will only be logged
ErrorCh chan error
}
ConsumerConfig is used to pass configuration options to Consume()
type INatty ¶
type INatty interface {
// Consume subscribes to given subject and executes callback every time a
// message is received. Consumed messages must be explicitly ACK'd or NAK'd.
//
// This is a blocking call; cancellation should be performed via the context.
Consume(ctx context.Context, cfg *ConsumerConfig, cb func(ctx context.Context, msg *nats.Msg) error) error
// Publish publishes a single message with the given subject; this method
// will perform automatic batching as configured during `natty.New(..)`
Publish(ctx context.Context, subject string, data []byte)
// DeletePublisher shuts down a publisher and deletes it from the internal publisherMap
DeletePublisher(ctx context.Context, id string) bool
// CreateStream creates a new stream if it does not exist
CreateStream(ctx context.Context, name string, subjects []string) error
// DeleteStream deletes an existing stream
DeleteStream(ctx context.Context, name string) error
// CreateConsumer creates a new consumer if it does not exist
CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error
// DeleteConsumer deletes an existing consumer
DeleteConsumer(ctx context.Context, consumerName, streamName string) error
// Get will fetch the value for a given bucket and key. Will NOT auto-create
// bucket if it does not exist.
Get(ctx context.Context, bucket string, key string) ([]byte, error)
// Create will attempt to create a key in KV. It will return an error if
// the key already exists. Will auto-create the bucket if it does not
// already exist.
Create(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error
// Put will put a new value for a given bucket and key. Will auto-create
// the bucket if it does not already exist.
Put(ctx context.Context, bucket string, key string, data []byte, ttl ...time.Duration) error
// Delete will delete a key from a given bucket. Will no-op if the bucket
// or key does not exist.
Delete(ctx context.Context, bucket string, key string) error
// CreateBucket will attempt to create a new bucket. Will return an error if
// bucket already exists.
CreateBucket(ctx context.Context, bucket string, ttl time.Duration, replicas int, description ...string) error
// DeleteBucket will delete the specified bucket
DeleteBucket(ctx context.Context, bucket string) error
// WatchBucket returns an instance of nats.KeyWatcher for the given bucket
WatchBucket(ctx context.Context, bucket string) (nats.KeyWatcher, error)
// WatchKey returns an instance of nats.KeyWatcher for the given bucket and key
WatchKey(ctx context.Context, bucket, key string) (nats.KeyWatcher, error)
// Keys will return all of the keys in a bucket (empty slice if none found)
Keys(ctx context.Context, bucket string) ([]string, error)
// Refresh will attempt to perform a "safe" refresh of a key that has a TTL.
// Natty will first attempt to fetch the key so that it can get its revision
// and then perform an Update() referencing the revision. If the revision
// does not match, it will return an error.
Refresh(ctx context.Context, bucket, key string) error
// Status queries the status of the KV bucket
Status(ctx context.Context, bucket string) (nats.KeyValueStatus, error)
// AsLeader enables simple leader election by using NATS k/v functionality.
//
// AsLeader will execute opts.Func if and only if the node executing AsLeader
// acquires leader role. It will continue executing opts.Func until it loses
// leadership and another node becomes leader.
AsLeader(ctx context.Context, opts AsLeaderConfig, f func() error) error
// HaveLeader returns bool indicating whether node-name in given cfg is the
// leader for the cfg.Bucket and cfg.Key
HaveLeader(ctx context.Context, nodeName, bucketName, keyName string) bool
// GetLeader returns the current leader for a given bucket and key
GetLeader(ctx context.Context, bucketName, keyName string) (string, error)
}
type KeyValueMap ¶
type KeyValueMap struct {
// contains filtered or unexported fields
}
func (*KeyValueMap) Delete ¶
func (k *KeyValueMap) Delete(key string)
Delete functionality is not used because there is no way to list buckets in NATS
type Logger ¶
type Logger interface {
// Debug sends out a debug message with the given arguments to the logger.
Debug(args ...interface{})
// Debugf formats a debug message using the given arguments and sends it to the logger.
Debugf(format string, args ...interface{})
// Info sends out an informational message with the given arguments to the logger.
Info(args ...interface{})
// Infof formats an informational message using the given arguments and sends it to the logger.
Infof(format string, args ...interface{})
// Warn sends out a warning message with the given arguments to the logger.
Warn(args ...interface{})
// Warnf formats a warning message using the given arguments and sends it to the logger.
Warnf(format string, args ...interface{})
// Error sends out an error message with the given arguments to the logger.
Error(args ...interface{})
// Errorf formats an error message using the given arguments and sends it to the logger.
Errorf(format string, args ...interface{})
}
Logger is the common interface for user-provided loggers.
type Natty ¶
type Natty struct {
*Config
// contains filtered or unexported fields
}
func (*Natty) Consume ¶
func (n *Natty) Consume(ctx context.Context, cfg *ConsumerConfig, f func(ctx context.Context, msg *nats.Msg) error) error
Consume will create a durable consumer and consume messages from the configured stream
func (*Natty) Create ¶
func (n *Natty) Create(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error
Create will add the key/value pair iff it does not exist; it will create the bucket if it does not already exist. TTL is optional - it will only be used if the bucket does not exist & only the first TTL will be used.
func (*Natty) CreateBucket ¶
func (n *Natty) CreateBucket(_ context.Context, name string, ttl time.Duration, replicaCount int, description ...string) error
CreateBucket creates a bucket; returns an error if it already exists. Context usage not supported by NATS kv (yet).
func (*Natty) CreateConsumer ¶
func (*Natty) CreateStream ¶
func (*Natty) DeleteConsumer ¶
func (*Natty) DeletePublisher ¶
DeletePublisher will stop the batch publisher goroutine and remove the publisher from the shared publisher map.
It is safe to call this if a publisher for the topic does not exist.
Returns bool which indicate if publisher exists.
func (*Natty) HaveLeader ¶
func (*Natty) Put ¶
func (n *Natty) Put(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error
Put puts a key/val into a bucket and will create bucket if it doesn't already exit. TTL is optional - it will only be used if the bucket does not exist & only the first TTL will be used.
func (*Natty) Refresh ¶ added in v0.0.36
Refresh will refresh the TTL of a key in a bucket. Since there is no built-in way to perform a refresh, we will first get the key and then attempt to update it referencing the revision ID we got.
func (*Natty) WatchBucket ¶
WatchBucket returns an instance of nats.KeyWatcher for the given bucket
type NoOpLogger ¶
type NoOpLogger struct {
}
NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.
func (*NoOpLogger) Debug ¶
func (l *NoOpLogger) Debug(args ...interface{})
Debug is no-op implementation of Logger's Debug.
func (*NoOpLogger) Debugf ¶
func (l *NoOpLogger) Debugf(format string, args ...interface{})
Debugf is no-op implementation of Logger's Debugf.
func (*NoOpLogger) Error ¶
func (l *NoOpLogger) Error(args ...interface{})
Error is no-op implementation of Logger's Error.
func (*NoOpLogger) Errorf ¶
func (l *NoOpLogger) Errorf(format string, args ...interface{})
Errorf is no-op implementation of Logger's Errorf.
func (*NoOpLogger) Info ¶
func (l *NoOpLogger) Info(args ...interface{})
Info is no-op implementation of Logger's Info.
func (*NoOpLogger) Infof ¶
func (l *NoOpLogger) Infof(format string, args ...interface{})
Infof is no-op implementation of Logger's Infof.
func (*NoOpLogger) Warn ¶
func (l *NoOpLogger) Warn(args ...interface{})
Warn is no-op implementation of Logger's Warn.
func (*NoOpLogger) Warnf ¶
func (l *NoOpLogger) Warnf(format string, args ...interface{})
Warnf is no-op implementation of Logger's Warnf.
type PublishError ¶
PublishError is a wrapper struct used to return errors to code that occur during async batch publishes
type Publisher ¶
type Publisher struct {
Subject string
QueueMutex *sync.RWMutex
Queue []*message
Natty *Natty
IdleTimeout time.Duration
// ErrorCh is optional. It will receive async publish errors if specified
// Otherwise errors will only be logged
ErrorCh chan *PublishError
// PublisherContext is used to close a specific publisher
PublisherContext context.Context
// PublisherCancel is used to cancel a specific publisher's context
PublisherCancel context.CancelFunc
// ServiceShutdownContext is used by main() to shutdown services before application termination
ServiceShutdownContext context.Context
// contains filtered or unexported fields
}