Documentation ¶
Index ¶
- Constants
- func DecodeObjectDigest(data string) ([]byte, error)
- func GetObjectDigestValue(data hash.Hash) string
- type APIError
- type APIStats
- type AccountInfo
- type AccountLimits
- type AckPolicy
- type ClientTrace
- type ClusterInfo
- type ConsumeContext
- type ConsumeErrHandlerFunc
- type Consumer
- type ConsumerConfig
- type ConsumerInfo
- type ConsumerInfoLister
- type ConsumerManager
- type ConsumerNameLister
- type DeleteMarkersOlderThan
- type DeliverPolicy
- type DiscardPolicy
- type ErrorCode
- type ExternalStream
- type FetchOpt
- type GetMsgOpt
- type GetObjectInfoOpt
- type GetObjectOpt
- type JetStream
- type JetStreamError
- type JetStreamOpt
- type KVDeleteOpt
- type KVPurgeOpt
- type KeyLister
- type KeyValue
- type KeyValueBucketStatus
- func (s *KeyValueBucketStatus) BackingStore() string
- func (s *KeyValueBucketStatus) Bucket() string
- func (s *KeyValueBucketStatus) Bytes() uint64
- func (s *KeyValueBucketStatus) History() int64
- func (s *KeyValueBucketStatus) IsCompressed() bool
- func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo
- func (s *KeyValueBucketStatus) TTL() time.Duration
- func (s *KeyValueBucketStatus) Values() uint64
- type KeyValueConfig
- type KeyValueEntry
- type KeyValueLister
- type KeyValueManager
- type KeyValueNamesLister
- type KeyValueOp
- type KeyValueStatus
- type KeyWatcher
- type ListObjectsOpt
- type MessageBatch
- type MessageHandler
- type MessagesContext
- type Msg
- type MsgErrHandler
- type MsgMetadata
- type ObjectBucketStatus
- func (s *ObjectBucketStatus) BackingStore() string
- func (s *ObjectBucketStatus) Bucket() string
- func (s *ObjectBucketStatus) Description() string
- func (s *ObjectBucketStatus) IsCompressed() bool
- func (s *ObjectBucketStatus) Metadata() map[string]string
- func (s *ObjectBucketStatus) Replicas() int
- func (s *ObjectBucketStatus) Sealed() bool
- func (s *ObjectBucketStatus) Size() uint64
- func (s *ObjectBucketStatus) Storage() StorageType
- func (s *ObjectBucketStatus) StreamInfo() *StreamInfo
- func (s *ObjectBucketStatus) TTL() time.Duration
- type ObjectInfo
- type ObjectLink
- type ObjectMeta
- type ObjectMetaOptions
- type ObjectResult
- type ObjectStore
- type ObjectStoreConfig
- type ObjectStoreManager
- type ObjectStoreNamesLister
- type ObjectStoreStatus
- type ObjectStoresLister
- type ObjectWatcher
- type OrderedConsumerConfig
- type PeerInfo
- type Placement
- type PubAck
- type PubAckFuture
- type PublishOpt
- func WithExpectLastMsgID(id string) PublishOpt
- func WithExpectLastSequence(seq uint64) PublishOpt
- func WithExpectLastSequencePerSubject(seq uint64) PublishOpt
- func WithExpectStream(stream string) PublishOpt
- func WithMsgID(id string) PublishOpt
- func WithRetryAttempts(num int) PublishOpt
- func WithRetryWait(dur time.Duration) PublishOpt
- func WithStallWait(ttl time.Duration) PublishOpt
- type Publisher
- type PullConsumeOpt
- type PullExpiry
- type PullHeartbeat
- type PullMaxBytes
- type PullMaxMessages
- type PullMessagesOpt
- type PullThresholdBytes
- type PullThresholdMessages
- type RawStreamMsg
- type RePublish
- type ReplayPolicy
- type RetentionPolicy
- type SequenceInfo
- type SequencePair
- type StopAfter
- type StorageType
- type StoreCompression
- type Stream
- type StreamConfig
- type StreamConsumerLimits
- type StreamConsumerManager
- type StreamInfo
- type StreamInfoLister
- type StreamInfoOpt
- type StreamListOpt
- type StreamManager
- type StreamNameLister
- type StreamPurgeOpt
- type StreamPurgeRequest
- type StreamSource
- type StreamSourceInfo
- type StreamState
- type SubjectTransformConfig
- type Tier
- type WatchOpt
Constants ¶
const ( KeyValueMaxHistory = 64 AllKeys = ">" )
const ( // MsgIdHeader is used to specify a user-defined message ID. It can be used // e.g. for deduplication in conjunction with the Duplicates duration on // ConsumerConfig or to provide optimistic concurrency safety together with // [ExpectedLastMsgIDHeader]. // // This can be set when publishing messages using [WithMsgID] option. MsgIDHeader = "Nats-Msg-Id" // ExpectedStreamHeader contains stream name and is used to assure that the // published message is received by expected stream. Server will reject the // message if it is not the case. // // This can be set when publishing messages using [WithExpectStream] option. ExpectedStreamHeader = "Nats-Expected-Stream" // ExpectedLastSeqHeader contains the expected last sequence number of the // stream and can be used to apply optimistic concurrency control at stream // level. Server will reject the message if it is not the case. // // This can be set when publishing messages using [WithExpectLastSequence] // option. option. ExpectedLastSeqHeader = "Nats-Expected-Last-Sequence" // ExpectedLastSubjSeqHeader contains the expected last sequence number on // the subject and can be used to apply optimistic concurrency control at // subject level. Server will reject the message if it is not the case. // // This can be set when publishing messages using // [WithExpectLastSequencePerSubject] option. ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence" // ExpectedLastMsgIDHeader contains the expected last message ID on the // subject and can be used to apply optimistic concurrency control at // stream level. Server will reject the message if it is not the case. // // This can be set when publishing messages using [WithExpectLastMsgID] // option. ExpectedLastMsgIDHeader = "Nats-Expected-Last-Msg-Id" // MsgRollup is used to apply a purge of all prior messages in the stream // ("all") or at the subject ("sub") before this message. MsgRollup = "Nats-Rollup" )
Headers used when publishing messages.
const ( // StreamHeader contains the stream name the message was republished from or // the stream name the message was retrieved from using direct get. StreamHeader = "Nats-Stream" // SequenceHeader contains the original sequence number of the message. SequenceHeader = "Nats-Sequence" // TimeStampHeader contains the original timestamp of the message. TimeStampHeaer = "Nats-Time-Stamp" // SubjectHeader contains the original subject the message was published to. SubjectHeader = "Nats-Subject" // LastSequenceHeader contains the last sequence of the message having the // same subject, otherwise zero if this is the first message for the // subject. LastSequenceHeader = "Nats-Last-Sequence" )
Headers for republished messages and direct gets. Those headers are set by the server and should not be set by the client.
const ( // MsgRollupSubject is used to purge all messages before this message on the // message subject. MsgRollupSubject = "sub" // MsgRollupAll is used to purge all messages before this message on the // stream. MsgRollupAll = "all" )
Rollups, can be subject only or all messages.
const ( // Default time wait between retries on Publish if err is ErrNoResponders. DefaultPubRetryWait = 250 * time.Millisecond // Default number of retries DefaultPubRetryAttempts = 2 )
const ( DefaultMaxMessages = 500 DefaultExpires = 30 * time.Second )
const (
// DefaultAPIPrefix is the default prefix for the JetStream API.
DefaultAPIPrefix = "$JS.API."
)
Request API subjects for JetStream.
Variables ¶
This section is empty.
Functions ¶
func DecodeObjectDigest ¶
DecodeObjectDigest decodes base64 hash
func GetObjectDigestValue ¶
GetObjectDigestValue calculates the base64 value of hashed data
Types ¶
type APIError ¶
type APIError struct { Code int `json:"code"` ErrorCode ErrorCode `json:"err_code"` Description string `json:"description,omitempty"` }
APIError is included in all API responses if there was an error.
type APIStats ¶
type APIStats struct { // Total is the total number of API calls. Total uint64 `json:"total"` // Errors is the total number of API errors. Errors uint64 `json:"errors"` }
APIStats reports on API calls to JetStream for this account.
type AccountInfo ¶
type AccountInfo struct { // Tier is the current account usage tier. Tier // Domain is the domain name associated with this account. Domain string `json:"domain"` // API is the API usage statistics for this account. API APIStats `json:"api"` // Tiers is the list of available tiers for this account. Tiers map[string]Tier `json:"tiers"` }
AccountInfo contains information about the JetStream usage from the current account.
type AccountLimits ¶
type AccountLimits struct { // MaxMemory is the maximum amount of memory available for this account. MaxMemory int64 `json:"max_memory"` // MaxStore is the maximum amount of disk storage available for this // account. MaxStore int64 `json:"max_storage"` // MaxStreams is the maximum number of streams allowed for this account. MaxStreams int `json:"max_streams"` // MaxConsumers is the maximum number of consumers allowed for this // account. MaxConsumers int `json:"max_consumers"` }
AccountLimits includes the JetStream limits of the current account.
type AckPolicy ¶
type AckPolicy int
AckPolicy determines how the consumer should acknowledge delivered messages.
func (AckPolicy) MarshalJSON ¶
func (*AckPolicy) UnmarshalJSON ¶
type ClientTrace ¶
type ClientTrace struct { // RequestSent is called when an API request is sent to the server. RequestSent func(subj string, payload []byte) // ResponseReceived is called when a response is received from the // server. ResponseReceived func(subj string, payload []byte, hdr nats.Header) }
ClientTrace can be used to trace API interactions for JetStream.
type ClusterInfo ¶
type ClusterInfo struct { // Name is the name of the cluster. Name string `json:"name,omitempty"` // Leader is the server name of the RAFT leader. Leader string `json:"leader,omitempty"` // Replicas is the list of members of the RAFT cluster Replicas []*PeerInfo `json:"replicas,omitempty"` }
ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.
type ConsumeContext ¶
type ConsumeContext interface { // Stop unsubscribes from the stream and cancels subscription. // No more messages will be received after calling this method. // All messages that are already in the buffer are discarded. Stop() // Drain unsubscribes from the stream and cancels subscription. // All messages that are already in the buffer will be processed in callback function. Drain() // Closed returns a channel that is closed when the consuming is // fully stopped/drained. When the channel is closed, no more messages // will be received and processing is complete. Closed() <-chan struct{} }
ConsumeContext supports processing incoming messages from a stream. It is returned by [Consumer.Consume] method.
type ConsumeErrHandlerFunc ¶
type ConsumeErrHandlerFunc func(consumeCtx ConsumeContext, err error)
type Consumer ¶
type Consumer interface { // Fetch is used to retrieve up to a provided number of messages from a // stream. This method will send a single request and deliver either all // requested messages unless time out is met earlier. Fetch timeout // defaults to 30 seconds and can be configured using FetchMaxWait // option. // // By default, Fetch uses a 5s idle heartbeat for requests longer than // 10 seconds. For shorter requests, the idle heartbeat is disabled. // This can be configured using FetchHeartbeat option. If a client does // not receive a heartbeat message from a stream for more than 2 times // the idle heartbeat setting, Fetch will return [ErrNoHeartbeat]. // // Fetch is non-blocking and returns MessageBatch, exposing a channel // for delivered messages. // // Messages channel is always closed, thus it is safe to range over it // without additional checks. Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) // FetchBytes is used to retrieve up to a provided bytes from the // stream. This method will send a single request and deliver the // provided number of bytes unless time out is met earlier. FetchBytes // timeout defaults to 30 seconds and can be configured using // FetchMaxWait option. // // By default, FetchBytes uses a 5s idle heartbeat for requests longer than // 10 seconds. For shorter requests, the idle heartbeat is disabled. // This can be configured using FetchHeartbeat option. If a client does // not receive a heartbeat message from a stream for more than 2 times // the idle heartbeat setting, Fetch will return ErrNoHeartbeat. // // FetchBytes is non-blocking and returns MessageBatch, exposing a channel // for delivered messages. // // Messages channel is always closed, thus it is safe to range over it // without additional checks. FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) // FetchNoWait is used to retrieve up to a provided number of messages // from a stream. Unlike Fetch, FetchNoWait will only deliver messages // that are currently available in the stream and will not wait for new // messages to arrive, even if batch size is not met. // // FetchNoWait is non-blocking and returns MessageBatch, exposing a // channel for delivered messages. // // Messages channel is always closed, thus it is safe to range over it // without additional checks. FetchNoWait(batch int) (MessageBatch, error) // Consume will continuously receive messages and handle them // with the provided callback function. Consume can be configured using // PullConsumeOpt options: // // - Error handling and monitoring can be configured using ConsumeErrHandler // option, which provides information about errors encountered during // consumption (both transient and terminal) // - Consume can be configured to stop after a certain number of // messages is received using StopAfter option. // - Consume can be optimized for throughput or memory usage using // PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options. // Unless there is a specific use case, these options should not be used. // // Consume returns a ConsumeContext, which can be used to stop or drain // the consumer. Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) // Messages returns MessagesContext, allowing continuously iterating // over messages on a stream. Messages can be configured using // PullMessagesOpt options: // // - Messages can be optimized for throughput or memory usage using // PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options. // Unless there is a specific use case, these options should not be used. // - WithMessagesErrOnMissingHeartbeat can be used to enable/disable // erroring out on MessagesContext.Next when a heartbeat is missing. // This option is enabled by default. Messages(opts ...PullMessagesOpt) (MessagesContext, error) // Next is used to retrieve the next message from the consumer. This // method will block until the message is retrieved or timeout is // reached. Next(opts ...FetchOpt) (Msg, error) // Info fetches current ConsumerInfo from the server. Info(context.Context) (*ConsumerInfo, error) // CachedInfo returns ConsumerInfo currently cached on this consumer. // This method does not perform any network requests. The cached // ConsumerInfo is updated on every call to Info and Update. CachedInfo() *ConsumerInfo }
Consumer contains methods for fetching/processing messages from a stream, as well as fetching consumer info.
This package provides two implementations of Consumer interface:
- Standard named/ephemeral pull consumers. These consumers are created using CreateConsumer method on Stream or JetStream interface. They can be explicitly configured (using ConsumerConfig) and managed by the user, either from this package or externally.
- Ordered consumers. These consumers are created using OrderedConsumer method on Stream or JetStream interface. They are managed by the library and provide a simple way to consume messages from a stream. Ordered consumers are ephemeral in-memory pull consumers and are resilient to deletes and restarts. They provide limited configuration options using OrderedConsumerConfig.
Consumer provides method for optimized continuous consumption of messages using Consume and Messages methods, as well as simple one-off messages retrieval using Fetch and Next methods.
type ConsumerConfig ¶
type ConsumerConfig struct { // Name is an optional name for the consumer. If not set, one is // generated automatically. // // Name cannot contain whitespace, ., *, >, path separators (forward or // backwards slash), and non-printable characters. Name string `json:"name,omitempty"` // Durable is an optional durable name for the consumer. If both Durable // and Name are set, they have to be equal. Unless InactiveThreshold is set, a // durable consumer will not be cleaned up automatically. // // Durable cannot contain whitespace, ., *, >, path separators (forward or // backwards slash), and non-printable characters. Durable string `json:"durable_name,omitempty"` // Description provides an optional description of the consumer. Description string `json:"description,omitempty"` // DeliverPolicy defines from which point to start delivering messages // from the stream. Defaults to DeliverAllPolicy. DeliverPolicy DeliverPolicy `json:"deliver_policy"` // OptStartSeq is an optional sequence number from which to start // message delivery. Only applicable when DeliverPolicy is set to // DeliverByStartSequencePolicy. OptStartSeq uint64 `json:"opt_start_seq,omitempty"` // OptStartTime is an optional time from which to start message // delivery. Only applicable when DeliverPolicy is set to // DeliverByStartTimePolicy. OptStartTime *time.Time `json:"opt_start_time,omitempty"` // AckPolicy defines the acknowledgement policy for the consumer. // Defaults to AckExplicitPolicy. AckPolicy AckPolicy `json:"ack_policy"` // AckWait defines how long the server will wait for an acknowledgement // before resending a message. If not set, server default is 30 seconds. AckWait time.Duration `json:"ack_wait,omitempty"` // MaxDeliver defines the maximum number of delivery attempts for a // message. Applies to any message that is re-sent due to ack policy. // If not set, server default is -1 (unlimited). MaxDeliver int `json:"max_deliver,omitempty"` // BackOff specifies the optional back-off intervals for retrying // message delivery after a failed acknowledgement. It overrides // AckWait. // // BackOff only applies to messages not acknowledged in specified time, // not messages that were nack'ed. // // The number of intervals specified must be lower or equal to // MaxDeliver. If the number of intervals is lower, the last interval is // used for all remaining attempts. BackOff []time.Duration `json:"backoff,omitempty"` // FilterSubject can be used to filter messages delivered from the // stream. FilterSubject is exclusive with FilterSubjects. FilterSubject string `json:"filter_subject,omitempty"` // ReplayPolicy defines the rate at which messages are sent to the // consumer. If ReplayOriginalPolicy is set, messages are sent in the // same intervals in which they were stored on stream. This can be used // e.g. to simulate production traffic in development environments. If // ReplayInstantPolicy is set, messages are sent as fast as possible. // Defaults to ReplayInstantPolicy. ReplayPolicy ReplayPolicy `json:"replay_policy"` // RateLimit specifies an optional maximum rate of message delivery in // bits per second. RateLimit uint64 `json:"rate_limit_bps,omitempty"` // SampleFrequency is an optional frequency for sampling how often // acknowledgements are sampled for observability. See // https://docs.nats.io/running-a-nats-service/nats_admin/monitoring/monitoring_jetstream SampleFrequency string `json:"sample_freq,omitempty"` // MaxWaiting is a maximum number of pull requests waiting to be // fulfilled. If not set, this will inherit settings from stream's // ConsumerLimits or (if those are not set) from account settings. If // neither are set, server default is 512. MaxWaiting int `json:"max_waiting,omitempty"` // MaxAckPending is a maximum number of outstanding unacknowledged // messages. Once this limit is reached, the server will suspend sending // messages to the consumer. If not set, server default is 1000. // Set to -1 for unlimited. MaxAckPending int `json:"max_ack_pending,omitempty"` // HeadersOnly indicates whether only headers of messages should be sent // (and no payload). Defaults to false. HeadersOnly bool `json:"headers_only,omitempty"` // MaxRequestBatch is the optional maximum batch size a single pull // request can make. When set with MaxRequestMaxBytes, the batch size // will be constrained by whichever limit is hit first. MaxRequestBatch int `json:"max_batch,omitempty"` // MaxRequestExpires is the maximum duration a single pull request will // wait for messages to be available to pull. MaxRequestExpires time.Duration `json:"max_expires,omitempty"` // MaxRequestMaxBytes is the optional maximum total bytes that can be // requested in a given batch. When set with MaxRequestBatch, the batch // size will be constrained by whichever limit is hit first. MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // InactiveThreshold is a duration which instructs the server to clean // up the consumer if it has been inactive for the specified duration. // Durable consumers will not be cleaned up by default, but if // InactiveThreshold is set, they will be. If not set, this will inherit // settings from stream's ConsumerLimits. If neither are set, server // default is 5 seconds. // // A consumer is considered inactive there are not pull requests // received by the server (for pull consumers), or no interest detected // on deliver subject (for push consumers), not if there are no // messages to be delivered. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` // Replicas the number of replicas for the consumer's state. By default, // consumers inherit the number of replicas from the stream. Replicas int `json:"num_replicas"` // MemoryStorage is a flag to force the consumer to use memory storage // rather than inherit the storage type from the stream. MemoryStorage bool `json:"mem_storage,omitempty"` // FilterSubjects allows filtering messages from a stream by subject. // This field is exclusive with FilterSubject. Requires nats-server // v2.10.0 or later. FilterSubjects []string `json:"filter_subjects,omitempty"` // Metadata is a set of application-defined key-value pairs for // associating metadata on the consumer. This feature requires // nats-server v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` }
ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerInfo ¶
type ConsumerInfo struct { // Stream specifies the name of the stream that the consumer is bound // to. Stream string `json:"stream_name"` // Name represents the unique identifier for the consumer. This can be // either set explicitly by the client or generated automatically if not // set. Name string `json:"name"` // Created is the timestamp when the consumer was created. Created time.Time `json:"created"` // Config contains the configuration settings of the consumer, set when // creating or updating the consumer. Config ConsumerConfig `json:"config"` // Delivered holds information about the most recently delivered // message, including its sequence numbers and timestamp. Delivered SequenceInfo `json:"delivered"` // AckFloor indicates the message before the first unacknowledged // message. AckFloor SequenceInfo `json:"ack_floor"` // NumAckPending is the number of messages that have been delivered but // not yet acknowledged. NumAckPending int `json:"num_ack_pending"` // NumRedelivered counts the number of messages that have been // redelivered and not yet acknowledged. Each message is counted only // once, even if it has been redelivered multiple times. This count is // reset when the message is eventually acknowledged. NumRedelivered int `json:"num_redelivered"` // NumWaiting is the count of active pull requests. It is only relevant // for pull-based consumers. NumWaiting int `json:"num_waiting"` // NumPending is the number of messages that match the consumer's // filter, but have not been delivered yet. NumPending uint64 `json:"num_pending"` // Cluster contains information about the cluster to which this consumer // belongs (if applicable). Cluster *ClusterInfo `json:"cluster,omitempty"` // PushBound indicates whether at least one subscription exists for the // delivery subject of this consumer. This is only applicable to // push-based consumers. PushBound bool `json:"push_bound,omitempty"` // TimeStamp indicates when the info was gathered by the server. TimeStamp time.Time `json:"ts"` }
ConsumerInfo is the detailed information about a JetStream consumer.
type ConsumerInfoLister ¶
type ConsumerInfoLister interface { Info() <-chan *ConsumerInfo Err() error }
ConsumerInfoLister is used to iterate over a channel of consumer infos. Err method can be used to check for errors encountered during iteration. Info channel is always closed and therefore can be used in a range loop.
type ConsumerManager ¶
type ConsumerManager interface { // CreateOrUpdateConsumer creates a consumer on a given stream with // given config. If consumer already exists, it will be updated (if // possible). Consumer interface is returned, allowing to operate on a // consumer (e.g. fetch messages). CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) // CreateConsumer creates a consumer on a given stream with given // config. If consumer already exists and the provided configuration // differs from its configuration, ErrConsumerExists is returned. If the // provided configuration is the same as the existing consumer, the // existing consumer is returned. Consumer interface is returned, // allowing to operate on a consumer (e.g. fetch messages). CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) // UpdateConsumer updates an existing consumer. If consumer does not // exist, ErrConsumerDoesNotExist is returned. Consumer interface is // returned, allowing to operate on a consumer (e.g. fetch messages). UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) // OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer // are managed by the library and provide a simple way to consume // messages from a stream. Ordered consumers are ephemeral in-memory // pull consumers and are resilient to deletes and restarts. OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) // Consumer returns an interface to an existing consumer, allowing processing // of messages. If consumer does not exist, ErrConsumerNotFound is // returned. Consumer(ctx context.Context, consumer string) (Consumer, error) // DeleteConsumer removes a consumer with given name from a stream. // If consumer does not exist, ErrConsumerNotFound is returned. DeleteConsumer(ctx context.Context, consumer string) error // ListConsumers returns ConsumerInfoLister enabling iterating over a // channel of consumer infos. ListConsumers(context.Context) ConsumerInfoLister // ConsumerNames returns a ConsumerNameLister enabling iterating over a // channel of consumer names. ConsumerNames(context.Context) ConsumerNameLister }
ConsumerManager provides CRUD API for managing consumers. It is available as a part of Stream interface. CreateConsumer, UpdateConsumer, CreateOrUpdateConsumer and Consumer methods return a Consumer interface, allowing to operate on a consumer (e.g. consume messages).
type ConsumerNameLister ¶
ConsumerNameLister is used to iterate over a channel of consumer names. Err method can be used to check for errors encountered during iteration. Name channel is always closed and therefore can be used in a range loop.
type DeleteMarkersOlderThan ¶
DeleteMarkersOlderThan indicates that delete or purge markers older than that will be deleted as part of [KeyValue.PurgeDeletes] operation, otherwise, only the data will be removed but markers that are recent will be kept. Note that if no option is specified, the default is 30 minutes. You can set this option to a negative value to instruct to always remove the markers, regardless of their age.
type DeliverPolicy ¶
type DeliverPolicy int
DeliverPolicy determines from which point to start delivering messages.
const ( // DeliverAllPolicy starts delivering messages from the very beginning of a // stream. This is the default. DeliverAllPolicy DeliverPolicy = iota // DeliverLastPolicy will start the consumer with the last sequence // received. DeliverLastPolicy // DeliverNewPolicy will only deliver new messages that are sent after the // consumer is created. DeliverNewPolicy // DeliverByStartSequencePolicy will deliver messages starting from a given // sequence configured with OptStartSeq in ConsumerConfig. DeliverByStartSequencePolicy // DeliverByStartTimePolicy will deliver messages starting from a given time // configured with OptStartTime in ConsumerConfig. DeliverByStartTimePolicy // DeliverLastPerSubjectPolicy will start the consumer with the last message // for all subjects received. DeliverLastPerSubjectPolicy )
func (DeliverPolicy) MarshalJSON ¶
func (p DeliverPolicy) MarshalJSON() ([]byte, error)
func (DeliverPolicy) String ¶
func (p DeliverPolicy) String() string
func (*DeliverPolicy) UnmarshalJSON ¶
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error
type DiscardPolicy ¶
type DiscardPolicy int
DiscardPolicy determines how to proceed when limits of messages or bytes are reached.
const ( // DiscardOld will remove older messages to return to the limits. This is // the default. DiscardOld DiscardPolicy = iota // DiscardNew will fail to store new messages once the limits are reached. DiscardNew )
func (DiscardPolicy) MarshalJSON ¶
func (dp DiscardPolicy) MarshalJSON() ([]byte, error)
func (DiscardPolicy) String ¶
func (dp DiscardPolicy) String() string
func (*DiscardPolicy) UnmarshalJSON ¶
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error
type ErrorCode ¶
type ErrorCode uint16
ErrorCode represents error_code returned in response from JetStream API.
const ( JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 JSErrCodeJetStreamNotEnabled ErrorCode = 10076 JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 JSErrCodeConsumerCreate ErrorCode = 10012 JSErrCodeConsumerNotFound ErrorCode = 10014 JSErrCodeConsumerNameExists ErrorCode = 10013 JSErrCodeConsumerAlreadyExists ErrorCode = 10105 JSErrCodeConsumerExists ErrorCode = 10148 JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 JSErrCodeConsumerEmptyFilter ErrorCode = 10139 JSErrCodeConsumerDoesNotExist ErrorCode = 10149 JSErrCodeMessageNotFound ErrorCode = 10037 JSErrCodeBadRequest ErrorCode = 10003 JSErrCodeStreamWrongLastSequence ErrorCode = 10071 )
type ExternalStream ¶
type ExternalStream struct { // APIPrefix is the subject prefix that imports the other account/domain // $JS.API.CONSUMER.> subjects. APIPrefix string `json:"api"` // DeliverPrefix is the delivery subject to use for the push consumer. DeliverPrefix string `json:"deliver"` }
ExternalStream allows you to qualify access to a stream source in another account.
type FetchOpt ¶
type FetchOpt func(*pullRequest) error
func FetchHeartbeat ¶
FetchHeartbeat sets custom heartbeat for individual fetch request. If a client does not receive a heartbeat message from a stream for more than 2 times the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
Heartbeat value has to be lower than FetchMaxWait / 2.
If not provided, heartbeat will is set to 5s for requests with FetchMaxWait > 10s and disabled otherwise.
func FetchMaxWait ¶
FetchMaxWait sets custom timeout for fetching predefined batch of messages.
If not provided, a default of 30 seconds will be used.
type GetMsgOpt ¶
type GetMsgOpt func(*apiMsgGetRequest) error
GetMsgOpt is a function setting options for [Stream.GetMsg]
func WithGetMsgSubject ¶
WithGetMsgSubject sets the stream subject from which the message should be retrieved. Server will return a first message with a seq >= to the input seq that has the specified subject.
type GetObjectInfoOpt ¶
type GetObjectInfoOpt func(opts *getObjectInfoOpts) error
GetObjectInfoOpt is used to set additional options when getting object info.
func GetObjectInfoShowDeleted ¶
func GetObjectInfoShowDeleted() GetObjectInfoOpt
GetObjectInfoShowDeleted makes [ObjectStore.GetInfo] return object info event if it was marked as deleted.
type GetObjectOpt ¶
type GetObjectOpt func(opts *getObjectOpts) error
GetObjectOpt is used to set additional options when getting an object.
func GetObjectShowDeleted ¶
func GetObjectShowDeleted() GetObjectOpt
GetObjectShowDeleted makes [ObjectStore.Get] return object even if it was marked as deleted.
type JetStream ¶
type JetStream interface { // AccountInfo fetches account information from the server, containing details // about the account associated with this JetStream connection. If account is // not enabled for JetStream, ErrJetStreamNotEnabledForAccount is returned. If // the server does not have JetStream enabled, ErrJetStreamNotEnabled is // returned. AccountInfo(ctx context.Context) (*AccountInfo, error) StreamConsumerManager StreamManager Publisher KeyValueManager ObjectStoreManager }
JetStream is the top-level interface for interacting with JetStream. The capabilities of JetStream include:
- Publishing messages to a stream using Publisher.
- Managing streams using StreamManager.
- Managing consumers using StreamConsumerManager. Those are the same methods as on Stream, but are available as a shortcut to a consumer bypassing stream lookup.
- Managing KeyValue stores using KeyValueManager.
- Managing Object Stores using ObjectStoreManager.
JetStream can be created using New, NewWithAPIPrefix or NewWithDomain methods.
func New ¶
func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error)
New returns a new JetStream instance. It uses default API prefix ($JS.API) for JetStream API requests. If a custom API prefix is required, use NewWithAPIPrefix or NewWithDomain.
Available options:
- WithClientTrace - enables request/response tracing.
- WithPublishAsyncErrHandler - sets error handler for async message publish.
- WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time.
func NewWithAPIPrefix ¶
func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (JetStream, error)
NewWithAPIPrefix returns a new JetStream instance and sets the API prefix to be used in requests to JetStream API. The API prefix will be used in API requests to JetStream, e.g. <prefix>.STREAM.INFO.<stream>.
Available options:
- WithClientTrace - enables request/response tracing.
- WithPublishAsyncErrHandler - sets error handler for async message publish.
- WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time.
func NewWithDomain ¶
func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStream, error)
NewWithDomain returns a new JetStream instance and sets the domain name token used when sending JetStream requests. The domain name token will be used in API requests to JetStream, e.g. $JS.<domain>.API.STREAM.INFO.<stream>.
Available options:
- WithClientTrace - enables request/response tracing.
- WithPublishAsyncErrHandler - sets error handler for async message publish.
- WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time.
type JetStreamError ¶
JetStreamError is an error result that happens when using JetStream. In case of client-side error, APIError returns nil.
var ( // ErrJetStreamNotEnabled is an error returned when JetStream is not // enabled. // // Note: This error will not be returned in clustered mode, even if each // server in the cluster does not have JetStream enabled. In clustered mode, // requests will time out instead. ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}} // ErrJetStreamNotEnabledForAccount is an error returned when JetStream is // not enabled for an account. ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}} // ErrStreamNotFound is an error returned when stream with given name does // not exist. ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}} // ErrStreamNameAlreadyInUse is returned when a stream with given name // already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} // ErrStreamSubjectTransformNotSupported is returned when the connected // nats-server version does not support setting the stream subject // transform. If this error is returned when executing CreateStream(), the // stream with invalid configuration was already created in the server. ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} // ErrStreamSourceSubjectTransformNotSupported is returned when the // connected nats-server version does not support setting the stream source // subject transform. If this error is returned when executing // CreateStream(), the stream with invalid configuration was already created // in the server. ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} // ErrStreamSourceNotSupported is returned when the connected nats-server // version does not support setting the stream sources. If this error is // returned when executing CreateStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} // ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the // connected nats-server version does not support setting the stream // sources. If this error is returned when executing CreateStream(), the // stream with invalid configuration was already created in the server. ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"} // ErrConsumerNotFound is an error returned when consumer with given name // does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} // ErrConsumerExists is returned when attempting to create a consumer with // CreateConsumer but a consumer with given name already exists. ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}} // ErrConsumerNameExists is returned when attempting to update a consumer // with UpdateConsumer but a consumer with given name does not exist. ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}} // ErrMsgNotFound is returned when message with provided sequence number // does not exist. ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}} // ErrBadRequest is returned when invalid request is sent to JetStream API. ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}} // ErrConsumerCreate is returned when nats-server reports error when // creating consumer (e.g. illegal update). ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}} // ErrDuplicateFilterSubjects is returned when both FilterSubject and // FilterSubjects are specified when creating consumer. ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} // ErrDuplicateFilterSubjects is returned when filter subjects overlap when // creating consumer. ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}} // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the // connected nats-server version does not support setting multiple filter // subjects with filter_subjects field. If this error is returned when // executing AddConsumer(), the consumer with invalid configuration was // already created in the server. ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} // ErrConsumerNotFound is an error returned when consumer with given name // does not exist. ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"} // ErrInvalidJSAck is returned when JetStream ack from message publish is // invalid. ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"} // ErrStreamNameRequired is returned when the provided stream name is empty. ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"} // ErrMsgAlreadyAckd is returned when attempting to acknowledge message more // than once. ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"} // ErrNoStreamResponse is returned when there is no response from stream // (e.g. no responders error). ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"} // ErrNotJSMessage is returned when attempting to get metadata from non // JetStream message. ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"} // ErrInvalidStreamName is returned when the provided stream name is invalid // (contains '.'). ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"} // ErrInvalidSubject is returned when the provided subject name is invalid. ErrInvalidSubject JetStreamError = &jsError{message: "invalid subject name"} // ErrInvalidConsumerName is returned when the provided consumer name is // invalid (contains '.'). ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"} // ErrNoMessages is returned when no messages are currently available for a // consumer. ErrNoMessages JetStreamError = &jsError{message: "no messages"} // ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set // on a pull request. ErrMaxBytesExceeded JetStreamError = &jsError{message: "message size exceeds max bytes"} // ErrConsumerDeleted is returned when attempting to send pull request to a // consumer which does not exist. ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"} // ErrConsumerLeadershipChanged is returned when pending requests are no // longer valid after leadership has changed. ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "leadership change"} // ErrHandlerRequired is returned when no handler func is provided in // Stream(). ErrHandlerRequired JetStreamError = &jsError{message: "handler cannot be empty"} // ErrEndOfData is returned when iterating over paged API from JetStream // reaches end of data. ErrEndOfData JetStreamError = &jsError{message: "end of data reached"} // ErrNoHeartbeat is received when no message is received in IdleHeartbeat // time (if set). ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"} // ErrConsumerHasActiveSubscription is returned when a consumer is already // subscribed to a stream. ErrConsumerHasActiveSubscription JetStreamError = &jsError{message: "consumer has active subscription"} // ErrMsgNotBound is returned when given message is not bound to any // subscription. ErrMsgNotBound JetStreamError = &jsError{message: "message is not bound to subscription/connection"} // ErrMsgNoReply is returned when attempting to reply to a message without a // reply subject. ErrMsgNoReply JetStreamError = &jsError{message: "message does not have a reply"} // ErrMsgDeleteUnsuccessful is returned when an attempt to delete a message // is unsuccessful. ErrMsgDeleteUnsuccessful JetStreamError = &jsError{message: "message deletion unsuccessful"} // ErrAsyncPublishReplySubjectSet is returned when reply subject is set on // async message publish. ErrAsyncPublishReplySubjectSet JetStreamError = &jsError{message: "reply subject should be empty"} // ErrTooManyStalledMsgs is returned when too many outstanding async // messages are waiting for ack. ErrTooManyStalledMsgs JetStreamError = &jsError{message: "stalled with too many outstanding async published messages"} // ErrInvalidOption is returned when there is a collision between options. ErrInvalidOption JetStreamError = &jsError{message: "invalid jetstream option"} // ErrMsgIteratorClosed is returned when attempting to get message from a // closed iterator. ErrMsgIteratorClosed JetStreamError = &jsError{message: "messages iterator closed"} // ErrOrderedConsumerReset is returned when resetting ordered consumer fails // due to too many attempts. ErrOrderedConsumerReset JetStreamError = &jsError{message: "recreating ordered consumer"} // ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already // used to process messages using Fetch (or FetchBytes). ErrOrderConsumerUsedAsFetch JetStreamError = &jsError{message: "ordered consumer initialized as fetch"} // ErrOrderConsumerUsedAsConsume is returned when ordered consumer was // already used to process messages using Consume or Messages. ErrOrderConsumerUsedAsConsume JetStreamError = &jsError{message: "ordered consumer initialized as consume"} // ErrOrderedConsumerConcurrentRequests is returned when attempting to run // concurrent operations on ordered consumers. ErrOrderedConsumerConcurrentRequests JetStreamError = &jsError{message: "cannot run concurrent processing using ordered consumer"} // ErrOrderedConsumerNotCreated is returned when trying to get consumer info // of an ordered consumer which was not yet created. ErrOrderedConsumerNotCreated JetStreamError = &jsError{message: "consumer instance not yet created"} // ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called. ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"} // ErrKeyExists is returned when attempting to create a key that already // exists. ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"} // ErrKeyValueConfigRequired is returned when attempting to create a bucket // without a config. ErrKeyValueConfigRequired JetStreamError = &jsError{message: "config required"} // ErrInvalidBucketName is returned when attempting to create a bucket with // an invalid name. ErrInvalidBucketName JetStreamError = &jsError{message: "invalid bucket name"} // ErrInvalidKey is returned when attempting to create a key with an invalid // name. ErrInvalidKey JetStreamError = &jsError{message: "invalid key"} // ErrBucketExists is returned when attempting to create a bucket that // already exists and has a different configuration. ErrBucketExists JetStreamError = &jsError{message: "bucket name already in use"} // ErrBucketNotFound is returned when attempting to access a bucket that // does not exist. ErrBucketNotFound JetStreamError = &jsError{message: "bucket not found"} // ErrBadBucket is returned when attempting to access a bucket that is not a // key-value store. ErrBadBucket JetStreamError = &jsError{message: "bucket not valid key-value store"} // ErrKeyNotFound is returned when attempting to access a key that does not // exist. ErrKeyNotFound JetStreamError = &jsError{message: "key not found"} // ErrKeyDeleted is returned when attempting to access a key that was // deleted. ErrKeyDeleted JetStreamError = &jsError{message: "key was deleted"} // ErrHistoryToLarge is returned when provided history limit is larger than // 64. ErrHistoryTooLarge JetStreamError = &jsError{message: "history limited to a max of 64"} // ErrNoKeysFound is returned when no keys are found. ErrNoKeysFound JetStreamError = &jsError{message: "no keys found"} // ErrObjectConfigRequired is returned when attempting to create an object // without a config. ErrObjectConfigRequired JetStreamError = &jsError{message: "object-store config required"} // ErrBadObjectMeta is returned when the meta information of an object is // invalid. ErrBadObjectMeta JetStreamError = &jsError{message: "object-store meta information invalid"} // ErrObjectNotFound is returned when an object is not found. ErrObjectNotFound JetStreamError = &jsError{message: "object not found"} // ErrInvalidStoreName is returned when the name of an object-store is // invalid. ErrInvalidStoreName JetStreamError = &jsError{message: "invalid object-store name"} // ErrDigestMismatch is returned when the digests of an object do not match. ErrDigestMismatch JetStreamError = &jsError{message: "received a corrupt object, digests do not match"} // ErrInvalidDigestFormat is returned when the digest hash of an object has // an invalid format. ErrInvalidDigestFormat JetStreamError = &jsError{message: "object digest hash has invalid format"} // ErrNoObjectsFound is returned when no objects are found. ErrNoObjectsFound JetStreamError = &jsError{message: "no objects found"} // ErrObjectAlreadyExists is returned when an object with the same name // already exists. ErrObjectAlreadyExists JetStreamError = &jsError{message: "an object already exists with that name"} // ErrNameRequired is returned when a name is required. ErrNameRequired JetStreamError = &jsError{message: "name is required"} // ErrLinkNotAllowed is returned when a link cannot be set when putting the // object in a bucket. ErrLinkNotAllowed JetStreamError = &jsError{message: "link cannot be set when putting the object in bucket"} // ErrObjectRequired is returned when an object is required. ErrObjectRequired = &jsError{message: "object required"} // ErrNoLinkToDeleted is returned when it is not allowed to link to a // deleted object. ErrNoLinkToDeleted JetStreamError = &jsError{message: "not allowed to link to a deleted object"} // ErrNoLinkToLink is returned when it is not allowed to link to another // link. ErrNoLinkToLink JetStreamError = &jsError{message: "not allowed to link to another link"} // ErrCantGetBucket is returned when an invalid Get is attempted on an // object that is a link to a bucket. ErrCantGetBucket JetStreamError = &jsError{message: "invalid Get, object is a link to a bucket"} // ErrBucketRequired is returned when a bucket is required. ErrBucketRequired JetStreamError = &jsError{message: "bucket required"} // ErrBucketMalformed is returned when a bucket is malformed. ErrBucketMalformed JetStreamError = &jsError{message: "bucket malformed"} // ErrUpdateMetaDeleted is returned when the meta information of a deleted // object cannot be updated. ErrUpdateMetaDeleted JetStreamError = &jsError{message: "cannot update meta for a deleted object"} )
type JetStreamOpt ¶
type JetStreamOpt func(*jsOpts) error
JetStreamOpt is a functional option for New, NewWithAPIPrefix and NewWithDomain methods.
func WithClientTrace ¶
func WithClientTrace(ct *ClientTrace) JetStreamOpt
WithClientTrace enables request/response API calls tracing.
func WithPublishAsyncErrHandler ¶
func WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt
WithPublishAsyncErrHandler sets error handler for async message publish.
func WithPublishAsyncMaxPending ¶
func WithPublishAsyncMaxPending(max int) JetStreamOpt
WithPublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
type KVDeleteOpt ¶
type KVDeleteOpt interface {
// contains filtered or unexported methods
}
KVDeleteOpt is used to configure delete and purge operations.
func LastRevision ¶
func LastRevision(revision uint64) KVDeleteOpt
LastRevision deletes if the latest revision matches the provided one. If the provided revision is not the latest, the delete will return an error.
type KVPurgeOpt ¶
type KVPurgeOpt interface {
// contains filtered or unexported methods
}
KVPurgeOpt is used to configure PurgeDeletes.
type KeyLister ¶
KeyLister is used to retrieve a list of key value store keys. It returns a channel to read the keys from. The lister will always close the channel when done (either all keys have been read or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all keys have been read.
type KeyValue ¶
type KeyValue interface { // Get returns the latest value for the key. If the key does not exist, // ErrKeyNotFound will be returned. Get(ctx context.Context, key string) (KeyValueEntry, error) // GetRevision returns a specific revision value for the key. If the key // does not exist or the provided revision does not exists, // ErrKeyNotFound will be returned. GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) // Put will place the new value for the key into the store. If the key // does not exist, it will be created. If the key exists, the value will // be updated. // // A key has to consist of alphanumeric characters, dashes, underscores, // equal signs, and dots. Put(ctx context.Context, key string, value []byte) (uint64, error) // PutString will place the string for the key into the store. If the // key does not exist, it will be created. If the key exists, the value // will be updated. // // A key has to consist of alphanumeric characters, dashes, underscores, // equal signs, and dots. PutString(ctx context.Context, key string, value string) (uint64, error) // Create will add the key/value pair if it does not exist. If the key // already exists, ErrKeyExists will be returned. // // A key has to consist of alphanumeric characters, dashes, underscores, // equal signs, and dots. Create(ctx context.Context, key string, value []byte) (uint64, error) // Update will update the value if the latest revision matches. // If the provided revision is not the latest, Update will return an error. // Update also resets the TTL associated with the key (if any). Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error) // Delete will place a delete marker and leave all revisions. A history // of a deleted key can still be retrieved by using the History method // or a watch on the key. [Delete] is a non-destructive operation and // will not remove any previous revisions from the underlying stream. // // [LastRevision] option can be specified to only perform delete if the // latest revision the provided one. Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error // Purge will place a delete marker and remove all previous revisions. // Only the latest revision will be preserved (with a delete marker). // Unlike [Delete], Purge is a destructive operation and will remove all // previous revisions from the underlying streams. // // [LastRevision] option can be specified to only perform purge if the // latest revision the provided one. Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error // Watch for any updates to keys that match the keys argument which // could include wildcards. By default, the watcher will send the latest // value for each key and all future updates. Watch will send a nil // entry when it has received all initial values. There are a few ways // to configure the watcher: // // - IncludeHistory will have the key watcher send all historical values // for each key (up to KeyValueMaxHistory). // - IgnoreDeletes will have the key watcher not pass any keys with // delete markers. // - UpdatesOnly will have the key watcher only pass updates on values // (without latest values when started). // - MetaOnly will have the key watcher retrieve only the entry meta // data, not the entry value. // - ResumeFromRevision instructs the key watcher to resume from a // specific revision number. Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) // WatchAll will watch for any updates to all keys. It can be configured // with the same options as Watch. WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. // Deprecated: Use ListKeys instead to avoid memory issues. Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) // ListKeys will return KeyLister, allowing to retrieve all keys from // the key value store in a streaming fashion (on a channel). ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) // History will return all historical values for the key (up to // KeyValueMaxHistory). History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the KV store name. Bucket() string // PurgeDeletes will remove all current delete markers. It can be // configured using DeleteMarkersOlderThan option to only remove delete // markers older than a certain duration. // // [PurgeDeletes] is a destructive operation and will remove all entries // with delete markers from the underlying stream. PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error // Status retrieves the status and configuration of a bucket. Status(ctx context.Context) (KeyValueStatus, error) }
KeyValue contains methods to operate on a KeyValue store. Using the KeyValue interface, it is possible to:
- Get, Put, Create, Update, Delete and Purge a key - Watch for updates to keys - List all keys - Retrieve historical values for a key - Retrieve status and configuration of a key value bucket - Purge all delete markers - Close the KeyValue store
type KeyValueBucketStatus ¶
type KeyValueBucketStatus struct {
// contains filtered or unexported fields
}
KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
func (*KeyValueBucketStatus) BackingStore ¶
func (s *KeyValueBucketStatus) BackingStore() string
BackingStore indicates what technology is used for storage of the bucket
func (*KeyValueBucketStatus) Bucket ¶
func (s *KeyValueBucketStatus) Bucket() string
Bucket the name of the bucket
func (*KeyValueBucketStatus) Bytes ¶
func (s *KeyValueBucketStatus) Bytes() uint64
Bytes is the size of the stream
func (*KeyValueBucketStatus) History ¶
func (s *KeyValueBucketStatus) History() int64
History returns the configured history kept per key
func (*KeyValueBucketStatus) IsCompressed ¶
func (s *KeyValueBucketStatus) IsCompressed() bool
IsCompressed indicates if the data is compressed on disk
func (*KeyValueBucketStatus) StreamInfo ¶
func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo
StreamInfo is the stream info retrieved to create the status
func (*KeyValueBucketStatus) TTL ¶
func (s *KeyValueBucketStatus) TTL() time.Duration
TTL is how long the bucket keeps values for
func (*KeyValueBucketStatus) Values ¶
func (s *KeyValueBucketStatus) Values() uint64
Values is how many messages are in the bucket, including historical values
type KeyValueConfig ¶
type KeyValueConfig struct { // Bucket is the name of the KeyValue store. Bucket name has to be // unique and can only contain alphanumeric characters, dashes, and // underscores. Bucket string `json:"bucket"` // Description is an optional description for the KeyValue store. Description string `json:"description,omitempty"` // MaxValueSize is the maximum size of a value in bytes. If not // specified, the default is -1 (unlimited). MaxValueSize int32 `json:"max_value_size,omitempty"` // History is the number of historical values to keep per key. If not // specified, the default is 1. Max is 64. History uint8 `json:"history,omitempty"` // TTL is the expiry time for keys. By default, keys do not expire. TTL time.Duration `json:"ttl,omitempty"` // MaxBytes is the maximum size in bytes of the KeyValue store. If not // specified, the default is -1 (unlimited). MaxBytes int64 `json:"max_bytes,omitempty"` // Storage is the type of storage to use for the KeyValue store. If not // specified, the default is FileStorage. Storage StorageType `json:"storage,omitempty"` // Replicas is the number of replicas to keep for the KeyValue store in // clustered jetstream. Defaults to 1, maximum is 5. Replicas int `json:"num_replicas,omitempty"` // Placement is used to declare where the stream should be placed via // tags and/or an explicit cluster name. Placement *Placement `json:"placement,omitempty"` // RePublish allows immediate republishing a message to the configured // subject after it's stored. RePublish *RePublish `json:"republish,omitempty"` // Mirror defines the consiguration for mirroring another KeyValue // store. Mirror *StreamSource `json:"mirror,omitempty"` // Sources defines the configuration for sources of a KeyValue store. Sources []*StreamSource `json:"sources,omitempty"` // Compression sets the underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ Compression bool `json:"compression,omitempty"` }
KeyValueConfig is the configuration for a KeyValue store.
type KeyValueEntry ¶
type KeyValueEntry interface { // Bucket is the bucket the data was loaded from. Bucket() string // Key is the name of the key that was retrieved. Key() string // Value is the retrieved value. Value() []byte // Revision is a unique sequence for this value. Revision() uint64 // Created is the time the data was put in the bucket. Created() time.Time // Delta is distance from the latest value (how far the current sequence // is from the latest). Delta() uint64 // Operation returns Put or Delete or Purge, depending on the manner in // which the current revision was created. Operation() KeyValueOp }
KeyValueEntry is a retrieved entry for Get, List or Watch.
type KeyValueLister ¶
type KeyValueLister interface { Status() <-chan KeyValueStatus Error() error }
KeyValueLister is used to retrieve a list of key value stores. It returns a channel to read the KV store statuses from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all KeyValue stores have been read.
type KeyValueManager ¶
type KeyValueManager interface { // KeyValue will lookup and bind to an existing KeyValue store. // // If the KeyValue store with given name does not exist, // ErrBucketNotFound will be returned. KeyValue(ctx context.Context, bucket string) (KeyValue, error) // CreateKeyValue will create a KeyValue store with the given // configuration. // // If a KeyValue store with the same name already exists and the // configuration is different, ErrBucketExists will be returned. CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) // UpdateKeyValue will update an existing KeyValue store with the given // configuration. // // If a KeyValue store with the given name does not exist, ErrBucketNotFound // will be returned. UpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) // CreateOrUpdateKeyValue will create a KeyValue store if it does not // exist or update an existing KeyValue store with the given // configuration (if possible). CreateOrUpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) // DeleteKeyValue will delete this KeyValue store. // // If the KeyValue store with given name does not exist, // ErrBucketNotFound will be returned. DeleteKeyValue(ctx context.Context, bucket string) error // KeyValueStoreNames is used to retrieve a list of key value store // names. It returns a KeyValueNamesLister exposing a channel to read // the names from. The lister will always close the channel when done // (either all names have been read or an error occurred) and therefore // can be used in range loops. KeyValueStoreNames(ctx context.Context) KeyValueNamesLister // KeyValueStores is used to retrieve a list of key value store // statuses. It returns a KeyValueLister exposing a channel to read the // statuses from. The lister will always close the channel when done // (either all statuses have been read or an error occurred) and // therefore can be used in range loops. KeyValueStores(ctx context.Context) KeyValueLister }
KeyValueManager is used to manage KeyValue stores. It provides methods to create, delete, and retrieve KeyValue stores.
type KeyValueNamesLister ¶
KeyValueNamesLister is used to retrieve a list of key value store names. It returns a channel to read the KV bucket names from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all bucket names have been read.
type KeyValueOp ¶
type KeyValueOp uint8
KeyValueOp represents the type of KV operation (Put, Delete, Purge). It is a part of KeyValueEntry.
const ( // KeyValuePut is a set on a revision which creates or updates a value for a // key. KeyValuePut KeyValueOp = iota // KeyValueDelete is a set on a revision which adds a delete marker for a // key. KeyValueDelete // KeyValuePurge is a set on a revision which removes all previous revisions // for a key. KeyValuePurge )
Available KeyValueOp values.
func (KeyValueOp) String ¶
func (op KeyValueOp) String() string
type KeyValueStatus ¶
type KeyValueStatus interface { // Bucket returns the name of the KeyValue store. Bucket() string // Values is how many messages are in the bucket, including historical values. Values() uint64 // History returns the configured history kept per key. History() int64 // TTL returns the duration for which keys are kept in the bucket. TTL() time.Duration // BackingStore indicates what technology is used for storage of the bucket. // Currently only JetStream is supported. BackingStore() string // Bytes returns the size of the bucket in bytes. Bytes() uint64 // IsCompressed indicates if the data is compressed on disk. IsCompressed() bool }
KeyValueStatus is run-time status about a Key-Value bucket.
type KeyWatcher ¶
type KeyWatcher interface { Updates() <-chan KeyValueEntry Stop() error }
KeyWatcher is what is returned when doing a watch. It can be used to retrieve updates to keys. If not using UpdatesOnly option, it will also send the latest value for each key. After all initial values have been sent, a nil entry will be sent. Stop can be used to stop the watcher and close the underlying channel. Watcher will not close the channel until Stop is called or connection is closed.
type ListObjectsOpt ¶
type ListObjectsOpt func(opts *listObjectOpts) error
ListObjectsOpt is used to set additional options when listing objects.
func ListObjectsShowDeleted ¶
func ListObjectsShowDeleted() ListObjectsOpt
ListObjectsShowDeleted makes [ObjectStore.ListObjects] also return deleted objects.
type MessageBatch ¶
type MessageHandler ¶
type MessageHandler func(msg Msg)
MessageHandler is a handler function used as callback in [Consume].
type MessagesContext ¶
type MessagesContext interface { // Next retrieves next message on a stream. It will block until the next // message is available. If the context is canceled, Next will return // ErrMsgIteratorClosed error. Next() (Msg, error) // Stop unsubscribes from the stream and cancels subscription. Calling // Next after calling Stop will return ErrMsgIteratorClosed error. // All messages that are already in the buffer are discarded. Stop() // Drain unsubscribes from the stream and cancels subscription. All // messages that are already in the buffer will be available on // subsequent calls to Next. After the buffer is drained, Next will // return ErrMsgIteratorClosed error. Drain() }
MessagesContext supports iterating over a messages on a stream. It is returned by [Consumer.Messages] method.
type Msg ¶
type Msg interface { // Metadata returns [MsgMetadata] for a JetStream message. Metadata() (*MsgMetadata, error) // Data returns the message body. Data() []byte // Headers returns a map of headers for a message. Headers() nats.Header // Subject returns a subject on which a message was published/received. Subject() string // Reply returns a reply subject for a message. Reply() string // Ack acknowledges a message. This tells the server that the message was // successfully processed and it can move on to the next message. Ack() error // DoubleAck acknowledges a message and waits for ack reply from the server. // While it impacts performance, it is useful for scenarios where // message loss is not acceptable. DoubleAck(context.Context) error // Nak negatively acknowledges a message. This tells the server to // redeliver the message. // // Nak does not adhere to AckWait or Backoff configured on the consumer // and triggers instant redelivery. For a delayed redelivery, use // NakWithDelay. Nak() error // NakWithDelay negatively acknowledges a message. This tells the server // to redeliver the message after the given delay. NakWithDelay(delay time.Duration) error // InProgress tells the server that this message is being worked on. It // resets the redelivery timer on the server. InProgress() error // Term tells the server to not redeliver this message, regardless of // the value of MaxDeliver. Term() error // TermWithReason tells the server to not redeliver this message, regardless of // the value of MaxDeliver. The provided reason will be included in JetStream // advisory event sent by the server. // // Note: This will only work with JetStream servers >= 2.10.4. // For older servers, TermWithReason will be ignored by the server and the message // will not be terminated. TermWithReason(reason string) error }
Msg contains methods to operate on a JetStream message. Metadata, Data, Headers, Subject and Reply can be used to retrieve the specific parts of the underlying message. Ack, DoubleAck, Nak, NakWithDelay, InProgress and Term are various flavors of ack requests.
type MsgErrHandler ¶
MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync. It will return the original message sent to the server for possible retransmitting and the error encountered.
type MsgMetadata ¶
type MsgMetadata struct { // Sequence is the sequence information for the message. Sequence SequencePair // NumDelivered is the number of times this message was delivered to the // consumer. NumDelivered uint64 // NumPending is the number of messages that match the consumer's // filter, but have not been delivered yet. NumPending uint64 // Timestamp is the time the message was originally stored on a stream. Timestamp time.Time // Stream is the stream name this message is stored on. Stream string // Consumer is the consumer name this message was delivered to. Consumer string // Domain is the domain this message was received on. Domain string }
MsgMetadata is the JetStream metadata associated with received messages.
type ObjectBucketStatus ¶
type ObjectBucketStatus struct {
// contains filtered or unexported fields
}
ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus
func (*ObjectBucketStatus) BackingStore ¶
func (s *ObjectBucketStatus) BackingStore() string
BackingStore indicates what technology is used for storage of the bucket
func (*ObjectBucketStatus) Bucket ¶
func (s *ObjectBucketStatus) Bucket() string
Bucket is the name of the bucket
func (*ObjectBucketStatus) Description ¶
func (s *ObjectBucketStatus) Description() string
Description is the description supplied when creating the bucket
func (*ObjectBucketStatus) IsCompressed ¶
func (s *ObjectBucketStatus) IsCompressed() bool
IsCompressed indicates if the data is compressed on disk
func (*ObjectBucketStatus) Metadata ¶
func (s *ObjectBucketStatus) Metadata() map[string]string
Metadata is the metadata supplied when creating the bucket
func (*ObjectBucketStatus) Replicas ¶
func (s *ObjectBucketStatus) Replicas() int
Replicas indicates how many storage replicas are kept for the data in the bucket
func (*ObjectBucketStatus) Sealed ¶
func (s *ObjectBucketStatus) Sealed() bool
Sealed indicates the stream is sealed and cannot be modified in any way
func (*ObjectBucketStatus) Size ¶
func (s *ObjectBucketStatus) Size() uint64
Size is the combined size of all data in the bucket including metadata, in bytes
func (*ObjectBucketStatus) Storage ¶
func (s *ObjectBucketStatus) Storage() StorageType
Storage indicates the underlying JetStream storage technology used to store data
func (*ObjectBucketStatus) StreamInfo ¶
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo
StreamInfo is the stream info retrieved to create the status
func (*ObjectBucketStatus) TTL ¶
func (s *ObjectBucketStatus) TTL() time.Duration
TTL indicates how long objects are kept in the bucket
type ObjectInfo ¶
type ObjectInfo struct { // ObjectMeta contains high level information about the object. ObjectMeta // Bucket is the name of the object store. Bucket string `json:"bucket"` // NUID is the unique identifier for the object set when putting the // object into the store. NUID string `json:"nuid"` // Size is the size of the object in bytes. It only includes the size of // the object itself, not the metadata. Size uint64 `json:"size"` // ModTime is the last modification time of the object. ModTime time.Time `json:"mtime"` // Chunks is the number of chunks the object is split into. Maximum size // of each chunk can be specified in ObjectMetaOptions. Chunks uint32 `json:"chunks"` // Digest is the SHA-256 digest of the object. It is used to verify the // integrity of the object. Digest string `json:"digest,omitempty"` // Deleted indicates if the object is marked as deleted. Deleted bool `json:"deleted,omitempty"` }
ObjectInfo contains ObjectMeta and additional information about an object.
type ObjectLink ¶
type ObjectLink struct { // Bucket is the name of the object store the link is pointing to. Bucket string `json:"bucket"` // Name can be used to link to a single object. // If empty means this is a link to the whole store, like a directory. Name string `json:"name,omitempty"` }
ObjectLink is used to embed links to other buckets and objects.
type ObjectMeta ¶
type ObjectMeta struct { // Name is the name of the object. The name is required when adding an // object and has to be unique within the object store. Name string `json:"name"` // Description is an optional description for the object. Description string `json:"description,omitempty"` // Headers is an optional set of user-defined headers for the object. Headers nats.Header `json:"headers,omitempty"` // Metadata is the user supplied metadata for the object. Metadata map[string]string `json:"metadata,omitempty"` // Additional options for the object. Opts *ObjectMetaOptions `json:"options,omitempty"` }
ObjectMeta is high level information about an object.
type ObjectMetaOptions ¶
type ObjectMetaOptions struct { // Link contains information about a link to another object or object store. // It should not be set manually, but rather by using the AddLink or // AddBucketLink methods. Link *ObjectLink `json:"link,omitempty"` // ChunkSize is the maximum size of each chunk in bytes. If not specified, // the default is 128k. ChunkSize uint32 `json:"max_chunk_size,omitempty"` }
ObjectMetaOptions is used to set additional options when creating an object.
type ObjectResult ¶
type ObjectResult interface { io.ReadCloser Info() (*ObjectInfo, error) Error() error }
ObjectResult will return the object info and a reader to read the object's contents. The reader will be closed when all data has been read or an error occurs.
type ObjectStore ¶
type ObjectStore interface { // Put will place the contents from the reader into a new object. If the // object already exists, it will be overwritten. The object name is // required and is taken from the ObjectMeta.Name field. // // The reader will be read until EOF. ObjectInfo will be returned, containing // the object's metadata, digest and instance information. Put(ctx context.Context, obj ObjectMeta, reader io.Reader) (*ObjectInfo, error) // PutBytes is convenience function to put a byte slice into this object // store under the given name. // // ObjectInfo will be returned, containing the object's metadata, digest // and instance information. PutBytes(ctx context.Context, name string, data []byte) (*ObjectInfo, error) // PutString is convenience function to put a string into this object // store under the given name. // // ObjectInfo will be returned, containing the object's metadata, digest // and instance information. PutString(ctx context.Context, name string, data string) (*ObjectInfo, error) // PutFile is convenience function to put a file contents into this // object store. The name of the object will be the path of the file. // // ObjectInfo will be returned, containing the object's metadata, digest // and instance information. PutFile(ctx context.Context, file string) (*ObjectInfo, error) // Get will pull the named object from the object store. If the object // does not exist, ErrObjectNotFound will be returned. // // The returned ObjectResult will contain the object's metadata and a // reader to read the object's contents. The reader will be closed when // all data has been read or an error occurs. // // A GetObjectShowDeleted option can be supplied to return an object // even if it was marked as deleted. Get(ctx context.Context, name string, opts ...GetObjectOpt) (ObjectResult, error) // GetBytes is a convenience function to pull an object from this object // store and return it as a byte slice. // // If the object does not exist, ErrObjectNotFound will be returned. // // A GetObjectShowDeleted option can be supplied to return an object // even if it was marked as deleted. GetBytes(ctx context.Context, name string, opts ...GetObjectOpt) ([]byte, error) // GetString is a convenience function to pull an object from this // object store and return it as a string. // // If the object does not exist, ErrObjectNotFound will be returned. // // A GetObjectShowDeleted option can be supplied to return an object // even if it was marked as deleted. GetString(ctx context.Context, name string, opts ...GetObjectOpt) (string, error) // GetFile is a convenience function to pull an object from this object // store and place it in a file. If the file already exists, it will be // overwritten, otherwise it will be created. // // If the object does not exist, ErrObjectNotFound will be returned. // A GetObjectShowDeleted option can be supplied to return an object // even if it was marked as deleted. GetFile(ctx context.Context, name, file string, opts ...GetObjectOpt) error // GetInfo will retrieve the current information for the object, containing // the object's metadata and instance information. // // If the object does not exist, ErrObjectNotFound will be returned. // // A GetObjectInfoShowDeleted option can be supplied to return an object // even if it was marked as deleted. GetInfo(ctx context.Context, name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) // UpdateMeta will update the metadata for the object. // // If the object does not exist, ErrUpdateMetaDeleted will be returned. // If the new name is different from the old name, and an object with the // new name already exists, ErrObjectAlreadyExists will be returned. UpdateMeta(ctx context.Context, name string, meta ObjectMeta) error // Delete will delete the named object from the object store. If the object // does not exist, ErrObjectNotFound will be returned. If the object is // already deleted, no error will be returned. // // All chunks for the object will be purged, and the object will be marked // as deleted. Delete(ctx context.Context, name string) error // AddLink will add a link to another object. A link is a reference to // another object. The provided name is the name of the link object. // The provided ObjectInfo is the info of the object being linked to. // // If an object with given name already exists, ErrObjectAlreadyExists // will be returned. // If object being linked to is deleted, ErrNoLinkToDeleted will be // returned. // If the provided object is a link, ErrNoLinkToLink will be returned. // If the provided object is nil or the name is empty, ErrObjectRequired // will be returned. AddLink(ctx context.Context, name string, obj *ObjectInfo) (*ObjectInfo, error) // AddBucketLink will add a link to another object store. A link is a // reference to another object store. The provided name is the name of // the link object. // The provided ObjectStore is the object store being linked to. // // If an object with given name already exists, ErrObjectAlreadyExists // will be returned. // If the provided object store is nil ErrBucketRequired will be returned. AddBucketLink(ctx context.Context, name string, bucket ObjectStore) (*ObjectInfo, error) // Seal will seal the object store, no further modifications will be allowed. Seal(ctx context.Context) error // Watch for any updates to objects in the store. By default, the watcher will send the latest // info for each object and all future updates. Watch will send a nil // entry when it has received all initial values. There are a few ways // to configure the watcher: // // - IncludeHistory will have the watcher send all historical information // for each object. // - IgnoreDeletes will have the watcher not pass any objects with // delete markers. // - UpdatesOnly will have the watcher only pass updates on objects // (without latest info when started). Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, error) // List will list information about objects in the store. // // If the object store is empty, ErrNoObjectsFound will be returned. List(ctx context.Context, opts ...ListObjectsOpt) ([]*ObjectInfo, error) // Status retrieves the status and configuration of the bucket. Status(ctx context.Context) (ObjectStoreStatus, error) }
ObjectStore contains methods to operate on an object store. Using the ObjectStore interface, it is possible to:
- Perform CRUD operations on objects (Get, Put, Delete). Get and put expose convenience methods to work with byte slices, strings and files, in addition to streaming io.Reader
- Get information about an object without retrieving it.
- Update the metadata of an object.
- Add links to other objects or object stores.
- Watch for updates to a store
- List information about objects in a store
- Retrieve status and configuration of an object store.
type ObjectStoreConfig ¶
type ObjectStoreConfig struct { // Bucket is the name of the object store. Bucket name has to be // unique and can only contain alphanumeric characters, dashes, and // underscores. Bucket string `json:"bucket"` // Description is an optional description for the object store. Description string `json:"description,omitempty"` // TTL is the maximum age of objects in the store. If an object is not // updated within this time, it will be removed from the store. // By default, objects do not expire. TTL time.Duration `json:"max_age,omitempty"` // MaxBytes is the maximum size of the object store. If not specified, // the default is -1 (unlimited). MaxBytes int64 `json:"max_bytes,omitempty"` // Storage is the type of storage to use for the object store. If not // specified, the default is FileStorage. Storage StorageType `json:"storage,omitempty"` // Replicas is the number of replicas to keep for the object store in // clustered jetstream. Defaults to 1, maximum is 5. Replicas int `json:"num_replicas,omitempty"` // Placement is used to declare where the object store should be placed via // tags and/or an explicit cluster name. Placement *Placement `json:"placement,omitempty"` // Compression enables the underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ Compression bool `json:"compression,omitempty"` // Bucket-specific metadata // NOTE: Metadata requires nats-server v2.10.0+ Metadata map[string]string `json:"metadata,omitempty"` }
ObjectStoreConfig is the configuration for the object store.
type ObjectStoreManager ¶
type ObjectStoreManager interface { // ObjectStore will look up and bind to an existing object store // instance. // // If the object store with given name does not exist, ErrBucketNotFound // will be returned. ObjectStore(ctx context.Context, bucket string) (ObjectStore, error) // CreateObjectStore will create a new object store with the given // configuration. // // If the object store with given name already exists, ErrBucketExists // will be returned. CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) // UpdateObjectStore will update an existing object store with the given // configuration. // // If the object store with given name does not exist, ErrBucketNotFound // will be returned. UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) // CreateOrUpdateObjectStore will create a new object store with the given // configuration if it does not exist, or update an existing object store // with the given configuration. CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) // DeleteObjectStore will delete the provided object store. // // If the object store with given name does not exist, ErrBucketNotFound // will be returned. DeleteObjectStore(ctx context.Context, bucket string) error // ObjectStoreNames is used to retrieve a list of bucket names. // It returns an ObjectStoreNamesLister exposing a channel to receive // the names of the object stores. // // The lister will always close the channel when done (either all names // have been read or an error occurred) and therefore can be used in a // for-range loop. ObjectStoreNames(ctx context.Context) ObjectStoreNamesLister // ObjectStores is used to retrieve a list of bucket statuses. // It returns an ObjectStoresLister exposing a channel to receive // the statuses of the object stores. // // The lister will always close the channel when done (either all statuses // have been read or an error occurred) and therefore can be used in a // for-range loop. ObjectStores(ctx context.Context) ObjectStoresLister }
ObjectStoreManager is used to manage object stores. It provides methods CRUD operations on object stores.
type ObjectStoreNamesLister ¶
ObjectStoreNamesLister is used to retrieve a list of object store names. It returns a channel to read the bucket names from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all bucket names have been read.
type ObjectStoreStatus ¶
type ObjectStoreStatus interface { // Bucket returns the name of the object store. Bucket() string // Description is the description supplied when creating the bucket. Description() string // TTL indicates how long objects are kept in the bucket. TTL() time.Duration // Storage indicates the underlying JetStream storage technology used to // store data. Storage() StorageType // Replicas indicates how many storage replicas are kept for the data in // the bucket. Replicas() int // Sealed indicates the stream is sealed and cannot be modified in any // way. Sealed() bool // Size is the combined size of all data in the bucket including // metadata, in bytes. Size() uint64 // BackingStore indicates what technology is used for storage of the // bucket. Currently only JetStream is supported. BackingStore() string // Metadata is the user supplied metadata for the bucket. Metadata() map[string]string // IsCompressed indicates if the data is compressed on disk. IsCompressed() bool }
ObjectStoreStatus is run-time status about a bucket.
type ObjectStoresLister ¶
type ObjectStoresLister interface { Status() <-chan ObjectStoreStatus Error() error }
ObjectStoresLister is used to retrieve a list of object stores. It returns a channel to read the bucket store statuses from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all object stores have been read.
type ObjectWatcher ¶
type ObjectWatcher interface { Updates() <-chan *ObjectInfo Stop() error }
ObjectWatcher is what is returned when doing a watch. It can be used to retrieve updates to objects in a bucket. If not using UpdatesOnly option, it will also send the latest value for each key. After all initial values have been sent, a nil entry will be sent. Stop can be used to stop the watcher and close the underlying channel. Watcher will not close the channel until Stop is called or connection is closed.
type OrderedConsumerConfig ¶
type OrderedConsumerConfig struct { // FilterSubjects allows filtering messages from a stream by subject. // This field is exclusive with FilterSubject. Requires nats-server // v2.10.0 or later. FilterSubjects []string `json:"filter_subjects,omitempty"` // DeliverPolicy defines from which point to start delivering messages // from the stream. Defaults to DeliverAllPolicy. DeliverPolicy DeliverPolicy `json:"deliver_policy"` // OptStartSeq is an optional sequence number from which to start // message delivery. Only applicable when DeliverPolicy is set to // DeliverByStartSequencePolicy. OptStartSeq uint64 `json:"opt_start_seq,omitempty"` // OptStartTime is an optional time from which to start message // delivery. Only applicable when DeliverPolicy is set to // DeliverByStartTimePolicy. OptStartTime *time.Time `json:"opt_start_time,omitempty"` // ReplayPolicy defines the rate at which messages are sent to the // consumer. If ReplayOriginalPolicy is set, messages are sent in the // same intervals in which they were stored on stream. This can be used // e.g. to simulate production traffic in development environments. If // ReplayInstantPolicy is set, messages are sent as fast as possible. // Defaults to ReplayInstantPolicy. ReplayPolicy ReplayPolicy `json:"replay_policy"` // InactiveThreshold is a duration which instructs the server to clean // up the consumer if it has been inactive for the specified duration. // Defaults to 5s. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` // HeadersOnly indicates whether only headers of messages should be sent // (and no payload). Defaults to false. HeadersOnly bool `json:"headers_only,omitempty"` // Maximum number of attempts for the consumer to be recreated in a // single recreation cycle. Defaults to unlimited. MaxResetAttempts int }
OrderedConsumerConfig is the configuration of an ordered JetStream consumer. For more information, see Ordered Consumers in README
type PeerInfo ¶
type PeerInfo struct { // Name is the server name of the peer. Name string `json:"name"` // Current indicates if the peer is up to date and synchronized with the // leader. Current bool `json:"current"` // Offline indicates if the peer is considered offline by the group. Offline bool `json:"offline,omitempty"` // Active it the duration since this peer was last seen. Active time.Duration `json:"active"` // Lag is the number of uncommitted operations this peer is behind the // leader. Lag uint64 `json:"lag,omitempty"` }
PeerInfo shows information about the peers in the cluster that are supporting the stream or consumer.
type Placement ¶
type Placement struct { // Cluster is the name of the cluster to which the stream should be // assigned. Cluster string `json:"cluster"` // Tags are used to match streams to servers in the cluster. A stream // will be assigned to a server with a matching tag. Tags []string `json:"tags,omitempty"` }
Placement is used to guide placement of streams in clustered JetStream.
type PubAck ¶
type PubAck struct { // Stream is the stream name the message was published to. Stream string `json:"stream"` // Sequence is the stream sequence number of the message. Sequence uint64 `json:"seq"` // Duplicate indicates whether the message was a duplicate. // Duplicate can be detected using the [MsgIDHeader] and [StreamConfig.Duplicates]. Duplicate bool `json:"duplicate,omitempty"` // Domain is the domain the message was published to. Domain string `json:"domain,omitempty"` }
PubAck is an ack received after successfully publishing a message.
type PubAckFuture ¶
type PubAckFuture interface { // Ok returns a receive only channel that can be used to get a PubAck. Ok() <-chan *PubAck // Err returns a receive only channel that can be used to get the error from an async publish. Err() <-chan error // Msg returns the message that was sent to the server. Msg() *nats.Msg }
PubAckFuture is a future for a PubAck. It can be used to wait for a PubAck or an error after an async publish.
type PublishOpt ¶
type PublishOpt func(*pubOpts) error
PublishOpt are the options that can be passed to Publish methods.
func WithExpectLastMsgID ¶
func WithExpectLastMsgID(id string) PublishOpt
WithExpectLastMsgID sets the expected message ID the last message on a stream should have. If the last message has a different message ID server will reject the message and publish will fail.
func WithExpectLastSequence ¶
func WithExpectLastSequence(seq uint64) PublishOpt
WithExpectLastSequence sets the expected sequence number the last message on a stream should have. If the last message has a different sequence number server will reject the message and publish will fail.
func WithExpectLastSequencePerSubject ¶
func WithExpectLastSequencePerSubject(seq uint64) PublishOpt
WithExpectLastSequencePerSubject sets the expected sequence number the last message on a subject the message is published to. If the last message on a subject has a different sequence number server will reject the message and publish will fail.
func WithExpectStream ¶
func WithExpectStream(stream string) PublishOpt
WithExpectStream sets the expected stream the message should be published to. If the message is published to a different stream server will reject the message and publish will fail.
func WithMsgID ¶
func WithMsgID(id string) PublishOpt
WithMsgID sets the message ID used for deduplication.
func WithRetryAttempts ¶
func WithRetryAttempts(num int) PublishOpt
WithRetryAttempts sets the retry number of attempts when ErrNoResponders is encountered. Defaults to 2
func WithRetryWait ¶
func WithRetryWait(dur time.Duration) PublishOpt
WithRetryWait sets the retry wait time when ErrNoResponders is encountered. Defaults to 250ms.
func WithStallWait ¶
func WithStallWait(ttl time.Duration) PublishOpt
WithStallWait sets the max wait when the producer becomes stall producing messages. If a publish call is blocked for this long, ErrTooManyStalledMsgs is returned.
type Publisher ¶
type Publisher interface { // Publish performs a synchronous publish to a stream and waits for ack // from server. It accepts subject name (which must be bound to a stream) // and message payload. Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOpt) (*PubAck, error) // PublishMsg performs a synchronous publish to a stream and waits for // ack from server. It accepts subject name (which must be bound to a // stream) and nats.Message. PublishMsg(ctx context.Context, msg *nats.Msg, opts ...PublishOpt) (*PubAck, error) // PublishAsync performs a publish to a stream and returns // [PubAckFuture] interface, not blocking while waiting for an // acknowledgement. It accepts subject name (which must be bound to a // stream) and message payload. // // PublishAsync does not guarantee that the message has been // received by the server. It only guarantees that the message has been // sent to the server and thus messages can be stored in the stream // out of order in case of retries. PublishAsync(subject string, payload []byte, opts ...PublishOpt) (PubAckFuture, error) // PublishMsgAsync performs a publish to a stream and returns // [PubAckFuture] interface, not blocking while waiting for an // acknowledgement. It accepts subject name (which must // be bound to a stream) and nats.Message. // // PublishMsgAsync does not guarantee that the message has been // sent to the server and thus messages can be stored in the stream // received by the server. It only guarantees that the message has been // out of order in case of retries. PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) // PublishAsyncPending returns the number of async publishes outstanding // for this context. An outstanding publish is one that has been // sent by the publisher but has not yet received an ack. PublishAsyncPending() int // PublishAsyncComplete returns a channel that will be closed when all // outstanding asynchronously published messages are acknowledged by the // server. PublishAsyncComplete() <-chan struct{} // CleanupPublisher will cleanup the publishing side of JetStreamContext. // // This will unsubscribe from the internal reply subject if needed. // All pending async publishes will fail with ErrJetStreamContextClosed. // // If an error handler was provided, it will be called for each pending async // publish and PublishAsyncComplete will be closed. // // After completing JetStreamContext is still usable - internal subscription // will be recreated on next publish, but the acks from previous publishes will // be lost. CleanupPublisher() }
Publisher provides methods for publishing messages to a stream. It is available as a part of JetStream interface. The behavior of Publisher can be customized using PublishOpt options.
type PullConsumeOpt ¶
type PullConsumeOpt interface {
// contains filtered or unexported methods
}
PullConsumeOpt represent additional options used in [Consume] for pull consumers.
func ConsumeErrHandler ¶
func ConsumeErrHandler(cb ConsumeErrHandlerFunc) PullConsumeOpt
ConsumeErrHandler sets custom error handler invoked when an error was encountered while consuming messages It will be invoked for both terminal (Consumer Deleted, invalid request body) and non-terminal (e.g. missing heartbeats) errors.
type PullExpiry ¶
PullExpiry sets timeout on a single pull request, waiting until at least one message is available. If not provided, a default of 30 seconds will be used.
type PullHeartbeat ¶
PullHeartbeat sets the idle heartbeat duration for a pull subscription If a client does not receive a heartbeat message from a stream for more than the idle heartbeat setting, the subscription will be removed and error will be passed to the message handler. If not provided, a default PullExpiry / 2 will be used (capped at 30 seconds)
type PullMaxBytes ¶
type PullMaxBytes int
PullMaxBytes limits the number of bytes to be buffered in the client. If not provided, the limit is not set (max messages will be used instead). This option is exclusive with PullMaxMessages.
type PullMaxMessages ¶
type PullMaxMessages int
PullMaxMessages limits the number of messages to be buffered in the client. If not provided, a default of 500 messages will be used. This option is exclusive with PullMaxBytes.
type PullMessagesOpt ¶
type PullMessagesOpt interface {
// contains filtered or unexported methods
}
PullMessagesOpt represent additional options used in [Messages] for pull consumers.
func WithMessagesErrOnMissingHeartbeat ¶
func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt
WithMessagesErrOnMissingHeartbeat sets whether a missing heartbeat error should be reported when calling [MessagesContext.Next] (Default: true).
type PullThresholdBytes ¶
type PullThresholdBytes int
PullThresholdBytes sets the byte count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxBytes (if set).
type PullThresholdMessages ¶
type PullThresholdMessages int
PullThresholdMessages sets the message count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxMessages.
type RawStreamMsg ¶
type RePublish ¶
type RePublish struct { // Source is the subject pattern to match incoming messages against. Source string `json:"src,omitempty"` // Destination is the subject pattern to republish the subject to. Destination string `json:"dest"` // HeadersOnly is a flag to indicate that only the headers should be // republished. HeadersOnly bool `json:"headers_only,omitempty"` }
RePublish is for republishing messages once committed to a stream. The original subject is remapped from the subject pattern to the destination pattern.
type ReplayPolicy ¶
type ReplayPolicy int
ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
const ( // ReplayInstantPolicy will replay messages as fast as possible. ReplayInstantPolicy ReplayPolicy = iota // ReplayOriginalPolicy will maintain the same timing as the messages were // received. ReplayOriginalPolicy )
func (ReplayPolicy) MarshalJSON ¶
func (p ReplayPolicy) MarshalJSON() ([]byte, error)
func (ReplayPolicy) String ¶
func (p ReplayPolicy) String() string
func (*ReplayPolicy) UnmarshalJSON ¶
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy determines how messages in a stream are retained.
const ( // LimitsPolicy (default) means that messages are retained until any given // limit is reached. This could be one of MaxMsgs, MaxBytes, or MaxAge. LimitsPolicy RetentionPolicy = iota // InterestPolicy specifies that when all known observables have // acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber // acknowledges the message it can be removed. WorkQueuePolicy )
func (RetentionPolicy) MarshalJSON ¶
func (rp RetentionPolicy) MarshalJSON() ([]byte, error)
func (RetentionPolicy) String ¶
func (rp RetentionPolicy) String() string
func (*RetentionPolicy) UnmarshalJSON ¶
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error
type SequenceInfo ¶
type SequenceInfo struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` Last *time.Time `json:"last_active,omitempty"` }
SequenceInfo has both the consumer and the stream sequence and last activity.
type SequencePair ¶
type SequencePair struct { // Consumer is the consumer sequence number for message deliveries. This // is the total number of messages the consumer has seen (including // redeliveries). Consumer uint64 `json:"consumer_seq"` // Stream is the stream sequence number for a message. Stream uint64 `json:"stream_seq"` }
SequencePair includes the consumer and stream sequence numbers for a message.
type StopAfter ¶
type StopAfter int
StopAfter sets the number of messages after which the consumer is automatically stopped and no more messages are pulled from the server.
type StorageType ¶
type StorageType int
StorageType determines how messages are stored for retention.
const ( // FileStorage specifies on disk storage. It's the default. FileStorage StorageType = iota // MemoryStorage specifies in memory only. MemoryStorage )
func (StorageType) MarshalJSON ¶
func (st StorageType) MarshalJSON() ([]byte, error)
func (StorageType) String ¶
func (st StorageType) String() string
func (*StorageType) UnmarshalJSON ¶
func (st *StorageType) UnmarshalJSON(data []byte) error
type StoreCompression ¶
type StoreCompression uint8
StoreCompression determines how messages are compressed.
const ( // NoCompression disables compression on the stream. This is the default. NoCompression StoreCompression = iota // S2Compression enables S2 compression on the stream. S2Compression )
func (StoreCompression) MarshalJSON ¶
func (alg StoreCompression) MarshalJSON() ([]byte, error)
func (StoreCompression) String ¶
func (alg StoreCompression) String() string
func (*StoreCompression) UnmarshalJSON ¶
func (alg *StoreCompression) UnmarshalJSON(b []byte) error
type Stream ¶
type Stream interface { ConsumerManager // Info returns StreamInfo from the server. Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) // CachedInfo returns ConsumerInfo currently cached on this stream. // This method does not perform any network requests. The cached // StreamInfo is updated on every call to Info and Update. CachedInfo() *StreamInfo // Purge removes messages from a stream. It is a destructive operation. // Use with caution. See StreamPurgeOpt for available options. Purge(ctx context.Context, opts ...StreamPurgeOpt) error // GetMsg retrieves a raw stream message stored in JetStream by sequence number. GetMsg(ctx context.Context, seq uint64, opts ...GetMsgOpt) (*RawStreamMsg, error) // GetLastMsgForSubject retrieves the last raw stream message stored in // JetStream on a given subject subject. GetLastMsgForSubject(ctx context.Context, subject string) (*RawStreamMsg, error) // DeleteMsg deletes a message from a stream. // On the server, the message is marked as erased, but not overwritten. DeleteMsg(ctx context.Context, seq uint64) error // SecureDeleteMsg deletes a message from a stream. The deleted message // is overwritten with random data. As a result, this operation is slower // than DeleteMsg. SecureDeleteMsg(ctx context.Context, seq uint64) error }
Stream contains CRUD methods on a consumer via ConsumerManager, as well as operations on an existing stream. It allows fetching and removing messages from a stream, as well as purging a stream.
type StreamConfig ¶
type StreamConfig struct { // Name is the name of the stream. It is required and must be unique // across the JetStream account. // // Name Names cannot contain whitespace, ., *, >, path separators // (forward or backwards slash), and non-printable characters. Name string `json:"name"` // Description is an optional description of the stream. Description string `json:"description,omitempty"` // Subjects is a list of subjects that the stream is listening on. // Wildcards are supported. Subjects cannot be set if the stream is // created as a mirror. Subjects []string `json:"subjects,omitempty"` // Retention defines the message retention policy for the stream. // Defaults to LimitsPolicy. Retention RetentionPolicy `json:"retention"` // MaxConsumers specifies the maximum number of consumers allowed for // the stream. MaxConsumers int `json:"max_consumers"` // MaxMsgs is the maximum number of messages the stream will store. // After reaching the limit, stream adheres to the discard policy. // If not set, server default is -1 (unlimited). MaxMsgs int64 `json:"max_msgs"` // MaxBytes is the maximum total size of messages the stream will store. // After reaching the limit, stream adheres to the discard policy. // If not set, server default is -1 (unlimited). MaxBytes int64 `json:"max_bytes"` // Discard defines the policy for handling messages when the stream // reaches its limits in terms of number of messages or total bytes. Discard DiscardPolicy `json:"discard"` // DiscardNewPerSubject is a flag to enable discarding new messages per // subject when limits are reached. Requires DiscardPolicy to be // DiscardNew and the MaxMsgsPerSubject to be set. DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` // MaxAge is the maximum age of messages that the stream will retain. MaxAge time.Duration `json:"max_age"` // MaxMsgsPerSubject is the maximum number of messages per subject that // the stream will retain. MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` // MaxMsgSize is the maximum size of any single message in the stream. MaxMsgSize int32 `json:"max_msg_size,omitempty"` // Storage specifies the type of storage backend used for the stream // (file or memory). Storage StorageType `json:"storage"` // Replicas is the number of stream replicas in clustered JetStream. // Defaults to 1, maximum is 5. Replicas int `json:"num_replicas"` // NoAck is a flag to disable acknowledging messages received by this // stream. // // If set to true, publish methods from the JetStream client will not // work as expected, since they rely on acknowledgements. Core NATS // publish methods should be used instead. Note that this will make // message delivery less reliable. NoAck bool `json:"no_ack,omitempty"` // Duplicates is the window within which to track duplicate messages. // If not set, server default is 2 minutes. Duplicates time.Duration `json:"duplicate_window,omitempty"` // Placement is used to declare where the stream should be placed via // tags and/or an explicit cluster name. Placement *Placement `json:"placement,omitempty"` // Mirror defines the configuration for mirroring another stream. Mirror *StreamSource `json:"mirror,omitempty"` // Sources is a list of other streams this stream sources messages from. Sources []*StreamSource `json:"sources,omitempty"` // Sealed streams do not allow messages to be published or deleted via limits or API, // sealed streams can not be unsealed via configuration update. Can only // be set on already created streams via the Update API. Sealed bool `json:"sealed,omitempty"` // DenyDelete restricts the ability to delete messages from a stream via // the API. Defaults to false. DenyDelete bool `json:"deny_delete,omitempty"` // DenyPurge restricts the ability to purge messages from a stream via // the API. Defaults to false. DenyPurge bool `json:"deny_purge,omitempty"` // AllowRollup allows the use of the Nats-Rollup header to replace all // contents of a stream, or subject in a stream, with a single new // message. AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` // Compression specifies the message storage compression algorithm. // Defaults to NoCompression. Compression StoreCompression `json:"compression"` // FirstSeq is the initial sequence number of the first message in the // stream. FirstSeq uint64 `json:"first_seq,omitempty"` // SubjectTransform allows applying a transformation to matching // messages' subjects. SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` // RePublish allows immediate republishing a message to the configured // subject after it's stored. RePublish *RePublish `json:"republish,omitempty"` // AllowDirect enables direct access to individual messages using direct // get API. Defaults to false. AllowDirect bool `json:"allow_direct"` // MirrorDirect enables direct access to individual messages from the // origin stream using direct get API. Defaults to false. MirrorDirect bool `json:"mirror_direct"` // ConsumerLimits defines limits of certain values that consumers can // set, defaults for those who don't set these settings ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` // Metadata is a set of application-defined key-value pairs for // associating metadata on the stream. This feature requires nats-server // v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` // Template identifies the template that manages the Stream. // Deprecated: This feature is no longer supported. Template string `json:"template_owner,omitempty"` }
StreamConfig is the configuration of a JetStream stream.
type StreamConsumerLimits ¶
type StreamConsumerLimits struct { // InactiveThreshold is a duration which instructs the server to clean // up the consumer if it has been inactive for the specified duration. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` // MaxAckPending is a maximum number of outstanding unacknowledged // messages for a consumer. MaxAckPending int `json:"max_ack_pending,omitempty"` }
StreamConsumerLimits are the limits for a consumer on a stream. These can be overridden on a per consumer basis.
type StreamConsumerManager ¶
type StreamConsumerManager interface { // CreateOrUpdateConsumer creates a consumer on a given stream with // given config. If consumer already exists, it will be updated (if // possible). Consumer interface is returned, allowing to operate on a // consumer (e.g. fetch messages). CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // CreateConsumer creates a consumer on a given stream with given // config. If consumer already exists and the provided configuration // differs from its configuration, ErrConsumerExists is returned. If the // provided configuration is the same as the existing consumer, the // existing consumer is returned. Consumer interface is returned, // allowing to operate on a consumer (e.g. fetch messages). CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // UpdateConsumer updates an existing consumer. If consumer does not // exist, ErrConsumerDoesNotExist is returned. Consumer interface is // returned, allowing to operate on a consumer (e.g. fetch messages). UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer // are managed by the library and provide a simple way to consume // messages from a stream. Ordered consumers are ephemeral in-memory // pull consumers and are resilient to deletes and restarts. OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error) // Consumer returns an interface to an existing consumer, allowing processing // of messages. If consumer does not exist, ErrConsumerNotFound is // returned. Consumer(ctx context.Context, stream string, consumer string) (Consumer, error) // DeleteConsumer removes a consumer with given name from a stream. // If consumer does not exist, ErrConsumerNotFound is returned. DeleteConsumer(ctx context.Context, stream string, consumer string) error }
StreamConsumerManager provides CRUD API for managing consumers. It is available as a part of JetStream interface. This is an alternative to Stream interface, allowing to bypass stream lookup. CreateConsumer, UpdateConsumer, CreateOrUpdateConsumer and Consumer methods return a Consumer interface, allowing to operate on a consumer (e.g. consume messages).
type StreamInfo ¶
type StreamInfo struct { // Config contains the configuration settings of the stream, set when // creating or updating the stream. Config StreamConfig `json:"config"` // Created is the timestamp when the stream was created. Created time.Time `json:"created"` // State provides the state of the stream at the time of request, // including metrics like the number of messages in the stream, total // bytes, etc. State StreamState `json:"state"` // Cluster contains information about the cluster to which this stream // belongs (if applicable). Cluster *ClusterInfo `json:"cluster,omitempty"` // Mirror contains information about another stream this one is // mirroring. Mirroring is used to create replicas of another stream's // data. This field is omitted if the stream is not mirroring another // stream. Mirror *StreamSourceInfo `json:"mirror,omitempty"` // Sources is a list of source streams from which this stream collects // data. Sources []*StreamSourceInfo `json:"sources,omitempty"` // TimeStamp indicates when the info was gathered by the server. TimeStamp time.Time `json:"ts"` }
StreamInfo shows config and current state for this stream.
type StreamInfoLister ¶
type StreamInfoLister interface { Info() <-chan *StreamInfo Err() error }
StreamInfoLister is used to iterate over a channel of stream infos. Err method can be used to check for errors encountered during iteration. Info channel is always closed and therefore can be used in a range loop.
type StreamInfoOpt ¶
type StreamInfoOpt func(*streamInfoRequest) error
StreamInfoOpt is a function setting options for [Stream.Info]
func WithDeletedDetails ¶
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt
WithDeletedDetails can be used to display the information about messages deleted from a stream on a stream info request
func WithSubjectFilter ¶
func WithSubjectFilter(subject string) StreamInfoOpt
WithSubjectFilter can be used to display the information about messages stored on given subjects. NOTE: if the subject filter matches over 100k subjects, this will result in multiple requests to the server to retrieve all the information, and all of the returned subjects will be kept in memory.
type StreamListOpt ¶
type StreamListOpt func(*streamsRequest) error
StreamListOpt is a functional option for [StreamManager.ListStreams] and [StreamManager.StreamNames] methods.
func WithStreamListSubject ¶
func WithStreamListSubject(subject string) StreamListOpt
WithStreamListSubject can be used to filter results of ListStreams and StreamNames requests to only streams that have given subject in their configuration.
type StreamManager ¶
type StreamManager interface { // CreateStream creates a new stream with given config and returns an // interface to operate on it. If stream with given name already exists // and its configuration differs from the provided one, // ErrStreamNameAlreadyInUse is returned. CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error) // UpdateStream updates an existing stream. If stream does not exist, // ErrStreamNotFound is returned. UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) // CreateOrUpdateStream creates a stream with given config. If stream // already exists, it will be updated (if possible). CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) // Stream fetches [StreamInfo] and returns a [Stream] interface for a given stream name. // If stream does not exist, ErrStreamNotFound is returned. Stream(ctx context.Context, stream string) (Stream, error) // StreamNameBySubject returns a stream name stream listening on given // subject. If no stream is bound to given subject, ErrStreamNotFound // is returned. StreamNameBySubject(ctx context.Context, subject string) (string, error) // DeleteStream removes a stream with given name. If stream does not // exist, ErrStreamNotFound is returned. DeleteStream(ctx context.Context, stream string) error // ListStreams returns StreamInfoLister, enabling iterating over a // channel of stream infos. ListStreams(context.Context, ...StreamListOpt) StreamInfoLister // StreamNames returns a StreamNameLister, enabling iterating over a // channel of stream names. StreamNames(context.Context, ...StreamListOpt) StreamNameLister }
StreamManager provides CRUD API for managing streams. It is available as a part of JetStream interface. CreateStream, UpdateStream, CreateOrUpdateStream and Stream methods return a Stream interface, allowing to operate on a stream.
type StreamNameLister ¶
StreamNameLister is used to iterate over a channel of stream names. Err method can be used to check for errors encountered during iteration. Name channel is always closed and therefore can be used in a range loop.
type StreamPurgeOpt ¶
type StreamPurgeOpt func(*StreamPurgeRequest) error
StreamPurgeOpt is a function setting options for [Stream.Purge]
func WithPurgeKeep ¶
func WithPurgeKeep(keep uint64) StreamPurgeOpt
WithPurgeKeep sets the number of messages to be kept in the stream after purge. Can be combined with WithPurgeSubject option, but not with WithPurgeSequence
func WithPurgeSequence ¶
func WithPurgeSequence(sequence uint64) StreamPurgeOpt
WithPurgeSequence is used to set a specific sequence number up to which (but not including) messages will be purged from a stream Can be combined with WithPurgeSubject option, but not with WithPurgeKeep
func WithPurgeSubject ¶
func WithPurgeSubject(subject string) StreamPurgeOpt
WithPurgeSubject sets a specific subject for which messages on a stream will be purged
type StreamPurgeRequest ¶
type StreamSource ¶
type StreamSource struct { // Name is the name of the stream to source from. Name string `json:"name"` // OptStartSeq is the sequence number to start sourcing from. OptStartSeq uint64 `json:"opt_start_seq,omitempty"` // OptStartTime is the timestamp of messages to start sourcing from. OptStartTime *time.Time `json:"opt_start_time,omitempty"` // FilterSubject is the subject filter used to only replicate messages // with matching subjects. FilterSubject string `json:"filter_subject,omitempty"` // SubjectTransforms is a list of subject transforms to apply to // matching messages. // // Subject transforms on sources and mirrors are also used as subject // filters with optional transformations. SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` // External is a configuration referencing a stream source in another // account or JetStream domain. External *ExternalStream `json:"external,omitempty"` // Domain is used to configure a stream source in another JetStream // domain. This setting will set the External field with the appropriate // APIPrefix. Domain string `json:"-"` }
StreamSource dictates how streams can source from other streams.
type StreamSourceInfo ¶
type StreamSourceInfo struct { // Name is the name of the stream that is being replicated. Name string `json:"name"` // Lag informs how many messages behind the source/mirror operation is. // This will only show correctly if there is active communication // with stream/mirror. Lag uint64 `json:"lag"` // Active informs when last the mirror or sourced stream had activity. // Value will be -1 when there has been no activity. Active time.Duration `json:"active"` // FilterSubject is the subject filter defined for this source/mirror. FilterSubject string `json:"filter_subject,omitempty"` // SubjectTransforms is a list of subject transforms defined for this // source/mirror. SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` }
StreamSourceInfo shows information about an upstream stream source/mirror.
type StreamState ¶
type StreamState struct { // Msgs is the number of messages stored in the stream. Msgs uint64 `json:"messages"` // Bytes is the number of bytes stored in the stream. Bytes uint64 `json:"bytes"` // FirstSeq is the sequence number of the first message in the stream. FirstSeq uint64 `json:"first_seq"` // FirstTime is the timestamp of the first message in the stream. FirstTime time.Time `json:"first_ts"` // LastSeq is the sequence number of the last message in the stream. LastSeq uint64 `json:"last_seq"` // LastTime is the timestamp of the last message in the stream. LastTime time.Time `json:"last_ts"` // Consumers is the number of consumers on the stream. Consumers int `json:"consumer_count"` // Deleted is a list of sequence numbers that have been removed from the // stream. This field will only be returned if the stream has been // fetched with the DeletedDetails option. Deleted []uint64 `json:"deleted"` // NumDeleted is the number of messages that have been removed from the // stream. Only deleted messages causing a gap in stream sequence numbers // are counted. Messages deleted at the beginning or end of the stream // are not counted. NumDeleted int `json:"num_deleted"` // NumSubjects is the number of unique subjects the stream has received // messages on. NumSubjects uint64 `json:"num_subjects"` // Subjects is a map of subjects the stream has received messages on // with message count per subject. This field will only be returned if // the stream has been fetched with the SubjectFilter option. Subjects map[string]uint64 `json:"subjects"` }
StreamState is the state of a JetStream stream at the time of request.
type SubjectTransformConfig ¶
type SubjectTransformConfig struct { // Source is the subject pattern to match incoming messages against. Source string `json:"src"` // Destination is the subject pattern to remap the subject to. Destination string `json:"dest"` }
SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type Tier ¶
type Tier struct { // Memory is the memory storage being used for Stream Message storage. Memory uint64 `json:"memory"` // Store is the disk storage being used for Stream Message storage. Store uint64 `json:"storage"` // ReservedMemory is the number of bytes reserved for memory usage by // this account on the server ReservedMemory uint64 `json:"reserved_memory"` // ReservedStore is the number of bytes reserved for disk usage by this // account on the server ReservedStore uint64 `json:"reserved_storage"` // Streams is the number of streams currently defined for this account. Streams int `json:"streams"` // Consumers is the number of consumers currently defined for this // account. Consumers int `json:"consumers"` // Limits are the JetStream limits for this account. Limits AccountLimits `json:"limits"` }
Tier represents a JetStream account usage tier.
type WatchOpt ¶
type WatchOpt interface {
// contains filtered or unexported methods
}
func IgnoreDeletes ¶
func IgnoreDeletes() WatchOpt
IgnoreDeletes will have the key watcher not pass any deleted keys.
func IncludeHistory ¶
func IncludeHistory() WatchOpt
IncludeHistory instructs the key watcher to include historical values as well (up to KeyValueMaxHistory).
func MetaOnly ¶
func MetaOnly() WatchOpt
MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value.
func ResumeFromRevision ¶
ResumeFromRevision instructs the key watcher to resume from a specific revision number.
func UpdatesOnly ¶
func UpdatesOnly() WatchOpt
UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).