Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateTLSConfig(caCertFile, clientKeyFile, clientCertFile string, tlsSkipVerify bool) (*tls.Config, error)
- func MustNewUUID() string
- type AsLeaderConfig
- type Config
- type ConsumerConfig
- type INatty
- type KeyValueMap
- type Logger
- type Mode
- type Natty
- func (n *Natty) AsLeader(ctx context.Context, cfg AsLeaderConfig, f func() error) error
- func (n *Natty) Consume(ctx context.Context, cfg *ConsumerConfig, ...) error
- func (n *Natty) Create(ctx context.Context, bucket string, key string, data []byte, ...) error
- func (n *Natty) CreateBucket(_ context.Context, name string, ttl time.Duration, replicaCount int, ...) error
- func (n *Natty) CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error
- func (n *Natty) CreateStream(ctx context.Context, name string, subjects []string) error
- func (n *Natty) Delete(ctx context.Context, bucket string, key string) error
- func (n *Natty) DeleteBucket(_ context.Context, bucket string) error
- func (n *Natty) DeleteConsumer(ctx context.Context, consumerName, streamName string) error
- func (n *Natty) DeletePublisher(ctx context.Context, topic string) bool
- func (n *Natty) DeleteStream(ctx context.Context, name string) error
- func (n *Natty) Get(ctx context.Context, bucket string, key string) ([]byte, error)
- func (n *Natty) GetLeader(ctx context.Context, bucketName, keyName string) (string, error)
- func (n *Natty) HaveLeader(ctx context.Context, nodeName, bucketName, keyName string) bool
- func (n *Natty) Keys(ctx context.Context, bucket string) ([]string, error)
- func (n *Natty) Publish(ctx context.Context, subject string, value []byte)
- func (n *Natty) Put(ctx context.Context, bucket string, key string, data []byte, ...) error
- func (n *Natty) Refresh(_ context.Context, bucket string, key string) error
- func (n *Natty) Status(ctx context.Context, bucket string) (nats.KeyValueStatus, error)
- func (n *Natty) WatchBucket(ctx context.Context, bucket string) (nats.KeyWatcher, error)
- func (n *Natty) WatchKey(ctx context.Context, bucket, key string) (nats.KeyWatcher, error)
- type NoOpLogger
- func (l *NoOpLogger) Debug(args ...interface{})
- func (l *NoOpLogger) Debugf(format string, args ...interface{})
- func (l *NoOpLogger) Error(args ...interface{})
- func (l *NoOpLogger) Errorf(format string, args ...interface{})
- func (l *NoOpLogger) Info(args ...interface{})
- func (l *NoOpLogger) Infof(format string, args ...interface{})
- func (l *NoOpLogger) Warn(args ...interface{})
- func (l *NoOpLogger) Warnf(format string, args ...interface{})
- type PublishError
- type Publisher
Constants ¶
const ( DefaultAsLeaderBucketTTL = time.Second * 10 DefaultAsLeaderElectionLooperInterval = time.Second DefaultAsLeaderReplicaCount = 1 )
Variables ¶
var ( ErrEmptyStreamName = errors.New("StreamName cannot be empty") ErrEmptyConsumerName = errors.New("ConsumerName cannot be empty") ErrEmptySubject = errors.New("Subject cannot be empty") )
var (
ErrBucketTTLMismatch = errors.New("bucket ttl mismatch")
)
Functions ¶
func GenerateTLSConfig ¶
func MustNewUUID ¶
func MustNewUUID() string
Types ¶
type AsLeaderConfig ¶
type AsLeaderConfig struct { // Looper is the loop construct that will be used to execute Func (required) Looper director.Looper // Bucket specifies what K/V bucket will be used for leader election (required) Bucket string // Key specifies the keyname that the leader election will occur on (required) Key string // NodeName is the name used for this node; should be unique in cluster (required) NodeName string // Description will set the bucket description (optional) Description string // ElectionLooper allows you to override the used election looper (optional) ElectionLooper director.Looper // BucketTTL specifies the TTL policy the bucket should use (optional) BucketTTL time.Duration // ReplicaCount specifies the number of replicas the bucket should use (optional, default 1) ReplicaCount int }
type Config ¶
type Config struct { // NatsURL defines the NATS urls the library will attempt to connect to. Iff // first URL fails, we will try to connect to the next one. Only fail if all // URLs fail. NatsURL []string // MaxMsgs defines the maximum number of messages a stream will contain. MaxMsgs int64 // FetchSize defines the number of messages to fetch from the stream during // a single Fetch() call. FetchSize int // FetchTimeout defines how long a Fetch() call will wait to attempt to reach // defined FetchSize before continuing. FetchTimeout time.Duration // DeliverPolicy defines the policy the library will use to deliver messages. // Default: DeliverLastPolicy which will deliver from the last message that // the consumer has seen. DeliverPolicy nats.DeliverPolicy // Logger allows you to inject a logger into the library. Optional. Logger Logger // Whether to use TLS UseTLS bool // TLS CA certificate file TLSCACertFile string // TLS client certificate file TLSClientCertFile string // TLS client key file TLSClientKeyFile string // Do not perform server certificate checks TLSSkipVerify bool // PublishBatchSize is how many messages to async publish at once // Default: 256 PublishBatchSize int // ServiceShutdownContext is used by main() to shutdown services before application termination ServiceShutdownContext context.Context // MainShutdownFunc is triggered by watchForShutdown() after all publisher queues are exhausted // and is used to trigger shutdown of APIs and then main() MainShutdownFunc context.CancelFunc // WorkerIdleTimeout determines how long to keep a publish worker alive if no activity WorkerIdleTimeout time.Duration // PublishTimeout is how long to wait for a batch of async publish calls to be ACK'd PublishTimeout time.Duration // PublishErrorCh will receive any PublishErrorCh chan *PublishError }
type ConsumerConfig ¶
type ConsumerConfig struct { // Subject is the subject to consume off of a stream Subject string // StreamName is the name of JS stream to consume from. // This should first be created with CreateStream() StreamName string // ConsumerName is the consumer that was made with CreateConsumer() ConsumerName string // Looper is optional, if none is provided, one will be created Looper director.Looper // ErrorCh is used to retrieve any errors returned during asynchronous publishing // If nil, errors will only be logged ErrorCh chan error }
ConsumerConfig is used to pass configuration options to Consume()
type INatty ¶
type INatty interface { // Consume subscribes to given subject and executes callback every time a // message is received. Consumed messages must be explicitly ACK'd or NAK'd. // // This is a blocking call; cancellation should be performed via the context. Consume(ctx context.Context, cfg *ConsumerConfig, cb func(ctx context.Context, msg *nats.Msg) error) error // Publish publishes a single message with the given subject; this method // will perform automatic batching as configured during `natty.New(..)` Publish(ctx context.Context, subject string, data []byte) // DeletePublisher shuts down a publisher and deletes it from the internal publisherMap DeletePublisher(ctx context.Context, id string) bool // CreateStream creates a new stream if it does not exist CreateStream(ctx context.Context, name string, subjects []string) error // DeleteStream deletes an existing stream DeleteStream(ctx context.Context, name string) error // CreateConsumer creates a new consumer if it does not exist CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error // DeleteConsumer deletes an existing consumer DeleteConsumer(ctx context.Context, consumerName, streamName string) error // Get will fetch the value for a given bucket and key. Will NOT auto-create // bucket if it does not exist. Get(ctx context.Context, bucket string, key string) ([]byte, error) // Create will attempt to create a key in KV. It will return an error if // the key already exists. Will auto-create the bucket if it does not // already exist. Create(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error // Put will put a new value for a given bucket and key. Will auto-create // the bucket if it does not already exist. Put(ctx context.Context, bucket string, key string, data []byte, ttl ...time.Duration) error // Delete will delete a key from a given bucket. Will no-op if the bucket // or key does not exist. Delete(ctx context.Context, bucket string, key string) error // CreateBucket will attempt to create a new bucket. Will return an error if // bucket already exists. CreateBucket(ctx context.Context, bucket string, ttl time.Duration, replicas int, description ...string) error // DeleteBucket will delete the specified bucket DeleteBucket(ctx context.Context, bucket string) error // WatchBucket returns an instance of nats.KeyWatcher for the given bucket WatchBucket(ctx context.Context, bucket string) (nats.KeyWatcher, error) // WatchKey returns an instance of nats.KeyWatcher for the given bucket and key WatchKey(ctx context.Context, bucket, key string) (nats.KeyWatcher, error) // Keys will return all of the keys in a bucket (empty slice if none found) Keys(ctx context.Context, bucket string) ([]string, error) // Refresh will attempt to perform a "safe" refresh of a key that has a TTL. // Natty will first attempt to fetch the key so that it can get its revision // and then perform an Update() referencing the revision. If the revision // does not match, it will return an error. Refresh(ctx context.Context, bucket, key string) error // Status queries the status of the KV bucket Status(ctx context.Context, bucket string) (nats.KeyValueStatus, error) // AsLeader enables simple leader election by using NATS k/v functionality. // // AsLeader will execute opts.Func if and only if the node executing AsLeader // acquires leader role. It will continue executing opts.Func until it loses // leadership and another node becomes leader. AsLeader(ctx context.Context, opts AsLeaderConfig, f func() error) error // HaveLeader returns bool indicating whether node-name in given cfg is the // leader for the cfg.Bucket and cfg.Key HaveLeader(ctx context.Context, nodeName, bucketName, keyName string) bool // GetLeader returns the current leader for a given bucket and key GetLeader(ctx context.Context, bucketName, keyName string) (string, error) }
type KeyValueMap ¶
type KeyValueMap struct {
// contains filtered or unexported fields
}
func (*KeyValueMap) Delete ¶
func (k *KeyValueMap) Delete(key string)
Delete functionality is not used because there is no way to list buckets in NATS
func (*KeyValueMap) Get ¶
func (k *KeyValueMap) Get(key string) (nats.KeyValue, bool)
func (*KeyValueMap) Put ¶
func (k *KeyValueMap) Put(key string, value nats.KeyValue)
type Logger ¶
type Logger interface { // Debug sends out a debug message with the given arguments to the logger. Debug(args ...interface{}) // Debugf formats a debug message using the given arguments and sends it to the logger. Debugf(format string, args ...interface{}) // Info sends out an informational message with the given arguments to the logger. Info(args ...interface{}) // Infof formats an informational message using the given arguments and sends it to the logger. Infof(format string, args ...interface{}) // Warn sends out a warning message with the given arguments to the logger. Warn(args ...interface{}) // Warnf formats a warning message using the given arguments and sends it to the logger. Warnf(format string, args ...interface{}) // Error sends out an error message with the given arguments to the logger. Error(args ...interface{}) // Errorf formats an error message using the given arguments and sends it to the logger. Errorf(format string, args ...interface{}) }
Logger is the common interface for user-provided loggers.
type Natty ¶
type Natty struct { *Config // contains filtered or unexported fields }
func (*Natty) Consume ¶
func (n *Natty) Consume(ctx context.Context, cfg *ConsumerConfig, f func(ctx context.Context, msg *nats.Msg) error) error
Consume will create a durable consumer and consume messages from the configured stream
func (*Natty) Create ¶
func (n *Natty) Create(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error
Create will add the key/value pair iff it does not exist; it will create the bucket if it does not already exist. TTL is optional - it will only be used if the bucket does not exist & only the first TTL will be used.
func (*Natty) CreateBucket ¶
func (n *Natty) CreateBucket(_ context.Context, name string, ttl time.Duration, replicaCount int, description ...string) error
CreateBucket creates a bucket; returns an error if it already exists. Context usage not supported by NATS kv (yet).
func (*Natty) CreateConsumer ¶
func (*Natty) CreateStream ¶
func (*Natty) DeleteConsumer ¶
func (*Natty) DeletePublisher ¶
DeletePublisher will stop the batch publisher goroutine and remove the publisher from the shared publisher map.
It is safe to call this if a publisher for the topic does not exist.
Returns bool which indicate if publisher exists.
func (*Natty) HaveLeader ¶
func (*Natty) Put ¶
func (n *Natty) Put(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error
Put puts a key/val into a bucket and will create bucket if it doesn't already exit. TTL is optional - it will only be used if the bucket does not exist & only the first TTL will be used.
func (*Natty) Refresh ¶ added in v0.0.36
Refresh will refresh the TTL of a key in a bucket. Since there is no built-in way to perform a refresh, we will first get the key and then attempt to update it referencing the revision ID we got.
func (*Natty) WatchBucket ¶
WatchBucket returns an instance of nats.KeyWatcher for the given bucket
type NoOpLogger ¶
type NoOpLogger struct { }
NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.
func (*NoOpLogger) Debug ¶
func (l *NoOpLogger) Debug(args ...interface{})
Debug is no-op implementation of Logger's Debug.
func (*NoOpLogger) Debugf ¶
func (l *NoOpLogger) Debugf(format string, args ...interface{})
Debugf is no-op implementation of Logger's Debugf.
func (*NoOpLogger) Error ¶
func (l *NoOpLogger) Error(args ...interface{})
Error is no-op implementation of Logger's Error.
func (*NoOpLogger) Errorf ¶
func (l *NoOpLogger) Errorf(format string, args ...interface{})
Errorf is no-op implementation of Logger's Errorf.
func (*NoOpLogger) Info ¶
func (l *NoOpLogger) Info(args ...interface{})
Info is no-op implementation of Logger's Info.
func (*NoOpLogger) Infof ¶
func (l *NoOpLogger) Infof(format string, args ...interface{})
Infof is no-op implementation of Logger's Infof.
func (*NoOpLogger) Warn ¶
func (l *NoOpLogger) Warn(args ...interface{})
Warn is no-op implementation of Logger's Warn.
func (*NoOpLogger) Warnf ¶
func (l *NoOpLogger) Warnf(format string, args ...interface{})
Warnf is no-op implementation of Logger's Warnf.
type PublishError ¶
PublishError is a wrapper struct used to return errors to code that occur during async batch publishes
type Publisher ¶
type Publisher struct { Subject string QueueMutex *sync.RWMutex Queue []*message Natty *Natty IdleTimeout time.Duration // ErrorCh is optional. It will receive async publish errors if specified // Otherwise errors will only be logged ErrorCh chan *PublishError // PublisherContext is used to close a specific publisher PublisherContext context.Context // PublisherCancel is used to cancel a specific publisher's context PublisherCancel context.CancelFunc // ServiceShutdownContext is used by main() to shutdown services before application termination ServiceShutdownContext context.Context // contains filtered or unexported fields }