Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeObjectDigest(data string) ([]byte, error)
- func GetObjectDigestValue(data hash.Hash) string
- type APIError
- type APIStats
- type AccountInfo
- type AccountLimits
- type AckPolicy
- type ClientTrace
- type ClusterInfo
- type ConsumeContext
- type ConsumeErrHandlerFunc
- type Consumer
- type ConsumerConfig
- type ConsumerInfo
- type ConsumerInfoLister
- type ConsumerNameLister
- type DeleteMarkersOlderThan
- type DeliverPolicy
- type DiscardPolicy
- type ErrorCode
- type ExternalStream
- type FetchOpt
- type GetMsgOpt
- type GetObjectInfoOpt
- type GetObjectOpt
- type JetStream
- type JetStreamError
- type JetStreamOpt
- type KVDeleteOpt
- type KVPurgeOpt
- type KeyLister
- type KeyValue
- type KeyValueBucketStatus
- func (s *KeyValueBucketStatus) BackingStore() string
- func (s *KeyValueBucketStatus) Bucket() string
- func (s *KeyValueBucketStatus) Bytes() uint64
- func (s *KeyValueBucketStatus) History() int64
- func (s *KeyValueBucketStatus) IsCompressed() bool
- func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo
- func (s *KeyValueBucketStatus) TTL() time.Duration
- func (s *KeyValueBucketStatus) Values() uint64
- type KeyValueConfig
- type KeyValueEntry
- type KeyValueLister
- type KeyValueManager
- type KeyValueNamesLister
- type KeyValueOp
- type KeyValueStatus
- type KeyWatcher
- type ListObjectsOpt
- type MessageBatch
- type MessageHandler
- type MessagesContext
- type Msg
- type MsgErrHandler
- type MsgMetadata
- type ObjectBucketStatus
- func (s *ObjectBucketStatus) BackingStore() string
- func (s *ObjectBucketStatus) Bucket() string
- func (s *ObjectBucketStatus) Description() string
- func (s *ObjectBucketStatus) Metadata() map[string]string
- func (s *ObjectBucketStatus) Replicas() int
- func (s *ObjectBucketStatus) Sealed() bool
- func (s *ObjectBucketStatus) Size() uint64
- func (s *ObjectBucketStatus) Storage() StorageType
- func (s *ObjectBucketStatus) StreamInfo() *StreamInfo
- func (s *ObjectBucketStatus) TTL() time.Duration
- type ObjectInfo
- type ObjectLink
- type ObjectMeta
- type ObjectMetaOptions
- type ObjectResult
- type ObjectStore
- type ObjectStoreConfig
- type ObjectStoreManager
- type ObjectStoreNamesLister
- type ObjectStoreStatus
- type ObjectStoresLister
- type ObjectWatcher
- type OrderedConsumerConfig
- type PeerInfo
- type Placement
- type PubAck
- type PubAckFuture
- type PublishOpt
- func WithExpectLastMsgID(id string) PublishOpt
- func WithExpectLastSequence(seq uint64) PublishOpt
- func WithExpectLastSequencePerSubject(seq uint64) PublishOpt
- func WithExpectStream(stream string) PublishOpt
- func WithMsgID(id string) PublishOpt
- func WithRetryAttempts(num int) PublishOpt
- func WithRetryWait(dur time.Duration) PublishOpt
- func WithStallWait(ttl time.Duration) PublishOpt
- type Publisher
- type PullConsumeOpt
- type PullExpiry
- type PullHeartbeat
- type PullMaxBytes
- type PullMaxMessages
- type PullMessagesOpt
- type PullThresholdBytes
- type PullThresholdMessages
- type RawStreamMsg
- type RePublish
- type ReplayPolicy
- type RetentionPolicy
- type SequenceInfo
- type SequencePair
- type StopAfter
- type StorageType
- type StoreCompression
- type Stream
- type StreamConfig
- type StreamConsumerLimits
- type StreamConsumerManager
- type StreamInfo
- type StreamInfoLister
- type StreamInfoOpt
- type StreamListOpt
- type StreamManager
- type StreamNameLister
- type StreamPurgeOpt
- type StreamPurgeRequest
- type StreamSource
- type StreamSourceInfo
- type StreamState
- type SubjectTransformConfig
- type Tier
- type WatchOpt
Constants ¶
const ( KeyValueMaxHistory = 64 AllKeys = ">" )
const ( MsgIDHeader = "Nats-Msg-Id" ExpectedStreamHeader = "Nats-Expected-Stream" ExpectedLastSeqHeader = "Nats-Expected-Last-Sequence" ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIDHeader = "Nats-Expected-Last-Msg-Id" MsgRollup = "Nats-Rollup" )
const ( StreamHeader = "Nats-Stream" SequenceHeader = "Nats-Sequence" TimeStampHeaer = "Nats-Time-Stamp" SubjectHeader = "Nats-Subject" LastSequenceHeader = "Nats-Last-Sequence" )
Headers for republished messages and direct gets.
const ( MsgRollupSubject = "sub" MsgRollupAll = "all" )
Rollups, can be subject only or all messages.
const ( // Default time wait between retries on Publish if err is NoResponders. DefaultPubRetryWait = 250 * time.Millisecond // Default number of retries DefaultPubRetryAttempts = 2 )
const ( DefaultMaxMessages = 500 DefaultExpires = 30 * time.Second )
const (
// DefaultAPIPrefix is the default prefix for the JetStream API.
DefaultAPIPrefix = "$JS.API."
)
Request API subjects for JetStream.
Variables ¶
var ( // ErrJetStreamNotEnabled is an error returned when JetStream is not enabled. ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}} // ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account. ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}} // ErrStreamNotFound is an error returned when stream with given name does not exist. ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}} // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting // the stream subject transform. If this error is returned when executing CreateStream(), the stream with invalid // configuration was already created in the server. ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting // the stream source subject transform. If this error is returned when executing CreateStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting // the stream sources. If this error is returned when executing CreateStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} // ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting // the stream sources. If this error is returned when executing CreateStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"} // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} // ErrConsumerExists is returned when attempting to create a consumer with CreateConsumer but a consumer with given name already exists. ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}} // ErrConsumerNameExists is returned when attempting to update a consumer with UpdateConsumer but a consumer with given name does not exist. ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}} // ErrMsgNotFound is returned when message with provided sequence number does not exist. ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}} // ErrBadRequest is returned when invalid request is sent to JetStream API. ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}} // ErrConsumerCreate is returned when nats-server reports error when creating consumer (e.g. illegal update). ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}} // ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer. ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} // ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer. ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}} // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting // multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid // configuration was already created in the server. ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"} // ErrInvalidJSAck is returned when JetStream ack from message publish is invalid. ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"} // ErrStreamNameRequired is returned when the provided stream name is empty. ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"} // ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once. ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"} // ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error). ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"} // ErrNotJSMessage is returned when attempting to get metadata from non JetStream message. ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"} // ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.'). ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"} // ErrInvalidSubject is returned when the provided subject name is invalid. ErrInvalidSubject JetStreamError = &jsError{message: "invalid subject name"} // ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.'). ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"} // ErrNoMessages is returned when no messages are currently available for a consumer. ErrNoMessages = &jsError{message: "no messages"} // ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set on a pull request. ErrMaxBytesExceeded = &jsError{message: "message size exceeds max bytes"} // ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist. ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"} // ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed. ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "leadership change"} // ErrHandlerRequired is returned when no handler func is provided in Stream(). ErrHandlerRequired = &jsError{message: "handler cannot be empty"} // ErrEndOfData is returned when iterating over paged API from JetStream reaches end of data. ErrEndOfData = &jsError{message: "end of data reached"} // ErrNoHeartbeat is received when no message is received in IdleHeartbeat time (if set). ErrNoHeartbeat = &jsError{message: "no heartbeat received"} // ErrConsumerHasActiveSubscription is returned when a consumer is already subscribed to a stream. ErrConsumerHasActiveSubscription = &jsError{message: "consumer has active subscription"} // ErrMsgNotBound is returned when given message is not bound to any subscription. ErrMsgNotBound = &jsError{message: "message is not bound to subscription/connection"} // ErrMsgNoReply is returned when attempting to reply to a message without a reply subject. ErrMsgNoReply = &jsError{message: "message does not have a reply"} // ErrMsgDeleteUnsuccessful is returned when an attempt to delete a message is unsuccessful. ErrMsgDeleteUnsuccessful = &jsError{message: "message deletion unsuccessful"} // ErrAsyncPublishReplySubjectSet is returned when reply subject is set on async message publish. ErrAsyncPublishReplySubjectSet = &jsError{message: "reply subject should be empty"} // ErrTooManyStalledMsgs is returned when too many outstanding async messages are waiting for ack. ErrTooManyStalledMsgs = &jsError{message: "stalled with too many outstanding async published messages"} // ErrInvalidOption is returned when there is a collision between options. ErrInvalidOption = &jsError{message: "invalid jetstream option"} // ErrMsgIteratorClosed is returned when attempting to get message from a closed iterator. ErrMsgIteratorClosed = &jsError{message: "messages iterator closed"} // ErrOrderedConsumerReset is returned when resetting ordered consumer fails due to too many attempts. ErrOrderedConsumerReset = &jsError{message: "recreating ordered consumer"} // ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already used to process // messages using Fetch (or FetchBytes). ErrOrderConsumerUsedAsFetch = &jsError{message: "ordered consumer initialized as fetch"} // ErrOrderConsumerUsedAsConsume is returned when ordered consumer was already used to process // messages using Consume or Messages. ErrOrderConsumerUsedAsConsume = &jsError{message: "ordered consumer initialized as consume"} // ErrOrderedConsumerConcurrentRequests is returned when attempting to run concurrent operations // on ordered consumers. ErrOrderedConsumerConcurrentRequests = &jsError{message: "cannot run concurrent processing using ordered consumer"} // ErrOrderedConsumerNotCreated is returned when trying to get consumer info of an // ordered consumer which was not yet created. ErrOrderedConsumerNotCreated = &jsError{message: "consumer instance not yet created"} // ErrKeyExists is returned when attempting to create a key that already exists. ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"} // ErrKeyValueConfigRequired is returned when attempting to create a bucket without a config. ErrKeyValueConfigRequired = &jsError{message: "config required"} // ErrInvalidBucketName is returned when attempting to create a bucket with an invalid name. ErrInvalidBucketName = &jsError{message: "invalid bucket name"} // ErrInvalidKey is returned when attempting to create a key with an invalid name. ErrInvalidKey = &jsError{message: "invalid key"} // ErrBucketNotFound is returned when attempting to access a bucket that does not exist. ErrBucketNotFound = &jsError{message: "bucket not found"} // ErrBadBucket is returned when attempting to access a bucket that is not a key-value store. ErrBadBucket = &jsError{message: "bucket not valid key-value store"} // ErrKeyNotFound is returned when attempting to access a key that does not exist. ErrKeyNotFound = &jsError{message: "key not found"} // ErrKeyDeleted is returned when attempting to access a key that was deleted. ErrKeyDeleted = &jsError{message: "key was deleted"} // ErrHistoryToLarge is returned when provided history limit is larger than 64. ErrHistoryTooLarge = &jsError{message: "history limited to a max of 64"} // ErrNoKeysFound is returned when no keys are found. ErrNoKeysFound = &jsError{message: "no keys found"} // ErrObjectConfigRequired is returned when attempting to create an object without a config. ErrObjectConfigRequired = &jsError{message: "object-store config required"} // ErrBadObjectMeta is returned when the meta information of an object is invalid. ErrBadObjectMeta = &jsError{message: "object-store meta information invalid"} // ErrObjectNotFound is returned when an object is not found. ErrObjectNotFound = &jsError{message: "object not found"} // ErrInvalidStoreName is returned when the name of an object-store is invalid. ErrInvalidStoreName = &jsError{message: "invalid object-store name"} // ErrDigestMismatch is returned when the digests of an object do not match. ErrDigestMismatch = &jsError{message: "received a corrupt object, digests do not match"} // ErrInvalidDigestFormat is returned when the digest hash of an object has an invalid format. ErrInvalidDigestFormat = &jsError{message: "object digest hash has invalid format"} // ErrNoObjectsFound is returned when no objects are found. ErrNoObjectsFound = &jsError{message: "no objects found"} // ErrObjectAlreadyExists is returned when an object with the same name already exists. ErrObjectAlreadyExists = &jsError{message: "an object already exists with that name"} // ErrNameRequired is returned when a name is required. ErrNameRequired = &jsError{message: "name is required"} // ErrLinkNotAllowed is returned when a link cannot be set when putting the object in a bucket. ErrLinkNotAllowed = &jsError{message: "link cannot be set when putting the object in bucket"} // ErrObjectRequired is returned when an object is required. ErrObjectRequired = &jsError{message: "object required"} // ErrNoLinkToDeleted is returned when it is not allowed to link to a deleted object. ErrNoLinkToDeleted = &jsError{message: "not allowed to link to a deleted object"} // ErrNoLinkToLink is returned when it is not allowed to link to another link. ErrNoLinkToLink = &jsError{message: "not allowed to link to another link"} // ErrCantGetBucket is returned when an invalid Get is attempted on an object that is a link to a bucket. ErrCantGetBucket = &jsError{message: "invalid Get, object is a link to a bucket"} // ErrBucketRequired is returned when a bucket is required. ErrBucketRequired = &jsError{message: "bucket required"} // ErrBucketMalformed is returned when a bucket is malformed. ErrBucketMalformed = &jsError{message: "bucket malformed"} // ErrUpdateMetaDeleted is returned when the meta information of a deleted object cannot be updated. ErrUpdateMetaDeleted = &jsError{message: "cannot update meta for a deleted object"} )
Functions ¶
func DecodeObjectDigest ¶
DecodeObjectDigest decodes base64 hash
func GetObjectDigestValue ¶
GetObjectDigestValue calculates the base64 value of hashed data
Types ¶
type APIError ¶
type APIError struct { Code int `json:"code"` ErrorCode ErrorCode `json:"err_code"` Description string `json:"description,omitempty"` }
APIError is included in all API responses if there was an error.
type AccountInfo ¶
type AccountInfo struct { Tier Domain string `json:"domain"` API APIStats `json:"api"` Tiers map[string]Tier `json:"tiers"` }
AccountInfo contains info about the JetStream usage from the current account.
type AccountLimits ¶
type AccountLimits struct { MaxMemory int64 `json:"max_memory"` MaxStore int64 `json:"max_storage"` MaxStreams int `json:"max_streams"` MaxConsumers int `json:"max_consumers"` }
AccountLimits includes the JetStream limits of the current account.
type AckPolicy ¶
type AckPolicy int
AckPolicy determines how the consumer should acknowledge delivered messages.
func (AckPolicy) MarshalJSON ¶
func (*AckPolicy) UnmarshalJSON ¶
type ClientTrace ¶
type ClientTrace struct { RequestSent func(subj string, payload []byte) ResponseReceived func(subj string, payload []byte, hdr nats.Header) }
ClientTrace can be used to trace API interactions for the JetStream Context.
type ClusterInfo ¶
type ClusterInfo struct { Name string `json:"name,omitempty"` Leader string `json:"leader,omitempty"` Replicas []*PeerInfo `json:"replicas,omitempty"` }
ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.
type ConsumeContext ¶
type ConsumeContext interface { // Stop unsubscribes from the stream and cancels subscription. // No more messages will be received after calling this method. // All messages that are already in the buffer are discarded. Stop() // Drain unsubscribes from the stream and cancels subscription. // All messages that are already in the buffer will be processed in callback function. Drain() }
type ConsumeErrHandlerFunc ¶
type ConsumeErrHandlerFunc func(consumeCtx ConsumeContext, err error)
type Consumer ¶
type Consumer interface { // Fetch is used to retrieve up to a provided number of messages from a stream. // This method will always send a single request and wait until either all messages are retrieved // or request times out. Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) // FetchBytes is used to retrieve up to a provided bytes from the stream. // This method will always send a single request and wait until provided number of bytes is // exceeded or request times out. FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) // FetchNoWait is used to retrieve up to a provided number of messages from a stream. // This method will always send a single request and immediately return up to a provided number of messages. FetchNoWait(batch int) (MessageBatch, error) // Consume can be used to continuously receive messages and handle them with the provided callback function Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) // Messages returns [MessagesContext], allowing continuously iterating over messages on a stream. Messages(opts ...PullMessagesOpt) (MessagesContext, error) // Next is used to retrieve the next message from the stream. // This method will block until the message is retrieved or timeout is reached. Next(opts ...FetchOpt) (Msg, error) // Info returns Consumer details Info(context.Context) (*ConsumerInfo, error) // CachedInfo returns [*ConsumerInfo] cached on a consumer struct CachedInfo() *ConsumerInfo }
Consumer contains methods for fetching/processing messages from a stream, as well as fetching consumer info
type ConsumerConfig ¶
type ConsumerConfig struct { Name string `json:"name,omitempty"` Durable string `json:"durable_name,omitempty"` Description string `json:"description,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` HeadersOnly bool `json:"headers_only,omitempty"` MaxRequestBatch int `json:"max_batch,omitempty"` MaxRequestExpires time.Duration `json:"max_expires,omitempty"` MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // Inactivity threshold. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` // Generally inherited by parent stream and other markers, now can be configured directly. Replicas int `json:"num_replicas"` // Force memory storage. MemoryStorage bool `json:"mem_storage,omitempty"` // NOTE: FilterSubjects requires nats-server v2.10.0+ FilterSubjects []string `json:"filter_subjects,omitempty"` // Metadata is additional metadata for the Consumer. // Keys starting with `_nats` are reserved. // NOTE: Metadata requires nats-server v2.10.0+ Metadata map[string]string `json:"metadata,omitempty"` }
ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerInfo ¶
type ConsumerInfo struct { Stream string `json:"stream_name"` Name string `json:"name"` Created time.Time `json:"created"` Config ConsumerConfig `json:"config"` Delivered SequenceInfo `json:"delivered"` AckFloor SequenceInfo `json:"ack_floor"` NumAckPending int `json:"num_ack_pending"` NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` }
ConsumerInfo is the info from a JetStream consumer.
type ConsumerInfoLister ¶
type ConsumerInfoLister interface { Info() <-chan *ConsumerInfo Err() error }
type ConsumerNameLister ¶
type DeleteMarkersOlderThan ¶
DeleteMarkersOlderThan indicates that delete or purge markers older than that will be deleted as part of PurgeDeletes() operation, otherwise, only the data will be removed but markers that are recent will be kept. Note that if no option is specified, the default is 30 minutes. You can set this option to a negative value to instruct to always remove the markers, regardless of their age.
type DeliverPolicy ¶
type DeliverPolicy int
const ( // DeliverAllPolicy starts delivering messages from the very beginning of a // stream. This is the default. DeliverAllPolicy DeliverPolicy = iota // DeliverLastPolicy will start the consumer with the last sequence // received. DeliverLastPolicy // DeliverNewPolicy will only deliver new messages that are sent after the // consumer is created. DeliverNewPolicy // DeliverByStartSequencePolicy will deliver messages starting from a given // sequence. DeliverByStartSequencePolicy // DeliverByStartTimePolicy will deliver messages starting from a given // time. DeliverByStartTimePolicy // DeliverLastPerSubjectPolicy will start the consumer with the last message // for all subjects received. DeliverLastPerSubjectPolicy )
func (DeliverPolicy) MarshalJSON ¶
func (p DeliverPolicy) MarshalJSON() ([]byte, error)
func (DeliverPolicy) String ¶
func (p DeliverPolicy) String() string
func (*DeliverPolicy) UnmarshalJSON ¶
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error
type DiscardPolicy ¶
type DiscardPolicy int
DiscardPolicy determines how to proceed when limits of messages or bytes are reached.
const ( // DiscardOld will remove older messages to return to the limits. This is // the default. DiscardOld DiscardPolicy = iota // DiscardNew will fail to store new messages. DiscardNew )
func (DiscardPolicy) MarshalJSON ¶
func (dp DiscardPolicy) MarshalJSON() ([]byte, error)
func (DiscardPolicy) String ¶
func (dp DiscardPolicy) String() string
func (*DiscardPolicy) UnmarshalJSON ¶
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error
type ErrorCode ¶
type ErrorCode uint16
ErrorCode represents error_code returned in response from JetStream API
const ( JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 JSErrCodeJetStreamNotEnabled ErrorCode = 10076 JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 JSErrCodeConsumerCreate ErrorCode = 10012 JSErrCodeConsumerNotFound ErrorCode = 10014 JSErrCodeConsumerNameExists ErrorCode = 10013 JSErrCodeConsumerAlreadyExists ErrorCode = 10105 JSErrCodeConsumerExists ErrorCode = 10148 JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 JSErrCodeConsumerEmptyFilter ErrorCode = 10139 JSErrCodeConsumerDoesNotExist ErrorCode = 10149 JSErrCodeMessageNotFound ErrorCode = 10037 JSErrCodeBadRequest ErrorCode = 10003 JSErrCodeStreamWrongLastSequence ErrorCode = 10071 )
type ExternalStream ¶
ExternalStream allows you to qualify access to a stream source in another account.
type FetchOpt ¶
type FetchOpt func(*pullRequest) error
func FetchMaxWait ¶
FetchMaxWait sets custom timeout for fetching predefined batch of messages
type GetMsgOpt ¶
type GetMsgOpt func(*apiMsgGetRequest) error
func WithGetMsgSubject ¶
WithGetMsgSubject sets the stream subject from which the message should be retrieved. Server will return a first message with a seq >= to the input seq that has the specified subject.
type GetObjectInfoOpt ¶
type GetObjectInfoOpt func(opts *getObjectInfoOpts) error
func GetObjectInfoShowDeleted ¶
func GetObjectInfoShowDeleted() GetObjectInfoOpt
GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
type GetObjectOpt ¶
type GetObjectOpt func(opts *getObjectOpts) error
func GetObjectShowDeleted ¶
func GetObjectShowDeleted() GetObjectOpt
GetObjectShowDeleted makes Get() return object if it was marked as deleted.
type JetStream ¶
type JetStream interface { // Returns *AccountInfo, containing details about the account associated with this JetStream connection AccountInfo(ctx context.Context) (*AccountInfo, error) StreamConsumerManager StreamManager Publisher KeyValueManager ObjectStoreManager }
JetStream contains CRUD methods to operate on a stream Create, update and get operations return 'Stream' interface, allowing operations on consumers
CreateOrUpdateConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API)
Client returns a JetStremClient, used to publish messages on a stream or fetch messages by sequence number
func New ¶
func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error)
New returns a new JetStream instance.
Available options: WithClientTrace - enables request/response tracing. WithPublishAsyncErrHandler - sets error handler for async message publish. WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time. [WithDirectGet] - specifies whether client should use direct get requests.
func NewWithAPIPrefix ¶
func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (JetStream, error)
NewWithAPIPrefix returns a new JetStream instance and sets the API prefix to be used in requests to JetStream API
Available options: WithClientTrace - enables request/response tracing WithPublishAsyncErrHandler - sets error handler for async message publish WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time. [WithDirectGet] - specifies whether client should use direct get requests.
func NewWithDomain ¶
func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStream, error)
NewWithDomain returns a new JetStream instance and sets the domain name token used when sending JetStream requests
Available options: WithClientTrace - enables request/response tracing WithPublishAsyncErrHandler - sets error handler for async message publish WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time. [WithDirectGet] - specifies whether client should use direct get requests.
type JetStreamError ¶
JetStreamError is an error result that happens when using JetStream. In case of client-side error, APIError returns nil
type JetStreamOpt ¶
type JetStreamOpt func(*jsOpts) error
func WithClientTrace ¶
func WithClientTrace(ct *ClientTrace) JetStreamOpt
WithClientTrace enables request/response API calls tracing ClientTrace is used to provide handlers for each event
func WithPublishAsyncErrHandler ¶
func WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt
WithPublishAsyncErrHandler sets error handler for async message publish
func WithPublishAsyncMaxPending ¶
func WithPublishAsyncMaxPending(max int) JetStreamOpt
WithPublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
type KVDeleteOpt ¶
type KVDeleteOpt interface {
// contains filtered or unexported methods
}
func LastRevision ¶
func LastRevision(revision uint64) KVDeleteOpt
LastRevision deletes if the latest revision matches.
type KVPurgeOpt ¶
type KVPurgeOpt interface {
// contains filtered or unexported methods
}
type KeyValue ¶
type KeyValue interface { // Get returns the latest value for the key. Get(ctx context.Context, key string) (KeyValueEntry, error) // GetRevision returns a specific revision value for the key. GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) // Put will place the new value for the key into the store. Put(ctx context.Context, key string, value []byte) (uint64, error) // PutString will place the string for the key into the store. PutString(ctx context.Context, key string, value string) (uint64, error) // Create will add the key/value pair if it does not exist. Create(ctx context.Context, key string, value []byte) (uint64, error) // Update will update the value if the latest revision matches. Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error) // Delete will place a delete marker and leave all revisions. Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error // Purge will place a delete marker and remove all previous revisions. Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error // Watch for any updates to keys that match the keys argument which could include wildcards. // Watch will send a nil entry when it has received all initial values. Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) // WatchAll will invoke the callback for all updates. WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. // DEPRECATED: Use ListKeys instead to avoid memory issues. Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) // ListKeys will return all keys in a channel. ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error) // History will return all historical values for the key. History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the current bucket name. Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error // Status retrieves the status and configuration of a bucket Status(ctx context.Context) (KeyValueStatus, error) }
KeyValue contains methods to operate on a KeyValue store.
type KeyValueBucketStatus ¶
type KeyValueBucketStatus struct {
// contains filtered or unexported fields
}
KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
func (*KeyValueBucketStatus) BackingStore ¶
func (s *KeyValueBucketStatus) BackingStore() string
BackingStore indicates what technology is used for storage of the bucket
func (*KeyValueBucketStatus) Bucket ¶
func (s *KeyValueBucketStatus) Bucket() string
Bucket the name of the bucket
func (*KeyValueBucketStatus) Bytes ¶
func (s *KeyValueBucketStatus) Bytes() uint64
Bytes is the size of the stream
func (*KeyValueBucketStatus) History ¶
func (s *KeyValueBucketStatus) History() int64
History returns the configured history kept per key
func (*KeyValueBucketStatus) IsCompressed ¶
func (s *KeyValueBucketStatus) IsCompressed() bool
IsCompressed indicates if the data is compressed on disk
func (*KeyValueBucketStatus) StreamInfo ¶
func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo
StreamInfo is the stream info retrieved to create the status
func (*KeyValueBucketStatus) TTL ¶
func (s *KeyValueBucketStatus) TTL() time.Duration
TTL is how long the bucket keeps values for
func (*KeyValueBucketStatus) Values ¶
func (s *KeyValueBucketStatus) Values() uint64
Values is how many messages are in the bucket, including historical values
type KeyValueConfig ¶
type KeyValueConfig struct { Bucket string Description string MaxValueSize int32 History uint8 TTL time.Duration MaxBytes int64 Storage StorageType Replicas int Placement *Placement RePublish *RePublish Mirror *StreamSource Sources []*StreamSource // Enable underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ Compression bool }
KeyValueConfig is for configuring a KeyValue store.
type KeyValueEntry ¶
type KeyValueEntry interface { // Bucket is the bucket the data was loaded from. Bucket() string // Key is the key that was retrieved. Key() string // Value is the retrieved value. Value() []byte // Revision is a unique sequence for this value. Revision() uint64 // Created is the time the data was put in the bucket. Created() time.Time // Delta is distance from the latest value. Delta() uint64 // Operation returns Put or Delete or Purge. Operation() KeyValueOp }
KeyValueEntry is a retrieved entry for Get or List or Watch.
type KeyValueLister ¶
type KeyValueLister interface { Status() <-chan KeyValueStatus Error() error }
Public interfaces and structs
type KeyValueManager ¶
type KeyValueManager interface { // KeyValue will lookup and bind to an existing KeyValue store. KeyValue(ctx context.Context, bucket string) (KeyValue, error) // CreateKeyValue will create a KeyValue store with the following configuration. CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) // DeleteKeyValue will delete this KeyValue store (JetStream stream). DeleteKeyValue(ctx context.Context, bucket string) error // KeyValueStoreNames is used to retrieve a list of key value store names KeyValueStoreNames(ctx context.Context) KeyValueNamesLister // KeyValueStores is used to retrieve a list of key value store statuses KeyValueStores(ctx context.Context) KeyValueLister }
KeyValueManager is used to manage KeyValue stores.
type KeyValueNamesLister ¶
Public interfaces and structs
type KeyValueOp ¶
type KeyValueOp uint8
KeyValueOp represents the type of KV operation (Put, Delete, Purge) Returned as part of watcher entry.
const ( KeyValuePut KeyValueOp = iota KeyValueDelete KeyValuePurge )
func (KeyValueOp) String ¶
func (op KeyValueOp) String() string
type KeyValueStatus ¶
type KeyValueStatus interface { // Bucket the name of the bucket Bucket() string // Values is how many messages are in the bucket, including historical values Values() uint64 // History returns the configured history kept per key History() int64 // TTL is how long the bucket keeps values for TTL() time.Duration // BackingStore indicates what technology is used for storage of the bucket BackingStore() string // Bytes returns the size in bytes of the bucket Bytes() uint64 // IsCompressed indicates if the data is compressed on disk IsCompressed() bool }
KeyValueStatus is run-time status about a Key-Value bucket
type KeyWatcher ¶
type KeyWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry // Stop will stop this watcher. Stop() error }
KeyWatcher is what is returned when doing a watch.
type ListObjectsOpt ¶
type ListObjectsOpt func(opts *listObjectOpts) error
func ListObjectsShowDeleted ¶
func ListObjectsShowDeleted() ListObjectsOpt
ListObjectsShowDeleted makes ListObjects() return deleted objects.
type MessageBatch ¶
type MessageHandler ¶
type MessageHandler func(msg Msg)
MessageHandler is a handler function used as callback in [Consume]
type MessagesContext ¶
type MessagesContext interface { // Next retreives next message on a stream. It will block until the next // message is available. Next() (Msg, error) // Stop unsubscribes from the stream and cancels subscription. Calling // Next after calling Stop will return ErrMsgIteratorClosed error. // All messages that are already in the buffer are discarded. Stop() // Drain unsubscribes from the stream and cancels subscription. All // messages that are already in the buffer will be available on // subsequent calls to Next. After the buffer is drained, Next will // return ErrMsgIteratorClosed error. Drain() }
MessagesContext supports iterating over a messages on a stream.
type Msg ¶
type Msg interface { // Metadata returns [MsgMetadata] for a JetStream message Metadata() (*MsgMetadata, error) // Data returns the message body Data() []byte // Headers returns a map of headers for a message Headers() nats.Header // Subject returns a subject on which a message is published Subject() string // Reply returns a reply subject for a message Reply() string // Ack acknowledges a message // This tells the server that the message was successfully processed and it can move on to the next message Ack() error // DoubleAck acknowledges a message and waits for ack from server DoubleAck(context.Context) error // Nak negatively acknowledges a message // This tells the server to redeliver the message Nak() error // NakWithDelay negatively acknowledges a message // This tells the server to redeliver the message // after the given `delay` duration NakWithDelay(delay time.Duration) error // InProgress tells the server that this message is being worked on // It resets the redelivery timer on the server InProgress() error // Term tells the server to not redeliver this message, regardless of the value of nats.MaxDeliver Term() error }
Msg contains methods to operate on a JetStream message Metadata, Data, Headers, Subject and Reply can be used to retrieve the specific parts of the underlying message Ack, DoubleAck, Nak, InProgress and Term are various flavors of ack requests
type MsgErrHandler ¶
MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync. It will return the original message sent to the server for possible retransmitting and the error encountered.
type MsgMetadata ¶
type MsgMetadata struct { Sequence SequencePair NumDelivered uint64 NumPending uint64 Timestamp time.Time Stream string Consumer string Domain string }
MsgMetadata is the JetStream metadata associated with received messages.
type ObjectBucketStatus ¶
type ObjectBucketStatus struct {
// contains filtered or unexported fields
}
ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus
func (*ObjectBucketStatus) BackingStore ¶
func (s *ObjectBucketStatus) BackingStore() string
BackingStore indicates what technology is used for storage of the bucket
func (*ObjectBucketStatus) Bucket ¶
func (s *ObjectBucketStatus) Bucket() string
Bucket is the name of the bucket
func (*ObjectBucketStatus) Description ¶
func (s *ObjectBucketStatus) Description() string
Description is the description supplied when creating the bucket
func (*ObjectBucketStatus) Metadata ¶
func (s *ObjectBucketStatus) Metadata() map[string]string
Metadata is the metadata supplied when creating the bucket
func (*ObjectBucketStatus) Replicas ¶
func (s *ObjectBucketStatus) Replicas() int
Replicas indicates how many storage replicas are kept for the data in the bucket
func (*ObjectBucketStatus) Sealed ¶
func (s *ObjectBucketStatus) Sealed() bool
Sealed indicates the stream is sealed and cannot be modified in any way
func (*ObjectBucketStatus) Size ¶
func (s *ObjectBucketStatus) Size() uint64
Size is the combined size of all data in the bucket including metadata, in bytes
func (*ObjectBucketStatus) Storage ¶
func (s *ObjectBucketStatus) Storage() StorageType
Storage indicates the underlying JetStream storage technology used to store data
func (*ObjectBucketStatus) StreamInfo ¶
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo
StreamInfo is the stream info retrieved to create the status
func (*ObjectBucketStatus) TTL ¶
func (s *ObjectBucketStatus) TTL() time.Duration
TTL indicates how long objects are kept in the bucket
type ObjectInfo ¶
type ObjectInfo struct { ObjectMeta Bucket string `json:"bucket"` NUID string `json:"nuid"` Size uint64 `json:"size"` ModTime time.Time `json:"mtime"` Chunks uint32 `json:"chunks"` Digest string `json:"digest,omitempty"` Deleted bool `json:"deleted,omitempty"` }
ObjectInfo is meta plus instance information.
type ObjectLink ¶
type ObjectLink struct { // Bucket is the name of the other object store. Bucket string `json:"bucket"` // Name can be used to link to a single object. // If empty means this is a link to the whole store, like a directory. Name string `json:"name,omitempty"` }
ObjectLink is used to embed links to other buckets and objects.
type ObjectMeta ¶
type ObjectMeta struct { Name string `json:"name"` Description string `json:"description,omitempty"` Headers nats.Header `json:"headers,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` // Additional options. Opts *ObjectMetaOptions `json:"options,omitempty"` }
ObjectMeta is high level information about an object.
type ObjectMetaOptions ¶
type ObjectMetaOptions struct { Link *ObjectLink `json:"link,omitempty"` ChunkSize uint32 `json:"max_chunk_size,omitempty"` }
ObjectMetaOptions
type ObjectResult ¶
type ObjectResult interface { io.ReadCloser Info() (*ObjectInfo, error) Error() error }
ObjectResult will return the underlying stream info and also be an io.ReadCloser.
type ObjectStore ¶
type ObjectStore interface { // Put will place the contents from the reader into a new object. Put(ctx context.Context, obj ObjectMeta, reader io.Reader) (*ObjectInfo, error) // Get will pull the named object from the object store. Get(ctx context.Context, name string, opts ...GetObjectOpt) (ObjectResult, error) // PutBytes is convenience function to put a byte slice into this object store. PutBytes(ctx context.Context, name string, data []byte) (*ObjectInfo, error) // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. GetBytes(ctx context.Context, name string, opts ...GetObjectOpt) ([]byte, error) // PutString is convenience function to put a string into this object store. PutString(ctx context.Context, name string, data string) (*ObjectInfo, error) // GetString is a convenience function to pull an object from this object store and return it as a string. GetString(ctx context.Context, name string, opts ...GetObjectOpt) (string, error) // PutFile is convenience function to put a file into this object store. PutFile(ctx context.Context, file string) (*ObjectInfo, error) // GetFile is a convenience function to pull an object from this object store and place it in a file. GetFile(ctx context.Context, name, file string, opts ...GetObjectOpt) error // GetInfo will retrieve the current information for the object. GetInfo(ctx context.Context, name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) // UpdateMeta will update the metadata for the object. UpdateMeta(ctx context.Context, name string, meta ObjectMeta) error // Delete will delete the named object. Delete(ctx context.Context, name string) error // AddLink will add a link to another object. AddLink(ctx context.Context, name string, obj *ObjectInfo) (*ObjectInfo, error) // AddBucketLink will add a link to another object store. AddBucketLink(ctx context.Context, name string, bucket ObjectStore) (*ObjectInfo, error) // Seal will seal the object store, no further modifications will be allowed. Seal(ctx context.Context) error // Watch for changes in the underlying store and receive meta information updates. Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, error) // List will list all the objects in this store. List(ctx context.Context, opts ...ListObjectsOpt) ([]*ObjectInfo, error) // Status retrieves run-time status about the backing store of the bucket. Status(ctx context.Context) (ObjectStoreStatus, error) }
ObjectStore is a blob store capable of storing large objects efficiently in JetStream streams
type ObjectStoreConfig ¶
type ObjectStoreConfig struct { Bucket string `json:"bucket"` Description string `json:"description,omitempty"` TTL time.Duration `json:"max_age,omitempty"` MaxBytes int64 `json:"max_bytes,omitempty"` Storage StorageType `json:"storage,omitempty"` Replicas int `json:"num_replicas,omitempty"` Placement *Placement `json:"placement,omitempty"` // Bucket-specific metadata // NOTE: Metadata requires nats-server v2.10.0+ Metadata map[string]string `json:"metadata,omitempty"` }
ObjectStoreConfig is the config for the object store.
type ObjectStoreManager ¶
type ObjectStoreManager interface { // ObjectStore will look up and bind to an existing object store instance. ObjectStore(ctx context.Context, bucket string) (ObjectStore, error) // CreateObjectStore will create an object store. CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) // DeleteObjectStore will delete the underlying stream for the named object. DeleteObjectStore(ctx context.Context, bucket string) error // ObjectStoreNames is used to retrieve a list of bucket names ObjectStoreNames(ctx context.Context) ObjectStoreNamesLister // ObjectStores is used to retrieve a list of bucket statuses ObjectStores(ctx context.Context) ObjectStoresLister }
ObjectStoreManager creates, loads and deletes Object Stores
type ObjectStoreNamesLister ¶
type ObjectStoreStatus ¶
type ObjectStoreStatus interface { // Bucket is the name of the bucket Bucket() string // Description is the description supplied when creating the bucket Description() string // TTL indicates how long objects are kept in the bucket TTL() time.Duration // Storage indicates the underlying JetStream storage technology used to store data Storage() StorageType // Replicas indicates how many storage replicas are kept for the data in the bucket Replicas() int // Sealed indicates the stream is sealed and cannot be modified in any way Sealed() bool // Size is the combined size of all data in the bucket including metadata, in bytes Size() uint64 // BackingStore provides details about the underlying storage BackingStore() string // Metadata is the user supplied metadata for the bucket Metadata() map[string]string }
type ObjectStoresLister ¶
type ObjectStoresLister interface { Status() <-chan ObjectStoreStatus Error() error }
type ObjectWatcher ¶
type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan *ObjectInfo // Stop will stop this watcher. Stop() error }
ObjectWatcher is what is returned when doing a watch.
type OrderedConsumerConfig ¶
type OrderedConsumerConfig struct { FilterSubjects []string `json:"filter_subjects,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` HeadersOnly bool `json:"headers_only,omitempty"` // Maximum number of attempts for the consumer to be recreated // Defaults to unlimited MaxResetAttempts int }
type PeerInfo ¶
type PeerInfo struct { Name string `json:"name"` Current bool `json:"current"` Offline bool `json:"offline,omitempty"` Active time.Duration `json:"active"` Lag uint64 `json:"lag,omitempty"` }
PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer.
type PubAck ¶
type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` Domain string `json:"domain,omitempty"` }
PubAck is an ack received after successfully publishing a message.
type PubAckFuture ¶
type PubAckFuture interface { // Ok returns a receive only channel that can be used to get a PubAck. Ok() <-chan *PubAck // Err returns a receive only channel that can be used to get the error from an async publish. Err() <-chan error // Msg returns the message that was sent to the server. Msg() *nats.Msg }
PubAckFuture is a future for a PubAck.
type PublishOpt ¶
type PublishOpt func(*pubOpts) error
func WithExpectLastMsgID ¶
func WithExpectLastMsgID(id string) PublishOpt
WithExpectLastMsgID sets the expected last msgId in the response from the publish.
func WithExpectLastSequence ¶
func WithExpectLastSequence(seq uint64) PublishOpt
WithExpectLastSequence sets the expected sequence in the response from the publish.
func WithExpectLastSequencePerSubject ¶
func WithExpectLastSequencePerSubject(seq uint64) PublishOpt
WithExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func WithExpectStream ¶
func WithExpectStream(stream string) PublishOpt
WithExpectStream sets the expected stream to respond from the publish.
func WithMsgID ¶
func WithMsgID(id string) PublishOpt
WithMsgID sets the message ID used for deduplication.
func WithRetryAttempts ¶
func WithRetryAttempts(num int) PublishOpt
WithRetryAttempts sets the retry number of attempts when ErrNoResponders is encountered. Defaults to 2
func WithRetryWait ¶
func WithRetryWait(dur time.Duration) PublishOpt
WithRetryWait sets the retry wait time when ErrNoResponders is encountered. Defaults to 250ms
func WithStallWait ¶
func WithStallWait(ttl time.Duration) PublishOpt
WithStallWait sets the max wait when the producer becomes stall producing messages.
type Publisher ¶
type Publisher interface { // Publish performs a synchronous publish to a stream and waits for ack from server // It accepts subject name (which must be bound to a stream) and message data Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOpt) (*PubAck, error) // PublishMsg performs a synchronous publish to a stream and waits for ack from server // It accepts subject name (which must be bound to a stream) and nats.Message PublishMsg(ctx context.Context, msg *nats.Msg, opts ...PublishOpt) (*PubAck, error) // PublishAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface // It accepts subject name (which must be bound to a stream) and message data PublishAsync(subject string, payload []byte, opts ...PublishOpt) (PubAckFuture, error) // PublishMsgAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface // It accepts subject name (which must be bound to a stream) and nats.Message PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error) // PublishAsyncPending returns the number of async publishes outstanding for this context PublishAsyncPending() int // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd PublishAsyncComplete() <-chan struct{} }
type PullConsumeOpt ¶
type PullConsumeOpt interface {
// contains filtered or unexported methods
}
PullConsumeOpt represent additional options used in [Consume] for pull consumers
func ConsumeErrHandler ¶
func ConsumeErrHandler(cb ConsumeErrHandlerFunc) PullConsumeOpt
ConsumeErrHandler sets custom error handler invoked when an error was encountered while consuming messages It will be invoked for both terminal (Consumer Deleted, invalid request body) and non-terminal (e.g. missing heartbeats) errors
type PullExpiry ¶
PullExpiry sets timeout on a single batch request, waiting until at least one message is available
type PullHeartbeat ¶
PullHeartbeat sets the idle heartbeat duration for a pull subscription If a client does not receive a heartbeat message from a stream for more than the idle heartbeat setting, the subscription will be removed and error will be passed to the message handler
type PullMaxMessages ¶
type PullMaxMessages int
PullMaxMessages limits the number of messages to be fetched from the stream in one request If not provided, a default of 100 messages will be used
type PullMessagesOpt ¶
type PullMessagesOpt interface {
// contains filtered or unexported methods
}
PullMessagesOpt represent additional options used in [Messages] for pull consumers
func WithMessagesErrOnMissingHeartbeat ¶
func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt
WithMessagesErrOnMissingHeartbeat sets whether a missing heartbeat error should be reported when calling Next (Default: true).
type PullThresholdBytes ¶
type PullThresholdBytes int
PullThresholdBytes sets the byte count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxBytes (if set).
type PullThresholdMessages ¶
type PullThresholdMessages int
PullThresholdMessages sets the message count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxMessages.
type RawStreamMsg ¶
type RePublish ¶
type RePublish struct { Source string `json:"src,omitempty"` Destination string `json:"dest"` HeadersOnly bool `json:"headers_only,omitempty"` }
RePublish is for republishing messages once committed to a stream. The original subject is remapped from the subject pattern to the destination pattern.
type ReplayPolicy ¶
type ReplayPolicy int
ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
const ( // ReplayInstantPolicy will replay messages as fast as possible. ReplayInstantPolicy ReplayPolicy = iota // ReplayOriginalPolicy will maintain the same timing as the messages were received. ReplayOriginalPolicy )
func (ReplayPolicy) MarshalJSON ¶
func (p ReplayPolicy) MarshalJSON() ([]byte, error)
func (ReplayPolicy) String ¶
func (p ReplayPolicy) String() string
func (*ReplayPolicy) UnmarshalJSON ¶
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy determines how messages in a set are retained.
const ( // LimitsPolicy (default) means that messages are retained until any given limit is reached. // This could be one of MaxMsgs, MaxBytes, or MaxAge. LimitsPolicy RetentionPolicy = iota // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. WorkQueuePolicy )
func (RetentionPolicy) MarshalJSON ¶
func (rp RetentionPolicy) MarshalJSON() ([]byte, error)
func (RetentionPolicy) String ¶
func (rp RetentionPolicy) String() string
func (*RetentionPolicy) UnmarshalJSON ¶
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error
type SequenceInfo ¶
type SequenceInfo struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` Last *time.Time `json:"last_active,omitempty"` }
SequenceInfo has both the consumer and the stream sequence and last activity.
type SequencePair ¶
type SequencePair struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` }
SequencePair includes the consumer and stream sequence info from a JetStream consumer.
type StopAfter ¶
type StopAfter int
StopAfter sets the number of messages after which the consumer is automatically stopped
type StorageType ¶
type StorageType int
StorageType determines how messages are stored for retention.
const ( // FileStorage specifies on disk storage. It's the default. FileStorage StorageType = iota // MemoryStorage specifies in memory only. MemoryStorage )
func (StorageType) MarshalJSON ¶
func (st StorageType) MarshalJSON() ([]byte, error)
func (StorageType) String ¶
func (st StorageType) String() string
func (*StorageType) UnmarshalJSON ¶
func (st *StorageType) UnmarshalJSON(data []byte) error
type StoreCompression ¶
type StoreCompression uint8
StoreCompression determines how messages are compressed.
const ( NoCompression StoreCompression = iota S2Compression )
func (StoreCompression) MarshalJSON ¶
func (alg StoreCompression) MarshalJSON() ([]byte, error)
func (StoreCompression) String ¶
func (alg StoreCompression) String() string
func (*StoreCompression) UnmarshalJSON ¶
func (alg *StoreCompression) UnmarshalJSON(b []byte) error
type Stream ¶
type Stream interface { // Info returns stream details Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) // CachedInfo returns *StreamInfo cached on a consumer struct CachedInfo() *StreamInfo // Purge removes messages from a stream Purge(ctx context.Context, opts ...StreamPurgeOpt) error // GetMsg retrieves a raw stream message stored in JetStream by sequence number GetMsg(ctx context.Context, seq uint64, opts ...GetMsgOpt) (*RawStreamMsg, error) // GetLastMsgForSubject retrieves the last raw stream message stored in JetStream by subject GetLastMsgForSubject(ctx context.Context, subject string) (*RawStreamMsg, error) // DeleteMsg deletes a message from a stream. // The message is marked as erased, but not overwritten DeleteMsg(ctx context.Context, seq uint64) error // SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data // As a result, this operation is slower than DeleteMsg() SecureDeleteMsg(ctx context.Context, seq uint64) error // contains filtered or unexported methods }
Stream contains CRUD methods on a consumer, as well as operations on an existing stream
type StreamConfig ¶
type StreamConfig struct { Name string `json:"name"` Description string `json:"description,omitempty"` Subjects []string `json:"subjects,omitempty"` Retention RetentionPolicy `json:"retention"` MaxConsumers int `json:"max_consumers"` MaxMsgs int64 `json:"max_msgs"` MaxBytes int64 `json:"max_bytes"` Discard DiscardPolicy `json:"discard"` DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` MaxAge time.Duration `json:"max_age"` MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` MaxMsgSize int32 `json:"max_msg_size,omitempty"` Storage StorageType `json:"storage"` Replicas int `json:"num_replicas"` NoAck bool `json:"no_ack,omitempty"` Template string `json:"template_owner,omitempty"` Duplicates time.Duration `json:"duplicate_window,omitempty"` Placement *Placement `json:"placement,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` Sealed bool `json:"sealed,omitempty"` DenyDelete bool `json:"deny_delete,omitempty"` DenyPurge bool `json:"deny_purge,omitempty"` AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` Compression StoreCompression `json:"compression"` FirstSeq uint64 `json:"first_seq,omitempty"` // Allow applying a subject transform to incoming messages before doing anything else SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` // Allow republish of the message after being sequenced and stored. RePublish *RePublish `json:"republish,omitempty"` // Allow higher performance, direct access to get individual messages. E.g. KeyValue AllowDirect bool `json:"allow_direct"` // Allow higher performance and unified direct access for mirrors as well. MirrorDirect bool `json:"mirror_direct"` // Limits for consumers on this stream. ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` // Metadata is additional metadata for the Stream. // Keys starting with `_nats` are reserved. // NOTE: Metadata requires nats-server v2.10.0+ Metadata map[string]string `json:"metadata,omitempty"` }
type StreamConsumerLimits ¶
type StreamConsumerLimits struct { InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` }
StreamConsumerLimits are the limits for a consumer on a stream. These can be overridden on a per consumer basis.
type StreamConsumerManager ¶
type StreamConsumerManager interface { // CreateOrUpdateConsumer creates a consumer on a given stream with given config. // If consumer already exists, it will be updated (if possible). // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // CreateConsumer creates a consumer on a given stream with given config. // If consumer already exists, ErrConsumerExists is returned. // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // UpdateConsumer updates an existing consumer. // If consumer does not exist, ErrConsumerDoesNotExist is returned. // Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages) UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) // OrderedConsumer returns an OrderedConsumer instance. // OrderedConsumer allows fetching messages from a stream (just like standard consumer), // for in order delivery of messages. Underlying consumer is re-created when necessary, // without additional client code. OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error) // Consumer returns a hook to an existing consumer, allowing processing of messages Consumer(ctx context.Context, stream string, consumer string) (Consumer, error) // DeleteConsumer removes a consumer with given name from a stream DeleteConsumer(ctx context.Context, stream string, consumer string) error }
type StreamInfo ¶
type StreamInfo struct { Config StreamConfig `json:"config"` Created time.Time `json:"created"` State StreamState `json:"state"` Cluster *ClusterInfo `json:"cluster,omitempty"` Mirror *StreamSourceInfo `json:"mirror,omitempty"` Sources []*StreamSourceInfo `json:"sources,omitempty"` }
StreamInfo shows config and current state for this stream.
type StreamInfoLister ¶
type StreamInfoLister interface { Info() <-chan *StreamInfo Err() error }
type StreamInfoOpt ¶
type StreamInfoOpt func(*streamInfoRequest) error
func WithDeletedDetails ¶
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt
WithDeletedDetails can be used to display the information about messages deleted from a stream on a stream info request
func WithSubjectFilter ¶
func WithSubjectFilter(subject string) StreamInfoOpt
WithSubjectFilter can be used to display the information about messages stored on given subjects. NOTE: if the subject filter matches over 100k subjects, this will result in multiple requests to the server to retrieve all the information, and all of the returned subjects will be kept in memory.
type StreamListOpt ¶
type StreamListOpt func(*streamsRequest) error
func WithStreamListSubject ¶
func WithStreamListSubject(subject string) StreamListOpt
WithStreamListSubject can be used to filter results of ListStreams and StreamNames requests to only streams that have given subject in their configuration
type StreamManager ¶
type StreamManager interface { // CreateStream creates a new stream with given config and returns a hook to operate on it CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error) // UpdateStream updates an existing stream UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) // CreateOrUpdateStream creates a stream with given config. If stream already exists, it will be updated (if possible). CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) // Stream returns a [Stream] hook for a given stream name Stream(ctx context.Context, stream string) (Stream, error) // StreamNameBySubject returns a stream name stream listening on given subject StreamNameBySubject(ctx context.Context, subject string) (string, error) // DeleteStream removes a stream with given name DeleteStream(ctx context.Context, stream string) error // ListStreams returns StreamInfoLister enabling iterating over a channel of stream infos ListStreams(context.Context, ...StreamListOpt) StreamInfoLister // StreamNames returns a StreamNameLister enabling iterating over a channel of stream names StreamNames(context.Context, ...StreamListOpt) StreamNameLister }
type StreamNameLister ¶
type StreamPurgeOpt ¶
type StreamPurgeOpt func(*StreamPurgeRequest) error
func WithPurgeKeep ¶
func WithPurgeKeep(keep uint64) StreamPurgeOpt
WithPurgeKeep sets the number of messages to be kept in the stream after purge. Can be combined with WithPurgeSubject option, but not with WithPurgeSequence
func WithPurgeSequence ¶
func WithPurgeSequence(sequence uint64) StreamPurgeOpt
WithPurgeSequence is used to set a specific sequence number up to which (but not including) messages will be purged from a stream Can be combined with WithPurgeSubject option, but not with WithPurgeKeep
func WithPurgeSubject ¶
func WithPurgeSubject(subject string) StreamPurgeOpt
WithPurgeSubject sets a specific subject for which messages on a stream will be purged
type StreamPurgeRequest ¶
type StreamSource ¶
type StreamSource struct { Name string `json:"name"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` External *ExternalStream `json:"external,omitempty"` Domain string `json:"-"` }
StreamSource dictates how streams can source from other streams.
type StreamSourceInfo ¶
type StreamSourceInfo struct { Name string `json:"name"` Lag uint64 `json:"lag"` Active time.Duration `json:"active"` FilterSubject string `json:"filter_subject,omitempty"` SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` }
StreamSourceInfo shows information about an upstream stream source.
type StreamState ¶
type StreamState struct { Msgs uint64 `json:"messages"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` FirstTime time.Time `json:"first_ts"` LastSeq uint64 `json:"last_seq"` LastTime time.Time `json:"last_ts"` Consumers int `json:"consumer_count"` Deleted []uint64 `json:"deleted"` NumDeleted int `json:"num_deleted"` NumSubjects uint64 `json:"num_subjects"` Subjects map[string]uint64 `json:"subjects"` }
StreamState is information about the given stream.
type SubjectTransformConfig ¶
SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received
type WatchOpt ¶
type WatchOpt interface {
// contains filtered or unexported methods
}
func IgnoreDeletes ¶
func IgnoreDeletes() WatchOpt
IgnoreDeletes will have the key watcher not pass any deleted keys.
func IncludeHistory ¶
func IncludeHistory() WatchOpt
IncludeHistory instructs the key watcher to include historical values as well.
func MetaOnly ¶
func MetaOnly() WatchOpt
MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value.
func ResumeFromRevision ¶
ResumeFromRevision instructs the key watcher to resume from a specific revision number.
func UpdatesOnly ¶
func UpdatesOnly() WatchOpt
UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).