jsm

package module
v0.0.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2021 License: Apache-2.0 Imports: 19 Imported by: 55

README

Overview

This is a Go based library to manage and interact with JetStream.

This package is the underlying library for the nats CLI, our Terraform provider, GitHub Actions and Kubernetes CRDs.

NOTE: This package is under development, while JetStream is in Preview we make no promises about the API stability of this package.

Go Doc

Features

  • Manage and interact with Streams
  • Manage and interact with Consumers
  • Perform Backup and Restore operations of configuration and data
  • Schema registry of many standard NATS events and APIs that supports validation using JSON Schema
  • Process, validate and render NATS server events and advisories

Initialization

This package is modeled as a Manager instance that receives a NATS Connection and sets default timeouts and validation for all interaction with JetStream.

Multiple Managers can be used in your application each with own timeouts and connection.

mgr, _ := jsm.New(nc, jsm.WithTimeout(10*time.Second))

This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager was created as above.

Managing Streams

A Stream stores data, this package allow you to Read messages from , Create, Delete, List and Update Streams.

Creating Streams

Before anything you have to create a stream, the basic pattern is:

mgr, _ := jsm.New(nc)
stream, _ := mgr.NewStream("ORDERS", jsm.Subjects("ORDERS.*"), jsm.MaxAge(24*365*time.Hour), jsm.FileStorage())

The mgr.NewStream uses jsm.DefaultStream as starting defaults.

This can get quite verbose, so you might have a template configuration of your own choosing to create many similar Streams.

template, _ := jsm.NewStreamConfiguration(jsm.DefaultStream, jsm.MaxAge(24 * 365 * time.Hour), jsm.FileStorage())

orders, _ := mgr.NewStreamFromDefault("ORDERS", template,  jsm.Subjects("ORDERS.*"))
archive, _ := mgr.NewStreamFromDefault("ARCHIVE", template, jsm.Subjects("ARCHIVE"), jsm.MaxAge(5*template.MaxAge))

We have 2 pre-defined configurations that you might use instead of your own template - jsm.DefaultStream and jsm.DefaultWorkQueue.

You can even copy Stream configurations this way (not content, just configuration), this creates STAGING using ORDERS config with a different set of subjects:

orders, err := mgr.NewStream("ORDERS", jsm.Subjects("ORDERS.*"), jsm.MaxAge(24*365*time.Hour), jsm.FileStorage())
staging, err := mgr.NewStreamFromDefault("STAGING", orders.Configuration(), jsm.Subjects("STAGINGORDERS.*"))

Loading references to existing streams

Once a Stream exist you can load it later:

orders, err := mgr.LoadStream("ORDERS")

This will fail if the stream does not exist, create and load can be combined:

orders, err := mgr.LoadOrNewFromDefault("ORDERS", template, jsm.Subjects("ORDERS.*"))

This will create the Stream if it doesn't exist, else load the existing one - though no effort is made to ensure the loaded one matches the desired configuration in that case.

Associated Consumers

With a stream handle you can get lists of known Consumers using stream.ConsumerNames(), or create new Consumers within the stream using stream.NewConsumer and stream.NewConsumerFromDefault. Consumers can also be loaded using stream.LoadConsumer and you can combine load and create using stream.LoadOrNewConsumer and stream.LoadOrNewConsumerFromDefault.

These methods just proxy to the Consumer specific ones which will be discussed below. When creating new Consumer instances this way the connection information from the Stream is passed into the Consumer.

Other actions

There are a number of other functions allowing you to purge messages, read individual messages, get statistics and access the configuration. Review the godoc for details.

Consumers

Creating

Above you saw that once you have a handle to a stream you can create and load consumers, you can access the consumer directly though, lets create one:

consumer, err := mgr.NewConsumer("ORDERS", "NEW", jsm.FilterSubject("ORDERS.received"), jsm.SampleFrequency("100"))

Like with Streams we have NewConsumerFromDefault, LoadOrNewConsumer and LoadOrNewConsumerFromDefault and we supply 2 default configurations to help you DefaultConsumer and SampledDefaultConsumer.

When using LoadOrNewConsumer and LoadOrNewConsumerFromDefault if a durable name is given then that has to match the name supplied.

Many options exist to set starting points, durability and more - everything that you will find in the jsm utility, review the godoc for full details.

Consuming

Push-based Consumers are accessed using the normal NATS subscribe approach:

ib := nats.NewInbox()

sub, _ := nc.Subscribe(ib, func(m *nats.Msg){ // process messages })

consumer, _ := mgr.NewConsumer("ORDERS", "NEW", jsm.FilterSubject("ORDERS.received"), jsm.SampleFrequency("100"), jsm.DeliverySubject(ib))

For Pull-based Consumers we have a helper to fetch the next message:

// 1 message
msg, err := consumer.NextMsg()

When consuming these messages they have metadata attached that you can parse:

msg, _ := consumer.NextMsg(jsm.WithTimeout(60*time.Second))
meta, _ := jsm.ParseJSMsgMetadata(msg)

At this point you have access to meta.Stream, meta.Consumer for the names and meta.StreamSequence, meta.ConsumerSequence to determine which exact message and meta.Delivered for how many times it was redelivered.

If using the latest nats.go branch the nats.Msg instance will have a JetStreamMetaData() function that performs the same parsing and it also have helpers to acknowledge messages and more.

sub, _ := nc.Subscribe(ib, func(m *nats.Msg){
  meta, _ := m.JetStreamMetaData()
  fmt.Printf("Received message from %s > %s\n", meta.Stream, meta.Consumer)
  m.Ack()
})

Other Actions

There are a number of other functions to help you determine if its Pull or Push, is it Durable, Sampled and to access the full configuration.

Schema Registry

All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the api package has a Schema Registry and helpers to discover and interact with these.

The Schema Registry can be accessed on the cli in the nats schemas command where you can list, search and view schemas and validate data based on schemas.

Example Message

To retrieve the Stream State for a specific Stream one accesses the $JS.API.STREAM.INFO.<stream> API, this will respond with data like below:

{
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "config": {
    "name": "TESTING",
    "subjects": [
      "js.in.testing"
    ],
    "retention": "limits",
    "max_consumers": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "discard": "old",
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "num_replicas": 1,
    "duplicate_window": 120000000000
  },
  "created": "2020-10-09T12:40:07.648216464Z",
  "state": {
    "messages": 1,
    "bytes": 81,
    "first_seq": 1017,
    "first_ts": "2020-10-09T19:43:40.867729419Z",
    "last_seq": 1017,
    "last_ts": "2020-10-09T19:43:40.867729419Z",
    "consumer_count": 1
  }
}

Here the type of the message is io.nats.jetstream.api.v1.stream_info_response, the API package can help parse this into the correct format.

Message Schemas

Given a message kind one can retrieve the full JSON Schema as bytes:

schema, _ := api.Schema("io.nats.jetstream.api.v1.stream_info_response")

Once can also retrieve it based on a specific message content:

schemaType, _ := api.SchemaTypeForMessage(m.Data)
schema, _ := api.Schema(schemaType)

Several other Schema related helpers exist to search Schemas, fine URLs and more. See the api Reference.

Parsing Message Content

JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)
    log.Printf("Received message of type %s", kind) // io.nats.jetstream.advisory.v1.api_audit
    
    switch e := event.(type){
    case advisory.JetStreamAPIAuditV1:
        fmt.Printf("Audit event on subject %s from %s\n", e.Subject, e.Client.Name)                
    }
})

Above we gain full access to all contents of the message in it's native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)

    if kind == "io.nats.unknown_message" {
        return // a message without metadata or of a unknown format was received
    }

    ne, ok := event.(api.Event)
    if !ok {
        return fmt.Errorf("event %q does not implement the Event interface", kind)
    }

    err = api.RenderEvent(os.Stdout, ne, api.TextCompactFormat)
    if err != nil {
        return fmt.Errorf("display failed: %s", err)
    }
})

This will produce output like:

11:25:49 [JS API] $JS.API.STREAM.INFO.TESTING $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:53 [JS API] $JS.API.STREAM.INFO.TESTING $G

The api.TextCompactFormat is one of a few we support, also api.TextExtendedFormat for a full multi line format, api.ApplicationCloudEventV1Format for CloudEvents v1 format and api.ApplicationJSONFormat for JSON.

API Validation

The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and provide better errors.

type SchemaValidator struct{}

func (v SchemaValidator) ValidateStruct(data interface{}, schemaType string) (ok bool, errs []string) {
	s, err := api.Schema(schemaType)
	if err != nil {
		return false, []string{"unknown schema type %s", schemaType}
	}

	ls := gojsonschema.NewBytesLoader(s)
	ld := gojsonschema.NewGoLoader(data)
	result, err := gojsonschema.Validate(ls, ld)
	if err != nil {
		return false, []string{fmt.Sprintf("validation failed: %s", err)}
	}

	if result.Valid() {
		return true, nil
	}

	errors := make([]string, len(result.Errors()))
	for i, verr := range result.Errors() {
		errors[i] = verr.String()
	}

	return false, errors
}

This is a api.StructValidator implementation that uses JSON Schema to do deep validation of the structures sent to JetStream.

This can be used by the Manager to validate all API access.

mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)))

Documentation

Overview

Package jsm provides client helpers for managing and interacting with NATS JetStream

Index

Constants

This section is empty.

Variables

View Source
var DefaultConsumer = api.ConsumerConfig{
	DeliverPolicy: api.DeliverAll,
	AckPolicy:     api.AckExplicit,
	AckWait:       30 * time.Second,
	ReplayPolicy:  api.ReplayInstant,
}

DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer

View Source
var DefaultStream = api.StreamConfig{
	Retention:    api.LimitsPolicy,
	Discard:      api.DiscardOld,
	MaxConsumers: -1,
	MaxMsgs:      -1,
	MaxMsgsPer:   -1,
	MaxBytes:     -1,
	MaxAge:       24 * 365 * time.Hour,
	MaxMsgSize:   -1,
	Replicas:     1,
	NoAck:        false,
}

DefaultStream is a template configuration with StreamPolicy retention and 1 years maximum age. No storage type or subjects are set

View Source
var DefaultStreamConfiguration = DefaultStream

DefaultStreamConfiguration is the configuration that will be used to create new Streams in NewStream

View Source
var DefaultWorkQueue = api.StreamConfig{
	Retention:    api.WorkQueuePolicy,
	Discard:      api.DiscardOld,
	MaxConsumers: -1,
	MaxMsgs:      -1,
	MaxMsgsPer:   -1,
	MaxBytes:     -1,
	MaxAge:       24 * 365 * time.Hour,
	MaxMsgSize:   -1,
	Replicas:     api.StreamDefaultReplicas,
	NoAck:        false,
}

DefaultWorkQueue is a template configuration with WorkQueuePolicy retention and 1 years maximum age. No storage type or subjects are set

View Source
var SampledDefaultConsumer = api.ConsumerConfig{
	DeliverPolicy:   api.DeliverAll,
	AckPolicy:       api.AckExplicit,
	AckWait:         30 * time.Second,
	ReplayPolicy:    api.ReplayInstant,
	SampleFrequency: "100%",
}

SampledDefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer

Functions

func APISubject added in v0.0.21

func APISubject(subject string, prefix string, domain string) string

APISubject returns API subject with prefix applied

func EventSubject added in v0.0.21

func EventSubject(subject string, prefix string) string

EventSubject returns Event subject with prefix applied

func IsErrorResponse

func IsErrorResponse(m *nats.Msg) bool

IsErrorResponse checks if the message holds a standard JetStream error

func IsInternalStream added in v0.0.27

func IsInternalStream(s string) bool

IsInternalStream indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state

func IsKVBucketStream added in v0.0.27

func IsKVBucketStream(s string) bool

IsKVBucketStream determines if a stream is a KV bucket

func IsMQTTStateStream added in v0.0.27

func IsMQTTStateStream(s string) bool

IsMQTTStateStream determines if a stream holds internal MQTT state

func IsNatsError added in v0.0.25

func IsNatsError(err error, code uint16) bool

IsNatsError checks if err is a ApiErr matching code

func IsOKResponse

func IsOKResponse(m *nats.Msg) bool

IsOKResponse checks if the message holds a standard JetStream error

func IsObjectBucketStream added in v0.0.27

func IsObjectBucketStream(s string) bool

IsObjectBucketStream determines if a stream is a Object bucket

func IsValidName added in v0.0.18

func IsValidName(n string) bool

IsValidName verifies if n is a valid stream, template or consumer name

func NewConsumerConfiguration

func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)

NewConsumerConfiguration generates a new configuration based on template modified by opts

func NewStreamConfiguration

func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)

NewStreamConfiguration generates a new configuration based on template modified by opts

func NextSubject

func NextSubject(stream string, consumer string) (string, error)

NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer

func ParseErrorResponse

func ParseErrorResponse(m *nats.Msg) error

ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil

func ParseEvent

func ParseEvent(e []byte) (schema string, event interface{}, err error)

ParseEvent parses event e and returns event as for example *api.ConsumerAckMetric, all unknown event schemas will be of type *UnknownMessage

func ParsePubAck added in v0.0.25

func ParsePubAck(m *nats.Msg) (*api.PubAck, error)

ParsePubAck parses a stream publish response and returns an error if the publish failed or parsing failed

Types

type BackupData

type BackupData struct {
	Type          string `json:"type"`
	Time          string `json:"time"`
	Configuration []byte `json:"configuration"`
	Checksum      string `json:"checksum"`
}

type Consumer

type Consumer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Consumer represents a JetStream consumer

func (*Consumer) AckPolicy

func (c *Consumer) AckPolicy() api.AckPolicy

func (*Consumer) AckSampleSubject

func (c *Consumer) AckSampleSubject() string

AckSampleSubject is the subject used to publish ack samples to

func (*Consumer) AckWait

func (c *Consumer) AckWait() time.Duration

func (*Consumer) AcknowledgedFloor

func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)

AcknowledgedFloor reports the highest contiguous message sequences that were acknowledged

func (*Consumer) AdvisorySubject

func (c *Consumer) AdvisorySubject() string

AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this consumer

func (*Consumer) Configuration

func (c *Consumer) Configuration() (config api.ConsumerConfig)

Configuration is the Consumer configuration

func (*Consumer) Delete

func (c *Consumer) Delete() (err error)

Delete deletes the Consumer, after this the Consumer object should be disposed

func (*Consumer) DeliverGroup added in v0.0.26

func (c *Consumer) DeliverGroup() string

func (*Consumer) DeliverPolicy

func (c *Consumer) DeliverPolicy() api.DeliverPolicy

func (*Consumer) DeliveredState

func (c *Consumer) DeliveredState() (api.SequenceInfo, error)

DeliveredState reports the messages sequences that were successfully delivered

func (*Consumer) DeliverySubject

func (c *Consumer) DeliverySubject() string

func (*Consumer) Description added in v0.0.26

func (c *Consumer) Description() string

func (*Consumer) DurableName

func (c *Consumer) DurableName() string

func (*Consumer) FilterSubject

func (c *Consumer) FilterSubject() string

func (*Consumer) FlowControl added in v0.0.21

func (c *Consumer) FlowControl() bool

func (*Consumer) Heartbeat added in v0.0.21

func (c *Consumer) Heartbeat() time.Duration

func (*Consumer) IsDurable

func (c *Consumer) IsDurable() bool

func (*Consumer) IsEphemeral

func (c *Consumer) IsEphemeral() bool

func (*Consumer) IsHeadersOnly added in v0.0.27

func (c *Consumer) IsHeadersOnly() bool

func (*Consumer) IsPullMode

func (c *Consumer) IsPullMode() bool

func (*Consumer) IsPushMode

func (c *Consumer) IsPushMode() bool

func (*Consumer) IsSampled

func (c *Consumer) IsSampled() bool

func (*Consumer) LatestState added in v0.0.23

func (c *Consumer) LatestState() (api.ConsumerInfo, error)

LatestState returns the most recently loaded state

func (*Consumer) LeaderStepDown added in v0.0.21

func (c *Consumer) LeaderStepDown() error

LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election

func (*Consumer) MaxAckPending added in v0.0.20

func (c *Consumer) MaxAckPending() int

func (*Consumer) MaxDeliver

func (c *Consumer) MaxDeliver() int

func (*Consumer) MaxWaiting added in v0.0.24

func (c *Consumer) MaxWaiting() int

func (*Consumer) MetricSubject

func (c *Consumer) MetricSubject() string

MetricSubject is a wildcard subscription subject that subscribes to all metrics for this consumer

func (*Consumer) Name

func (c *Consumer) Name() string

func (*Consumer) NextMsg

func (c *Consumer) NextMsg() (*nats.Msg, error)

NextMsg retrieves the next message, waiting up to manager timeout for a response

func (*Consumer) NextMsgContext added in v0.0.19

func (c *Consumer) NextMsgContext(ctx context.Context) (*nats.Msg, error)

NextMsgContext retrieves the next message, interrupted by the cancel context ctx

func (*Consumer) NextMsgRequest added in v0.0.20

func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error

NextMsgRequest creates a request for a batch of messages, data or control flow messages will be sent to inbox

func (*Consumer) NextSubject

func (c *Consumer) NextSubject() string

NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer

func (*Consumer) PendingAcknowledgement added in v0.0.20

func (c *Consumer) PendingAcknowledgement() (int, error)

PendingAcknowledgement reports the number of messages sent but not yet acknowledged

func (*Consumer) PendingMessages added in v0.0.20

func (c *Consumer) PendingMessages() (uint64, error)

PendingMessages is the number of unprocessed messages for this consumer

func (*Consumer) RateLimit added in v0.0.18

func (c *Consumer) RateLimit() uint64

func (*Consumer) RedeliveryCount

func (c *Consumer) RedeliveryCount() (int, error)

RedeliveryCount reports the number of redelivers that were done

func (*Consumer) ReplayPolicy

func (c *Consumer) ReplayPolicy() api.ReplayPolicy

func (*Consumer) Reset

func (c *Consumer) Reset() error

Reset reloads the Consumer configuration from the JetStream server

func (*Consumer) SampleFrequency

func (c *Consumer) SampleFrequency() string

func (*Consumer) StartSequence

func (c *Consumer) StartSequence() uint64

func (*Consumer) StartTime

func (c *Consumer) StartTime() time.Time

func (*Consumer) State

func (c *Consumer) State() (api.ConsumerInfo, error)

State loads a snapshot of consumer state including delivery counts, retries and more

func (*Consumer) StreamName

func (c *Consumer) StreamName() string

func (*Consumer) UpdateConfiguration added in v0.0.27

func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error

UpdateConfiguration updates the consumer configuration At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed

func (*Consumer) WaitingClientPulls added in v0.0.20

func (c *Consumer) WaitingClientPulls() (int, error)

WaitingClientPulls is the number of clients that have outstanding pull requests against this consumer

type ConsumerBackup

type ConsumerBackup struct {
	Name   string             `json:"name"`
	Stream string             `json:"stream"`
	Config api.ConsumerConfig `json:"config"`
}

type ConsumerOption

type ConsumerOption func(o *api.ConsumerConfig) error

ConsumerOption configures consumers

func AckWait

func AckWait(t time.Duration) ConsumerOption

AckWait sets the time a delivered message might remain unacknowledged before redelivery is attempted

func AcknowledgeAll

func AcknowledgeAll() ConsumerOption

AcknowledgeAll enables an acknowledgement mode where acknowledging message 100 will also ack the preceding messages

func AcknowledgeExplicit

func AcknowledgeExplicit() ConsumerOption

AcknowledgeExplicit requires that every message received be acknowledged

func AcknowledgeNone

func AcknowledgeNone() ConsumerOption

AcknowledgeNone disables message acknowledgement

func ConsumerDescription added in v0.0.26

func ConsumerDescription(d string) ConsumerOption

ConsumerDescription is a textual description of this consumer to provide additional context

func DeliverAllAvailable

func DeliverAllAvailable() ConsumerOption

DeliverAllAvailable delivers messages starting with the first available in the stream

func DeliverGroup added in v0.0.26

func DeliverGroup(g string) ConsumerOption

DeliverGroup when set will only deliver messages to subscriptions matching that group

func DeliverHeadersOnly added in v0.0.27

func DeliverHeadersOnly() ConsumerOption

DeliverHeadersOnly configures the consumer to only deliver existing header and the `Nats-Msg-Size` header, no bodies

func DeliverLastPerSubject added in v0.0.26

func DeliverLastPerSubject() ConsumerOption

DeliverLastPerSubject delivers the last message for each subject in a wildcard stream based on the filter subjects of the consumer

func DeliverySubject

func DeliverySubject(s string) ConsumerOption

DeliverySubject is the subject where a Push consumer will deliver its messages

func DurableName

func DurableName(s string) ConsumerOption

DurableName is the name given to the consumer, when not set an ephemeral consumer is created

func FilterStreamBySubject

func FilterStreamBySubject(s string) ConsumerOption

FilterStreamBySubject filters the messages in a wildcard stream to those matching a specific subject

func IdleHeartbeat added in v0.0.21

func IdleHeartbeat(hb time.Duration) ConsumerOption

IdleHeartbeat sets the time before an idle consumer will send a empty message with Status header 100 indicating the consumer is still alive

func MaxAckPending added in v0.0.20

func MaxAckPending(pending uint) ConsumerOption

MaxAckPending maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended

func MaxDeliveryAttempts

func MaxDeliveryAttempts(n int) ConsumerOption

MaxDeliveryAttempts is the number of times a message will be attempted to be delivered

func MaxWaiting added in v0.0.24

func MaxWaiting(pulls uint) ConsumerOption

MaxWaiting is the number of outstanding pulls that are allowed on any one consumer. Pulls made that exceeds this limit are discarded.

func PushFlowControl added in v0.0.21

func PushFlowControl() ConsumerOption

PushFlowControl enables flow control for push based consumers

func RateLimitBitsPerSecond added in v0.0.18

func RateLimitBitsPerSecond(bps uint64) ConsumerOption

RateLimitBitsPerSecond limits message delivery to a rate in bits per second

func ReplayAsReceived

func ReplayAsReceived() ConsumerOption

ReplayAsReceived delivers messages at the rate they were received at

func ReplayInstantly

func ReplayInstantly() ConsumerOption

ReplayInstantly delivers messages to the consumer as fast as possible

func SamplePercent

func SamplePercent(i int) ConsumerOption

SamplePercent configures sampling of a subset of messages expressed as a percentage

func StartAtSequence

func StartAtSequence(s uint64) ConsumerOption

StartAtSequence starts consuming messages at a specific sequence in the stream

func StartAtTime

func StartAtTime(t time.Time) ConsumerOption

StartAtTime starts consuming messages at a specific point in time in the stream

func StartAtTimeDelta

func StartAtTimeDelta(d time.Duration) ConsumerOption

StartAtTimeDelta starts delivering messages at a past point in time

func StartWithLastReceived

func StartWithLastReceived() ConsumerOption

StartWithLastReceived starts delivery at the last messages received in the stream

func StartWithNextReceived

func StartWithNextReceived() ConsumerOption

StartWithNextReceived starts delivery at the next messages received in the stream

type Manager added in v0.0.19

type Manager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func New added in v0.0.19

func New(nc *nats.Conn, opts ...Option) (*Manager, error)

func (*Manager) BackupJetStreamConfiguration added in v0.0.19

func (m *Manager) BackupJetStreamConfiguration(backupDir string, data bool) error

BackupJetStreamConfiguration creates a backup of all configuration for Streams, Consumers and Stream Templates.

Stream data can optionally be backed up

func (*Manager) ConsumerNames added in v0.0.19

func (m *Manager) ConsumerNames(stream string) (names []string, err error)

ConsumerNames is a sorted list of all known consumers within a stream

func (*Manager) Consumers added in v0.0.19

func (m *Manager) Consumers(stream string) (consumers []*Consumer, err error)

Consumers is a sorted list of all known Consumers within a Stream

func (*Manager) DeleteStreamMessage added in v0.0.25

func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error

DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete

func (*Manager) EachStream added in v0.0.19

func (m *Manager) EachStream(cb func(*Stream)) (err error)

EachStream iterates over all known Streams

func (*Manager) EachStreamTemplate added in v0.0.19

func (m *Manager) EachStreamTemplate(cb func(*StreamTemplate)) (err error)

EachStreamTemplate iterates over all known Stream Templates

func (*Manager) IsJetStreamEnabled added in v0.0.19

func (m *Manager) IsJetStreamEnabled() bool

IsJetStreamEnabled determines if JetStream is enabled for the current account

func (*Manager) IsKnownConsumer added in v0.0.19

func (m *Manager) IsKnownConsumer(stream string, consumer string) (bool, error)

IsKnownConsumer determines if a Consumer is known for a specific Stream

func (*Manager) IsKnownStream added in v0.0.19

func (m *Manager) IsKnownStream(stream string) (bool, error)

IsKnownStream determines if a Stream is known

func (*Manager) IsKnownStreamTemplate added in v0.0.19

func (m *Manager) IsKnownStreamTemplate(template string) (bool, error)

IsKnownStreamTemplate determines if a StreamTemplate is known

func (*Manager) JetStreamAccountInfo added in v0.0.19

func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)

JetStreamAccountInfo retrieves information about the current account limits and more

func (*Manager) LoadConsumer added in v0.0.19

func (m *Manager) LoadConsumer(stream string, name string) (consumer *Consumer, err error)

LoadConsumer loads a consumer by name

func (*Manager) LoadOrNewConsumer added in v0.0.19

func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumer loads a consumer by name if known else creates a new one with these properties

func (*Manager) LoadOrNewConsumerFromDefault added in v0.0.19

func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumerFromDefault loads a consumer by name if known else creates a new one with these properties based on template

func (*Manager) LoadOrNewStream added in v0.0.19

func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)

LoadOrNewStream loads an existing stream or creates a new one matching opts

func (*Manager) LoadOrNewStreamFromDefault added in v0.0.19

func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)

LoadOrNewStreamFromDefault loads an existing stream or creates a new one matching opts and template

func (*Manager) LoadOrNewStreamTemplate added in v0.0.19

func (m *Manager) LoadOrNewStreamTemplate(name string, maxStreams uint32, config api.StreamConfig, opts ...StreamOption) (template *StreamTemplate, err error)

LoadOrNewStreamTemplate loads an existing template, else creates a new one based on config

func (*Manager) LoadStream added in v0.0.19

func (m *Manager) LoadStream(name string) (stream *Stream, err error)

LoadStream loads a stream by name

func (*Manager) LoadStreamTemplate added in v0.0.19

func (m *Manager) LoadStreamTemplate(name string) (template *StreamTemplate, err error)

LoadStreamTemplate loads a given stream template from JetStream

func (*Manager) MetaLeaderStandDown added in v0.0.21

func (m *Manager) MetaLeaderStandDown(placement *api.Placement) error

MetaLeaderStandDown requests the meta group leader to stand down, must be initiated by a system user

func (*Manager) MetaPeerRemove added in v0.0.21

func (m *Manager) MetaPeerRemove(name string) error

MetaPeerRemove removes a peer from the JetStream meta cluster, evicting all streams, consumer etc. Use with extreme caution.

func (*Manager) NatsConn added in v0.0.25

func (m *Manager) NatsConn() *nats.Conn

NatsConn gives access to the underlying NATS Connection

func (*Manager) NewConsumer added in v0.0.19

func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumer creates a consumer based on DefaultConsumer modified by opts

func (*Manager) NewConsumerFromDefault added in v0.0.19

func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumerFromDefault creates a new consumer based on a template config that gets modified by opts

func (*Manager) NewStream added in v0.0.19

func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)

NewStream creates a new stream using DefaultStream as a starting template allowing adjustments to be made using options

func (*Manager) NewStreamConfiguration added in v0.0.19

func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)

NewStreamConfiguration generates a new configuration based on template modified by opts

func (*Manager) NewStreamFromDefault added in v0.0.19

func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)

NewStreamFromDefault creates a new stream based on a supplied template and options

func (*Manager) NewStreamTemplate added in v0.0.19

func (m *Manager) NewStreamTemplate(name string, maxStreams uint32, config api.StreamConfig, opts ...StreamOption) (template *StreamTemplate, err error)

NewStreamTemplate creates a new template

func (*Manager) NextMsg added in v0.0.19

func (m *Manager) NextMsg(stream string, consumer string) (*nats.Msg, error)

NextMsg requests the next message from the server with the manager timeout

func (*Manager) NextMsgContext added in v0.0.19

func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)

NextMsgContext requests the next message from the server. This request will wait for as long as the context is active. If repeated pulls will be made it's better to use NextMsgRequest()

func (*Manager) NextMsgRequest added in v0.0.20

func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, req *api.JSApiConsumerGetNextRequest) error

NextMsgRequest creates a request for a batch of messages on a consumer, data or control flow messages will be sent to inbox

func (*Manager) NextSubject added in v0.0.21

func (m *Manager) NextSubject(stream string, consumer string) (string, error)

NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer

func (*Manager) ReadLastMessageForSubject added in v0.0.25

func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)

ReadLastMessageForSubject reads the last message stored in the stream for a specific subject

func (*Manager) RestoreJetStreamConfiguration added in v0.0.19

func (m *Manager) RestoreJetStreamConfiguration(backupDir string, update bool) error

RestoreJetStreamConfiguration restores the configuration from a backup made by BackupJetStreamConfiguration

func (*Manager) RestoreJetStreamConfigurationFile added in v0.0.19

func (m *Manager) RestoreJetStreamConfigurationFile(path string, update bool) error

RestoreJetStreamConfigurationFile restores a single file from a backup made by BackupJetStreamConfiguration

func (*Manager) RestoreSnapshotFromDirectory added in v0.0.21

func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)

func (*Manager) StreamNames added in v0.0.19

func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)

StreamNames is a sorted list of all known Streams filtered by filter

func (*Manager) StreamTemplateNames added in v0.0.19

func (m *Manager) StreamTemplateNames() (templates []string, err error)

StreamTemplateNames is a sorted list of all known StreamTemplates

func (*Manager) Streams added in v0.0.19

func (m *Manager) Streams() (streams []*Stream, err error)

Streams is a sorted list of all known Streams

type MsgInfo

type MsgInfo struct {
	// contains filtered or unexported fields
}

MsgInfo holds metadata about a message that was received from JetStream

func ParseJSMsgMetadata

func ParseJSMsgMetadata(m *nats.Msg) (info *MsgInfo, err error)

ParseJSMsgMetadata parse the reply subject metadata to determine message metadata

func ParseJSMsgMetadataReply added in v0.0.20

func ParseJSMsgMetadataReply(reply string) (info *MsgInfo, err error)

ParseJSMsgMetadataReply parses the reply subject of a JetStream originated message

func (*MsgInfo) Consumer

func (i *MsgInfo) Consumer() string

Consumer is the name of the consumer that produced this message

func (*MsgInfo) ConsumerSequence

func (i *MsgInfo) ConsumerSequence() uint64

ConsumerSequence is the sequence of this message in the consumer

func (*MsgInfo) Delivered

func (i *MsgInfo) Delivered() int

Delivered is the number of times this message had delivery attempts including this one

func (*MsgInfo) Domain added in v0.0.26

func (i *MsgInfo) Domain() string

Domain is the domain the message came from, can be empty

func (*MsgInfo) Pending added in v0.0.20

func (i *MsgInfo) Pending() uint64

Pending is the number of messages left to consume, -1 when the number is not reported

func (*MsgInfo) Stream

func (i *MsgInfo) Stream() string

Stream is the stream this message is stored in

func (*MsgInfo) StreamSequence

func (i *MsgInfo) StreamSequence() uint64

StreamSequence is the sequence of this message in the stream

func (*MsgInfo) TimeStamp

func (i *MsgInfo) TimeStamp() time.Time

TimeStamp is the time the message was received by JetStream

type Option added in v0.0.19

type Option func(o *Manager)

Option is a option to configure the JetStream Manager

func WithAPIPrefix added in v0.0.21

func WithAPIPrefix(s string) Option

WithAPIPrefix replace API endpoints like $JS.API.STREAM.NAMES with prefix.STREAM.NAMES

func WithAPIValidation

func WithAPIValidation(v api.StructValidator) Option

WithAPIValidation validates responses sent from the NATS server using a validator

func WithDomain added in v0.0.24

func WithDomain(d string) Option

WithDomain sets a JetStream domain, incompatible with WithApiPrefix()

func WithEventPrefix added in v0.0.21

func WithEventPrefix(s string) Option

WithEventPrefix replace event subjects like $JS.EVENT.ADVISORY.API with prefix.ADVISORY

func WithTimeout

func WithTimeout(t time.Duration) Option

WithTimeout sets a timeout for the requests

func WithTrace

func WithTrace() Option

WithTrace enables logging of JSON API requests and responses

type PagerOption added in v0.0.19

type PagerOption func(p *StreamPager)

PagerOption configures the stream pager

func PagerFilterSubject added in v0.0.23

func PagerFilterSubject(s string) PagerOption

PagerFilterSubject sets a filter subject for the pager

func PagerSize added in v0.0.19

func PagerSize(sz int) PagerOption

PagerSize is the size of pages to walk

func PagerStartDelta added in v0.0.19

func PagerStartDelta(d time.Duration) PagerOption

PagerStartDelta sets a starting time delta for the pager

func PagerStartId added in v0.0.19

func PagerStartId(id int) PagerOption

PagerStartId sets a starting stream sequence for the pager

func PagerTimeout added in v0.0.19

func PagerTimeout(d time.Duration) PagerOption

PagerTimeout is the time to wait for messages before it's assumed the end of the stream was reached

type RestoreProgress

type RestoreProgress interface {
	// StartTime is when the process started
	StartTime() time.Time
	// EndTime is when the process ended - zero when not completed
	EndTime() time.Time
	// ChunkSize is the size of the data packets sent over NATS
	ChunkSize() int
	// ChunksSent is the number of chunks of size ChunkSize that was sent
	ChunksSent() uint32
	// ChunksToSend number of chunks of ChunkSize expected to be sent
	ChunksToSend() int
	// BytesSent is the number of bytes sent so far
	BytesSent() uint64
	// BytesPerSecond is the number of bytes received in the last second, 0 during the first second
	BytesPerSecond() uint64
}

type SnapshotOption

type SnapshotOption func(o *snapshotOptions)

func RestoreConfiguration added in v0.0.22

func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption

RestoreConfiguration overrides the configuration used to restore

func RestoreNotify

func RestoreNotify(cb func(RestoreProgress)) SnapshotOption

RestoreNotify notifies cb about progress of the restore operation

func SnapshotConsumers

func SnapshotConsumers() SnapshotOption

SnapshotConsumers includes consumer configuration and state in backups

func SnapshotDebug

func SnapshotDebug() SnapshotOption

SnapshotDebug enables logging using the standard go logging library

func SnapshotHealthCheck

func SnapshotHealthCheck() SnapshotOption

SnapshotHealthCheck performs a health check prior to starting the snapshot

func SnapshotNotify

func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption

SnapshotNotify notifies cb about progress of the snapshot operation

type SnapshotProgress

type SnapshotProgress interface {
	// StartTime is when the process started
	StartTime() time.Time
	// EndTime is when the process ended - zero when not completed
	EndTime() time.Time
	// ChunkSize is the size of the data packets sent over NATS
	ChunkSize() int
	// ChunksReceived is how many chunks of ChunkSize were received
	ChunksReceived() uint32
	// BytesExpected is how many Bytes we should be receiving
	BytesExpected() uint64
	// BytesReceived is how many Bytes have been received
	BytesReceived() uint64
	// UncompressedBytesReceived is the number of bytes received uncompressed
	UncompressedBytesReceived() uint64
	// BytesPerSecond is the number of bytes received in the last second, 0 during the first second
	BytesPerSecond() uint64
	// HealthCheck indicates if health checking was requested
	HealthCheck() bool
	// Finished will be true after all data have been written
	Finished() bool
}

type Stream

type Stream struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Stream represents a JetStream Stream

func (*Stream) AdvisorySubject

func (s *Stream) AdvisorySubject() string

AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this stream

func (*Stream) Configuration

func (s *Stream) Configuration() api.StreamConfig

func (*Stream) ConsumerNames

func (s *Stream) ConsumerNames() (names []string, err error)

ConsumerNames is a list of all known consumers for this Stream

func (*Stream) Delete

func (s *Stream) Delete() error

Delete deletes the Stream, after this the Stream object should be disposed

func (*Stream) DeleteAllow added in v0.0.27

func (s *Stream) DeleteAllow() bool

func (*Stream) DeleteMessage

func (s *Stream) DeleteMessage(seq uint64) (err error)

DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data

func (*Stream) Description added in v0.0.26

func (s *Stream) Description() string

func (*Stream) DuplicateWindow added in v0.0.18

func (s *Stream) DuplicateWindow() time.Duration

func (*Stream) EachConsumer

func (s *Stream) EachConsumer(cb func(consumer *Consumer)) error

EachConsumer calls cb with each known consumer for this stream, error on any error to load consumers

func (*Stream) FastDeleteMessage added in v0.0.25

func (s *Stream) FastDeleteMessage(seq uint64) error

FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete

func (*Stream) Information

func (s *Stream) Information() (info *api.StreamInfo, err error)

Information loads the current stream information

func (*Stream) IsInternal added in v0.0.27

func (s *Stream) IsInternal() bool

IsInternal indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state

func (*Stream) IsKVBucket added in v0.0.27

func (s *Stream) IsKVBucket() bool

IsKVBucket determines if a stream is a KV bucket

func (*Stream) IsMQTTState added in v0.0.27

func (s *Stream) IsMQTTState() bool

IsMQTTState determines if a stream holds internal MQTT state

func (*Stream) IsMirror added in v0.0.21

func (s *Stream) IsMirror() bool

IsMirror determines if this stream is a mirror of another

func (*Stream) IsObjectBucket added in v0.0.27

func (s *Stream) IsObjectBucket() bool

IsObjectBucket determines if a stream is a Object bucket

func (*Stream) IsSourced added in v0.0.21

func (s *Stream) IsSourced() bool

IsSourced determines if this stream is sourcing data from another stream. Other streams could be synced to this stream and it would not be reported by this property

func (*Stream) IsTemplateManaged

func (s *Stream) IsTemplateManaged() bool

IsTemplateManaged determines if this stream is managed by a template

func (*Stream) LatestInformation

func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)

LatestInformation returns the most recently fetched stream information

func (*Stream) LatestState

func (s *Stream) LatestState() (state api.StreamState, err error)

LatestState returns the most recently fetched stream state

func (*Stream) LeaderStepDown added in v0.0.21

func (s *Stream) LeaderStepDown() error

LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election

func (*Stream) LoadConsumer

func (s *Stream) LoadConsumer(name string) (*Consumer, error)

LoadConsumer loads a named consumer related to this Stream

func (*Stream) LoadOrNewConsumer

func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumer loads or creates a consumer based on these options

func (*Stream) LoadOrNewConsumerFromDefault

func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumerFromDefault loads or creates a consumer based on these options that adjust supplied template

func (*Stream) MaxAge

func (s *Stream) MaxAge() time.Duration

func (*Stream) MaxBytes

func (s *Stream) MaxBytes() int64

func (*Stream) MaxConsumers

func (s *Stream) MaxConsumers() int

func (*Stream) MaxMsgSize

func (s *Stream) MaxMsgSize() int32

func (*Stream) MaxMsgs

func (s *Stream) MaxMsgs() int64

func (*Stream) MaxMsgsPerSubject added in v0.0.24

func (s *Stream) MaxMsgsPerSubject() int64

func (*Stream) MetricSubject

func (s *Stream) MetricSubject() string

MetricSubject is a wildcard subscription subject that subscribes to all advisories for this stream

func (*Stream) Mirror added in v0.0.21

func (s *Stream) Mirror() *api.StreamSource

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) NewConsumer

func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumer creates a new consumer in this Stream based on DefaultConsumer

func (*Stream) NewConsumerFromDefault

func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumerFromDefault creates a new consumer in this Stream based on a supplied template config

func (*Stream) NoAck

func (s *Stream) NoAck() bool

func (*Stream) PageContents added in v0.0.19

func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)

PageContents creates a StreamPager used to traverse the contents of the stream, Close() should be called to dispose of the background consumer and resources

func (*Stream) Purge

func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error

Purge deletes messages from the Stream, an optional JSApiStreamPurgeRequest can be supplied to limit the purge to a subset of messages

func (*Stream) PurgeAllowed added in v0.0.27

func (s *Stream) PurgeAllowed() bool

func (*Stream) ReadLastMessageForSubject added in v0.0.25

func (s *Stream) ReadLastMessageForSubject(subj string) (*api.StoredMsg, error)

ReadLastMessageForSubject reads the last message stored in the stream for a specific subject

func (*Stream) ReadMessage

func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error)

ReadMessage loads a message from the stream by its sequence number

func (*Stream) RemoveRAFTPeer added in v0.0.21

func (s *Stream) RemoveRAFTPeer(peer string) error

RemoveRAFTPeer removes a peer from the group indicating it will not return

func (*Stream) Replicas

func (s *Stream) Replicas() int

func (*Stream) Reset

func (s *Stream) Reset() error

Reset reloads the Stream configuration from the JetStream server

func (*Stream) Retention

func (s *Stream) Retention() api.RetentionPolicy

func (*Stream) RollupAllowed added in v0.0.27

func (s *Stream) RollupAllowed() bool

func (*Stream) Seal added in v0.0.27

func (s *Stream) Seal() error

Seal updates a stream so that messages can not be added or removed using the API and limits will not be processed - messages will never age out. A sealed stream can not be unsealed.

func (*Stream) Sealed added in v0.0.27

func (s *Stream) Sealed() bool

func (*Stream) SnapshotToDirectory added in v0.0.21

func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)

SnapshotToDirectory creates a backup into s2 compressed tar file

func (*Stream) Sources added in v0.0.21

func (s *Stream) Sources() []*api.StreamSource

func (*Stream) State

func (s *Stream) State() (stats api.StreamState, err error)

State retrieves the Stream State

func (*Stream) Storage

func (s *Stream) Storage() api.StorageType

func (*Stream) Subjects

func (s *Stream) Subjects() []string

func (*Stream) Template

func (s *Stream) Template() string

func (*Stream) UpdateConfiguration

func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error

UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update

type StreamNamesFilter added in v0.0.20

type StreamNamesFilter struct {
	// Subject filter the names to those consuming messages matching this subject or wildcard
	Subject string `json:"subject,omitempty"`
}

StreamNamesFilter limits the names being returned by the names API

type StreamOption

type StreamOption func(o *api.StreamConfig) error

StreamOption configures a stream

func AllowRollup added in v0.0.27

func AllowRollup() StreamOption

func AppendSource added in v0.0.21

func AppendSource(source *api.StreamSource) StreamOption

func DenyDelete added in v0.0.27

func DenyDelete() StreamOption

func DenyPurge added in v0.0.27

func DenyPurge() StreamOption

func DiscardNew

func DiscardNew() StreamOption

func DiscardOld

func DiscardOld() StreamOption

func DuplicateWindow added in v0.0.18

func DuplicateWindow(d time.Duration) StreamOption

func FileStorage

func FileStorage() StreamOption

func InterestRetention

func InterestRetention() StreamOption

func LimitsRetention

func LimitsRetention() StreamOption

func MaxAge

func MaxAge(m time.Duration) StreamOption

func MaxBytes

func MaxBytes(m int64) StreamOption

func MaxConsumers

func MaxConsumers(m int) StreamOption

func MaxMessageSize

func MaxMessageSize(m int32) StreamOption

func MaxMessages

func MaxMessages(m int64) StreamOption

func MaxMessagesPerSubject added in v0.0.24

func MaxMessagesPerSubject(m int64) StreamOption

func MemoryStorage

func MemoryStorage() StreamOption

func Mirror added in v0.0.21

func Mirror(stream *api.StreamSource) StreamOption

func NoAck

func NoAck() StreamOption

func PlacementCluster added in v0.0.21

func PlacementCluster(cluster string) StreamOption

func PlacementTags added in v0.0.21

func PlacementTags(tags ...string) StreamOption

func Replicas

func Replicas(r int) StreamOption

func Sources added in v0.0.21

func Sources(streams ...*api.StreamSource) StreamOption

func StreamDescription added in v0.0.26

func StreamDescription(d string) StreamOption

StreamDescription is a textual description of this stream to provide additional context

func Subjects

func Subjects(s ...string) StreamOption

func WorkQueueRetention

func WorkQueueRetention() StreamOption

type StreamPager added in v0.0.19

type StreamPager struct {
	// contains filtered or unexported fields
}

func (*StreamPager) Close added in v0.0.19

func (p *StreamPager) Close() error

Close dispose of the resources used by the pager and should be called when done

func (*StreamPager) NextMsg added in v0.0.19

func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, err error)

NextMsg retrieves the next message from the pager interrupted by ctx.

last indicates if the message is the last in the current page, the next call to NextMsg will first request the next page, if the client is prompting users to continue to the next page it should be done when last is true

When the end of the stream is reached err will be non nil and last will be true otherwise err being non nil while last is false indicate a failed state. End is indicated by no new messages arriving after ctx timeout or the time set using PagerTimes() is reached

type StreamTemplate

type StreamTemplate struct {
	// contains filtered or unexported fields
}

func (*StreamTemplate) Configuration

func (t *StreamTemplate) Configuration() api.StreamTemplateConfig

func (*StreamTemplate) Delete

func (t *StreamTemplate) Delete() error

Delete deletes the StreamTemplate, after this the StreamTemplate object should be disposed

func (*StreamTemplate) MaxStreams

func (t *StreamTemplate) MaxStreams() uint32

func (*StreamTemplate) Name

func (t *StreamTemplate) Name() string

func (*StreamTemplate) Reset

func (t *StreamTemplate) Reset() error

Reset reloads the Stream Template configuration and state from the JetStream server

func (*StreamTemplate) StreamConfiguration

func (t *StreamTemplate) StreamConfiguration() api.StreamConfig

func (*StreamTemplate) Streams

func (t *StreamTemplate) Streams() []string

Directories

Path Synopsis
api
Package election is a JetStream backed leader election system.
Package election is a JetStream backed leader election system.
Package governor controls the concurrency of a network wide process Using this one can, for example, create CRON jobs that can trigger 100s or 1000s concurrently but where most will wait for a set limit to complete.
Package governor controls the concurrency of a network wide process Using this one can, for example, create CRON jobs that can trigger 100s or 1000s concurrently but where most will wait for a set limit to complete.
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL