Documentation ¶
Overview ¶
Package liftbridge implements a client for the Liftbridge messaging system. Liftbridge provides lightweight, fault-tolerant message streams by implementing a durable stream augmentation NATS. In particular, it offers a publish-subscribe log API that is highly available and horizontally scalable.
This package provides APIs for creating and consuming Liftbridge streams and some utility APIs for using Liftbridge in combination with NATS.
Index ¶
- Constants
- Variables
- func NewMessage(value []byte, options ...MessageOption) []byte
- type Ack
- type AckHandler
- type AckPolicy
- type BrokerInfo
- type Client
- type ClientOption
- type ClientOptions
- type Handler
- type Message
- func (m *Message) Headers() map[string][]byte
- func (m *Message) Key() []byte
- func (m *Message) Offset() int64
- func (m *Message) Partition() int32
- func (m *Message) ReplySubject() string
- func (m *Message) Stream() string
- func (m *Message) Subject() string
- func (m *Message) Timestamp() time.Time
- func (m *Message) Value() []byte
- type MessageOption
- func AckInbox(ackInbox string) MessageOption
- func AckPolicyAll() MessageOption
- func AckPolicyLeader() MessageOption
- func AckPolicyNone() MessageOption
- func CorrelationID(correlationID string) MessageOption
- func Header(name string, value []byte) MessageOption
- func Headers(headers map[string][]byte) MessageOption
- func Key(key []byte) MessageOption
- func PartitionBy(partitioner Partitioner) MessageOption
- func PartitionByKey() MessageOption
- func PartitionByRoundRobin() MessageOption
- func ToPartition(partition int32) MessageOption
- type MessageOptions
- type Metadata
- type PartitionInfo
- type Partitioner
- type PauseOption
- type PauseOptions
- type ReadonlyOption
- type ReadonlyOptions
- type StartPosition
- type StreamInfo
- type StreamOption
- func CleanerInterval(val time.Duration) StreamOption
- func CompactEnabled(val bool) StreamOption
- func CompactMaxGoroutines(val int32) StreamOption
- func Group(group string) StreamOption
- func MaxReplication() StreamOption
- func Partitions(partitions int32) StreamOption
- func ReplicationFactor(replicationFactor int32) StreamOption
- func RetentionMaxAge(val time.Duration) StreamOption
- func RetentionMaxBytes(val int64) StreamOption
- func RetentionMaxMessages(val int64) StreamOption
- func SegmentMaxAge(val time.Duration) StreamOption
- func SegmentMaxBytes(val int64) StreamOption
- type StreamOptions
- type SubscriptionOption
- func Partition(partition int32) SubscriptionOption
- func ReadISRReplica() SubscriptionOption
- func StartAtEarliestReceived() SubscriptionOption
- func StartAtLatestReceived() SubscriptionOption
- func StartAtOffset(offset int64) SubscriptionOption
- func StartAtTime(start time.Time) SubscriptionOption
- func StartAtTimeDelta(ago time.Duration) SubscriptionOption
- type SubscriptionOptions
Examples ¶
Constants ¶
const MaxReplicationFactor int32 = -1
MaxReplicationFactor can be used to tell the server to set the replication factor equal to the current number of servers in the cluster when creating a stream.
Variables ¶
var ( // ErrStreamExists is returned by CreateStream if the specified stream // already exists in the Liftbridge cluster. ErrStreamExists = errors.New("stream already exists") // ErrNoSuchStream is returned by DeleteStream if the specified stream does // not exist in the Liftbridge cluster. ErrNoSuchStream = errors.New("stream does not exist") // ErrNoSuchPartition is returned by Subscribe if the specified stream // partition does not exist in the Liftbridge cluster. ErrNoSuchPartition = errors.New("stream partition does not exist") // ErrStreamDeleted is sent to subscribers when the stream they are // subscribed to has been deleted. ErrStreamDeleted = errors.New("stream has been deleted") // ErrPartitionPaused is sent to subscribers when the stream partition they // are subscribed to has been paused. ErrPartitionPaused = errors.New("stream partition has been paused") // ErrAckTimeout indicates a publish ack was not received in time. ErrAckTimeout = errors.New("publish ack timeout") // ErrEndOfReadonlyPartition is sent to subscribers when the stream // partition they are subscribed to has either been set to readonly or is // already readonly and all messages have been read. ErrEndOfReadonlyPartition = errors.New("end of readonly partition reached") )
Functions ¶
func NewMessage ¶
func NewMessage(value []byte, options ...MessageOption) []byte
NewMessage returns a serialized message for the given payload and options.
Example ¶
// Create NATS connection. conn, err := nats.GetDefaultOptions().Connect() if err != nil { panic(err) } defer conn.Flush() defer conn.Close() // Publish simple message. msg := NewMessage([]byte("value")) if err := conn.Publish("foo", msg); err != nil { panic(err) } // Publish message with options. msg = NewMessage([]byte("value"), Key([]byte("key")), AckPolicyAll(), AckInbox("ack"), CorrelationID("123"), ) if err := conn.Publish("foo", msg); err != nil { panic(err) }
Output:
Types ¶
type Ack ¶
type Ack struct {
// contains filtered or unexported fields
}
Ack represents an acknowledgement that a message was committed to a stream partition.
func UnmarshalAck ¶
UnmarshalAck deserializes an Ack from the given byte slice. It returns an error if the given data is not actually an Ack.
Example ¶
// Create NATS connection. conn, err := nats.GetDefaultOptions().Connect() if err != nil { panic(err) } defer conn.Close() // Setup ack inbox. ackInbox := "acks" acked := make(chan struct{}) _, err = conn.Subscribe(ackInbox, func(m *nats.Msg) { ack, err := UnmarshalAck(m.Data) if err != nil { panic(err) } fmt.Println("ack:", ack.Stream(), ack.Offset(), ack.MessageSubject()) close(acked) }) if err != nil { panic(err) } // Publish message. msg := NewMessage([]byte("value"), Key([]byte("key")), AckInbox(ackInbox)) if err := conn.Publish("foo", msg); err != nil { panic(err) } <-acked
Output:
func (*Ack) CorrelationID ¶
CorrelationID is the user-supplied value from the message.
func (*Ack) MessageSubject ¶
MessageSubject is the NATS subject the message was received on.
func (*Ack) PartitionSubject ¶
PartitionSubject is the NATS subject the partition is attached to.
type AckHandler ¶
AckHandler is used to handle the results of asynchronous publishes to a stream. If the AckPolicy on the published message is not NONE, the handler will receive the ack once it's received from the cluster or an error if the message was not received successfully.
type BrokerInfo ¶
type BrokerInfo struct {
// contains filtered or unexported fields
}
BrokerInfo contains information for a Liftbridge cluster node.
func (*BrokerInfo) Addr ¶
func (b *BrokerInfo) Addr() string
Addr returns <host>:<port> for the broker server.
type Client ¶
type Client interface { // Close the client connection. Close() error // CreateStream creates a new stream attached to a NATS subject. Subject is // the NATS subject the stream is attached to, and name is the stream // identifier, unique per subject. It returns ErrStreamExists if a stream // with the given subject and name already exists. CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error // DeleteStream deletes a stream and all of its partitions. Name is the // stream identifier, globally unique. DeleteStream(ctx context.Context, name string) error // PauseStream pauses a stream and some or all of its partitions. Name is // the stream identifier, globally unique. It returns an ErrNoSuchPartition // if the given stream or partition does not exist. By default, this will // pause all partitions. A partition is resumed when it is published to via // the Liftbridge Publish API or ResumeAll is enabled and another partition // in the stream is published to. PauseStream(ctx context.Context, name string, opts ...PauseOption) error // SetStreamReadonly sets the readonly flag on a stream and some or all of // its partitions. Name is the stream identifier, globally unique. It // returns an ErrNoSuchPartition if the given stream or partition does not // exist. By default, this will set the readonly flag on all partitions. // Subscribers to a readonly partition will see their subscription ended // with a ErrEndOfReadonlyPartition error once all messages currently in // the partition have been read. SetStreamReadonly(ctx context.Context, name string, opts ...ReadonlyOption) error // Subscribe creates an ephemeral subscription for the given stream. It // begins receiving messages starting at the configured position and waits // for new messages when it reaches the end of the stream. The default // start position is the end of the stream. It returns an // ErrNoSuchPartition if the given stream or partition does not exist. Use // a cancelable Context to close a subscription. Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error // Publish publishes a new message to the Liftbridge stream. The partition // that gets published to is determined by the provided partition or // Partitioner passed through MessageOptions, if any. If a partition or // Partitioner is not provided, this defaults to the base partition. This // partition determines the underlying NATS subject that gets published to. // To publish directly to a specific NATS subject, use the low-level // PublishToSubject API. // // If the AckPolicy is not NONE, this will synchronously block until the // ack is received. If the ack is not received in time, ErrAckTimeout is // returned. If AckPolicy is NONE, this returns nil on success. Publish(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error) // PublishAsync publishes a new message to the Liftbridge stream and // asynchronously processes the ack or error for the message. PublishAsync(ctx context.Context, stream string, value []byte, ackHandler AckHandler, opts ...MessageOption) error // PublishToSubject publishes a new message to the NATS subject. Note that // because this publishes directly to a subject, there may be multiple (or // no) streams that receive the message. As a result, MessageOptions // related to partitioning will be ignored. To publish at the // stream/partition level, use the high-level Publish API. // // If the AckPolicy is not NONE and a deadline is provided, this will // synchronously block until the first ack is received. If an ack is not // received in time, ErrAckTimeout is returned. If an AckPolicy and // deadline are configured, this returns the first Ack on success, // otherwise it returns nil. PublishToSubject(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error) // FetchMetadata returns cluster metadata including broker and stream // information. FetchMetadata(ctx context.Context) (*Metadata, error) }
Client is the main API used to communicate with a Liftbridge cluster. Call Connect to get a Client instance.
Example (CreateStream) ¶
// Connect to Liftbridge. addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close() // Create stream with a single partition. if err := client.CreateStream(context.Background(), "foo", "foo-stream"); err != nil { panic(err) } // Create stream with three partitions. if err := client.CreateStream(context.Background(), "bar", "bar-stream", Partitions(3)); err != nil { panic(err) }
Output:
Example (Publish) ¶
// Connect to Liftbridge. addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close() // Publish message to base stream partition. if _, err := client.Publish(context.Background(), "foo-stream", []byte("hello")); err != nil { panic(err) } // Publish message to stream partition based on key. if _, err := client.Publish(context.Background(), "bar-stream", []byte("hello"), Key([]byte("key")), PartitionByKey(), ); err != nil { panic(err) }
Output:
Example (PublishToSubject) ¶
// Connect to Liftbridge. addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close() // Publish message directly to NATS subject. if _, err := client.PublishToSubject(context.Background(), "foo.bar", []byte("hello")); err != nil { panic(err) }
Output:
Example (Subscribe) ¶
// Connect to Liftbridge. addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close() // Subscribe to base stream partition. if err := client.Subscribe(context.Background(), "foo-stream", func(msg *Message, err error) { if err != nil { panic(err) } fmt.Println(msg.Offset(), string(msg.Value())) }); err != nil { panic(err) } // Subscribe to a specific stream partition. ctx := context.Background() if err := client.Subscribe(ctx, "bar-stream", func(msg *Message, err error) { if err != nil { panic(err) } fmt.Println(msg.Offset(), string(msg.Value())) }, Partition(1)); err != nil { panic(err) } <-ctx.Done()
Output:
func Connect ¶
func Connect(addrs []string, options ...ClientOption) (Client, error)
Connect creates a Client connection for the given Liftbridge cluster. Multiple addresses can be provided. Connect will use whichever it connects successfully to first in random order. The Client will use the pool of addresses for failover purposes. Note that only one seed address needs to be provided as the Client will discover the other brokers when fetching metadata for the cluster.
Example ¶
addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close()
Output:
type ClientOption ¶
type ClientOption func(*ClientOptions) error
ClientOption is a function on the ClientOptions for a connection. These are used to configure particular client options.
func AckWaitTime ¶
func AckWaitTime(wait time.Duration) ClientOption
AckWaitTime is a ClientOption to set the default amount of time to wait for an ack to be received for a published message before ErrAckTimeout is returned. This can be overridden on individual requests by setting a timeout on the Context. This defaults to 5 seconds if not set.
func KeepAliveTime ¶
func KeepAliveTime(keepAlive time.Duration) ClientOption
KeepAliveTime is a ClientOption to set the amount of time a pooled connection can be idle before it is closed and removed from the pool. The default is 30 seconds.
func MaxConnsPerBroker ¶
func MaxConnsPerBroker(max int) ClientOption
MaxConnsPerBroker is a ClientOption to set the maximum number of connections to pool for a given broker in the cluster. The default is 2.
func ResubscribeWaitTime ¶
func ResubscribeWaitTime(wait time.Duration) ClientOption
ResubscribeWaitTime is a ClientOption to set the amount of time to attempt to re-establish a stream subscription after being disconnected. For example, if the server serving a subscription dies and the stream is replicated, the client will attempt to re-establish the subscription once the stream leader has failed over. This failover can take several moments, so this option gives the client time to retry. The default is 30 seconds.
func TLSCert ¶
func TLSCert(cert string) ClientOption
TLSCert is a ClientOption to set the TLS certificate for the client.
func TLSConfig ¶
func TLSConfig(config *tls.Config) ClientOption
TLSConfig is a ClientOption to set the TLS configuration for the client. Overrides TLSCert.
type ClientOptions ¶
type ClientOptions struct { // Brokers it the set of hosts the client will use when attempting to // connect. Brokers []string // MaxConnsPerBroker is the maximum number of connections to pool for a // given broker in the cluster. The default is 2. MaxConnsPerBroker int // KeepAliveTime is the amount of time a pooled connection can be idle // before it is closed and removed from the pool. The default is 30 // seconds. KeepAliveTime time.Duration // TLSCert is the TLS certificate file to use. The client does not use a // TLS connection if this is not set. TLSCert string // TLSConfig is the TLS configuration to use. The client does not use a // TLS connection if this is not set. Overrides TLSCert if set. TLSConfig *tls.Config // ResubscribeWaitTime is the amount of time to attempt to re-establish a // stream subscription after being disconnected. For example, if the server // serving a subscription dies and the stream is replicated, the client // will attempt to re-establish the subscription once the stream leader has // failed over. This failover can take several moments, so this option // gives the client time to retry. The default is 30 seconds. ResubscribeWaitTime time.Duration // AckWaitTime is the default amount of time to wait for an ack to be // received for a published message before ErrAckTimeout is returned. This // can be overridden on individual requests by setting a timeout on the // Context. This defaults to 5 seconds if not set. AckWaitTime time.Duration }
ClientOptions are used to control the Client configuration.
func DefaultClientOptions ¶
func DefaultClientOptions() ClientOptions
DefaultClientOptions returns the default configuration options for the client.
func (ClientOptions) Connect ¶
func (o ClientOptions) Connect() (Client, error)
Connect will attempt to connect to a Liftbridge server with multiple options.
type Handler ¶
Handler is the callback invoked by Subscribe when a message is received on the specified stream. If err is not nil, the subscription will be terminated and no more messages will be received.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message received from a Liftbridge stream.
func UnmarshalMessage ¶
UnmarshalMessage deserializes a message from the given byte slice. It returns an error if the given data is not actually a Message.
func (*Message) Key ¶
Key is an optional label set on a Message, useful for partitioning and stream compaction.
func (*Message) ReplySubject ¶
ReplySubject is the NATS reply subject on the Message, if any.
type MessageOption ¶
type MessageOption func(*MessageOptions)
MessageOption is a function on the MessageOptions for a Message. These are used to configure particular optional Message fields.
func AckInbox ¶
func AckInbox(ackInbox string) MessageOption
AckInbox is a MessageOption to set the NATS subject Liftbridge should publish the Message ack to. If this is not set, the server will generate a random inbox.
func AckPolicyAll ¶
func AckPolicyAll() MessageOption
AckPolicyAll is a MessageOption that sets the AckPolicy of the Message to ALL. This means the Message ack will be sent when the message has been written to all replicas.
func AckPolicyLeader ¶
func AckPolicyLeader() MessageOption
AckPolicyLeader is a MessageOption that sets the AckPolicy of the Message to LEADER. This means the Message ack will be sent when the stream leader has written it to its write-ahead log.
func AckPolicyNone ¶
func AckPolicyNone() MessageOption
AckPolicyNone is a MessageOption that sets the AckPolicy of the Message to NONE. This means no ack will be sent.
func CorrelationID ¶
func CorrelationID(correlationID string) MessageOption
CorrelationID is a MessageOption to set the identifier used to correlate an ack with the published Message. If this is not set, the ack will not have a correlation id.
func Header ¶
func Header(name string, value []byte) MessageOption
Header is a MessageOption that adds a single header to the Message. This may overwrite previously set headers.
func Headers ¶
func Headers(headers map[string][]byte) MessageOption
Headers is a MessageOption that adds a set of headers to the Message. This may overwrite previously set headers.
func Key ¶
func Key(key []byte) MessageOption
Key is a MessageOption to set the key on a Message. If Liftbridge has stream compaction enabled, the stream will retain only the last value for each key.
func PartitionBy ¶
func PartitionBy(partitioner Partitioner) MessageOption
PartitionBy is a MessageOption that specifies a Partitioner used to map Messages to stream partitions.
func PartitionByKey ¶
func PartitionByKey() MessageOption
PartitionByKey is a MessageOption that maps Messages to stream partitions based on a hash of the Message key. This computes the partition number for a given message by hashing the key and modding by the number of partitions for the first stream found with the subject of the published message. This does not work with streams containing wildcards in their subjects, e.g. "foo.*", since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject.
func PartitionByRoundRobin ¶
func PartitionByRoundRobin() MessageOption
PartitionByRoundRobin is a MessageOption that maps Messages to stream partitions in a round-robin fashion. This computes the partition number for a given message by atomically incrementing a counter for the message subject and modding by the number of partitions for the first stream found with the subject. This does not work with streams containing wildcards in their subjects, e.g. "foo.*", since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject.
func ToPartition ¶
func ToPartition(partition int32) MessageOption
ToPartition is a MessageOption that specifies the stream partition to publish the Message to. If this is set, any Partitioner will not be used.
type MessageOptions ¶
type MessageOptions struct { // Key to set on the Message. If Liftbridge has stream compaction enabled, // the stream will retain only the last value for each key. Key []byte // AckInbox sets the NATS subject Liftbridge should publish the Message ack // to. If this is not set, the server will generate a random inbox. AckInbox string // CorrelationID sets the identifier used to correlate an ack with the // published Message. If this is not set, the ack will not have a // correlation id. CorrelationID string // AckPolicy controls the behavior of Message acks sent by the server. By // default, Liftbridge will send an ack when the stream leader has written // the Message to its write-ahead log. AckPolicy AckPolicy // Headers are key-value pairs to set on the Message. Headers map[string][]byte // Partitioner specifies the strategy for mapping a Message to a stream // partition. Partitioner Partitioner // Partition specifies the stream partition to publish the Message to. If // this is set, any Partitioner will not be used. This is a pointer to // allow distinguishing between unset and 0. Partition *int32 }
MessageOptions are used to configure optional settings for a Message.
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
Metadata contains an immutable snapshot of information for a cluster and subset of streams.
func (*Metadata) Brokers ¶
func (m *Metadata) Brokers() []*BrokerInfo
Brokers returns a list of the cluster nodes.
func (*Metadata) GetStream ¶
func (m *Metadata) GetStream(name string) *StreamInfo
GetStream returns the given stream or nil if unknown.
func (*Metadata) GetStreams ¶
func (m *Metadata) GetStreams() map[string]*StreamInfo
GetStreams returns a map of stream names to streams
func (*Metadata) LastUpdated ¶
LastUpdated returns the time when this metadata was last updated from the server.
func (*Metadata) PartitionCountForStream ¶
PartitionCountForStream returns the number of partitions for the given stream.
type PartitionInfo ¶
type PartitionInfo struct {
// contains filtered or unexported fields
}
PartitionInfo contains information for a Liftbridge stream partition.
func (*PartitionInfo) ISR ¶
func (p *PartitionInfo) ISR() []*BrokerInfo
ISR returns the list of replicas currently in the in-sync replica set.
func (*PartitionInfo) Leader ¶
func (p *PartitionInfo) Leader() *BrokerInfo
Leader returns the broker acting as leader for this partition or nil if there is no leader.
func (*PartitionInfo) Replicas ¶
func (p *PartitionInfo) Replicas() []*BrokerInfo
Replicas returns the list of brokers replicating the partition.
type Partitioner ¶
type Partitioner interface { // Partition computes the partition number for a given message. Partition(stream string, key, value []byte, metadata *Metadata) int32 }
Partitioner is used to map a message to a stream partition.
type PauseOption ¶
type PauseOption func(*PauseOptions) error
PauseOption is a function on the PauseOptions for a pause call. These are used to configure particular pausing options.
func PausePartitions ¶
func PausePartitions(partitions ...int32) PauseOption
PausePartitions sets the list of partition to pause or all of them if nil/empty.
func ResumeAll ¶
func ResumeAll() PauseOption
ResumeAll will resume all partitions in the stream if one of them is published to instead of resuming only that partition.
type PauseOptions ¶
type PauseOptions struct { // Partitions sets the list of partitions to pause or all of them if // nil/empty. Partitions []int32 // ResumeAll will resume all partitions in the stream if one of them is // published to instead of resuming only that partition. ResumeAll bool }
PauseOptions are used to setup stream pausing.
type ReadonlyOption ¶
type ReadonlyOption func(*ReadonlyOptions) error
ReadonlyOption is a function on the ReadonlyOptions for a set readonly call. These are used to configure particular set readonly options.
func Readonly ¶
func Readonly(readonly bool) ReadonlyOption
Readonly defines if the partitions should be set to readonly or to readwrite.
func ReadonlyPartitions ¶
func ReadonlyPartitions(partitions ...int32) ReadonlyOption
ReadonlyPartitions sets the list of partition on which to set the readonly flag or all of them if nil/empty.
type ReadonlyOptions ¶
type ReadonlyOptions struct { // Partitions sets the list of partitions on which to set the readonly flag // or all of them if nil/empty. Partitions []int32 // Readwrite defines if the partitions should be set to readonly (false) or // to readwrite (true). This field is called readwrite and not readonly so // that the default value corresponds to "enable readonly". Readwrite bool }
ReadonlyOptions are used to setup stream readonly operations.
type StartPosition ¶
type StartPosition int32
StartPosition controls where to begin consuming in a stream.
type StreamInfo ¶
type StreamInfo struct {
// contains filtered or unexported fields
}
StreamInfo contains information for a Liftbridge stream.
func (*StreamInfo) GetPartition ¶
func (s *StreamInfo) GetPartition(id int32) *PartitionInfo
GetPartition returns the partition info for the given partition id or nil if no such partition exists.
func (*StreamInfo) Partitions ¶
func (s *StreamInfo) Partitions() map[int32]*PartitionInfo
Partitions returns a map containing partition IDs and partitions for the stream.
type StreamOption ¶
type StreamOption func(*StreamOptions) error
StreamOption is a function on the StreamOptions for a stream. These are used to configure particular stream options.
func CleanerInterval ¶
func CleanerInterval(val time.Duration) StreamOption
CleanerInterval sets the value of the cleaner.interval configuration for the stream. This controls the frequency to check if a new stream log segment file should be rolled and whether any segments are eligible for deletion based on the retention policy or compaction if enabled. If this is not set, it uses the server default value.
func CompactEnabled ¶
func CompactEnabled(val bool) StreamOption
CompactEnabled sets the value of the compact.enabled configuration for the stream. This controls the activation of stream log compaction. If this is not set, it uses the server default value.
func CompactMaxGoroutines ¶
func CompactMaxGoroutines(val int32) StreamOption
CompactMaxGoroutines sets the value of the compact.max.goroutines configuration for the stream. This controls the maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if compact.enabled is true). If this is not set, it uses the server default value.
func Group ¶
func Group(group string) StreamOption
Group is a StreamOption to set the load-balance group for a stream. When there are multiple streams in the same group, messages will be balanced among them.
func MaxReplication ¶
func MaxReplication() StreamOption
MaxReplication is a StreamOption to set the stream replication factor equal to the current number of servers in the cluster.
func Partitions ¶
func Partitions(partitions int32) StreamOption
Partitions is a StreamOption to set the number of partitions for a stream. Partitions are ordered, replicated, and durably stored on disk and serve as the unit of storage and parallelism for a stream. A partitioned stream for NATS subject "foo.bar" with three partitions internally maps to the NATS subjects "foo.bar", "foo.bar.1", and "foo.bar.2". A single partition would map to "foo.bar" to match behavior of an "un-partitioned" stream. If this is not set, it defaults to 1.
func ReplicationFactor ¶
func ReplicationFactor(replicationFactor int32) StreamOption
ReplicationFactor is a StreamOption to set the replication factor for a stream. The replication factor controls the number of servers to replicate a stream to. E.g. a value of 1 would mean only 1 server would have the data, and a value of 3 would be 3 servers would have it. If this is not set, it defaults to 1. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster.
func RetentionMaxAge ¶
func RetentionMaxAge(val time.Duration) StreamOption
RetentionMaxAge sets the value of the retention.max.age configuration for the stream. This controls the TTL for stream log segment files, after which they are deleted. A value of 0 indicates no TTL. If this is not set, it uses the server default value.
func RetentionMaxBytes ¶
func RetentionMaxBytes(val int64) StreamOption
RetentionMaxBytes sets the value of the retention.max.bytes configuration for the stream. This controls the maximum size a stream's log can grow to, in bytes, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it uses the server default value.
func RetentionMaxMessages ¶
func RetentionMaxMessages(val int64) StreamOption
RetentionMaxMessages sets the value of the retention.max.messages configuration for the stream. This controls the maximum size a stream's log can grow to, in number of messages, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it uses the server default value.
func SegmentMaxAge ¶
func SegmentMaxAge(val time.Duration) StreamOption
SegmentMaxAge sets the value of the segment.max.age configuration for the stream. Thia controls the maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when segment.max.bytes is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. If this is not set, it uses the server default value.
func SegmentMaxBytes ¶
func SegmentMaxBytes(val int64) StreamOption
SegmentMaxBytes sets the value of the segment.max.bytes configuration for the stream. This controls the maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. If this is not set, it uses the server default value.
type StreamOptions ¶
type StreamOptions struct { // Group is the name of a load-balance group. When there are multiple // streams in the same group, messages will be balanced among them. Group string // ReplicationFactor controls the number of servers to replicate a stream // to. E.g. a value of 1 would mean only 1 server would have the data, and // a value of 3 would be 3 servers would have it. If this is not set, it // defaults to 1. A value of -1 will signal to the server to set the // replication factor equal to the current number of servers in the // cluster. ReplicationFactor int32 // Partitions determines how many partitions to create for a stream. If 0, // this will behave as a stream with a single partition. If this is not // set, it defaults to 1. Partitions int32 // The maximum size a stream's log can grow to, in bytes, before we will // discard old log segments to free up space. A value of 0 indicates no // limit. If this is not set, it uses the server default value. RetentionMaxBytes *int64 // The maximum size a stream's log can grow to, in number of messages, // before we will discard old log segments to free up space. A value of 0 // indicates no limit. If this is not set, it uses the server default // value. RetentionMaxMessages *int64 // The TTL for stream log segment files, after which they are deleted. A // value of 0 indicates no TTL. If this is not set, it uses the server // default value. RetentionMaxAge *time.Duration // The frequency to check if a new stream log segment file should be rolled // and whether any segments are eligible for deletion based on the // retention policy or compaction if enabled. If this is not set, it uses // the server default value. CleanerInterval *time.Duration // The maximum size of a single stream log segment file in bytes. Retention // is always done a file at a time, so a larger segment size means fewer // files but less granular control over retention. If this is not set, it // uses the server default value. SegmentMaxBytes *int64 // The maximum time before a new stream log segment is rolled out. A value // of 0 means new segments will only be rolled when segment.max.bytes is // reached. Retention is always done a file at a time, so a larger value // means fewer files but less granular control over retention. If this is // not set, it uses the server default value. SegmentMaxAge *time.Duration // The maximum number of concurrent goroutines to use for compaction on a // stream log (only applicable if compact.enabled is true). If this is not // set, it uses the server default value. CompactMaxGoroutines *int32 // CompactEnabled controls the activation of stream log compaction. If this // is not set, it uses the server default value. CompactEnabled *bool }
StreamOptions are used to configure new streams.
type SubscriptionOption ¶
type SubscriptionOption func(*SubscriptionOptions) error
SubscriptionOption is a function on the SubscriptionOptions for a subscription. These are used to configure particular subscription options.
func Partition ¶
func Partition(partition int32) SubscriptionOption
Partition specifies the stream partition to consume. If not set, this defaults to 0.
func ReadISRReplica ¶
func ReadISRReplica() SubscriptionOption
ReadISRReplica sets read replica option. If true, the client will request subscription from an random ISR replica instead of subscribing explicitly to partition's leader. As a random ISR replica is given, it may well be the partition's leader itself.
func StartAtEarliestReceived ¶
func StartAtEarliestReceived() SubscriptionOption
StartAtEarliestReceived sets the subscription start position to the earliest message received in the stream.
func StartAtLatestReceived ¶
func StartAtLatestReceived() SubscriptionOption
StartAtLatestReceived sets the subscription start position to the last message received in the stream.
func StartAtOffset ¶
func StartAtOffset(offset int64) SubscriptionOption
StartAtOffset sets the desired start offset to begin consuming from in the stream.
func StartAtTime ¶
func StartAtTime(start time.Time) SubscriptionOption
StartAtTime sets the desired timestamp to begin consuming from in the stream.
func StartAtTimeDelta ¶
func StartAtTimeDelta(ago time.Duration) SubscriptionOption
StartAtTimeDelta sets the desired timestamp to begin consuming from in the stream using a time delta in the past.
type SubscriptionOptions ¶
type SubscriptionOptions struct { // StartPosition controls where to begin consuming from in the stream. StartPosition StartPosition // StartOffset sets the stream offset to begin consuming from. StartOffset int64 // StartTimestamp sets the stream start position to the given timestamp. StartTimestamp time.Time // Partition sets the stream partition to consume. Partition int32 // ReadISRReplica sets client's ability to subscribe from a random ISR ReadISRReplica bool }
SubscriptionOptions are used to control a subscription's behavior.