Documentation ¶
Overview ¶
Package jsm provides client helpers for managing and interacting with NATS JetStream
Index ¶
- Variables
- func APISubject(subject string, prefix string, domain string) string
- func EventSubject(subject string, prefix string) string
- func IsErrorResponse(m *nats.Msg) bool
- func IsInternalStream(s string) bool
- func IsKVBucketStream(s string) bool
- func IsMQTTStateStream(s string) bool
- func IsNatsError(err error, code uint16) bool
- func IsOKResponse(m *nats.Msg) bool
- func IsObjectBucketStream(s string) bool
- func IsValidName(n string) bool
- func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)
- func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
- func NextSubject(stream string, consumer string) (string, error)
- func ParseErrorResponse(m *nats.Msg) error
- func ParseEvent(e []byte) (schema string, event interface{}, err error)
- func ParsePubAck(m *nats.Msg) (*api.PubAck, error)
- type BackupData
- type Consumer
- func (c *Consumer) AckPolicy() api.AckPolicy
- func (c *Consumer) AckSampleSubject() string
- func (c *Consumer) AckWait() time.Duration
- func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)
- func (c *Consumer) AdvisorySubject() string
- func (c *Consumer) Configuration() (config api.ConsumerConfig)
- func (c *Consumer) Delete() (err error)
- func (c *Consumer) DeliverGroup() string
- func (c *Consumer) DeliverPolicy() api.DeliverPolicy
- func (c *Consumer) DeliveredState() (api.SequenceInfo, error)
- func (c *Consumer) DeliverySubject() string
- func (c *Consumer) Description() string
- func (c *Consumer) DurableName() string
- func (c *Consumer) FilterSubject() string
- func (c *Consumer) FlowControl() bool
- func (c *Consumer) Heartbeat() time.Duration
- func (c *Consumer) IsDurable() bool
- func (c *Consumer) IsEphemeral() bool
- func (c *Consumer) IsHeadersOnly() bool
- func (c *Consumer) IsPullMode() bool
- func (c *Consumer) IsPushMode() bool
- func (c *Consumer) IsSampled() bool
- func (c *Consumer) LatestState() (api.ConsumerInfo, error)
- func (c *Consumer) LeaderStepDown() error
- func (c *Consumer) MaxAckPending() int
- func (c *Consumer) MaxDeliver() int
- func (c *Consumer) MaxWaiting() int
- func (c *Consumer) MetricSubject() string
- func (c *Consumer) Name() string
- func (c *Consumer) NextMsg() (*nats.Msg, error)
- func (c *Consumer) NextMsgContext(ctx context.Context) (*nats.Msg, error)
- func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error
- func (c *Consumer) NextSubject() string
- func (c *Consumer) PendingAcknowledgement() (int, error)
- func (c *Consumer) PendingMessages() (uint64, error)
- func (c *Consumer) RateLimit() uint64
- func (c *Consumer) RedeliveryCount() (int, error)
- func (c *Consumer) ReplayPolicy() api.ReplayPolicy
- func (c *Consumer) Reset() error
- func (c *Consumer) SampleFrequency() string
- func (c *Consumer) StartSequence() uint64
- func (c *Consumer) StartTime() time.Time
- func (c *Consumer) State() (api.ConsumerInfo, error)
- func (c *Consumer) StreamName() string
- func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error
- func (c *Consumer) WaitingClientPulls() (int, error)
- type ConsumerBackup
- type ConsumerOption
- func AckWait(t time.Duration) ConsumerOption
- func AcknowledgeAll() ConsumerOption
- func AcknowledgeExplicit() ConsumerOption
- func AcknowledgeNone() ConsumerOption
- func ConsumerDescription(d string) ConsumerOption
- func DeliverAllAvailable() ConsumerOption
- func DeliverGroup(g string) ConsumerOption
- func DeliverHeadersOnly() ConsumerOption
- func DeliverLastPerSubject() ConsumerOption
- func DeliverySubject(s string) ConsumerOption
- func DurableName(s string) ConsumerOption
- func FilterStreamBySubject(s string) ConsumerOption
- func IdleHeartbeat(hb time.Duration) ConsumerOption
- func MaxAckPending(pending uint) ConsumerOption
- func MaxDeliveryAttempts(n int) ConsumerOption
- func MaxWaiting(pulls uint) ConsumerOption
- func PushFlowControl() ConsumerOption
- func RateLimitBitsPerSecond(bps uint64) ConsumerOption
- func ReplayAsReceived() ConsumerOption
- func ReplayInstantly() ConsumerOption
- func SamplePercent(i int) ConsumerOption
- func StartAtSequence(s uint64) ConsumerOption
- func StartAtTime(t time.Time) ConsumerOption
- func StartAtTimeDelta(d time.Duration) ConsumerOption
- func StartWithLastReceived() ConsumerOption
- func StartWithNextReceived() ConsumerOption
- type Manager
- func (m *Manager) BackupJetStreamConfiguration(backupDir string, data bool) error
- func (m *Manager) ConsumerNames(stream string) (names []string, err error)
- func (m *Manager) Consumers(stream string) (consumers []*Consumer, err error)
- func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error
- func (m *Manager) EachStream(cb func(*Stream)) (err error)
- func (m *Manager) EachStreamTemplate(cb func(*StreamTemplate)) (err error)
- func (m *Manager) IsJetStreamEnabled() bool
- func (m *Manager) IsKnownConsumer(stream string, consumer string) (bool, error)
- func (m *Manager) IsKnownStream(stream string) (bool, error)
- func (m *Manager) IsKnownStreamTemplate(template string) (bool, error)
- func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)
- func (m *Manager) LoadConsumer(stream string, name string) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, ...) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) LoadOrNewStreamTemplate(name string, maxStreams uint32, config api.StreamConfig, opts ...StreamOption) (template *StreamTemplate, err error)
- func (m *Manager) LoadStream(name string) (stream *Stream, err error)
- func (m *Manager) LoadStreamTemplate(name string) (template *StreamTemplate, err error)
- func (m *Manager) MetaLeaderStandDown(placement *api.Placement) error
- func (m *Manager) MetaPeerRemove(name string) error
- func (m *Manager) NatsConn() *nats.Conn
- func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
- func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) NewStreamTemplate(name string, maxStreams uint32, config api.StreamConfig, opts ...StreamOption) (template *StreamTemplate, err error)
- func (m *Manager) NextMsg(stream string, consumer string) (*nats.Msg, error)
- func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)
- func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, ...) error
- func (m *Manager) NextSubject(stream string, consumer string) (string, error)
- func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)
- func (m *Manager) RestoreJetStreamConfiguration(backupDir string, update bool) error
- func (m *Manager) RestoreJetStreamConfigurationFile(path string, update bool) error
- func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)
- func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)
- func (m *Manager) StreamTemplateNames() (templates []string, err error)
- func (m *Manager) Streams() (streams []*Stream, err error)
- type MsgInfo
- type Option
- type PagerOption
- type RestoreProgress
- type SnapshotOption
- func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption
- func RestoreNotify(cb func(RestoreProgress)) SnapshotOption
- func SnapshotConsumers() SnapshotOption
- func SnapshotDebug() SnapshotOption
- func SnapshotHealthCheck() SnapshotOption
- func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption
- type SnapshotProgress
- type Stream
- func (s *Stream) AdvisorySubject() string
- func (s *Stream) Configuration() api.StreamConfig
- func (s *Stream) ConsumerNames() (names []string, err error)
- func (s *Stream) Delete() error
- func (s *Stream) DeleteAllow() bool
- func (s *Stream) DeleteMessage(seq uint64) (err error)
- func (s *Stream) Description() string
- func (s *Stream) DuplicateWindow() time.Duration
- func (s *Stream) EachConsumer(cb func(consumer *Consumer)) error
- func (s *Stream) FastDeleteMessage(seq uint64) error
- func (s *Stream) Information() (info *api.StreamInfo, err error)
- func (s *Stream) IsInternal() bool
- func (s *Stream) IsKVBucket() bool
- func (s *Stream) IsMQTTState() bool
- func (s *Stream) IsMirror() bool
- func (s *Stream) IsObjectBucket() bool
- func (s *Stream) IsSourced() bool
- func (s *Stream) IsTemplateManaged() bool
- func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)
- func (s *Stream) LatestState() (state api.StreamState, err error)
- func (s *Stream) LeaderStepDown() error
- func (s *Stream) LoadConsumer(name string) (*Consumer, error)
- func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) MaxAge() time.Duration
- func (s *Stream) MaxBytes() int64
- func (s *Stream) MaxConsumers() int
- func (s *Stream) MaxMsgSize() int32
- func (s *Stream) MaxMsgs() int64
- func (s *Stream) MaxMsgsPerSubject() int64
- func (s *Stream) MetricSubject() string
- func (s *Stream) Mirror() *api.StreamSource
- func (s *Stream) Name() string
- func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) NoAck() bool
- func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)
- func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error
- func (s *Stream) PurgeAllowed() bool
- func (s *Stream) ReadLastMessageForSubject(subj string) (*api.StoredMsg, error)
- func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error)
- func (s *Stream) RemoveRAFTPeer(peer string) error
- func (s *Stream) Replicas() int
- func (s *Stream) Reset() error
- func (s *Stream) Retention() api.RetentionPolicy
- func (s *Stream) RollupAllowed() bool
- func (s *Stream) Seal() error
- func (s *Stream) Sealed() bool
- func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)
- func (s *Stream) Sources() []*api.StreamSource
- func (s *Stream) State() (stats api.StreamState, err error)
- func (s *Stream) Storage() api.StorageType
- func (s *Stream) Subjects() []string
- func (s *Stream) Template() string
- func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error
- type StreamNamesFilter
- type StreamOption
- func AllowRollup() StreamOption
- func AppendSource(source *api.StreamSource) StreamOption
- func DenyDelete() StreamOption
- func DenyPurge() StreamOption
- func DiscardNew() StreamOption
- func DiscardOld() StreamOption
- func DuplicateWindow(d time.Duration) StreamOption
- func FileStorage() StreamOption
- func InterestRetention() StreamOption
- func LimitsRetention() StreamOption
- func MaxAge(m time.Duration) StreamOption
- func MaxBytes(m int64) StreamOption
- func MaxConsumers(m int) StreamOption
- func MaxMessageSize(m int32) StreamOption
- func MaxMessages(m int64) StreamOption
- func MaxMessagesPerSubject(m int64) StreamOption
- func MemoryStorage() StreamOption
- func Mirror(stream *api.StreamSource) StreamOption
- func NoAck() StreamOption
- func PlacementCluster(cluster string) StreamOption
- func PlacementTags(tags ...string) StreamOption
- func Replicas(r int) StreamOption
- func Sources(streams ...*api.StreamSource) StreamOption
- func StreamDescription(d string) StreamOption
- func Subjects(s ...string) StreamOption
- func WorkQueueRetention() StreamOption
- type StreamPager
- type StreamTemplate
- func (t *StreamTemplate) Configuration() api.StreamTemplateConfig
- func (t *StreamTemplate) Delete() error
- func (t *StreamTemplate) MaxStreams() uint32
- func (t *StreamTemplate) Name() string
- func (t *StreamTemplate) Reset() error
- func (t *StreamTemplate) StreamConfiguration() api.StreamConfig
- func (t *StreamTemplate) Streams() []string
Constants ¶
This section is empty.
Variables ¶
var DefaultConsumer = api.ConsumerConfig{ DeliverPolicy: api.DeliverAll, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, ReplayPolicy: api.ReplayInstant, }
DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
var DefaultStream = api.StreamConfig{ Retention: api.LimitsPolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxMsgsPer: -1, MaxBytes: -1, MaxAge: 24 * 365 * time.Hour, MaxMsgSize: -1, Replicas: 1, NoAck: false, }
DefaultStream is a template configuration with StreamPolicy retention and 1 years maximum age. No storage type or subjects are set
var DefaultStreamConfiguration = DefaultStream
DefaultStreamConfiguration is the configuration that will be used to create new Streams in NewStream
var DefaultWorkQueue = api.StreamConfig{ Retention: api.WorkQueuePolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxMsgsPer: -1, MaxBytes: -1, MaxAge: 24 * 365 * time.Hour, MaxMsgSize: -1, Replicas: api.StreamDefaultReplicas, NoAck: false, }
DefaultWorkQueue is a template configuration with WorkQueuePolicy retention and 1 years maximum age. No storage type or subjects are set
var SampledDefaultConsumer = api.ConsumerConfig{ DeliverPolicy: api.DeliverAll, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, ReplayPolicy: api.ReplayInstant, SampleFrequency: "100%", }
SampledDefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
Functions ¶
func APISubject ¶ added in v0.0.21
APISubject returns API subject with prefix applied
func EventSubject ¶ added in v0.0.21
EventSubject returns Event subject with prefix applied
func IsErrorResponse ¶
func IsErrorResponse(m *nats.Msg) bool
IsErrorResponse checks if the message holds a standard JetStream error
func IsInternalStream ¶ added in v0.0.27
IsInternalStream indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state
func IsKVBucketStream ¶ added in v0.0.27
IsKVBucketStream determines if a stream is a KV bucket
func IsMQTTStateStream ¶ added in v0.0.27
IsMQTTStateStream determines if a stream holds internal MQTT state
func IsNatsError ¶ added in v0.0.25
IsNatsError checks if err is a ApiErr matching code
func IsOKResponse ¶
func IsOKResponse(m *nats.Msg) bool
IsOKResponse checks if the message holds a standard JetStream error
func IsObjectBucketStream ¶ added in v0.0.27
IsObjectBucketStream determines if a stream is a Object bucket
func IsValidName ¶ added in v0.0.18
IsValidName verifies if n is a valid stream, template or consumer name
func NewConsumerConfiguration ¶
func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)
NewConsumerConfiguration generates a new configuration based on template modified by opts
func NewStreamConfiguration ¶
func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
NewStreamConfiguration generates a new configuration based on template modified by opts
func NextSubject ¶
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func ParseErrorResponse ¶
func ParseErrorResponse(m *nats.Msg) error
ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil
func ParseEvent ¶
ParseEvent parses event e and returns event as for example *api.ConsumerAckMetric, all unknown event schemas will be of type *UnknownMessage
func ParsePubAck ¶ added in v0.0.25
ParsePubAck parses a stream publish response and returns an error if the publish failed or parsing failed
Types ¶
type BackupData ¶
type Consumer ¶
Consumer represents a JetStream consumer
func (*Consumer) AckSampleSubject ¶
AckSampleSubject is the subject used to publish ack samples to
func (*Consumer) AcknowledgedFloor ¶
func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)
AcknowledgedFloor reports the highest contiguous message sequences that were acknowledged
func (*Consumer) AdvisorySubject ¶
AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this consumer
func (*Consumer) Configuration ¶
func (c *Consumer) Configuration() (config api.ConsumerConfig)
Configuration is the Consumer configuration
func (*Consumer) Delete ¶
Delete deletes the Consumer, after this the Consumer object should be disposed
func (*Consumer) DeliverGroup ¶ added in v0.0.26
func (*Consumer) DeliverPolicy ¶
func (c *Consumer) DeliverPolicy() api.DeliverPolicy
func (*Consumer) DeliveredState ¶
func (c *Consumer) DeliveredState() (api.SequenceInfo, error)
DeliveredState reports the messages sequences that were successfully delivered
func (*Consumer) DeliverySubject ¶
func (*Consumer) Description ¶ added in v0.0.26
func (*Consumer) DurableName ¶
func (*Consumer) FilterSubject ¶
func (*Consumer) FlowControl ¶ added in v0.0.21
func (*Consumer) IsEphemeral ¶
func (*Consumer) IsHeadersOnly ¶ added in v0.0.27
func (*Consumer) IsPullMode ¶
func (*Consumer) IsPushMode ¶
func (*Consumer) LatestState ¶ added in v0.0.23
func (c *Consumer) LatestState() (api.ConsumerInfo, error)
LatestState returns the most recently loaded state
func (*Consumer) LeaderStepDown ¶ added in v0.0.21
LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election
func (*Consumer) MaxAckPending ¶ added in v0.0.20
func (*Consumer) MaxDeliver ¶
func (*Consumer) MaxWaiting ¶ added in v0.0.24
func (*Consumer) MetricSubject ¶
MetricSubject is a wildcard subscription subject that subscribes to all metrics for this consumer
func (*Consumer) NextMsg ¶
NextMsg retrieves the next message, waiting up to manager timeout for a response
func (*Consumer) NextMsgContext ¶ added in v0.0.19
NextMsgContext retrieves the next message, interrupted by the cancel context ctx
func (*Consumer) NextMsgRequest ¶ added in v0.0.20
func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error
NextMsgRequest creates a request for a batch of messages, data or control flow messages will be sent to inbox
func (*Consumer) NextSubject ¶
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func (*Consumer) PendingAcknowledgement ¶ added in v0.0.20
PendingAcknowledgement reports the number of messages sent but not yet acknowledged
func (*Consumer) PendingMessages ¶ added in v0.0.20
PendingMessages is the number of unprocessed messages for this consumer
func (*Consumer) RedeliveryCount ¶
RedeliveryCount reports the number of redelivers that were done
func (*Consumer) ReplayPolicy ¶
func (c *Consumer) ReplayPolicy() api.ReplayPolicy
func (*Consumer) SampleFrequency ¶
func (*Consumer) StartSequence ¶
func (*Consumer) State ¶
func (c *Consumer) State() (api.ConsumerInfo, error)
State loads a snapshot of consumer state including delivery counts, retries and more
func (*Consumer) StreamName ¶
func (*Consumer) UpdateConfiguration ¶ added in v0.0.27
func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error
UpdateConfiguration updates the consumer configuration At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed
func (*Consumer) WaitingClientPulls ¶ added in v0.0.20
WaitingClientPulls is the number of clients that have outstanding pull requests against this consumer
type ConsumerBackup ¶
type ConsumerBackup struct { Name string `json:"name"` Stream string `json:"stream"` Config api.ConsumerConfig `json:"config"` }
type ConsumerOption ¶
type ConsumerOption func(o *api.ConsumerConfig) error
ConsumerOption configures consumers
func AckWait ¶
func AckWait(t time.Duration) ConsumerOption
AckWait sets the time a delivered message might remain unacknowledged before redelivery is attempted
func AcknowledgeAll ¶
func AcknowledgeAll() ConsumerOption
AcknowledgeAll enables an acknowledgement mode where acknowledging message 100 will also ack the preceding messages
func AcknowledgeExplicit ¶
func AcknowledgeExplicit() ConsumerOption
AcknowledgeExplicit requires that every message received be acknowledged
func AcknowledgeNone ¶
func AcknowledgeNone() ConsumerOption
AcknowledgeNone disables message acknowledgement
func ConsumerDescription ¶ added in v0.0.26
func ConsumerDescription(d string) ConsumerOption
ConsumerDescription is a textual description of this consumer to provide additional context
func DeliverAllAvailable ¶
func DeliverAllAvailable() ConsumerOption
DeliverAllAvailable delivers messages starting with the first available in the stream
func DeliverGroup ¶ added in v0.0.26
func DeliverGroup(g string) ConsumerOption
DeliverGroup when set will only deliver messages to subscriptions matching that group
func DeliverHeadersOnly ¶ added in v0.0.27
func DeliverHeadersOnly() ConsumerOption
DeliverHeadersOnly configures the consumer to only deliver existing header and the `Nats-Msg-Size` header, no bodies
func DeliverLastPerSubject ¶ added in v0.0.26
func DeliverLastPerSubject() ConsumerOption
DeliverLastPerSubject delivers the last message for each subject in a wildcard stream based on the filter subjects of the consumer
func DeliverySubject ¶
func DeliverySubject(s string) ConsumerOption
DeliverySubject is the subject where a Push consumer will deliver its messages
func DurableName ¶
func DurableName(s string) ConsumerOption
DurableName is the name given to the consumer, when not set an ephemeral consumer is created
func FilterStreamBySubject ¶
func FilterStreamBySubject(s string) ConsumerOption
FilterStreamBySubject filters the messages in a wildcard stream to those matching a specific subject
func IdleHeartbeat ¶ added in v0.0.21
func IdleHeartbeat(hb time.Duration) ConsumerOption
IdleHeartbeat sets the time before an idle consumer will send a empty message with Status header 100 indicating the consumer is still alive
func MaxAckPending ¶ added in v0.0.20
func MaxAckPending(pending uint) ConsumerOption
MaxAckPending maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended
func MaxDeliveryAttempts ¶
func MaxDeliveryAttempts(n int) ConsumerOption
MaxDeliveryAttempts is the number of times a message will be attempted to be delivered
func MaxWaiting ¶ added in v0.0.24
func MaxWaiting(pulls uint) ConsumerOption
MaxWaiting is the number of outstanding pulls that are allowed on any one consumer. Pulls made that exceeds this limit are discarded.
func PushFlowControl ¶ added in v0.0.21
func PushFlowControl() ConsumerOption
PushFlowControl enables flow control for push based consumers
func RateLimitBitsPerSecond ¶ added in v0.0.18
func RateLimitBitsPerSecond(bps uint64) ConsumerOption
RateLimitBitsPerSecond limits message delivery to a rate in bits per second
func ReplayAsReceived ¶
func ReplayAsReceived() ConsumerOption
ReplayAsReceived delivers messages at the rate they were received at
func ReplayInstantly ¶
func ReplayInstantly() ConsumerOption
ReplayInstantly delivers messages to the consumer as fast as possible
func SamplePercent ¶
func SamplePercent(i int) ConsumerOption
SamplePercent configures sampling of a subset of messages expressed as a percentage
func StartAtSequence ¶
func StartAtSequence(s uint64) ConsumerOption
StartAtSequence starts consuming messages at a specific sequence in the stream
func StartAtTime ¶
func StartAtTime(t time.Time) ConsumerOption
StartAtTime starts consuming messages at a specific point in time in the stream
func StartAtTimeDelta ¶
func StartAtTimeDelta(d time.Duration) ConsumerOption
StartAtTimeDelta starts delivering messages at a past point in time
func StartWithLastReceived ¶
func StartWithLastReceived() ConsumerOption
StartWithLastReceived starts delivery at the last messages received in the stream
func StartWithNextReceived ¶
func StartWithNextReceived() ConsumerOption
StartWithNextReceived starts delivery at the next messages received in the stream
type Manager ¶ added in v0.0.19
func (*Manager) BackupJetStreamConfiguration ¶ added in v0.0.19
BackupJetStreamConfiguration creates a backup of all configuration for Streams, Consumers and Stream Templates.
Stream data can optionally be backed up
func (*Manager) ConsumerNames ¶ added in v0.0.19
ConsumerNames is a sorted list of all known consumers within a stream
func (*Manager) Consumers ¶ added in v0.0.19
Consumers is a sorted list of all known Consumers within a Stream
func (*Manager) DeleteStreamMessage ¶ added in v0.0.25
DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (*Manager) EachStream ¶ added in v0.0.19
EachStream iterates over all known Streams
func (*Manager) EachStreamTemplate ¶ added in v0.0.19
func (m *Manager) EachStreamTemplate(cb func(*StreamTemplate)) (err error)
EachStreamTemplate iterates over all known Stream Templates
func (*Manager) IsJetStreamEnabled ¶ added in v0.0.19
IsJetStreamEnabled determines if JetStream is enabled for the current account
func (*Manager) IsKnownConsumer ¶ added in v0.0.19
IsKnownConsumer determines if a Consumer is known for a specific Stream
func (*Manager) IsKnownStream ¶ added in v0.0.19
IsKnownStream determines if a Stream is known
func (*Manager) IsKnownStreamTemplate ¶ added in v0.0.19
IsKnownStreamTemplate determines if a StreamTemplate is known
func (*Manager) JetStreamAccountInfo ¶ added in v0.0.19
func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)
JetStreamAccountInfo retrieves information about the current account limits and more
func (*Manager) LoadConsumer ¶ added in v0.0.19
LoadConsumer loads a consumer by name
func (*Manager) LoadOrNewConsumer ¶ added in v0.0.19
func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumer loads a consumer by name if known else creates a new one with these properties
func (*Manager) LoadOrNewConsumerFromDefault ¶ added in v0.0.19
func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumerFromDefault loads a consumer by name if known else creates a new one with these properties based on template
func (*Manager) LoadOrNewStream ¶ added in v0.0.19
func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)
LoadOrNewStream loads an existing stream or creates a new one matching opts
func (*Manager) LoadOrNewStreamFromDefault ¶ added in v0.0.19
func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
LoadOrNewStreamFromDefault loads an existing stream or creates a new one matching opts and template
func (*Manager) LoadOrNewStreamTemplate ¶ added in v0.0.19
func (m *Manager) LoadOrNewStreamTemplate(name string, maxStreams uint32, config api.StreamConfig, opts ...StreamOption) (template *StreamTemplate, err error)
LoadOrNewStreamTemplate loads an existing template, else creates a new one based on config
func (*Manager) LoadStream ¶ added in v0.0.19
LoadStream loads a stream by name
func (*Manager) LoadStreamTemplate ¶ added in v0.0.19
func (m *Manager) LoadStreamTemplate(name string) (template *StreamTemplate, err error)
LoadStreamTemplate loads a given stream template from JetStream
func (*Manager) MetaLeaderStandDown ¶ added in v0.0.21
MetaLeaderStandDown requests the meta group leader to stand down, must be initiated by a system user
func (*Manager) MetaPeerRemove ¶ added in v0.0.21
MetaPeerRemove removes a peer from the JetStream meta cluster, evicting all streams, consumer etc. Use with extreme caution.
func (*Manager) NatsConn ¶ added in v0.0.25
func (m *Manager) NatsConn() *nats.Conn
NatsConn gives access to the underlying NATS Connection
func (*Manager) NewConsumer ¶ added in v0.0.19
func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumer creates a consumer based on DefaultConsumer modified by opts
func (*Manager) NewConsumerFromDefault ¶ added in v0.0.19
func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumerFromDefault creates a new consumer based on a template config that gets modified by opts
func (*Manager) NewStream ¶ added in v0.0.19
func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)
NewStream creates a new stream using DefaultStream as a starting template allowing adjustments to be made using options
func (*Manager) NewStreamConfiguration ¶ added in v0.0.19
func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
NewStreamConfiguration generates a new configuration based on template modified by opts
func (*Manager) NewStreamFromDefault ¶ added in v0.0.19
func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
NewStreamFromDefault creates a new stream based on a supplied template and options
func (*Manager) NewStreamTemplate ¶ added in v0.0.19
func (m *Manager) NewStreamTemplate(name string, maxStreams uint32, config api.StreamConfig, opts ...StreamOption) (template *StreamTemplate, err error)
NewStreamTemplate creates a new template
func (*Manager) NextMsg ¶ added in v0.0.19
NextMsg requests the next message from the server with the manager timeout
func (*Manager) NextMsgContext ¶ added in v0.0.19
func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)
NextMsgContext requests the next message from the server. This request will wait for as long as the context is active. If repeated pulls will be made it's better to use NextMsgRequest()
func (*Manager) NextMsgRequest ¶ added in v0.0.20
func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, req *api.JSApiConsumerGetNextRequest) error
NextMsgRequest creates a request for a batch of messages on a consumer, data or control flow messages will be sent to inbox
func (*Manager) NextSubject ¶ added in v0.0.21
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func (*Manager) ReadLastMessageForSubject ¶ added in v0.0.25
func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)
ReadLastMessageForSubject reads the last message stored in the stream for a specific subject
func (*Manager) RestoreJetStreamConfiguration ¶ added in v0.0.19
RestoreJetStreamConfiguration restores the configuration from a backup made by BackupJetStreamConfiguration
func (*Manager) RestoreJetStreamConfigurationFile ¶ added in v0.0.19
RestoreJetStreamConfigurationFile restores a single file from a backup made by BackupJetStreamConfiguration
func (*Manager) RestoreSnapshotFromDirectory ¶ added in v0.0.21
func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)
func (*Manager) StreamNames ¶ added in v0.0.19
func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)
StreamNames is a sorted list of all known Streams filtered by filter
func (*Manager) StreamTemplateNames ¶ added in v0.0.19
StreamTemplateNames is a sorted list of all known StreamTemplates
type MsgInfo ¶
type MsgInfo struct {
// contains filtered or unexported fields
}
MsgInfo holds metadata about a message that was received from JetStream
func ParseJSMsgMetadata ¶
ParseJSMsgMetadata parse the reply subject metadata to determine message metadata
func ParseJSMsgMetadataReply ¶ added in v0.0.20
ParseJSMsgMetadataReply parses the reply subject of a JetStream originated message
func (*MsgInfo) ConsumerSequence ¶
ConsumerSequence is the sequence of this message in the consumer
func (*MsgInfo) Delivered ¶
Delivered is the number of times this message had delivery attempts including this one
func (*MsgInfo) Pending ¶ added in v0.0.20
Pending is the number of messages left to consume, -1 when the number is not reported
func (*MsgInfo) StreamSequence ¶
StreamSequence is the sequence of this message in the stream
type Option ¶ added in v0.0.19
type Option func(o *Manager)
Option is a option to configure the JetStream Manager
func WithAPIPrefix ¶ added in v0.0.21
WithAPIPrefix replace API endpoints like $JS.API.STREAM.NAMES with prefix.STREAM.NAMES
func WithAPIValidation ¶
func WithAPIValidation(v api.StructValidator) Option
WithAPIValidation validates responses sent from the NATS server using a validator
func WithDomain ¶ added in v0.0.24
WithDomain sets a JetStream domain, incompatible with WithApiPrefix()
func WithEventPrefix ¶ added in v0.0.21
WithEventPrefix replace event subjects like $JS.EVENT.ADVISORY.API with prefix.ADVISORY
func WithTimeout ¶
WithTimeout sets a timeout for the requests
type PagerOption ¶ added in v0.0.19
type PagerOption func(p *StreamPager)
PagerOption configures the stream pager
func PagerFilterSubject ¶ added in v0.0.23
func PagerFilterSubject(s string) PagerOption
PagerFilterSubject sets a filter subject for the pager
func PagerSize ¶ added in v0.0.19
func PagerSize(sz int) PagerOption
PagerSize is the size of pages to walk
func PagerStartDelta ¶ added in v0.0.19
func PagerStartDelta(d time.Duration) PagerOption
PagerStartDelta sets a starting time delta for the pager
func PagerStartId ¶ added in v0.0.19
func PagerStartId(id int) PagerOption
PagerStartId sets a starting stream sequence for the pager
func PagerTimeout ¶ added in v0.0.19
func PagerTimeout(d time.Duration) PagerOption
PagerTimeout is the time to wait for messages before it's assumed the end of the stream was reached
type RestoreProgress ¶
type RestoreProgress interface { // StartTime is when the process started StartTime() time.Time // EndTime is when the process ended - zero when not completed EndTime() time.Time // ChunkSize is the size of the data packets sent over NATS ChunkSize() int // ChunksSent is the number of chunks of size ChunkSize that was sent ChunksSent() uint32 // ChunksToSend number of chunks of ChunkSize expected to be sent ChunksToSend() int // BytesSent is the number of bytes sent so far BytesSent() uint64 // BytesPerSecond is the number of bytes received in the last second, 0 during the first second BytesPerSecond() uint64 }
type SnapshotOption ¶
type SnapshotOption func(o *snapshotOptions)
func RestoreConfiguration ¶ added in v0.0.22
func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption
RestoreConfiguration overrides the configuration used to restore
func RestoreNotify ¶
func RestoreNotify(cb func(RestoreProgress)) SnapshotOption
RestoreNotify notifies cb about progress of the restore operation
func SnapshotConsumers ¶
func SnapshotConsumers() SnapshotOption
SnapshotConsumers includes consumer configuration and state in backups
func SnapshotDebug ¶
func SnapshotDebug() SnapshotOption
SnapshotDebug enables logging using the standard go logging library
func SnapshotHealthCheck ¶
func SnapshotHealthCheck() SnapshotOption
SnapshotHealthCheck performs a health check prior to starting the snapshot
func SnapshotNotify ¶
func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption
SnapshotNotify notifies cb about progress of the snapshot operation
type SnapshotProgress ¶
type SnapshotProgress interface { // StartTime is when the process started StartTime() time.Time // EndTime is when the process ended - zero when not completed EndTime() time.Time // ChunkSize is the size of the data packets sent over NATS ChunkSize() int // ChunksReceived is how many chunks of ChunkSize were received ChunksReceived() uint32 // BytesExpected is how many Bytes we should be receiving BytesExpected() uint64 // BytesReceived is how many Bytes have been received BytesReceived() uint64 // UncompressedBytesReceived is the number of bytes received uncompressed UncompressedBytesReceived() uint64 // BytesPerSecond is the number of bytes received in the last second, 0 during the first second BytesPerSecond() uint64 // HealthCheck indicates if health checking was requested HealthCheck() bool // Finished will be true after all data have been written Finished() bool }
type Stream ¶
Stream represents a JetStream Stream
func (*Stream) AdvisorySubject ¶
AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this stream
func (*Stream) Configuration ¶
func (s *Stream) Configuration() api.StreamConfig
func (*Stream) ConsumerNames ¶
ConsumerNames is a list of all known consumers for this Stream
func (*Stream) DeleteAllow ¶ added in v0.0.27
func (*Stream) DeleteMessage ¶
DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data
func (*Stream) Description ¶ added in v0.0.26
func (*Stream) DuplicateWindow ¶ added in v0.0.18
func (*Stream) EachConsumer ¶
EachConsumer calls cb with each known consumer for this stream, error on any error to load consumers
func (*Stream) FastDeleteMessage ¶ added in v0.0.25
FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (*Stream) Information ¶
func (s *Stream) Information() (info *api.StreamInfo, err error)
Information loads the current stream information
func (*Stream) IsInternal ¶ added in v0.0.27
IsInternal indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state
func (*Stream) IsKVBucket ¶ added in v0.0.27
IsKVBucket determines if a stream is a KV bucket
func (*Stream) IsMQTTState ¶ added in v0.0.27
IsMQTTState determines if a stream holds internal MQTT state
func (*Stream) IsMirror ¶ added in v0.0.21
IsMirror determines if this stream is a mirror of another
func (*Stream) IsObjectBucket ¶ added in v0.0.27
IsObjectBucket determines if a stream is a Object bucket
func (*Stream) IsSourced ¶ added in v0.0.21
IsSourced determines if this stream is sourcing data from another stream. Other streams could be synced to this stream and it would not be reported by this property
func (*Stream) IsTemplateManaged ¶
IsTemplateManaged determines if this stream is managed by a template
func (*Stream) LatestInformation ¶
func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)
LatestInformation returns the most recently fetched stream information
func (*Stream) LatestState ¶
func (s *Stream) LatestState() (state api.StreamState, err error)
LatestState returns the most recently fetched stream state
func (*Stream) LeaderStepDown ¶ added in v0.0.21
LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election
func (*Stream) LoadConsumer ¶
LoadConsumer loads a named consumer related to this Stream
func (*Stream) LoadOrNewConsumer ¶
func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumer loads or creates a consumer based on these options
func (*Stream) LoadOrNewConsumerFromDefault ¶
func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumerFromDefault loads or creates a consumer based on these options that adjust supplied template
func (*Stream) MaxConsumers ¶
func (*Stream) MaxMsgSize ¶
func (*Stream) MaxMsgsPerSubject ¶ added in v0.0.24
func (*Stream) MetricSubject ¶
MetricSubject is a wildcard subscription subject that subscribes to all advisories for this stream
func (*Stream) Mirror ¶ added in v0.0.21
func (s *Stream) Mirror() *api.StreamSource
func (*Stream) NewConsumer ¶
func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumer creates a new consumer in this Stream based on DefaultConsumer
func (*Stream) NewConsumerFromDefault ¶
func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumerFromDefault creates a new consumer in this Stream based on a supplied template config
func (*Stream) PageContents ¶ added in v0.0.19
func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)
PageContents creates a StreamPager used to traverse the contents of the stream, Close() should be called to dispose of the background consumer and resources
func (*Stream) Purge ¶
func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error
Purge deletes messages from the Stream, an optional JSApiStreamPurgeRequest can be supplied to limit the purge to a subset of messages
func (*Stream) PurgeAllowed ¶ added in v0.0.27
func (*Stream) ReadLastMessageForSubject ¶ added in v0.0.25
ReadLastMessageForSubject reads the last message stored in the stream for a specific subject
func (*Stream) ReadMessage ¶
ReadMessage loads a message from the stream by its sequence number
func (*Stream) RemoveRAFTPeer ¶ added in v0.0.21
RemoveRAFTPeer removes a peer from the group indicating it will not return
func (*Stream) Retention ¶
func (s *Stream) Retention() api.RetentionPolicy
func (*Stream) RollupAllowed ¶ added in v0.0.27
func (*Stream) Seal ¶ added in v0.0.27
Seal updates a stream so that messages can not be added or removed using the API and limits will not be processed - messages will never age out. A sealed stream can not be unsealed.
func (*Stream) SnapshotToDirectory ¶ added in v0.0.21
func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)
SnapshotToDirectory creates a backup into s2 compressed tar file
func (*Stream) Sources ¶ added in v0.0.21
func (s *Stream) Sources() []*api.StreamSource
func (*Stream) State ¶
func (s *Stream) State() (stats api.StreamState, err error)
State retrieves the Stream State
func (*Stream) Storage ¶
func (s *Stream) Storage() api.StorageType
func (*Stream) UpdateConfiguration ¶
func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error
UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update
type StreamNamesFilter ¶ added in v0.0.20
type StreamNamesFilter struct { // Subject filter the names to those consuming messages matching this subject or wildcard Subject string `json:"subject,omitempty"` }
StreamNamesFilter limits the names being returned by the names API
type StreamOption ¶
type StreamOption func(o *api.StreamConfig) error
StreamOption configures a stream
func AllowRollup ¶ added in v0.0.27
func AllowRollup() StreamOption
func AppendSource ¶ added in v0.0.21
func AppendSource(source *api.StreamSource) StreamOption
func DenyDelete ¶ added in v0.0.27
func DenyDelete() StreamOption
func DenyPurge ¶ added in v0.0.27
func DenyPurge() StreamOption
func DiscardNew ¶
func DiscardNew() StreamOption
func DiscardOld ¶
func DiscardOld() StreamOption
func DuplicateWindow ¶ added in v0.0.18
func DuplicateWindow(d time.Duration) StreamOption
func FileStorage ¶
func FileStorage() StreamOption
func InterestRetention ¶
func InterestRetention() StreamOption
func LimitsRetention ¶
func LimitsRetention() StreamOption
func MaxAge ¶
func MaxAge(m time.Duration) StreamOption
func MaxBytes ¶
func MaxBytes(m int64) StreamOption
func MaxConsumers ¶
func MaxConsumers(m int) StreamOption
func MaxMessageSize ¶
func MaxMessageSize(m int32) StreamOption
func MaxMessages ¶
func MaxMessages(m int64) StreamOption
func MaxMessagesPerSubject ¶ added in v0.0.24
func MaxMessagesPerSubject(m int64) StreamOption
func MemoryStorage ¶
func MemoryStorage() StreamOption
func Mirror ¶ added in v0.0.21
func Mirror(stream *api.StreamSource) StreamOption
func NoAck ¶
func NoAck() StreamOption
func PlacementCluster ¶ added in v0.0.21
func PlacementCluster(cluster string) StreamOption
func PlacementTags ¶ added in v0.0.21
func PlacementTags(tags ...string) StreamOption
func Replicas ¶
func Replicas(r int) StreamOption
func Sources ¶ added in v0.0.21
func Sources(streams ...*api.StreamSource) StreamOption
func StreamDescription ¶ added in v0.0.26
func StreamDescription(d string) StreamOption
StreamDescription is a textual description of this stream to provide additional context
func Subjects ¶
func Subjects(s ...string) StreamOption
func WorkQueueRetention ¶
func WorkQueueRetention() StreamOption
type StreamPager ¶ added in v0.0.19
type StreamPager struct {
// contains filtered or unexported fields
}
func (*StreamPager) Close ¶ added in v0.0.19
func (p *StreamPager) Close() error
Close dispose of the resources used by the pager and should be called when done
func (*StreamPager) NextMsg ¶ added in v0.0.19
func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, err error)
NextMsg retrieves the next message from the pager interrupted by ctx.
last indicates if the message is the last in the current page, the next call to NextMsg will first request the next page, if the client is prompting users to continue to the next page it should be done when last is true
When the end of the stream is reached err will be non nil and last will be true otherwise err being non nil while last is false indicate a failed state. End is indicated by no new messages arriving after ctx timeout or the time set using PagerTimes() is reached
type StreamTemplate ¶
type StreamTemplate struct {
// contains filtered or unexported fields
}
func (*StreamTemplate) Configuration ¶
func (t *StreamTemplate) Configuration() api.StreamTemplateConfig
func (*StreamTemplate) Delete ¶
func (t *StreamTemplate) Delete() error
Delete deletes the StreamTemplate, after this the StreamTemplate object should be disposed
func (*StreamTemplate) MaxStreams ¶
func (t *StreamTemplate) MaxStreams() uint32
func (*StreamTemplate) Name ¶
func (t *StreamTemplate) Name() string
func (*StreamTemplate) Reset ¶
func (t *StreamTemplate) Reset() error
Reset reloads the Stream Template configuration and state from the JetStream server
func (*StreamTemplate) StreamConfiguration ¶
func (t *StreamTemplate) StreamConfiguration() api.StreamConfig
func (*StreamTemplate) Streams ¶
func (t *StreamTemplate) Streams() []string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package election is a JetStream backed leader election system.
|
Package election is a JetStream backed leader election system. |
Package governor controls the concurrency of a network wide process Using this one can, for example, create CRON jobs that can trigger 100s or 1000s concurrently but where most will wait for a set limit to complete.
|
Package governor controls the concurrency of a network wide process Using this one can, for example, create CRON jobs that can trigger 100s or 1000s concurrently but where most will wait for a set limit to complete. |
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.
|
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context. |