jetstream

package
v1.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2024 License: Apache-2.0 Imports: 26 Imported by: 319

README

JetStream Simplified Client JetStream API Reference

This doc covers the basic usage of the jetstream package in nats.go client.

Overview

jetstream package is a new client API to interact with NATS JetStream, aiming to replace the JetStream client implementation from nats package. The main goal of this package is to provide a simple and clear way to interact with JetStream API. Key differences between jetstream and nats packages include:

  • Using smaller, simpler interfaces to manage streams and consumers
  • Using more granular and predictable approach to consuming messages from a stream, instead of relying on often complicated and unpredictable Subscribe() method (and all of its flavors)
  • Allowing the usage of pull consumers to continuously receive incoming messages (including ordered consumer functionality)
  • Separating JetStream context from core NATS

jetstream package provides several ways of interacting with the API:

  • JetStream - top-level interface, used to create and manage streams, consumers and publishing messages
  • Stream - used to manage consumers for a specific stream, as well as performing stream-specific operations (purging, fetching and deleting messages by sequence number, fetching stream info)
  • Consumer - used to get information about a consumer as well as consuming messages
  • Msg - used for message-specific operations - reading data, headers and metadata, as well as performing various types of acknowledgements

Additionally, jetstream exposes KeyValue Store and ObjectStore capabilities. KV and Object stores are abstraction layers on top of JetStream Streams, simplifying key value and large data storage on Streams.

NOTE: jetstream requires nats-server >= 2.9.0 to work correctly.

Basic usage

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

func main() {
    // In the `jetstream` package, almost all API calls rely on `context.Context` for timeout/cancellation handling
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    nc, _ := nats.Connect(nats.DefaultURL)

    // Create a JetStream management interface
    js, _ := jetstream.New(nc)

    // Create a stream
    s, _ := js.CreateStream(ctx, jetstream.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.*"},
    })

    // Publish some messages
    for i := 0; i < 100; i++ {
        js.Publish(ctx, "ORDERS.new", []byte("hello message "+strconv.Itoa(i)))
        fmt.Printf("Published hello message %d\n", i)
    }

    // Create durable consumer
    c, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable:   "CONS",
        AckPolicy: jetstream.AckExplicitPolicy,
    })

    // Get 10 messages from the consumer
    messageCounter := 0
    msgs, err := c.Fetch(10)
    if err != nil {
        // handle error
    }
	
    for msg := range msgs.Messages() {
        msg.Ack()
        fmt.Printf("Received a JetStream message via fetch: %s\n", string(msg.Data()))
        messageCounter++
    }
	
    fmt.Printf("received %d messages\n", messageCounter)
	
    if msgs.Error() != nil {
        fmt.Println("Error during Fetch(): ", msgs.Error())
    }

    // Receive messages continuously in a callback
    cons, _ := c.Consume(func(msg jetstream.Msg) {
        msg.Ack()
        fmt.Printf("Received a JetStream message via callback: %s\n", string(msg.Data()))
        messageCounter++
    })
    defer cons.Stop()

    // Iterate over messages continuously
    it, _ := c.Messages()
    for i := 0; i < 10; i++ {
        msg, _ := it.Next()
        msg.Ack()
        fmt.Printf("Received a JetStream message via iterator: %s\n", string(msg.Data()))
        messageCounter++
    }
    it.Stop()

    // block until all 100 published messages have been processed
    for messageCounter < 100 {
        time.Sleep(10 * time.Millisecond)
    }
}

Streams

jetstream provides methods to manage and list streams, as well as perform stream-specific operations (purging, fetching/deleting messages by sequence id)

Stream management (CRUD)
js, _ := jetstream.New(nc)

// create a stream (this is an idempotent operation)
s, _ := js.CreateStream(ctx, jetstream.StreamConfig{
    Name:     "ORDERS",
    Subjects: []string{"ORDERS.*"},
})

// update a stream
s, _ = js.UpdateStream(ctx, jetstream.StreamConfig{
    Name:        "ORDERS",
    Subjects:    []string{"ORDERS.*"},
    Description: "updated stream",
})

// get stream handle
s, _ = js.Stream(ctx, "ORDERS")

// delete a stream
js.DeleteStream(ctx, "ORDERS")
Listing streams and stream names
// list streams
streams := js.ListStreams(ctx)
for s := range streams.Info() {
    fmt.Println(s.Config.Name)
}
if streams.Err() != nil {
    fmt.Println("Unexpected error occurred")
}

// list stream names
names := js.StreamNames(ctx)
for name := range names.Name() {
    fmt.Println(name)
}
if names.Err() != nil {
    fmt.Println("Unexpected error occurred")
}
Stream-specific operations

Using Stream interface, it is also possible to:

  • Purge a stream
// remove all messages from a stream
_ = s.Purge(ctx)

// remove all messages from a stream that are stored on a specific subject
_ = s.Purge(ctx, jetstream.WithPurgeSubject("ORDERS.new"))

// remove all messages up to specified sequence number
_ = s.Purge(ctx, jetstream.WithPurgeSequence(100))

// remove messages, but keep 10 newest
_ = s.Purge(ctx, jetstream.WithPurgeKeep(10))
  • Get and messages from stream
// get message from stream with sequence number == 100
msg, _ := s.GetMsg(ctx, 100)

// get last message from "ORDERS.new" subject
msg, _ = s.GetLastMsgForSubject(ctx, "ORDERS.new")

// delete a message with sequence number == 100
_ = s.DeleteMsg(ctx, 100)
  • Get information about a stream
// Fetches latest stream info from server
info, _ := s.Info(ctx)
fmt.Println(info.Config.Name)

// Returns the most recently fetched StreamInfo, without making an API call to the server
cachedInfo := s.CachedInfo()
fmt.Println(cachedInfo.Config.Name)

Consumers

Only pull consumers are supported in jetstream package. However, unlike the JetStream API in nats package, pull consumers allow for continuous message retrieval (similarly to how nats.Subscribe() works). Because of that, push consumers can be easily replaced by pull consumers for most of the use cases.

Consumers management

CRUD operations on consumers can be achieved on 2 levels:

  • on JetStream interface
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
// an error will be returned if consumer already exists and has different configuration.
cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
    Durable: "foo",
    AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
    AckPolicy: jetstream.AckExplicitPolicy,
})


// consumer can also be created using CreateOrUpdateConsumer
// this method will either create a consumer if it does not exist
// or update existing consumer (if possible)
cons2 := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
    Name: "bar",
})

// consumers can be updated
// an error will be returned if consumer with given name does not exist
// or an illegal property is to be updated (e.g. AckPolicy)
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
    AckPolicy: jetstream.AckExplicitPolicy,
    Description: "updated consumer"
})

// get consumer handle
cons, _ = js.Consumer(ctx, "ORDERS", "foo")

// delete a consumer
js.DeleteConsumer(ctx, "ORDERS", "foo")
  • on Stream interface
// Create a JetStream management interface
js, _ := jetstream.New(nc)

// get stream handle
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
    Durable:   "foo",
    AckPolicy: jetstream.AckExplicitPolicy,
})

// get consumer handle
cons, _ = stream.Consumer(ctx, "ORDERS", "foo")

// delete a consumer
stream.DeleteConsumer(ctx, "foo")

Consumer interface, returned when creating/fetching consumers, allows fetching ConsumerInfo:

// Fetches latest consumer info from server
info, _ := cons.Info(ctx)
fmt.Println(info.Config.Durable)

// Returns the most recently fetched ConsumerInfo, without making an API call to the server
cachedInfo := cons.CachedInfo()
fmt.Println(cachedInfo.Config.Durable)
Listing consumers and consumer names
// list consumers
consumers := s.ListConsumers(ctx)
for cons := range consumers.Info() {
    fmt.Println(cons.Name)
}
if consumers.Err() != nil {
    fmt.Println("Unexpected error occurred")
}

// list consumer names
names := s.ConsumerNames(ctx)
for name := range names.Name() {
    fmt.Println(name)
}
if names.Err() != nil {
    fmt.Println("Unexpected error occurred")
}
Ordered consumers

jetstream, in addition to basic named/ephemeral consumers, supports ordered consumer functionality. Ordered is strictly processing messages in the order that they were stored on the stream, providing a consistent and deterministic message ordering. It is also resilient to consumer deletion.

Ordered consumers present the same set of message consumption methods as standard pull consumers.

js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
    // Filter results from "ORDERS" stream by specific subject
    FilterSubjects: []{"ORDERS.A"},
})
Receiving messages from the consumer

The Consumer interface covers allows fetching messages on demand, with pre-defined batch size on bytes limit, or continuous push-like receiving of messages.

Single fetch

This pattern pattern allows fetching a defined number of messages in a single RPC.

  • Using Fetch or FetchBytes, consumer will return up to the provided number of messages/bytes. By default, Fetch() will wait 30 seconds before timing out (this behavior can be configured using FetchMaxWait() option):
// receive up to 10 messages from the stream
msgs, err := c.Fetch(10)
if err != nil {
	// handle error
}

for msg := range msgs.Messages() {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}

if msgs.Error() != nil {
    // handle error
}

// receive up to 1024 B of data
msgs, err := c.FetchBytes(1024)
if err != nil {
// handle error
}

for msg := range msgs.Messages() {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}

if msgs.Error() != nil {
    // handle error
}

Similarly, FetchNoWait() can be used in order to only return messages from the stream available at the time of sending request:

// FetchNoWait will not wait for new messages if the whole batch is not available at the time of sending request.
msgs, err := c.FetchNoWait(10)
if err != nil {
// handle error
}

for msg := range msgs.Messages() {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}

if msgs.Error() != nil {
    // handle error
}

Warning: Both Fetch() and FetchNoWait() have worse performance when used to continuously retrieve messages in comparison to Messages() or Consume() methods, as they do not perform any optimizations (pre-buffering) and new subscription is created for each execution.

Continuous polling

There are 2 ways to achieve push-like behavior using pull consumers in jetstream package. Both Messages() and Consume() methods perform similar optimizations and for most cases can be used interchangeably.

There is an advantage of using Messages() instead of Consume() for work-queue scenarios, where messages should be fetched one by one, as it allows for finer control over fetching single messages on demand.

Subject filtering is achieved by configuring a consumer with a FilterSubject value.

Using Consume() receive messages in a callback
cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
    AckPolicy: jetstream.AckExplicitPolicy,
    // receive messages from ORDERS.A subject only
    FilterSubject: "ORDERS.A"
}))

consContext, _ := c.Consume(func(msg jetstream.Msg) {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
})
defer consContext.Stop()

Similarly to Messages(), Consume() can be supplied with options to modify the behavior of a single pull request:

  • PullMaxMessages(int) - up to provided number of messages will be buffered
  • PullMaxBytes(int) - up to provided number of bytes will be buffered. This setting and PullMaxMessages are mutually exclusive
  • PullExpiry(time.Duration) - timeout on a single pull request to the server type PullThresholdMessages int
  • PullThresholdMessages(int) - amount of messages which triggers refilling the buffer
  • PullThresholdBytes(int) - amount of bytes which triggers refilling the buffer
  • PullHeartbeat(time.Duration) - idle heartbeat duration for a single pull request. An error will be triggered if at least 2 heartbeats are missed
  • WithConsumeErrHandler(func (ConsumeContext, error)) - when used, sets a custom error handler on Consume(), allowing e.g. tracking missing heartbeats.

NOTE: Stop() should always be called on ConsumeContext to avoid leaking goroutines.

Using Messages() to iterate over incoming messages
iter, _ := cons.Messages()
for {
    msg, err := iter.Next()
    // Next can return error, e.g. when iterator is closed or no heartbeats were received
    if err != nil {
        //handle error
    }
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
    msg.Ack()
}
iter.Stop()

It can also be configured to only store up to defined number of messages/bytes in the buffer.

// a maximum of 10 messages or 1024 bytes will be stored in memory (whichever is encountered first)
iter, _ := cons.Messages(jetstream.PullMaxMessages(10), jetstream.PullMaxBytes(1024))

Messages() exposes the following options:

  • PullMaxMessages(int) - up to provided number of messages will be buffered
  • PullMaxBytes(int) - up to provided number of bytes will be buffered. This setting and PullMaxMessages are mutually exclusive
  • PullExpiry(time.Duration) - timeout on a single pull request to the server type PullThresholdMessages int
  • PullThresholdMessages(int) - amount of messages which triggers refilling the buffer
  • PullThresholdBytes(int) - amount of bytes which triggers refilling the buffer
  • PullHeartbeat(time.Duration) - idle heartbeat duration for a single pull request. An error will be triggered if at least 2 heartbeats are missed (unless WithMessagesErrOnMissingHeartbeat(false) is used)
Using Messages() to fetch single messages one by one

When implementing work queue, it is possible to use Messages() in order to fetch messages from the server one-by-one, without optimizations and pre-buffering (to avoid redeliveries when processing messages at slow rate).

// PullMaxMessages determines how many messages will be sent to the client in a single pull request
iter, _ := cons.Messages(jetstream.PullMaxMessages(1))
numWorkers := 5
sem := make(chan struct{}, numWorkers)
for {
    sem <- struct{}{}
    go func() {
        defer func() {
            <-sem
        }()
        msg, err := iter.Next()
        if err != nil {
            // handle err
        }
        fmt.Printf("Processing msg: %s\n", string(msg.Data()))
        doWork()
        msg.Ack()
    }()
}

Publishing on stream

JetStream interface allows publishing messages on stream in 2 ways:

Synchronous publish
js, _ := jetstream.New(nc)

// Publish message on subject ORDERS.new
// Given subject has to belong to a stream
ack, err := js.PublishMsg(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
})
fmt.Printf("Published msg with sequence number %d on stream %q", ack.Sequence, ack.Stream)

// A helper method accepting subject and data as parameters
ack, err = js.Publish(ctx, "ORDERS.new", []byte("hello"))

Both Publish() and PublishMsg() can be supplied with options allowing setting various headers. Additionally, for PublishMsg() headers can be set directly on nats.Msg.

// All 3 implementations are work identically 
ack, err := js.PublishMsg(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
    Header: nats.Header{
        "Nats-Msg-Id": []string{"id"},
    },
})

ack, err = js.PublishMsg(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
}, jetstream.WithMsgID("id"))

ack, err = js.Publish(ctx, "ORDERS.new", []byte("hello"), jetstream.WithMsgID("id"))
Async publish
js, _ := jetstream.New(nc)

// publish message and do not wait for ack
ackF, err := js.PublishMsgAsync(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
})

// block and wait for ack
select {
case ack := <-ackF.Ok():
    fmt.Printf("Published msg with sequence number %d on stream %q", ack.Sequence, ack.Stream)
case err := <-ackF.Err():
    fmt.Println(err)
}

// similarly to synchronous publish, there is a helper method accepting subject and data
ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))

Just as for synchronous publish, PublishAsync() and PublishMsgAsync() accept options for setting headers.

KeyValue Store

JetStream KeyValue Stores offer a straightforward method for storing key-value pairs within JetStream. These stores are supported by a specially configured stream, designed to efficiently and compactly store these pairs. This structure ensures rapid and convenient access to the data.

The KV Store, also known as a bucket, enables the execution of various operations:

  • create/update a value for a given key
  • get a value for a given key
  • delete a value for a given key
  • purge all values from a bucket
  • list all keys in a bucket
  • watch for changes on given key set or the whole bucket
  • retrieve history of changes for a given key
Basic usage of KV bucket

The most basic usage of KV bucket is to create or retrieve a bucket and perform basic CRUD operations on keys.

js, _ := jetstream.New(nc)
ctx := context.Background()

// Create a new bucket. Bucket name is required and has to be unique within a JetStream account.
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

// Set a value for a given key
// Put will either create or update a value for a given key
kv.Put(ctx, "sue.color", []byte("blue"))

// Get an entry for a given key
// Entry contains key/value, but also metadata (revision, timestamp, etc.)) 
entry, _ := kv.Get(ctx, "sue.color")

// Prints `sue.color @ 1 -> "blue"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// Update a value for a given key
// Update will fail if the key does not exist or the revision has changed
kv.Update(ctx, "sue.color", []byte("red"), 1)

// Create will fail if the key already exists
_, err := kv.Create(ctx, "sue.color", []byte("purple"))
fmt.Println(err) // prints `nats: key exists`

// Delete a value for a given key.
// Delete is not destructive, it will add a delete marker for a given key
// and all previous revisions will still be available
kv.Delete(ctx, "sue.color")

// getting a deleted key will return an error
_, err = kv.Get(ctx, "sue.color")
fmt.Println(err) // prints `nats: key not found`

// A bucket can be deleted once it is no longer needed
js.DeleteKeyValue(ctx, "profiles")
Watching for changes on a bucket

KV buckets support Watchers, which can be used to watch for changes on a given key or the whole bucket. Watcher will receive a notification on a channel when a change occurs. By default, watcher will return initial values for all matching keys. After sending all initial values, watcher will send nil on the channel to signal that all initial values have been sent and it will start sending updates when changes occur.

Watcher supports several configuration options:

  • IncludeHistory will have the key watcher send all historical values for each key (up to KeyValueMaxHistory).
  • IgnoreDeletes will have the key watcher not pass any keys with delete markers.
  • UpdatesOnly will have the key watcher only pass updates on values (without values already present when starting).
  • MetaOnly will have the key watcher retrieve only the entry metadata, not the entry value.
  • ResumeFromRevision instructs the key watcher to resume from a specific revision number.
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))

// A watcher can be created to watch for changes on a given key or the whole bucket
// By default, watcher will return most recent values for all matching keys.
// Watcher can be configured to only return updates by using jetstream.UpdatesOnly() option.
watcher, _ := kv.Watch(ctx, "sue.*")
defer watcher.Stop()

kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "sue.color", []byte("red"))

// First, the watcher sends most recent values for all matching keys.
// In this case, it will send a single entry for `sue.color`.
entry := <-watcher.Updates()
// Prints `sue.color @ 1 -> "blue"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// After all current values have been sent, watcher will send nil on the channel.
entry = <-watcher.Updates()
if entry != nil {
    fmt.Println("Unexpected entry received")
}

// After that, watcher will send updates when changes occur
// In this case, it will send an entry for `sue.color` and `sue.age`.

entry = <-watcher.Updates()
// Prints `sue.age @ 2 -> "43"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

entry = <-watcher.Updates()
// Prints `sue.color @ 3 -> "red"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
Additional operations on a bucket

In addition to basic CRUD operations and watching for changes, KV buckets support several additional operations:

  • ListKeys will return all keys in a bucket"
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

keys, _ := kv.ListKeys(ctx)

// Prints all 3 keys
for key := range keys.Keys() {
    fmt.Println(key)
}
  • Purge and PurgeDeletes for removing all keys from a bucket
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

// Purge will remove all keys from a bucket.
// The latest revision of each key will be kept
// with a delete marker, all previous revisions will be removed
// permanently.
kv.Purge(ctx)

// PurgeDeletes will remove all keys from a bucket
// with a delete marker.
kv.PurgeDeletes(ctx)
  • Status will return the current status of a bucket
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

status, _ := kv.Status(ctx)

fmt.Println(status.Bucket()) // prints `profiles`
fmt.Println(status.Values()) // prints `3`
fmt.Println(status.Bytes()) // prints the size of all values in bytes

Object Store

JetStream Object Stores offer a straightforward method for storing large objects within JetStream. These stores are backed by a specially configured streams, designed to efficiently and compactly store these objects.

The Object Store, also known as a bucket, enables the execution of various operations:

  • create/update an object
  • get an object
  • delete an object
  • list all objects in a bucket
  • watch for changes on objects in a bucket
  • create links to other objects or other buckets
Basic usage of Object Store

The most basic usage of Object bucket is to create or retrieve a bucket and perform basic CRUD operations on objects.

js, _ := jetstream.New(nc)
ctx := context.Background()

// Create a new bucket. Bucket name is required and has to be unique within a JetStream account.
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "configs"})

config1 := bytes.NewBufferString("first config")
// Put an object in a bucket. Put expects an object metadata and a reader
// to read the object data from.
os.Put(ctx, jetstream.ObjectMeta{Name: "config-1"}, config1)

// Objects can also be created using various helper methods

// 1. As raw strings
os.PutString(ctx, "config-2", "second config")

// 2. As raw bytes
os.PutBytes(ctx, "config-3", []byte("third config"))

// 3. As a file
os.PutFile(ctx, "config-4.txt")

// Get an object
// Get returns a reader and object info
// Similar to Put, Get can also be used with helper methods
// to retrieve object data as a string, bytes or to save it to a file
object, _ := os.Get(ctx, "config-1")
data, _ := io.ReadAll(object)
info, _ := object.Info()

// Prints `configs.config-1 -> "first config"`
fmt.Printf("%s.%s -> %q\n", info.Bucket, info.Name, string(data))

// Delete an object.
// Delete will remove object data from stream, but object metadata will be kept
// with a delete marker.
os.Delete(ctx, "config-1")

// getting a deleted object will return an error
_, err := os.Get(ctx, "config-1")
fmt.Println(err) // prints `nats: object not found`

// A bucket can be deleted once it is no longer needed
js.DeleteObjectStore(ctx, "configs")
Watching for changes on a store

Object Stores support Watchers, which can be used to watch for changes on objects in a given bucket. Watcher will receive a notification on a channel when a change occurs. By default, watcher will return latest information for all objects in a bucket. After sending all initial values, watcher will send nil on the channel to signal that all initial values have been sent and it will start sending updates when changes occur.

NOTE: Watchers do not retrieve values for objects, only metadata (containing information such as object name, bucket name, object size etc.). If object data is required, Get method should be used.

Watcher supports several configuration options:

  • IncludeHistory will have the watcher send historical updates for each object.
  • IgnoreDeletes will have the watcher not pass any objects with delete markers.
  • UpdatesOnly will have the watcher only pass updates on objects (without objects already present when starting).
js, _ := jetstream.New(nc)
ctx := context.Background()
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "configs"})

os.PutString(ctx, "config-1", "first config")

// By default, watcher will return most recent values for all objects in a bucket.
// Watcher can be configured to only return updates by using jetstream.UpdatesOnly() option.
watcher, _ := os.Watch(ctx)
defer watcher.Stop()

// create a second object
os.PutString(ctx, "config-2", "second config")

// update metadata of the first object
os.UpdateMeta(ctx, "config-1", jetstream.ObjectMeta{Name: "config-1", Description: "updated config"})

// First, the watcher sends most recent values for all matching objects.
// In this case, it will send a single entry for `config-1`.
object := <-watcher.Updates()
// Prints `configs.config-1 -> ""`
fmt.Printf("%s.%s -> %q\n", object.Bucket, object.Name, object.Description)

// After all current values have been sent, watcher will send nil on the channel.
object = <-watcher.Updates()
if object != nil {
    fmt.Println("Unexpected object received")
}

// After that, watcher will send updates when changes occur
// In this case, it will send an entry for `config-2` and `config-1`.
object = <-watcher.Updates()
// Prints `configs.config-2 -> ""`
fmt.Printf("%s.%s -> %q\n", object.Bucket, object.Name, object.Description)

object = <-watcher.Updates()
// Prints `configs.config-1 -> "updated config"`
fmt.Printf("%s.%s -> %q\n", object.Bucket, object.Name, object.Description)
Additional operations on a store

In addition to basic CRUD operations and watching for changes, Object Stores support several additional operations:

  • UpdateMeta for updating object metadata, such as name, description, etc.
js, _ := jetstream.New(nc)
ctx := context.Background()
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "configs"})

os.PutString(ctx, "config", "data")

// update metadata of the object to e.g. add a description
os.UpdateMeta(ctx, "config", jetstream.ObjectMeta{Name: "config", Description: "this is a config"})

// object can be moved under a new name (unless it already exists)
os.UpdateMeta(ctx, "config", jetstream.ObjectMeta{Name: "config-1", Description: "updated config"})
  • List for listing information about all objects in a bucket:
js, _ := jetstream.New(nc)
ctx := context.Background()
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "configs"})

os.PutString(ctx, "config-1", "cfg1")
os.PutString(ctx, "config-2", "cfg1")
os.PutString(ctx, "config-3", "cfg1")

// List will return information about all objects in a bucket
objects, _ := os.List(ctx)

// Prints all 3 objects
for _, object := range objects {
    fmt.Println(object.Name)
}
  • Status will return the current status of a bucket
js, _ := jetstream.New(nc)
ctx := context.Background()
os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "configs"})

os.PutString(ctx, "config-1", "cfg1")
os.PutString(ctx, "config-2", "cfg1")
os.PutString(ctx, "config-3", "cfg1")

status, _ := os.Status(ctx)

fmt.Println(status.Bucket()) // prints `configs`
fmt.Println(status.Size()) // prints the size of the bucket in bytes

Examples

You can find more examples of jetstream usage here.

Documentation

Index

Constants

View Source
const (
	KeyValueMaxHistory = 64
	AllKeys            = ">"
)
View Source
const (
	// MsgIdHeader is used to specify a user-defined message ID. It can be used
	// e.g. for deduplication in conjunction with the Duplicates duration on
	// ConsumerConfig or to provide optimistic concurrency safety together with
	// [ExpectedLastMsgIDHeader].
	//
	// This can be set when publishing messages using [WithMsgID] option.
	MsgIDHeader = "Nats-Msg-Id"

	// ExpectedStreamHeader contains stream name and is used to assure that the
	// published message is received by expected stream. Server will reject the
	// message if it is not the case.
	//
	// This can be set when publishing messages using [WithExpectStream] option.
	ExpectedStreamHeader = "Nats-Expected-Stream"

	// ExpectedLastSeqHeader contains the expected last sequence number of the
	// stream and can be used to apply optimistic concurrency control at stream
	// level. Server will reject the message if it is not the case.
	//
	// This can be set when publishing messages using [WithExpectLastSequence]
	// option. option.
	ExpectedLastSeqHeader = "Nats-Expected-Last-Sequence"

	// ExpectedLastSubjSeqHeader contains the expected last sequence number on
	// the subject and can be used to apply optimistic concurrency control at
	// subject level. Server will reject the message if it is not the case.
	//
	// This can be set when publishing messages using
	// [WithExpectLastSequencePerSubject] option.
	ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence"

	// ExpectedLastMsgIDHeader contains the expected last message ID on the
	// subject and can be used to apply optimistic concurrency control at
	// stream level. Server will reject the message if it is not the case.
	//
	// This can be set when publishing messages using [WithExpectLastMsgID]
	// option.
	ExpectedLastMsgIDHeader = "Nats-Expected-Last-Msg-Id"

	// MsgRollup is used to apply a purge of all prior messages in the stream
	// ("all") or at the subject ("sub") before this message.
	MsgRollup = "Nats-Rollup"
)

Headers used when publishing messages.

View Source
const (
	// StreamHeader contains the stream name the message was republished from or
	// the stream name the message was retrieved from using direct get.
	StreamHeader = "Nats-Stream"

	// SequenceHeader contains the original sequence number of the message.
	SequenceHeader = "Nats-Sequence"

	// TimeStampHeader contains the original timestamp of the message.
	TimeStampHeaer = "Nats-Time-Stamp"

	// SubjectHeader contains the original subject the message was published to.
	SubjectHeader = "Nats-Subject"

	// LastSequenceHeader contains the last sequence of the message having the
	// same subject, otherwise zero if this is the first message for the
	// subject.
	LastSequenceHeader = "Nats-Last-Sequence"
)

Headers for republished messages and direct gets. Those headers are set by the server and should not be set by the client.

View Source
const (
	// MsgRollupSubject is used to purge all messages before this message on the
	// message subject.
	MsgRollupSubject = "sub"

	// MsgRollupAll is used to purge all messages before this message on the
	// stream.
	MsgRollupAll = "all"
)

Rollups, can be subject only or all messages.

View Source
const (
	// Default time wait between retries on Publish if err is ErrNoResponders.
	DefaultPubRetryWait = 250 * time.Millisecond

	// Default number of retries
	DefaultPubRetryAttempts = 2
)
View Source
const (
	DefaultMaxMessages = 500
	DefaultExpires     = 30 * time.Second
)
View Source
const (
	// DefaultAPIPrefix is the default prefix for the JetStream API.
	DefaultAPIPrefix = "$JS.API."
)

Request API subjects for JetStream.

Variables

This section is empty.

Functions

func DecodeObjectDigest added in v1.32.0

func DecodeObjectDigest(data string) ([]byte, error)

DecodeObjectDigest decodes base64 hash

func GetObjectDigestValue added in v1.32.0

func GetObjectDigestValue(data hash.Hash) string

GetObjectDigestValue calculates the base64 value of hashed data

Types

type APIError

type APIError struct {
	Code        int       `json:"code"`
	ErrorCode   ErrorCode `json:"err_code"`
	Description string    `json:"description,omitempty"`
}

APIError is included in all API responses if there was an error.

func (*APIError) APIError

func (e *APIError) APIError() *APIError

APIError implements the JetStreamError interface.

func (*APIError) Error

func (e *APIError) Error() string

Error prints the JetStream API error code and description.

func (*APIError) Is

func (e *APIError) Is(err error) bool

Is matches against an APIError.

type APIStats

type APIStats struct {
	// Total is the total number of API calls.
	Total uint64 `json:"total"`

	// Errors is the total number of API errors.
	Errors uint64 `json:"errors"`
}

APIStats reports on API calls to JetStream for this account.

type AccountInfo

type AccountInfo struct {
	// Tier is the current account usage tier.
	Tier

	// Domain is the domain name associated with this account.
	Domain string `json:"domain"`

	// API is the API usage statistics for this account.
	API APIStats `json:"api"`

	// Tiers is the list of available tiers for this account.
	Tiers map[string]Tier `json:"tiers"`
}

AccountInfo contains information about the JetStream usage from the current account.

type AccountLimits

type AccountLimits struct {
	// MaxMemory is the maximum amount of memory available for this account.
	MaxMemory int64 `json:"max_memory"`

	// MaxStore is the maximum amount of disk storage available for this
	// account.
	MaxStore int64 `json:"max_storage"`

	// MaxStreams is the maximum number of streams allowed for this account.
	MaxStreams int `json:"max_streams"`

	// MaxConsumers is the maximum number of consumers allowed for this
	// account.
	MaxConsumers int `json:"max_consumers"`
}

AccountLimits includes the JetStream limits of the current account.

type AckPolicy

type AckPolicy int

AckPolicy determines how the consumer should acknowledge delivered messages.

const (
	// AckExplicitPolicy requires ack or nack for all messages.
	AckExplicitPolicy AckPolicy = iota

	// AckAllPolicy when acking a sequence number, this implicitly acks all
	// sequences below this one as well.
	AckAllPolicy

	// AckNonePolicy requires no acks for delivered messages.
	AckNonePolicy
)

func (AckPolicy) MarshalJSON

func (p AckPolicy) MarshalJSON() ([]byte, error)

func (AckPolicy) String

func (p AckPolicy) String() string

func (*AckPolicy) UnmarshalJSON

func (p *AckPolicy) UnmarshalJSON(data []byte) error

type ClientTrace

type ClientTrace struct {
	// RequestSent is called when an API request is sent to the server.
	RequestSent func(subj string, payload []byte)

	// ResponseReceived is called when a response is received from the
	// server.
	ResponseReceived func(subj string, payload []byte, hdr nats.Header)
}

ClientTrace can be used to trace API interactions for JetStream.

type ClusterInfo

type ClusterInfo struct {
	// Name is the name of the cluster.
	Name string `json:"name,omitempty"`

	// Leader is the server name of the RAFT leader.
	Leader string `json:"leader,omitempty"`

	// Replicas is the list of members of the RAFT cluster
	Replicas []*PeerInfo `json:"replicas,omitempty"`
}

ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.

type ConsumeContext

type ConsumeContext interface {
	// Stop unsubscribes from the stream and cancels subscription.
	// No more messages will be received after calling this method.
	// All messages that are already in the buffer are discarded.
	Stop()

	// Drain unsubscribes from the stream and cancels subscription.
	// All messages that are already in the buffer will be processed in callback function.
	Drain()

	// Closed returns a channel that is closed when the consuming is
	// fully stopped/drained. When the channel is closed, no more messages
	// will be received and processing is complete.
	Closed() <-chan struct{}
}

ConsumeContext supports processing incoming messages from a stream. It is returned by [Consumer.Consume] method.

type ConsumeErrHandlerFunc

type ConsumeErrHandlerFunc func(consumeCtx ConsumeContext, err error)

type Consumer

type Consumer interface {
	// Fetch is used to retrieve up to a provided number of messages from a
	// stream. This method will send a single request and deliver either all
	// requested messages unless time out is met earlier. Fetch timeout
	// defaults to 30 seconds and can be configured using FetchMaxWait
	// option.
	//
	// By default, Fetch uses a 5s idle heartbeat for requests longer than
	// 10 seconds. For shorter requests, the idle heartbeat is disabled.
	// This can be configured using FetchHeartbeat option. If a client does
	// not receive a heartbeat message from a stream for more than 2 times
	// the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
	//
	// Fetch is non-blocking and returns MessageBatch, exposing a channel
	// for delivered messages.
	//
	// Messages channel is always closed, thus it is safe to range over it
	// without additional checks.
	Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)

	// FetchBytes is used to retrieve up to a provided bytes from the
	// stream. This method will send a single request and deliver the
	// provided number of bytes unless time out is met earlier. FetchBytes
	// timeout defaults to 30 seconds and can be configured using
	// FetchMaxWait option.
	//
	// By default, FetchBytes uses a 5s idle heartbeat for requests longer than
	// 10 seconds. For shorter requests, the idle heartbeat is disabled.
	// This can be configured using FetchHeartbeat option. If a client does
	// not receive a heartbeat message from a stream for more than 2 times
	// the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
	//
	// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
	// for delivered messages.
	//
	// Messages channel is always closed, thus it is safe to range over it
	// without additional checks.
	FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)

	// FetchNoWait is used to retrieve up to a provided number of messages
	// from a stream. Unlike Fetch, FetchNoWait will only deliver messages
	// that are currently available in the stream and will not wait for new
	// messages to arrive, even if batch size is not met.
	//
	// FetchNoWait is non-blocking and returns MessageBatch, exposing a
	// channel for delivered messages.
	//
	// Messages channel is always closed, thus it is safe to range over it
	// without additional checks.
	FetchNoWait(batch int) (MessageBatch, error)

	// Consume will continuously receive messages and handle them
	// with the provided callback function. Consume can be configured using
	// PullConsumeOpt options:
	//
	// - Error handling and monitoring can be configured using ConsumeErrHandler
	//   option, which provides information about errors encountered during
	//   consumption (both transient and terminal)
	// - Consume can be configured to stop after a certain number of
	//   messages is received using StopAfter option.
	// - Consume can be optimized for throughput or memory usage using
	//   PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options.
	//   Unless there is a specific use case, these options should not be used.
	//
	// Consume returns a ConsumeContext, which can be used to stop or drain
	// the consumer.
	Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error)

	// Messages returns MessagesContext, allowing continuously iterating
	// over messages on a stream. Messages can be configured using
	// PullMessagesOpt options:
	//
	// - Messages can be optimized for throughput or memory usage using
	//   PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options.
	//   Unless there is a specific use case, these options should not be used.
	// - WithMessagesErrOnMissingHeartbeat can be used to enable/disable
	//   erroring out on MessagesContext.Next when a heartbeat is missing.
	//   This option is enabled by default.
	Messages(opts ...PullMessagesOpt) (MessagesContext, error)

	// Next is used to retrieve the next message from the consumer. This
	// method will block until the message is retrieved or timeout is
	// reached.
	Next(opts ...FetchOpt) (Msg, error)

	// Info fetches current ConsumerInfo from the server.
	Info(context.Context) (*ConsumerInfo, error)

	// CachedInfo returns ConsumerInfo currently cached on this consumer.
	// This method does not perform any network requests. The cached
	// ConsumerInfo is updated on every call to Info and Update.
	CachedInfo() *ConsumerInfo
}

Consumer contains methods for fetching/processing messages from a stream, as well as fetching consumer info.

This package provides two implementations of Consumer interface:

  • Standard named/ephemeral pull consumers. These consumers are created using CreateConsumer method on Stream or JetStream interface. They can be explicitly configured (using ConsumerConfig) and managed by the user, either from this package or externally.
  • Ordered consumers. These consumers are created using OrderedConsumer method on Stream or JetStream interface. They are managed by the library and provide a simple way to consume messages from a stream. Ordered consumers are ephemeral in-memory pull consumers and are resilient to deletes and restarts. They provide limited configuration options using OrderedConsumerConfig.

Consumer provides method for optimized continuous consumption of messages using Consume and Messages methods, as well as simple one-off messages retrieval using Fetch and Next methods.

type ConsumerConfig

type ConsumerConfig struct {
	// Name is an optional name for the consumer. If not set, one is
	// generated automatically.
	//
	// Name cannot contain whitespace, ., *, >, path separators (forward or
	// backwards slash), and non-printable characters.
	Name string `json:"name,omitempty"`

	// Durable is an optional durable name for the consumer. If both Durable
	// and Name are set, they have to be equal. Unless InactiveThreshold is set, a
	// durable consumer will not be cleaned up automatically.
	//
	// Durable cannot contain whitespace, ., *, >, path separators (forward or
	// backwards slash), and non-printable characters.
	Durable string `json:"durable_name,omitempty"`

	// Description provides an optional description of the consumer.
	Description string `json:"description,omitempty"`

	// DeliverPolicy defines from which point to start delivering messages
	// from the stream. Defaults to DeliverAllPolicy.
	DeliverPolicy DeliverPolicy `json:"deliver_policy"`

	// OptStartSeq is an optional sequence number from which to start
	// message delivery. Only applicable when DeliverPolicy is set to
	// DeliverByStartSequencePolicy.
	OptStartSeq uint64 `json:"opt_start_seq,omitempty"`

	// OptStartTime is an optional time from which to start message
	// delivery. Only applicable when DeliverPolicy is set to
	// DeliverByStartTimePolicy.
	OptStartTime *time.Time `json:"opt_start_time,omitempty"`

	// AckPolicy defines the acknowledgement policy for the consumer.
	// Defaults to AckExplicitPolicy.
	AckPolicy AckPolicy `json:"ack_policy"`

	// AckWait defines how long the server will wait for an acknowledgement
	// before resending a message. If not set, server default is 30 seconds.
	AckWait time.Duration `json:"ack_wait,omitempty"`

	// MaxDeliver defines the maximum number of delivery attempts for a
	// message. Applies to any message that is re-sent due to ack policy.
	//  If not set, server default is -1 (unlimited).
	MaxDeliver int `json:"max_deliver,omitempty"`

	// BackOff specifies the optional back-off intervals for retrying
	// message delivery after a failed acknowledgement. It overrides
	// AckWait.
	//
	// BackOff only applies to messages not acknowledged in specified time,
	// not messages that were nack'ed.
	//
	// The number of intervals specified must be lower or equal to
	// MaxDeliver. If the number of intervals is lower, the last interval is
	// used for all remaining attempts.
	BackOff []time.Duration `json:"backoff,omitempty"`

	// FilterSubject can be used to filter messages delivered from the
	// stream. FilterSubject is exclusive with FilterSubjects.
	FilterSubject string `json:"filter_subject,omitempty"`

	// ReplayPolicy defines the rate at which messages are sent to the
	// consumer. If ReplayOriginalPolicy is set, messages are sent in the
	// same intervals in which they were stored on stream. This can be used
	// e.g. to simulate production traffic in development environments. If
	// ReplayInstantPolicy is set, messages are sent as fast as possible.
	// Defaults to ReplayInstantPolicy.
	ReplayPolicy ReplayPolicy `json:"replay_policy"`

	// RateLimit specifies an optional maximum rate of message delivery in
	// bits per second.
	RateLimit uint64 `json:"rate_limit_bps,omitempty"`

	// SampleFrequency is an optional frequency for sampling how often
	// acknowledgements are sampled for observability. See
	// https://docs.nats.io/running-a-nats-service/nats_admin/monitoring/monitoring_jetstream
	SampleFrequency string `json:"sample_freq,omitempty"`

	// MaxWaiting is a maximum number of pull requests waiting to be
	// fulfilled. If not set, this will inherit settings from stream's
	// ConsumerLimits or (if those are not set) from account settings.  If
	// neither are set, server default is 512.
	MaxWaiting int `json:"max_waiting,omitempty"`

	// MaxAckPending is a maximum number of outstanding unacknowledged
	// messages. Once this limit is reached, the server will suspend sending
	// messages to the consumer. If not set, server default is 1000.
	// Set to -1 for unlimited.
	MaxAckPending int `json:"max_ack_pending,omitempty"`

	// HeadersOnly indicates whether only headers of messages should be sent
	// (and no payload). Defaults to false.
	HeadersOnly bool `json:"headers_only,omitempty"`

	// MaxRequestBatch is the optional maximum batch size a single pull
	// request can make. When set with MaxRequestMaxBytes, the batch size
	// will be constrained by whichever limit is hit first.
	MaxRequestBatch int `json:"max_batch,omitempty"`

	// MaxRequestExpires is the maximum duration a single pull request will
	// wait for messages to be available to pull.
	MaxRequestExpires time.Duration `json:"max_expires,omitempty"`

	// MaxRequestMaxBytes is the optional maximum total bytes that can be
	// requested in a given batch. When set with MaxRequestBatch, the batch
	// size will be constrained by whichever limit is hit first.
	MaxRequestMaxBytes int `json:"max_bytes,omitempty"`

	// InactiveThreshold is a duration which instructs the server to clean
	// up the consumer if it has been inactive for the specified duration.
	// Durable consumers will not be cleaned up by default, but if
	// InactiveThreshold is set, they will be. If not set, this will inherit
	// settings from stream's ConsumerLimits. If neither are set, server
	// default is 5 seconds.
	//
	// A consumer is considered inactive there are not pull requests
	// received by the server (for pull consumers), or no interest detected
	// on deliver subject (for push consumers), not if there are no
	// messages to be delivered.
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

	// Replicas the number of replicas for the consumer's state. By default,
	// consumers inherit the number of replicas from the stream.
	Replicas int `json:"num_replicas"`

	// MemoryStorage is a flag to force the consumer to use memory storage
	// rather than inherit the storage type from the stream.
	MemoryStorage bool `json:"mem_storage,omitempty"`

	// FilterSubjects allows filtering messages from a stream by subject.
	// This field is exclusive with FilterSubject. Requires nats-server
	// v2.10.0 or later.
	FilterSubjects []string `json:"filter_subjects,omitempty"`

	// Metadata is a set of application-defined key-value pairs for
	// associating metadata on the consumer. This feature requires
	// nats-server v2.10.0 or later.
	Metadata map[string]string `json:"metadata,omitempty"`
}

ConsumerConfig is the configuration of a JetStream consumer.

type ConsumerInfo

type ConsumerInfo struct {
	// Stream specifies the name of the stream that the consumer is bound
	// to.
	Stream string `json:"stream_name"`

	// Name represents the unique identifier for the consumer. This can be
	// either set explicitly by the client or generated automatically if not
	// set.
	Name string `json:"name"`

	// Created is the timestamp when the consumer was created.
	Created time.Time `json:"created"`

	// Config contains the configuration settings of the consumer, set when
	// creating or updating the consumer.
	Config ConsumerConfig `json:"config"`

	// Delivered holds information about the most recently delivered
	// message, including its sequence numbers and timestamp.
	Delivered SequenceInfo `json:"delivered"`

	// AckFloor indicates the message before the first unacknowledged
	// message.
	AckFloor SequenceInfo `json:"ack_floor"`

	// NumAckPending is the number of messages that have been delivered but
	// not yet acknowledged.
	NumAckPending int `json:"num_ack_pending"`

	// NumRedelivered counts the number of messages that have been
	// redelivered and not yet acknowledged. Each message is counted only
	// once, even if it has been redelivered multiple times. This count is
	// reset when the message is eventually acknowledged.
	NumRedelivered int `json:"num_redelivered"`

	// NumWaiting is the count of active pull requests. It is only relevant
	// for pull-based consumers.
	NumWaiting int `json:"num_waiting"`

	// NumPending is the number of messages that match the consumer's
	// filter, but have not been delivered yet.
	NumPending uint64 `json:"num_pending"`

	// Cluster contains information about the cluster to which this consumer
	// belongs (if applicable).
	Cluster *ClusterInfo `json:"cluster,omitempty"`

	// PushBound indicates whether at least one subscription exists for the
	// delivery subject of this consumer. This is only applicable to
	// push-based consumers.
	PushBound bool `json:"push_bound,omitempty"`

	// TimeStamp indicates when the info was gathered by the server.
	TimeStamp time.Time `json:"ts"`
}

ConsumerInfo is the detailed information about a JetStream consumer.

type ConsumerInfoLister

type ConsumerInfoLister interface {
	Info() <-chan *ConsumerInfo
	Err() error
}

ConsumerInfoLister is used to iterate over a channel of consumer infos. Err method can be used to check for errors encountered during iteration. Info channel is always closed and therefore can be used in a range loop.

type ConsumerManager added in v1.33.0

type ConsumerManager interface {
	// CreateOrUpdateConsumer creates a consumer on a given stream with
	// given config. If consumer already exists, it will be updated (if
	// possible). Consumer interface is returned, allowing to operate on a
	// consumer (e.g. fetch messages).
	CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

	// CreateConsumer creates a consumer on a given stream with given
	// config. If consumer already exists and the provided configuration
	// differs from its configuration, ErrConsumerExists is returned. If the
	// provided configuration is the same as the existing consumer, the
	// existing consumer is returned. Consumer interface is returned,
	// allowing to operate on a consumer (e.g. fetch messages).
	CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

	// UpdateConsumer updates an existing consumer. If consumer does not
	// exist, ErrConsumerDoesNotExist is returned. Consumer interface is
	// returned, allowing to operate on a consumer (e.g. fetch messages).
	UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

	// OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer
	// are managed by the library and provide a simple way to consume
	// messages from a stream. Ordered consumers are ephemeral in-memory
	// pull consumers and are resilient to deletes and restarts.
	OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error)

	// Consumer returns an interface to an existing consumer, allowing processing
	// of messages. If consumer does not exist, ErrConsumerNotFound is
	// returned.
	Consumer(ctx context.Context, consumer string) (Consumer, error)

	// DeleteConsumer removes a consumer with given name from a stream.
	// If consumer does not exist, ErrConsumerNotFound is returned.
	DeleteConsumer(ctx context.Context, consumer string) error

	// ListConsumers returns ConsumerInfoLister enabling iterating over a
	// channel of consumer infos.
	ListConsumers(context.Context) ConsumerInfoLister

	// ConsumerNames returns a ConsumerNameLister enabling iterating over a
	// channel of consumer names.
	ConsumerNames(context.Context) ConsumerNameLister
}

ConsumerManager provides CRUD API for managing consumers. It is available as a part of Stream interface. CreateConsumer, UpdateConsumer, CreateOrUpdateConsumer and Consumer methods return a Consumer interface, allowing to operate on a consumer (e.g. consume messages).

type ConsumerNameLister

type ConsumerNameLister interface {
	Name() <-chan string
	Err() error
}

ConsumerNameLister is used to iterate over a channel of consumer names. Err method can be used to check for errors encountered during iteration. Name channel is always closed and therefore can be used in a range loop.

type DeleteMarkersOlderThan added in v1.29.0

type DeleteMarkersOlderThan time.Duration

DeleteMarkersOlderThan indicates that delete or purge markers older than that will be deleted as part of [KeyValue.PurgeDeletes] operation, otherwise, only the data will be removed but markers that are recent will be kept. Note that if no option is specified, the default is 30 minutes. You can set this option to a negative value to instruct to always remove the markers, regardless of their age.

type DeliverPolicy

type DeliverPolicy int

DeliverPolicy determines from which point to start delivering messages.

const (
	// DeliverAllPolicy starts delivering messages from the very beginning of a
	// stream. This is the default.
	DeliverAllPolicy DeliverPolicy = iota

	// DeliverLastPolicy will start the consumer with the last sequence
	// received.
	DeliverLastPolicy

	// DeliverNewPolicy will only deliver new messages that are sent after the
	// consumer is created.
	DeliverNewPolicy

	// DeliverByStartSequencePolicy will deliver messages starting from a given
	// sequence configured with OptStartSeq in ConsumerConfig.
	DeliverByStartSequencePolicy

	// DeliverByStartTimePolicy will deliver messages starting from a given time
	// configured with OptStartTime in ConsumerConfig.
	DeliverByStartTimePolicy

	// DeliverLastPerSubjectPolicy will start the consumer with the last message
	// for all subjects received.
	DeliverLastPerSubjectPolicy
)

func (DeliverPolicy) MarshalJSON

func (p DeliverPolicy) MarshalJSON() ([]byte, error)

func (DeliverPolicy) String

func (p DeliverPolicy) String() string

func (*DeliverPolicy) UnmarshalJSON

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error

type DiscardPolicy

type DiscardPolicy int

DiscardPolicy determines how to proceed when limits of messages or bytes are reached.

const (
	// DiscardOld will remove older messages to return to the limits. This is
	// the default.
	DiscardOld DiscardPolicy = iota

	// DiscardNew will fail to store new messages once the limits are reached.
	DiscardNew
)

func (DiscardPolicy) MarshalJSON

func (dp DiscardPolicy) MarshalJSON() ([]byte, error)

func (DiscardPolicy) String

func (dp DiscardPolicy) String() string

func (*DiscardPolicy) UnmarshalJSON

func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error

type ErrorCode

type ErrorCode uint16

ErrorCode represents error_code returned in response from JetStream API.

const (
	JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
	JSErrCodeJetStreamNotEnabled           ErrorCode = 10076

	JSErrCodeStreamNotFound  ErrorCode = 10059
	JSErrCodeStreamNameInUse ErrorCode = 10058

	JSErrCodeConsumerCreate            ErrorCode = 10012
	JSErrCodeConsumerNotFound          ErrorCode = 10014
	JSErrCodeConsumerNameExists        ErrorCode = 10013
	JSErrCodeConsumerAlreadyExists     ErrorCode = 10105
	JSErrCodeConsumerExists            ErrorCode = 10148
	JSErrCodeDuplicateFilterSubjects   ErrorCode = 10136
	JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
	JSErrCodeConsumerEmptyFilter       ErrorCode = 10139
	JSErrCodeConsumerDoesNotExist      ErrorCode = 10149

	JSErrCodeMessageNotFound ErrorCode = 10037

	JSErrCodeBadRequest ErrorCode = 10003

	JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)

type ExternalStream

type ExternalStream struct {
	// APIPrefix is the subject prefix that imports the other account/domain
	// $JS.API.CONSUMER.> subjects.
	APIPrefix string `json:"api"`

	// DeliverPrefix is the delivery subject to use for the push consumer.
	DeliverPrefix string `json:"deliver"`
}

ExternalStream allows you to qualify access to a stream source in another account.

type FetchOpt

type FetchOpt func(*pullRequest) error

func FetchHeartbeat added in v1.33.0

func FetchHeartbeat(hb time.Duration) FetchOpt

FetchHeartbeat sets custom heartbeat for individual fetch request. If a client does not receive a heartbeat message from a stream for more than 2 times the idle heartbeat setting, Fetch will return ErrNoHeartbeat.

Heartbeat value has to be lower than FetchMaxWait / 2.

If not provided, heartbeat will is set to 5s for requests with FetchMaxWait > 10s and disabled otherwise.

func FetchMaxWait

func FetchMaxWait(timeout time.Duration) FetchOpt

FetchMaxWait sets custom timeout for fetching predefined batch of messages.

If not provided, a default of 30 seconds will be used.

type GetMsgOpt

type GetMsgOpt func(*apiMsgGetRequest) error

GetMsgOpt is a function setting options for [Stream.GetMsg]

func WithGetMsgSubject

func WithGetMsgSubject(subject string) GetMsgOpt

WithGetMsgSubject sets the stream subject from which the message should be retrieved. Server will return a first message with a seq >= to the input seq that has the specified subject.

type GetObjectInfoOpt added in v1.32.0

type GetObjectInfoOpt func(opts *getObjectInfoOpts) error

GetObjectInfoOpt is used to set additional options when getting object info.

func GetObjectInfoShowDeleted added in v1.32.0

func GetObjectInfoShowDeleted() GetObjectInfoOpt

GetObjectInfoShowDeleted makes [ObjectStore.GetInfo] return object info event if it was marked as deleted.

type GetObjectOpt added in v1.32.0

type GetObjectOpt func(opts *getObjectOpts) error

GetObjectOpt is used to set additional options when getting an object.

func GetObjectShowDeleted added in v1.32.0

func GetObjectShowDeleted() GetObjectOpt

GetObjectShowDeleted makes [ObjectStore.Get] return object even if it was marked as deleted.

type JetStream

type JetStream interface {
	// AccountInfo fetches account information from the server, containing details
	// about the account associated with this JetStream connection. If account is
	// not enabled for JetStream, ErrJetStreamNotEnabledForAccount is returned. If
	// the server does not have JetStream enabled, ErrJetStreamNotEnabled is
	// returned.
	AccountInfo(ctx context.Context) (*AccountInfo, error)

	StreamConsumerManager
	StreamManager
	Publisher
	KeyValueManager
	ObjectStoreManager
}

JetStream is the top-level interface for interacting with JetStream. The capabilities of JetStream include:

JetStream can be created using New, NewWithAPIPrefix or NewWithDomain methods.

func New

func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error)

New returns a new JetStream instance. It uses default API prefix ($JS.API) for JetStream API requests. If a custom API prefix is required, use NewWithAPIPrefix or NewWithDomain.

Available options:

func NewWithAPIPrefix

func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (JetStream, error)

NewWithAPIPrefix returns a new JetStream instance and sets the API prefix to be used in requests to JetStream API. The API prefix will be used in API requests to JetStream, e.g. <prefix>.STREAM.INFO.<stream>.

Available options:

func NewWithDomain

func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStream, error)

NewWithDomain returns a new JetStream instance and sets the domain name token used when sending JetStream requests. The domain name token will be used in API requests to JetStream, e.g. $JS.<domain>.API.STREAM.INFO.<stream>.

Available options:

type JetStreamError

type JetStreamError interface {
	APIError() *APIError
	error
}

JetStreamError is an error result that happens when using JetStream. In case of client-side error, APIError returns nil.

var (

	// ErrJetStreamNotEnabled is an error returned when JetStream is not
	// enabled.
	//
	// Note: This error will not be returned in clustered mode, even if each
	// server in the cluster does not have JetStream enabled. In clustered mode,
	// requests will time out instead.
	ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}

	// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is
	// not enabled for an account.
	ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}}

	// ErrStreamNotFound is an error returned when stream with given name does
	// not exist.
	ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

	// ErrStreamNameAlreadyInUse is returned when a stream with given name
	// already exists and has a different configuration.
	ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

	// ErrStreamSubjectTransformNotSupported is returned when the connected
	// nats-server version does not support setting the stream subject
	// transform. If this error is returned when executing CreateStream(), the
	// stream with invalid configuration was already created in the server.
	ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

	// ErrStreamSourceSubjectTransformNotSupported is returned when the
	// connected nats-server version does not support setting the stream source
	// subject transform. If this error is returned when executing
	// CreateStream(), the stream with invalid configuration was already created
	// in the server.
	ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

	// ErrStreamSourceNotSupported is returned when the connected nats-server
	// version does not support setting the stream sources. If this error is
	// returned when executing CreateStream(), the stream with invalid
	// configuration was already created in the server.
	ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"}

	// ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the
	// connected nats-server version does not support setting the stream
	// sources. If this error is returned when executing CreateStream(), the
	// stream with invalid configuration was already created in the server.
	ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"}

	// ErrConsumerNotFound is an error returned when consumer with given name
	// does not exist.
	ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

	// ErrConsumerExists is returned when attempting to create a consumer with
	// CreateConsumer but a consumer with given name already exists.
	ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}}

	// ErrConsumerNameExists is returned when attempting to update a consumer
	// with UpdateConsumer but a consumer with given name does not exist.
	ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}}

	// ErrMsgNotFound is returned when message with provided sequence number
	// does not exist.
	ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

	// ErrBadRequest is returned when invalid request is sent to JetStream API.
	ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}

	// ErrConsumerCreate is returned when nats-server reports error when
	// creating consumer (e.g. illegal update).
	ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}

	// ErrDuplicateFilterSubjects is returned when both FilterSubject and
	// FilterSubjects are specified when creating consumer.
	ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}

	// ErrDuplicateFilterSubjects is returned when filter subjects overlap when
	// creating consumer.
	ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}

	// ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
	ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}}

	// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the
	// connected nats-server version does not support setting multiple filter
	// subjects with filter_subjects field. If this error is returned when
	// executing AddConsumer(), the consumer with invalid configuration was
	// already created in the server.
	ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}

	// ErrConsumerNotFound is an error returned when consumer with given name
	// does not exist.
	ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}

	// ErrInvalidJSAck is returned when JetStream ack from message publish is
	// invalid.
	ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"}

	// ErrStreamNameRequired is returned when the provided stream name is empty.
	ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

	// ErrMsgAlreadyAckd is returned when attempting to acknowledge message more
	// than once.
	ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"}

	// ErrNoStreamResponse is returned when there is no response from stream
	// (e.g. no responders error).
	ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"}

	// ErrNotJSMessage is returned when attempting to get metadata from non
	// JetStream message.
	ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"}

	// ErrInvalidStreamName is returned when the provided stream name is invalid
	// (contains '.').
	ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"}

	// ErrInvalidSubject is returned when the provided subject name is invalid.
	ErrInvalidSubject JetStreamError = &jsError{message: "invalid subject name"}

	// ErrInvalidConsumerName is returned when the provided consumer name is
	// invalid (contains '.').
	ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}

	// ErrNoMessages is returned when no messages are currently available for a
	// consumer.
	ErrNoMessages JetStreamError = &jsError{message: "no messages"}

	// ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set
	// on a pull request.
	ErrMaxBytesExceeded JetStreamError = &jsError{message: "message size exceeds max bytes"}

	// ErrConsumerDeleted is returned when attempting to send pull request to a
	// consumer which does not exist.
	ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"}

	// ErrConsumerLeadershipChanged is returned when pending requests are no
	// longer valid after leadership has changed.
	ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "leadership change"}

	// ErrHandlerRequired is returned when no handler func is provided in
	// Stream().
	ErrHandlerRequired JetStreamError = &jsError{message: "handler cannot be empty"}

	// ErrEndOfData is returned when iterating over paged API from JetStream
	// reaches end of data.
	ErrEndOfData JetStreamError = &jsError{message: "end of data reached"}

	// ErrNoHeartbeat is received when no message is received in IdleHeartbeat
	// time (if set).
	ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}

	// ErrConsumerHasActiveSubscription is returned when a consumer is already
	// subscribed to a stream.
	ErrConsumerHasActiveSubscription JetStreamError = &jsError{message: "consumer has active subscription"}

	// ErrMsgNotBound is returned when given message is not bound to any
	// subscription.
	ErrMsgNotBound JetStreamError = &jsError{message: "message is not bound to subscription/connection"}

	// ErrMsgNoReply is returned when attempting to reply to a message without a
	// reply subject.
	ErrMsgNoReply JetStreamError = &jsError{message: "message does not have a reply"}

	// ErrMsgDeleteUnsuccessful is returned when an attempt to delete a message
	// is unsuccessful.
	ErrMsgDeleteUnsuccessful JetStreamError = &jsError{message: "message deletion unsuccessful"}

	// ErrAsyncPublishReplySubjectSet is returned when reply subject is set on
	// async message publish.
	ErrAsyncPublishReplySubjectSet JetStreamError = &jsError{message: "reply subject should be empty"}

	// ErrTooManyStalledMsgs is returned when too many outstanding async
	// messages are waiting for ack.
	ErrTooManyStalledMsgs JetStreamError = &jsError{message: "stalled with too many outstanding async published messages"}

	// ErrInvalidOption is returned when there is a collision between options.
	ErrInvalidOption JetStreamError = &jsError{message: "invalid jetstream option"}

	// ErrMsgIteratorClosed is returned when attempting to get message from a
	// closed iterator.
	ErrMsgIteratorClosed JetStreamError = &jsError{message: "messages iterator closed"}

	// ErrOrderedConsumerReset is returned when resetting ordered consumer fails
	// due to too many attempts.
	ErrOrderedConsumerReset JetStreamError = &jsError{message: "recreating ordered consumer"}

	// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already
	// used to process messages using Fetch (or FetchBytes).
	ErrOrderConsumerUsedAsFetch JetStreamError = &jsError{message: "ordered consumer initialized as fetch"}

	// ErrOrderConsumerUsedAsConsume is returned when ordered consumer was
	// already used to process messages using Consume or Messages.
	ErrOrderConsumerUsedAsConsume JetStreamError = &jsError{message: "ordered consumer initialized as consume"}

	// ErrOrderedConsumerConcurrentRequests is returned when attempting to run
	// concurrent operations on ordered consumers.
	ErrOrderedConsumerConcurrentRequests JetStreamError = &jsError{message: "cannot run concurrent processing using ordered consumer"}

	// ErrOrderedConsumerNotCreated is returned when trying to get consumer info
	// of an ordered consumer which was not yet created.
	ErrOrderedConsumerNotCreated JetStreamError = &jsError{message: "consumer instance not yet created"}

	// ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called.
	ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"}

	// ErrKeyExists is returned when attempting to create a key that already
	// exists.
	ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"}

	// ErrKeyValueConfigRequired is returned when attempting to create a bucket
	// without a config.
	ErrKeyValueConfigRequired JetStreamError = &jsError{message: "config required"}

	// ErrInvalidBucketName is returned when attempting to create a bucket with
	// an invalid name.
	ErrInvalidBucketName JetStreamError = &jsError{message: "invalid bucket name"}

	// ErrInvalidKey is returned when attempting to create a key with an invalid
	// name.
	ErrInvalidKey JetStreamError = &jsError{message: "invalid key"}

	// ErrBucketExists is returned when attempting to create a bucket that
	// already exists and has a different configuration.
	ErrBucketExists JetStreamError = &jsError{message: "bucket name already in use"}

	// ErrBucketNotFound is returned when attempting to access a bucket that
	// does not exist.
	ErrBucketNotFound JetStreamError = &jsError{message: "bucket not found"}

	// ErrBadBucket is returned when attempting to access a bucket that is not a
	// key-value store.
	ErrBadBucket JetStreamError = &jsError{message: "bucket not valid key-value store"}

	// ErrKeyNotFound is returned when attempting to access a key that does not
	// exist.
	ErrKeyNotFound JetStreamError = &jsError{message: "key not found"}

	// ErrKeyDeleted is returned when attempting to access a key that was
	// deleted.
	ErrKeyDeleted JetStreamError = &jsError{message: "key was deleted"}

	// ErrHistoryToLarge is returned when provided history limit is larger than
	// 64.
	ErrHistoryTooLarge JetStreamError = &jsError{message: "history limited to a max of 64"}

	// ErrNoKeysFound is returned when no keys are found.
	ErrNoKeysFound JetStreamError = &jsError{message: "no keys found"}

	// ErrObjectConfigRequired is returned when attempting to create an object
	// without a config.
	ErrObjectConfigRequired JetStreamError = &jsError{message: "object-store config required"}

	// ErrBadObjectMeta is returned when the meta information of an object is
	// invalid.
	ErrBadObjectMeta JetStreamError = &jsError{message: "object-store meta information invalid"}

	// ErrObjectNotFound is returned when an object is not found.
	ErrObjectNotFound JetStreamError = &jsError{message: "object not found"}

	// ErrInvalidStoreName is returned when the name of an object-store is
	// invalid.
	ErrInvalidStoreName JetStreamError = &jsError{message: "invalid object-store name"}

	// ErrDigestMismatch is returned when the digests of an object do not match.
	ErrDigestMismatch JetStreamError = &jsError{message: "received a corrupt object, digests do not match"}

	// ErrInvalidDigestFormat is returned when the digest hash of an object has
	// an invalid format.
	ErrInvalidDigestFormat JetStreamError = &jsError{message: "object digest hash has invalid format"}

	// ErrNoObjectsFound is returned when no objects are found.
	ErrNoObjectsFound JetStreamError = &jsError{message: "no objects found"}

	// ErrObjectAlreadyExists is returned when an object with the same name
	// already exists.
	ErrObjectAlreadyExists JetStreamError = &jsError{message: "an object already exists with that name"}

	// ErrNameRequired is returned when a name is required.
	ErrNameRequired JetStreamError = &jsError{message: "name is required"}

	// ErrLinkNotAllowed is returned when a link cannot be set when putting the
	// object in a bucket.
	ErrLinkNotAllowed JetStreamError = &jsError{message: "link cannot be set when putting the object in bucket"}

	// ErrObjectRequired is returned when an object is required.
	ErrObjectRequired = &jsError{message: "object required"}

	// ErrNoLinkToDeleted is returned when it is not allowed to link to a
	// deleted object.
	ErrNoLinkToDeleted JetStreamError = &jsError{message: "not allowed to link to a deleted object"}

	// ErrNoLinkToLink is returned when it is not allowed to link to another
	// link.
	ErrNoLinkToLink JetStreamError = &jsError{message: "not allowed to link to another link"}

	// ErrCantGetBucket is returned when an invalid Get is attempted on an
	// object that is a link to a bucket.
	ErrCantGetBucket JetStreamError = &jsError{message: "invalid Get, object is a link to a bucket"}

	// ErrBucketRequired is returned when a bucket is required.
	ErrBucketRequired JetStreamError = &jsError{message: "bucket required"}

	// ErrBucketMalformed is returned when a bucket is malformed.
	ErrBucketMalformed JetStreamError = &jsError{message: "bucket malformed"}

	// ErrUpdateMetaDeleted is returned when the meta information of a deleted
	// object cannot be updated.
	ErrUpdateMetaDeleted JetStreamError = &jsError{message: "cannot update meta for a deleted object"}
)

type JetStreamOpt

type JetStreamOpt func(*jsOpts) error

JetStreamOpt is a functional option for New, NewWithAPIPrefix and NewWithDomain methods.

func WithClientTrace

func WithClientTrace(ct *ClientTrace) JetStreamOpt

WithClientTrace enables request/response API calls tracing.

func WithPublishAsyncErrHandler

func WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt

WithPublishAsyncErrHandler sets error handler for async message publish.

func WithPublishAsyncMaxPending

func WithPublishAsyncMaxPending(max int) JetStreamOpt

WithPublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.

type KVDeleteOpt added in v1.29.0

type KVDeleteOpt interface {
	// contains filtered or unexported methods
}

KVDeleteOpt is used to configure delete and purge operations.

func LastRevision added in v1.29.0

func LastRevision(revision uint64) KVDeleteOpt

LastRevision deletes if the latest revision matches the provided one. If the provided revision is not the latest, the delete will return an error.

type KVPurgeOpt added in v1.29.0

type KVPurgeOpt interface {
	// contains filtered or unexported methods
}

KVPurgeOpt is used to configure PurgeDeletes.

type KeyLister added in v1.32.0

type KeyLister interface {
	Keys() <-chan string
	Stop() error
}

KeyLister is used to retrieve a list of key value store keys. It returns a channel to read the keys from. The lister will always close the channel when done (either all keys have been read or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all keys have been read.

type KeyValue added in v1.29.0

type KeyValue interface {
	// Get returns the latest value for the key. If the key does not exist,
	// ErrKeyNotFound will be returned.
	Get(ctx context.Context, key string) (KeyValueEntry, error)

	// GetRevision returns a specific revision value for the key. If the key
	// does not exist or the provided revision does not exists,
	// ErrKeyNotFound will be returned.
	GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error)

	// Put will place the new value for the key into the store. If the key
	// does not exist, it will be created. If the key exists, the value will
	// be updated.
	//
	// A key has to consist of alphanumeric characters, dashes, underscores,
	// equal signs, and dots.
	Put(ctx context.Context, key string, value []byte) (uint64, error)

	// PutString will place the string for the key into the store. If the
	// key does not exist, it will be created. If the key exists, the value
	// will be updated.
	//
	// A key has to consist of alphanumeric characters, dashes, underscores,
	// equal signs, and dots.
	PutString(ctx context.Context, key string, value string) (uint64, error)

	// Create will add the key/value pair if it does not exist. If the key
	// already exists, ErrKeyExists will be returned.
	//
	// A key has to consist of alphanumeric characters, dashes, underscores,
	// equal signs, and dots.
	Create(ctx context.Context, key string, value []byte) (uint64, error)

	// Update will update the value if the latest revision matches.
	// If the provided revision is not the latest, Update will return an error.
	Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)

	// Delete will place a delete marker and leave all revisions. A history
	// of a deleted key can still be retrieved by using the History method
	// or a watch on the key. [Delete] is a non-destructive operation and
	// will not remove any previous revisions from the underlying stream.
	//
	// [LastRevision] option can be specified to only perform delete if the
	// latest revision the provided one.
	Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error

	// Purge will place a delete marker and remove all previous revisions.
	// Only the latest revision will be preserved (with a delete marker).
	// Unlike [Delete], Purge is a destructive operation and will remove all
	// previous revisions from the underlying streams.
	//
	// [LastRevision] option can be specified to only perform purge if the
	// latest revision the provided one.
	Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error

	// Watch for any updates to keys that match the keys argument which
	// could include wildcards. By default, the watcher will send the latest
	// value for each key and all future updates. Watch will send a nil
	// entry when it has received all initial values. There are a few ways
	// to configure the watcher:
	//
	// - IncludeHistory will have the key watcher send all historical values
	// for each key (up to KeyValueMaxHistory).
	// - IgnoreDeletes will have the key watcher not pass any keys with
	// delete markers.
	// - UpdatesOnly will have the key watcher only pass updates on values
	// (without latest values when started).
	// - MetaOnly will have the key watcher retrieve only the entry meta
	// data, not the entry value.
	// - ResumeFromRevision instructs the key watcher to resume from a
	// specific revision number.
	Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error)

	// WatchAll will watch for any updates to all keys. It can be configured
	// with the same options as Watch.
	WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error)

	// Keys will return all keys.
	// Deprecated: Use ListKeys instead to avoid memory issues.
	Keys(ctx context.Context, opts ...WatchOpt) ([]string, error)

	// ListKeys will return KeyLister, allowing to retrieve all keys from
	// the key value store in a streaming fashion (on a channel).
	ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)

	// History will return all historical values for the key (up to
	// KeyValueMaxHistory).
	History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error)

	// Bucket returns the KV store name.
	Bucket() string

	// PurgeDeletes will remove all current delete markers. It can be
	// configured using DeleteMarkersOlderThan option to only remove delete
	// markers older than a certain duration.
	//
	// [PurgeDeletes] is a destructive operation and will remove all entries
	// with delete markers from the underlying stream.
	PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error

	// Status retrieves the status and configuration of a bucket.
	Status(ctx context.Context) (KeyValueStatus, error)
}

KeyValue contains methods to operate on a KeyValue store. Using the KeyValue interface, it is possible to:

- Get, Put, Create, Update, Delete and Purge a key - Watch for updates to keys - List all keys - Retrieve historical values for a key - Retrieve status and configuration of a key value bucket - Purge all delete markers - Close the KeyValue store

type KeyValueBucketStatus added in v1.29.0

type KeyValueBucketStatus struct {
	// contains filtered or unexported fields
}

KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus

func (*KeyValueBucketStatus) BackingStore added in v1.29.0

func (s *KeyValueBucketStatus) BackingStore() string

BackingStore indicates what technology is used for storage of the bucket

func (*KeyValueBucketStatus) Bucket added in v1.29.0

func (s *KeyValueBucketStatus) Bucket() string

Bucket the name of the bucket

func (*KeyValueBucketStatus) Bytes added in v1.29.0

func (s *KeyValueBucketStatus) Bytes() uint64

Bytes is the size of the stream

func (*KeyValueBucketStatus) History added in v1.29.0

func (s *KeyValueBucketStatus) History() int64

History returns the configured history kept per key

func (*KeyValueBucketStatus) IsCompressed added in v1.32.0

func (s *KeyValueBucketStatus) IsCompressed() bool

IsCompressed indicates if the data is compressed on disk

func (*KeyValueBucketStatus) StreamInfo added in v1.29.0

func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo

StreamInfo is the stream info retrieved to create the status

func (*KeyValueBucketStatus) TTL added in v1.29.0

TTL is how long the bucket keeps values for

func (*KeyValueBucketStatus) Values added in v1.29.0

func (s *KeyValueBucketStatus) Values() uint64

Values is how many messages are in the bucket, including historical values

type KeyValueConfig added in v1.29.0

type KeyValueConfig struct {
	// Bucket is the name of the KeyValue store. Bucket name has to be
	// unique and can only contain alphanumeric characters, dashes, and
	// underscores.
	Bucket string `json:"bucket"`

	// Description is an optional description for the KeyValue store.
	Description string `json:"description,omitempty"`

	// MaxValueSize is the maximum size of a value in bytes. If not
	// specified, the default is -1 (unlimited).
	MaxValueSize int32 `json:"max_value_size,omitempty"`

	// History is the number of historical values to keep per key. If not
	// specified, the default is 1. Max is 64.
	History uint8 `json:"history,omitempty"`

	// TTL is the expiry time for keys. By default, keys do not expire.
	TTL time.Duration `json:"ttl,omitempty"`

	// MaxBytes is the maximum size in bytes of the KeyValue store. If not
	// specified, the default is -1 (unlimited).
	MaxBytes int64 `json:"max_bytes,omitempty"`

	// Storage is the type of storage to use for the KeyValue store. If not
	// specified, the default is FileStorage.
	Storage StorageType `json:"storage,omitempty"`

	// Replicas is the number of replicas to keep for the KeyValue store in
	// clustered jetstream. Defaults to 1, maximum is 5.
	Replicas int `json:"num_replicas,omitempty"`

	// Placement is used to declare where the stream should be placed via
	// tags and/or an explicit cluster name.
	Placement *Placement `json:"placement,omitempty"`

	// RePublish allows immediate republishing a message to the configured
	// subject after it's stored.
	RePublish *RePublish `json:"republish,omitempty"`

	// Mirror defines the consiguration for mirroring another KeyValue
	// store.
	Mirror *StreamSource `json:"mirror,omitempty"`

	// Sources defines the configuration for sources of a KeyValue store.
	Sources []*StreamSource `json:"sources,omitempty"`

	// Compression sets the underlying stream compression.
	// NOTE: Compression is supported for nats-server 2.10.0+
	Compression bool `json:"compression,omitempty"`
}

KeyValueConfig is the configuration for a KeyValue store.

type KeyValueEntry added in v1.29.0

type KeyValueEntry interface {
	// Bucket is the bucket the data was loaded from.
	Bucket() string

	// Key is the name of the key that was retrieved.
	Key() string

	// Value is the retrieved value.
	Value() []byte

	// Revision is a unique sequence for this value.
	Revision() uint64

	// Created is the time the data was put in the bucket.
	Created() time.Time

	// Delta is distance from the latest value (how far the current sequence
	// is from the latest).
	Delta() uint64

	// Operation returns Put or Delete or Purge, depending on the manner in
	// which the current revision was created.
	Operation() KeyValueOp
}

KeyValueEntry is a retrieved entry for Get, List or Watch.

type KeyValueLister added in v1.29.0

type KeyValueLister interface {
	Status() <-chan KeyValueStatus
	Error() error
}

KeyValueLister is used to retrieve a list of key value stores. It returns a channel to read the KV store statuses from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all KeyValue stores have been read.

type KeyValueManager added in v1.29.0

type KeyValueManager interface {
	// KeyValue will lookup and bind to an existing KeyValue store.
	//
	// If the KeyValue store with given name does not exist,
	// ErrBucketNotFound will be returned.
	KeyValue(ctx context.Context, bucket string) (KeyValue, error)

	// CreateKeyValue will create a KeyValue store with the given
	// configuration.
	//
	// If a KeyValue store with the same name already exists and the
	// configuration is different, ErrBucketExists will be returned.
	CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error)

	// UpdateKeyValue will update an existing KeyValue store with the given
	// configuration.
	//
	// If a KeyValue store with the given name does not exist, ErrBucketNotFound
	// will be returned.
	UpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error)

	// CreateOrUpdateKeyValue will create a KeyValue store if it does not
	// exist or update an existing KeyValue store with the given
	// configuration (if possible).
	CreateOrUpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error)

	// DeleteKeyValue will delete this KeyValue store.
	//
	// If the KeyValue store with given name does not exist,
	// ErrBucketNotFound will be returned.
	DeleteKeyValue(ctx context.Context, bucket string) error

	// KeyValueStoreNames is used to retrieve a list of key value store
	// names. It returns a KeyValueNamesLister exposing a channel to read
	// the names from. The lister will always close the channel when done
	// (either all names have been read or an error occurred) and therefore
	// can be used in range loops.
	KeyValueStoreNames(ctx context.Context) KeyValueNamesLister

	// KeyValueStores is used to retrieve a list of key value store
	// statuses. It returns a KeyValueLister exposing a channel to read the
	// statuses from. The lister will always close the channel when done
	// (either all statuses have been read or an error occurred) and
	// therefore can be used in range loops.
	KeyValueStores(ctx context.Context) KeyValueLister
}

KeyValueManager is used to manage KeyValue stores. It provides methods to create, delete, and retrieve KeyValue stores.

type KeyValueNamesLister added in v1.29.0

type KeyValueNamesLister interface {
	Name() <-chan string
	Error() error
}

KeyValueNamesLister is used to retrieve a list of key value store names. It returns a channel to read the KV bucket names from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all bucket names have been read.

type KeyValueOp added in v1.29.0

type KeyValueOp uint8

KeyValueOp represents the type of KV operation (Put, Delete, Purge). It is a part of KeyValueEntry.

const (
	// KeyValuePut is a set on a revision which creates or updates a value for a
	// key.
	KeyValuePut KeyValueOp = iota

	// KeyValueDelete is a set on a revision which adds a delete marker for a
	// key.
	KeyValueDelete

	// KeyValuePurge is a set on a revision which removes all previous revisions
	// for a key.
	KeyValuePurge
)

Available KeyValueOp values.

func (KeyValueOp) String added in v1.29.0

func (op KeyValueOp) String() string

type KeyValueStatus added in v1.29.0

type KeyValueStatus interface {
	// Bucket returns the name of the KeyValue store.
	Bucket() string

	// Values is how many messages are in the bucket, including historical values.
	Values() uint64

	// History returns the configured history kept per key.
	History() int64

	// TTL returns the duration for which keys are kept in the bucket.
	TTL() time.Duration

	// BackingStore indicates what technology is used for storage of the bucket.
	// Currently only JetStream is supported.
	BackingStore() string

	// Bytes returns the size of the bucket in bytes.
	Bytes() uint64

	// IsCompressed indicates if the data is compressed on disk.
	IsCompressed() bool
}

KeyValueStatus is run-time status about a Key-Value bucket.

type KeyWatcher added in v1.29.0

type KeyWatcher interface {
	Updates() <-chan KeyValueEntry
	Stop() error
}

KeyWatcher is what is returned when doing a watch. It can be used to retrieve updates to keys. If not using UpdatesOnly option, it will also send the latest value for each key. After all initial values have been sent, a nil entry will be sent. Stop can be used to stop the watcher and close the underlying channel. Watcher will not close the channel until Stop is called or connection is closed.

type ListObjectsOpt added in v1.32.0

type ListObjectsOpt func(opts *listObjectOpts) error

ListObjectsOpt is used to set additional options when listing objects.

func ListObjectsShowDeleted added in v1.32.0

func ListObjectsShowDeleted() ListObjectsOpt

ListObjectsShowDeleted makes [ObjectStore.ListObjects] also return deleted objects.

type MessageBatch

type MessageBatch interface {
	Messages() <-chan Msg
	Error() error
}

type MessageHandler

type MessageHandler func(msg Msg)

MessageHandler is a handler function used as callback in [Consume].

type MessagesContext

type MessagesContext interface {
	// Next retrieves next message on a stream. It will block until the next
	// message is available. If the context is canceled, Next will return
	// ErrMsgIteratorClosed error.
	Next() (Msg, error)

	// Stop unsubscribes from the stream and cancels subscription. Calling
	// Next after calling Stop will return ErrMsgIteratorClosed error.
	// All messages that are already in the buffer are discarded.
	Stop()

	// Drain unsubscribes from the stream and cancels subscription. All
	// messages that are already in the buffer will be available on
	// subsequent calls to Next. After the buffer is drained, Next will
	// return ErrMsgIteratorClosed error.
	Drain()
}

MessagesContext supports iterating over a messages on a stream. It is returned by [Consumer.Messages] method.

type Msg

type Msg interface {
	// Metadata returns [MsgMetadata] for a JetStream message.
	Metadata() (*MsgMetadata, error)

	// Data returns the message body.
	Data() []byte

	// Headers returns a map of headers for a message.
	Headers() nats.Header

	// Subject returns a subject on which a message was published/received.
	Subject() string

	// Reply returns a reply subject for a message.
	Reply() string

	// Ack acknowledges a message. This tells the server that the message was
	// successfully processed and it can move on to the next message.
	Ack() error

	// DoubleAck acknowledges a message and waits for ack reply from the server.
	// While it impacts performance, it is useful for scenarios where
	// message loss is not acceptable.
	DoubleAck(context.Context) error

	// Nak negatively acknowledges a message. This tells the server to
	// redeliver the message.
	//
	// Nak does not adhere to AckWait or Backoff configured on the consumer
	// and triggers instant redelivery. For a delayed redelivery, use
	// NakWithDelay.
	Nak() error

	// NakWithDelay negatively acknowledges a message. This tells the server
	// to redeliver the message after the given delay.
	NakWithDelay(delay time.Duration) error

	// InProgress tells the server that this message is being worked on. It
	// resets the redelivery timer on the server.
	InProgress() error

	// Term tells the server to not redeliver this message, regardless of
	// the value of MaxDeliver.
	Term() error

	// TermWithReason tells the server to not redeliver this message, regardless of
	// the value of MaxDeliver. The provided reason will be included in JetStream
	// advisory event sent by the server.
	//
	// Note: This will only work with JetStream servers >= 2.10.4.
	// For older servers, TermWithReason will be ignored by the server and the message
	// will not be terminated.
	TermWithReason(reason string) error
}

Msg contains methods to operate on a JetStream message. Metadata, Data, Headers, Subject and Reply can be used to retrieve the specific parts of the underlying message. Ack, DoubleAck, Nak, NakWithDelay, InProgress and Term are various flavors of ack requests.

type MsgErrHandler

type MsgErrHandler func(JetStream, *nats.Msg, error)

MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync. It will return the original message sent to the server for possible retransmitting and the error encountered.

type MsgMetadata

type MsgMetadata struct {
	// Sequence is the sequence information for the message.
	Sequence SequencePair

	// NumDelivered is the number of times this message was delivered to the
	// consumer.
	NumDelivered uint64

	// NumPending is the number of messages that match the consumer's
	// filter, but have not been delivered yet.
	NumPending uint64

	// Timestamp is the time the message was originally stored on a stream.
	Timestamp time.Time

	// Stream is the stream name this message is stored on.
	Stream string

	// Consumer is the consumer name this message was delivered to.
	Consumer string

	// Domain is the domain this message was received on.
	Domain string
}

MsgMetadata is the JetStream metadata associated with received messages.

type ObjectBucketStatus added in v1.32.0

type ObjectBucketStatus struct {
	// contains filtered or unexported fields
}

ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus

func (*ObjectBucketStatus) BackingStore added in v1.32.0

func (s *ObjectBucketStatus) BackingStore() string

BackingStore indicates what technology is used for storage of the bucket

func (*ObjectBucketStatus) Bucket added in v1.32.0

func (s *ObjectBucketStatus) Bucket() string

Bucket is the name of the bucket

func (*ObjectBucketStatus) Description added in v1.32.0

func (s *ObjectBucketStatus) Description() string

Description is the description supplied when creating the bucket

func (*ObjectBucketStatus) IsCompressed added in v1.33.0

func (s *ObjectBucketStatus) IsCompressed() bool

IsCompressed indicates if the data is compressed on disk

func (*ObjectBucketStatus) Metadata added in v1.32.0

func (s *ObjectBucketStatus) Metadata() map[string]string

Metadata is the metadata supplied when creating the bucket

func (*ObjectBucketStatus) Replicas added in v1.32.0

func (s *ObjectBucketStatus) Replicas() int

Replicas indicates how many storage replicas are kept for the data in the bucket

func (*ObjectBucketStatus) Sealed added in v1.32.0

func (s *ObjectBucketStatus) Sealed() bool

Sealed indicates the stream is sealed and cannot be modified in any way

func (*ObjectBucketStatus) Size added in v1.32.0

func (s *ObjectBucketStatus) Size() uint64

Size is the combined size of all data in the bucket including metadata, in bytes

func (*ObjectBucketStatus) Storage added in v1.32.0

func (s *ObjectBucketStatus) Storage() StorageType

Storage indicates the underlying JetStream storage technology used to store data

func (*ObjectBucketStatus) StreamInfo added in v1.32.0

func (s *ObjectBucketStatus) StreamInfo() *StreamInfo

StreamInfo is the stream info retrieved to create the status

func (*ObjectBucketStatus) TTL added in v1.32.0

func (s *ObjectBucketStatus) TTL() time.Duration

TTL indicates how long objects are kept in the bucket

type ObjectInfo added in v1.32.0

type ObjectInfo struct {
	// ObjectMeta contains high level information about the object.
	ObjectMeta

	// Bucket is the name of the object store.
	Bucket string `json:"bucket"`

	// NUID is the unique identifier for the object set when putting the
	// object into the store.
	NUID string `json:"nuid"`

	// Size is the size of the object in bytes. It only includes the size of
	// the object itself, not the metadata.
	Size uint64 `json:"size"`

	// ModTime is the last modification time of the object.
	ModTime time.Time `json:"mtime"`

	// Chunks is the number of chunks the object is split into. Maximum size
	// of each chunk can be specified in ObjectMetaOptions.
	Chunks uint32 `json:"chunks"`

	// Digest is the SHA-256 digest of the object. It is used to verify the
	// integrity of the object.
	Digest string `json:"digest,omitempty"`

	// Deleted indicates if the object is marked as deleted.
	Deleted bool `json:"deleted,omitempty"`
}

ObjectInfo contains ObjectMeta and additional information about an object.

type ObjectLink struct {
	// Bucket is the name of the object store the link is pointing to.
	Bucket string `json:"bucket"`

	// Name can be used to link to a single object.
	// If empty means this is a link to the whole store, like a directory.
	Name string `json:"name,omitempty"`
}

ObjectLink is used to embed links to other buckets and objects.

type ObjectMeta added in v1.32.0

type ObjectMeta struct {
	// Name is the name of the object. The name is required when adding an
	// object and has to be unique within the object store.
	Name string `json:"name"`

	// Description is an optional description for the object.
	Description string `json:"description,omitempty"`

	// Headers is an optional set of user-defined headers for the object.
	Headers nats.Header `json:"headers,omitempty"`

	// Metadata is the user supplied metadata for the object.
	Metadata map[string]string `json:"metadata,omitempty"`

	// Additional options for the object.
	Opts *ObjectMetaOptions `json:"options,omitempty"`
}

ObjectMeta is high level information about an object.

type ObjectMetaOptions added in v1.32.0

type ObjectMetaOptions struct {
	// Link contains information about a link to another object or object store.
	// It should not be set manually, but rather by using the AddLink or
	// AddBucketLink methods.
	Link *ObjectLink `json:"link,omitempty"`

	// ChunkSize is the maximum size of each chunk in bytes. If not specified,
	// the default is 128k.
	ChunkSize uint32 `json:"max_chunk_size,omitempty"`
}

ObjectMetaOptions is used to set additional options when creating an object.

type ObjectResult added in v1.32.0

type ObjectResult interface {
	io.ReadCloser
	Info() (*ObjectInfo, error)
	Error() error
}

ObjectResult will return the object info and a reader to read the object's contents. The reader will be closed when all data has been read or an error occurs.

type ObjectStore added in v1.32.0

type ObjectStore interface {
	// Put will place the contents from the reader into a new object. If the
	// object already exists, it will be overwritten. The object name is
	// required and is taken from the ObjectMeta.Name field.
	//
	// The reader will be read until EOF. ObjectInfo will be returned, containing
	// the object's metadata, digest and instance information.
	Put(ctx context.Context, obj ObjectMeta, reader io.Reader) (*ObjectInfo, error)

	// PutBytes is convenience function to put a byte slice into this object
	// store under the given name.
	//
	// ObjectInfo will be returned, containing the object's metadata, digest
	// and instance information.
	PutBytes(ctx context.Context, name string, data []byte) (*ObjectInfo, error)

	// PutString is convenience function to put a string into this object
	// store under the given name.
	//
	// ObjectInfo will be returned, containing the object's metadata, digest
	// and instance information.
	PutString(ctx context.Context, name string, data string) (*ObjectInfo, error)

	// PutFile is convenience function to put a file contents into this
	// object store. The name of the object will be the path of the file.
	//
	// ObjectInfo will be returned, containing the object's metadata, digest
	// and instance information.
	PutFile(ctx context.Context, file string) (*ObjectInfo, error)

	// Get will pull the named object from the object store. If the object
	// does not exist, ErrObjectNotFound will be returned.
	//
	// The returned ObjectResult will contain the object's metadata and a
	// reader to read the object's contents. The reader will be closed when
	// all data has been read or an error occurs.
	//
	// A GetObjectShowDeleted option can be supplied to return an object
	// even if it was marked as deleted.
	Get(ctx context.Context, name string, opts ...GetObjectOpt) (ObjectResult, error)

	// GetBytes is a convenience function to pull an object from this object
	// store and return it as a byte slice.
	//
	// If the object does not exist, ErrObjectNotFound will be returned.
	//
	// A GetObjectShowDeleted option can be supplied to return an object
	// even if it was marked as deleted.
	GetBytes(ctx context.Context, name string, opts ...GetObjectOpt) ([]byte, error)

	// GetString is a convenience function to pull an object from this
	// object store and return it as a string.
	//
	// If the object does not exist, ErrObjectNotFound will be returned.
	//
	// A GetObjectShowDeleted option can be supplied to return an object
	// even if it was marked as deleted.
	GetString(ctx context.Context, name string, opts ...GetObjectOpt) (string, error)

	// GetFile is a convenience function to pull an object from this object
	// store and place it in a file. If the file already exists, it will be
	// overwritten, otherwise it will be created.
	//
	// If the object does not exist, ErrObjectNotFound will be returned.
	// A GetObjectShowDeleted option can be supplied to return an object
	// even if it was marked as deleted.
	GetFile(ctx context.Context, name, file string, opts ...GetObjectOpt) error

	// GetInfo will retrieve the current information for the object, containing
	// the object's metadata and instance information.
	//
	// If the object does not exist, ErrObjectNotFound will be returned.
	//
	// A GetObjectInfoShowDeleted option can be supplied to return an object
	// even if it was marked as deleted.
	GetInfo(ctx context.Context, name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)

	// UpdateMeta will update the metadata for the object.
	//
	// If the object does not exist, ErrUpdateMetaDeleted will be returned.
	// If the new name is different from the old name, and an object with the
	// new name already exists, ErrObjectAlreadyExists will be returned.
	UpdateMeta(ctx context.Context, name string, meta ObjectMeta) error

	// Delete will delete the named object from the object store. If the object
	// does not exist, ErrObjectNotFound will be returned. If the object is
	// already deleted, no error will be returned.
	//
	// All chunks for the object will be purged, and the object will be marked
	// as deleted.
	Delete(ctx context.Context, name string) error

	// AddLink will add a link to another object. A link is a reference to
	// another object. The provided name is the name of the link object.
	// The provided ObjectInfo is the info of the object being linked to.
	//
	// If an object with given name already exists, ErrObjectAlreadyExists
	// will be returned.
	// If object being linked to is deleted, ErrNoLinkToDeleted will be
	// returned.
	// If the provided object is a link, ErrNoLinkToLink will be returned.
	// If the provided object is nil or the name is empty, ErrObjectRequired
	// will be returned.
	AddLink(ctx context.Context, name string, obj *ObjectInfo) (*ObjectInfo, error)

	// AddBucketLink will add a link to another object store. A link is a
	// reference to another object store. The provided name is the name of
	// the link object.
	// The provided ObjectStore is the object store being linked to.
	//
	// If an object with given name already exists, ErrObjectAlreadyExists
	// will be returned.
	// If the provided object store is nil ErrBucketRequired will be returned.
	AddBucketLink(ctx context.Context, name string, bucket ObjectStore) (*ObjectInfo, error)

	// Seal will seal the object store, no further modifications will be allowed.
	Seal(ctx context.Context) error

	// Watch for any updates to objects in the store. By default, the watcher will send the latest
	// info for each object and all future updates. Watch will send a nil
	// entry when it has received all initial values. There are a few ways
	// to configure the watcher:
	//
	// - IncludeHistory will have the watcher send all historical information
	// for each object.
	// - IgnoreDeletes will have the watcher not pass any objects with
	// delete markers.
	// - UpdatesOnly will have the watcher only pass updates on objects
	// (without latest info when started).
	Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, error)

	// List will list information about objects in the store.
	//
	// If the object store is empty, ErrNoObjectsFound will be returned.
	List(ctx context.Context, opts ...ListObjectsOpt) ([]*ObjectInfo, error)

	// Status retrieves the status and configuration of the bucket.
	Status(ctx context.Context) (ObjectStoreStatus, error)
}

ObjectStore contains methods to operate on an object store. Using the ObjectStore interface, it is possible to:

  • Perform CRUD operations on objects (Get, Put, Delete). Get and put expose convenience methods to work with byte slices, strings and files, in addition to streaming io.Reader
  • Get information about an object without retrieving it.
  • Update the metadata of an object.
  • Add links to other objects or object stores.
  • Watch for updates to a store
  • List information about objects in a store
  • Retrieve status and configuration of an object store.

type ObjectStoreConfig added in v1.32.0

type ObjectStoreConfig struct {
	// Bucket is the name of the object store. Bucket name has to be
	// unique and can only contain alphanumeric characters, dashes, and
	// underscores.
	Bucket string `json:"bucket"`

	// Description is an optional description for the object store.
	Description string `json:"description,omitempty"`

	// TTL is the maximum age of objects in the store. If an object is not
	// updated within this time, it will be removed from the store.
	// By default, objects do not expire.
	TTL time.Duration `json:"max_age,omitempty"`

	// MaxBytes is the maximum size of the object store. If not specified,
	// the default is -1 (unlimited).
	MaxBytes int64 `json:"max_bytes,omitempty"`

	// Storage is the type of storage to use for the object store. If not
	// specified, the default is FileStorage.
	Storage StorageType `json:"storage,omitempty"`

	// Replicas is the number of replicas to keep for the object store in
	// clustered jetstream. Defaults to 1, maximum is 5.
	Replicas int `json:"num_replicas,omitempty"`

	// Placement is used to declare where the object store should be placed via
	// tags and/or an explicit cluster name.
	Placement *Placement `json:"placement,omitempty"`

	// Compression enables the underlying stream compression.
	// NOTE: Compression is supported for nats-server 2.10.0+
	Compression bool `json:"compression,omitempty"`

	// Bucket-specific metadata
	// NOTE: Metadata requires nats-server v2.10.0+
	Metadata map[string]string `json:"metadata,omitempty"`
}

ObjectStoreConfig is the configuration for the object store.

type ObjectStoreManager added in v1.32.0

type ObjectStoreManager interface {
	// ObjectStore will look up and bind to an existing object store
	// instance.
	//
	// If the object store with given name does not exist, ErrBucketNotFound
	// will be returned.
	ObjectStore(ctx context.Context, bucket string) (ObjectStore, error)

	// CreateObjectStore will create a new object store with the given
	// configuration.
	//
	// If the object store with given name already exists, ErrBucketExists
	// will be returned.
	CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)

	// UpdateObjectStore will update an existing object store with the given
	// configuration.
	//
	// If the object store with given name does not exist, ErrBucketNotFound
	// will be returned.
	UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)

	// CreateOrUpdateObjectStore will create a new object store with the given
	// configuration if it does not exist, or update an existing object store
	// with the given configuration.
	CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)

	// DeleteObjectStore will delete the provided object store.
	//
	// If the object store with given name does not exist, ErrBucketNotFound
	// will be returned.
	DeleteObjectStore(ctx context.Context, bucket string) error

	// ObjectStoreNames is used to retrieve a list of bucket names.
	// It returns an ObjectStoreNamesLister exposing a channel to receive
	// the names of the object stores.
	//
	// The lister will always close the channel when done (either all names
	// have been read or an error occurred) and therefore can be used in a
	// for-range loop.
	ObjectStoreNames(ctx context.Context) ObjectStoreNamesLister

	// ObjectStores is used to retrieve a list of bucket statuses.
	// It returns an ObjectStoresLister exposing a channel to receive
	// the statuses of the object stores.
	//
	// The lister will always close the channel when done (either all statuses
	// have been read or an error occurred) and therefore can be used in a
	// for-range loop.
	ObjectStores(ctx context.Context) ObjectStoresLister
}

ObjectStoreManager is used to manage object stores. It provides methods CRUD operations on object stores.

type ObjectStoreNamesLister added in v1.32.0

type ObjectStoreNamesLister interface {
	Name() <-chan string
	Error() error
}

ObjectStoreNamesLister is used to retrieve a list of object store names. It returns a channel to read the bucket names from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all bucket names have been read.

type ObjectStoreStatus added in v1.32.0

type ObjectStoreStatus interface {
	// Bucket returns the name of the object store.
	Bucket() string

	// Description is the description supplied when creating the bucket.
	Description() string

	// TTL indicates how long objects are kept in the bucket.
	TTL() time.Duration

	// Storage indicates the underlying JetStream storage technology used to
	// store data.
	Storage() StorageType

	// Replicas indicates how many storage replicas are kept for the data in
	// the bucket.
	Replicas() int

	// Sealed indicates the stream is sealed and cannot be modified in any
	// way.
	Sealed() bool

	// Size is the combined size of all data in the bucket including
	// metadata, in bytes.
	Size() uint64

	// BackingStore indicates what technology is used for storage of the
	// bucket. Currently only JetStream is supported.
	BackingStore() string

	// Metadata is the user supplied metadata for the bucket.
	Metadata() map[string]string

	// IsCompressed indicates if the data is compressed on disk.
	IsCompressed() bool
}

ObjectStoreStatus is run-time status about a bucket.

type ObjectStoresLister added in v1.32.0

type ObjectStoresLister interface {
	Status() <-chan ObjectStoreStatus
	Error() error
}

ObjectStoresLister is used to retrieve a list of object stores. It returns a channel to read the bucket store statuses from. The lister will always close the channel when done (either all stores have been retrieved or an error occurred) and therefore can be used in range loops. Stop can be used to stop the lister when not all object stores have been read.

type ObjectWatcher added in v1.32.0

type ObjectWatcher interface {
	Updates() <-chan *ObjectInfo
	Stop() error
}

ObjectWatcher is what is returned when doing a watch. It can be used to retrieve updates to objects in a bucket. If not using UpdatesOnly option, it will also send the latest value for each key. After all initial values have been sent, a nil entry will be sent. Stop can be used to stop the watcher and close the underlying channel. Watcher will not close the channel until Stop is called or connection is closed.

type OrderedConsumerConfig

type OrderedConsumerConfig struct {
	// FilterSubjects allows filtering messages from a stream by subject.
	// This field is exclusive with FilterSubject. Requires nats-server
	// v2.10.0 or later.
	FilterSubjects []string `json:"filter_subjects,omitempty"`

	// DeliverPolicy defines from which point to start delivering messages
	// from the stream. Defaults to DeliverAllPolicy.
	DeliverPolicy DeliverPolicy `json:"deliver_policy"`

	// OptStartSeq is an optional sequence number from which to start
	// message delivery. Only applicable when DeliverPolicy is set to
	// DeliverByStartSequencePolicy.
	OptStartSeq uint64 `json:"opt_start_seq,omitempty"`

	// OptStartTime is an optional time from which to start message
	// delivery. Only applicable when DeliverPolicy is set to
	// DeliverByStartTimePolicy.
	OptStartTime *time.Time `json:"opt_start_time,omitempty"`

	// ReplayPolicy defines the rate at which messages are sent to the
	// consumer. If ReplayOriginalPolicy is set, messages are sent in the
	// same intervals in which they were stored on stream. This can be used
	// e.g. to simulate production traffic in development environments. If
	// ReplayInstantPolicy is set, messages are sent as fast as possible.
	// Defaults to ReplayInstantPolicy.
	ReplayPolicy ReplayPolicy `json:"replay_policy"`

	// InactiveThreshold is a duration which instructs the server to clean
	// up the consumer if it has been inactive for the specified duration.
	// Defaults to 5s.
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

	// HeadersOnly indicates whether only headers of messages should be sent
	// (and no payload). Defaults to false.
	HeadersOnly bool `json:"headers_only,omitempty"`

	// Maximum number of attempts for the consumer to be recreated in a
	// single recreation cycle. Defaults to unlimited.
	MaxResetAttempts int
}

OrderedConsumerConfig is the configuration of an ordered JetStream consumer. For more information, see Ordered Consumers in README

type PeerInfo

type PeerInfo struct {
	// Name is the server name of the peer.
	Name string `json:"name"`

	// Current indicates if the peer is up to date and synchronized with the
	// leader.
	Current bool `json:"current"`

	// Offline indicates if the peer is considered offline by the group.
	Offline bool `json:"offline,omitempty"`

	// Active it the duration since this peer was last seen.
	Active time.Duration `json:"active"`

	// Lag is the number of uncommitted operations this peer is behind the
	// leader.
	Lag uint64 `json:"lag,omitempty"`
}

PeerInfo shows information about the peers in the cluster that are supporting the stream or consumer.

type Placement

type Placement struct {
	// Cluster is the name of the cluster to which the stream should be
	// assigned.
	Cluster string `json:"cluster"`

	// Tags are used to match streams to servers in the cluster. A stream
	// will be assigned to a server with a matching tag.
	Tags []string `json:"tags,omitempty"`
}

Placement is used to guide placement of streams in clustered JetStream.

type PubAck

type PubAck struct {
	// Stream is the stream name the message was published to.
	Stream string `json:"stream"`

	// Sequence is the stream sequence number of the message.
	Sequence uint64 `json:"seq"`

	// Duplicate indicates whether the message was a duplicate.
	// Duplicate can be detected using the [MsgIDHeader] and [StreamConfig.Duplicates].
	Duplicate bool `json:"duplicate,omitempty"`

	// Domain is the domain the message was published to.
	Domain string `json:"domain,omitempty"`
}

PubAck is an ack received after successfully publishing a message.

type PubAckFuture

type PubAckFuture interface {
	// Ok returns a receive only channel that can be used to get a PubAck.
	Ok() <-chan *PubAck

	// Err returns a receive only channel that can be used to get the error from an async publish.
	Err() <-chan error

	// Msg returns the message that was sent to the server.
	Msg() *nats.Msg
}

PubAckFuture is a future for a PubAck. It can be used to wait for a PubAck or an error after an async publish.

type PublishOpt

type PublishOpt func(*pubOpts) error

PublishOpt are the options that can be passed to Publish methods.

func WithExpectLastMsgID

func WithExpectLastMsgID(id string) PublishOpt

WithExpectLastMsgID sets the expected message ID the last message on a stream should have. If the last message has a different message ID server will reject the message and publish will fail.

func WithExpectLastSequence

func WithExpectLastSequence(seq uint64) PublishOpt

WithExpectLastSequence sets the expected sequence number the last message on a stream should have. If the last message has a different sequence number server will reject the message and publish will fail.

func WithExpectLastSequencePerSubject

func WithExpectLastSequencePerSubject(seq uint64) PublishOpt

WithExpectLastSequencePerSubject sets the expected sequence number the last message on a subject the message is published to. If the last message on a subject has a different sequence number server will reject the message and publish will fail.

func WithExpectStream

func WithExpectStream(stream string) PublishOpt

WithExpectStream sets the expected stream the message should be published to. If the message is published to a different stream server will reject the message and publish will fail.

func WithMsgID

func WithMsgID(id string) PublishOpt

WithMsgID sets the message ID used for deduplication.

func WithRetryAttempts

func WithRetryAttempts(num int) PublishOpt

WithRetryAttempts sets the retry number of attempts when ErrNoResponders is encountered. Defaults to 2

func WithRetryWait

func WithRetryWait(dur time.Duration) PublishOpt

WithRetryWait sets the retry wait time when ErrNoResponders is encountered. Defaults to 250ms.

func WithStallWait

func WithStallWait(ttl time.Duration) PublishOpt

WithStallWait sets the max wait when the producer becomes stall producing messages. If a publish call is blocked for this long, ErrTooManyStalledMsgs is returned.

type Publisher

type Publisher interface {
	// Publish performs a synchronous publish to a stream and waits for ack
	// from server. It accepts subject name (which must be bound to a stream)
	// and message payload.
	Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOpt) (*PubAck, error)

	// PublishMsg performs a synchronous publish to a stream and waits for
	// ack from server. It accepts subject name (which must be bound to a
	// stream) and nats.Message.
	PublishMsg(ctx context.Context, msg *nats.Msg, opts ...PublishOpt) (*PubAck, error)

	// PublishAsync performs a publish to a stream and returns
	// [PubAckFuture] interface, not blocking while waiting for an
	// acknowledgement. It accepts subject name (which must be bound to a
	// stream) and message payload.
	//
	// PublishAsync does not guarantee that the message has been
	// received by the server. It only guarantees that the message has been
	// sent to the server and thus messages can be stored in the stream
	// out of order in case of retries.
	PublishAsync(subject string, payload []byte, opts ...PublishOpt) (PubAckFuture, error)

	// PublishMsgAsync performs a publish to a stream and returns
	// [PubAckFuture] interface, not blocking while waiting for an
	// acknowledgement. It accepts subject name (which must
	// be bound to a stream) and nats.Message.
	//
	// PublishMsgAsync does not guarantee that the message has been
	// sent to the server and thus messages can be stored in the stream
	// received by the server. It only guarantees that the message has been
	// out of order in case of retries.
	PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error)

	// PublishAsyncPending returns the number of async publishes outstanding
	// for this context. An outstanding publish is one that has been
	// sent by the publisher but has not yet received an ack.
	PublishAsyncPending() int

	// PublishAsyncComplete returns a channel that will be closed when all
	// outstanding asynchronously published messages are acknowledged by the
	// server.
	PublishAsyncComplete() <-chan struct{}

	// CleanupPublisher will cleanup the publishing side of JetStreamContext.
	//
	// This will unsubscribe from the internal reply subject if needed.
	// All pending async publishes will fail with ErrJetStreamContextClosed.
	//
	// If an error handler was provided, it will be called for each pending async
	// publish and PublishAsyncComplete will be closed.
	//
	// After completing JetStreamContext is still usable - internal subscription
	// will be recreated on next publish, but the acks from previous publishes will
	// be lost.
	CleanupPublisher()
}

Publisher provides methods for publishing messages to a stream. It is available as a part of JetStream interface. The behavior of Publisher can be customized using PublishOpt options.

type PullConsumeOpt

type PullConsumeOpt interface {
	// contains filtered or unexported methods
}

PullConsumeOpt represent additional options used in [Consume] for pull consumers.

func ConsumeErrHandler

func ConsumeErrHandler(cb ConsumeErrHandlerFunc) PullConsumeOpt

ConsumeErrHandler sets custom error handler invoked when an error was encountered while consuming messages It will be invoked for both terminal (Consumer Deleted, invalid request body) and non-terminal (e.g. missing heartbeats) errors.

type PullExpiry

type PullExpiry time.Duration

PullExpiry sets timeout on a single pull request, waiting until at least one message is available. If not provided, a default of 30 seconds will be used.

type PullHeartbeat

type PullHeartbeat time.Duration

PullHeartbeat sets the idle heartbeat duration for a pull subscription If a client does not receive a heartbeat message from a stream for more than the idle heartbeat setting, the subscription will be removed and error will be passed to the message handler. If not provided, a default PullExpiry / 2 will be used (capped at 30 seconds)

type PullMaxBytes

type PullMaxBytes int

PullMaxBytes limits the number of bytes to be buffered in the client. If not provided, the limit is not set (max messages will be used instead). This option is exclusive with PullMaxMessages.

type PullMaxMessages

type PullMaxMessages int

PullMaxMessages limits the number of messages to be buffered in the client. If not provided, a default of 500 messages will be used. This option is exclusive with PullMaxBytes.

type PullMessagesOpt

type PullMessagesOpt interface {
	// contains filtered or unexported methods
}

PullMessagesOpt represent additional options used in [Messages] for pull consumers.

func WithMessagesErrOnMissingHeartbeat

func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt

WithMessagesErrOnMissingHeartbeat sets whether a missing heartbeat error should be reported when calling [MessagesContext.Next] (Default: true).

type PullThresholdBytes added in v1.27.0

type PullThresholdBytes int

PullThresholdBytes sets the byte count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxBytes (if set).

type PullThresholdMessages

type PullThresholdMessages int

PullThresholdMessages sets the message count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxMessages.

type RawStreamMsg

type RawStreamMsg struct {
	Subject  string
	Sequence uint64
	Header   nats.Header
	Data     []byte
	Time     time.Time
}

type RePublish

type RePublish struct {
	// Source is the subject pattern to match incoming messages against.
	Source string `json:"src,omitempty"`

	// Destination is the subject pattern to republish the subject to.
	Destination string `json:"dest"`

	// HeadersOnly is a flag to indicate that only the headers should be
	// republished.
	HeadersOnly bool `json:"headers_only,omitempty"`
}

RePublish is for republishing messages once committed to a stream. The original subject is remapped from the subject pattern to the destination pattern.

type ReplayPolicy

type ReplayPolicy int

ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.

const (
	// ReplayInstantPolicy will replay messages as fast as possible.
	ReplayInstantPolicy ReplayPolicy = iota

	// ReplayOriginalPolicy will maintain the same timing as the messages were
	// received.
	ReplayOriginalPolicy
)

func (ReplayPolicy) MarshalJSON

func (p ReplayPolicy) MarshalJSON() ([]byte, error)

func (ReplayPolicy) String

func (p ReplayPolicy) String() string

func (*ReplayPolicy) UnmarshalJSON

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error

type RetentionPolicy

type RetentionPolicy int

RetentionPolicy determines how messages in a stream are retained.

const (
	// LimitsPolicy (default) means that messages are retained until any given
	// limit is reached. This could be one of MaxMsgs, MaxBytes, or MaxAge.
	LimitsPolicy RetentionPolicy = iota

	// InterestPolicy specifies that when all known observables have
	// acknowledged a message it can be removed.
	InterestPolicy

	// WorkQueuePolicy specifies that when the first worker or subscriber
	// acknowledges the message it can be removed.
	WorkQueuePolicy
)

func (RetentionPolicy) MarshalJSON

func (rp RetentionPolicy) MarshalJSON() ([]byte, error)

func (RetentionPolicy) String

func (rp RetentionPolicy) String() string

func (*RetentionPolicy) UnmarshalJSON

func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error

type SequenceInfo

type SequenceInfo struct {
	Consumer uint64     `json:"consumer_seq"`
	Stream   uint64     `json:"stream_seq"`
	Last     *time.Time `json:"last_active,omitempty"`
}

SequenceInfo has both the consumer and the stream sequence and last activity.

type SequencePair

type SequencePair struct {
	// Consumer is the consumer sequence number for message deliveries. This
	// is the total number of messages the consumer has seen (including
	// redeliveries).
	Consumer uint64 `json:"consumer_seq"`

	// Stream is the stream sequence number for a message.
	Stream uint64 `json:"stream_seq"`
}

SequencePair includes the consumer and stream sequence numbers for a message.

type StopAfter added in v1.30.0

type StopAfter int

StopAfter sets the number of messages after which the consumer is automatically stopped and no more messages are pulled from the server.

type StorageType

type StorageType int

StorageType determines how messages are stored for retention.

const (
	// FileStorage specifies on disk storage. It's the default.
	FileStorage StorageType = iota
	// MemoryStorage specifies in memory only.
	MemoryStorage
)

func (StorageType) MarshalJSON

func (st StorageType) MarshalJSON() ([]byte, error)

func (StorageType) String

func (st StorageType) String() string

func (*StorageType) UnmarshalJSON

func (st *StorageType) UnmarshalJSON(data []byte) error

type StoreCompression added in v1.30.0

type StoreCompression uint8

StoreCompression determines how messages are compressed.

const (
	// NoCompression disables compression on the stream. This is the default.
	NoCompression StoreCompression = iota

	// S2Compression enables S2 compression on the stream.
	S2Compression
)

func (StoreCompression) MarshalJSON added in v1.30.0

func (alg StoreCompression) MarshalJSON() ([]byte, error)

func (StoreCompression) String added in v1.30.0

func (alg StoreCompression) String() string

func (*StoreCompression) UnmarshalJSON added in v1.30.0

func (alg *StoreCompression) UnmarshalJSON(b []byte) error

type Stream

type Stream interface {
	ConsumerManager

	// Info returns StreamInfo from the server.
	Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error)

	// CachedInfo returns ConsumerInfo currently cached on this stream.
	// This method does not perform any network requests. The cached
	// StreamInfo is updated on every call to Info and Update.
	CachedInfo() *StreamInfo

	// Purge removes messages from a stream. It is a destructive operation.
	// Use with caution. See StreamPurgeOpt for available options.
	Purge(ctx context.Context, opts ...StreamPurgeOpt) error

	// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
	GetMsg(ctx context.Context, seq uint64, opts ...GetMsgOpt) (*RawStreamMsg, error)

	// GetLastMsgForSubject retrieves the last raw stream message stored in
	// JetStream on a given subject subject.
	GetLastMsgForSubject(ctx context.Context, subject string) (*RawStreamMsg, error)

	// DeleteMsg deletes a message from a stream.
	// On the server, the message is marked as erased, but not overwritten.
	DeleteMsg(ctx context.Context, seq uint64) error

	// SecureDeleteMsg deletes a message from a stream. The deleted message
	// is overwritten with random data. As a result, this operation is slower
	// than DeleteMsg.
	SecureDeleteMsg(ctx context.Context, seq uint64) error
}

Stream contains CRUD methods on a consumer via ConsumerManager, as well as operations on an existing stream. It allows fetching and removing messages from a stream, as well as purging a stream.

type StreamConfig

type StreamConfig struct {
	// Name is the name of the stream. It is required and must be unique
	// across the JetStream account.
	//
	// Name Names cannot contain whitespace, ., *, >, path separators
	// (forward or backwards slash), and non-printable characters.
	Name string `json:"name"`

	// Description is an optional description of the stream.
	Description string `json:"description,omitempty"`

	// Subjects is a list of subjects that the stream is listening on.
	// Wildcards are supported. Subjects cannot be set if the stream is
	// created as a mirror.
	Subjects []string `json:"subjects,omitempty"`

	// Retention defines the message retention policy for the stream.
	// Defaults to LimitsPolicy.
	Retention RetentionPolicy `json:"retention"`

	// MaxConsumers specifies the maximum number of consumers allowed for
	// the stream.
	MaxConsumers int `json:"max_consumers"`

	// MaxMsgs is the maximum number of messages the stream will store.
	// After reaching the limit, stream adheres to the discard policy.
	// If not set, server default is -1 (unlimited).
	MaxMsgs int64 `json:"max_msgs"`

	// MaxBytes is the maximum total size of messages the stream will store.
	// After reaching the limit, stream adheres to the discard policy.
	// If not set, server default is -1 (unlimited).
	MaxBytes int64 `json:"max_bytes"`

	// Discard defines the policy for handling messages when the stream
	// reaches its limits in terms of number of messages or total bytes.
	Discard DiscardPolicy `json:"discard"`

	// DiscardNewPerSubject is a flag to enable discarding new messages per
	// subject when limits are reached. Requires DiscardPolicy to be
	// DiscardNew and the MaxMsgsPerSubject to be set.
	DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`

	// MaxAge is the maximum age of messages that the stream will retain.
	MaxAge time.Duration `json:"max_age"`

	// MaxMsgsPerSubject is the maximum number of messages per subject that
	// the stream will retain.
	MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`

	// MaxMsgSize is the maximum size of any single message in the stream.
	MaxMsgSize int32 `json:"max_msg_size,omitempty"`

	// Storage specifies the type of storage backend used for the stream
	// (file or memory).
	Storage StorageType `json:"storage"`

	// Replicas is the number of stream replicas in clustered JetStream.
	// Defaults to 1, maximum is 5.
	Replicas int `json:"num_replicas"`

	// NoAck is a flag to disable acknowledging messages received by this
	// stream.
	//
	// If set to true, publish methods from the JetStream client will not
	// work as expected, since they rely on acknowledgements. Core NATS
	// publish methods should be used instead. Note that this will make
	// message delivery less reliable.
	NoAck bool `json:"no_ack,omitempty"`

	// Duplicates is the window within which to track duplicate messages.
	// If not set, server default is 2 minutes.
	Duplicates time.Duration `json:"duplicate_window,omitempty"`

	// Placement is used to declare where the stream should be placed via
	// tags and/or an explicit cluster name.
	Placement *Placement `json:"placement,omitempty"`

	// Mirror defines the configuration for mirroring another stream.
	Mirror *StreamSource `json:"mirror,omitempty"`

	// Sources is a list of other streams this stream sources messages from.
	Sources []*StreamSource `json:"sources,omitempty"`

	// Sealed streams do not allow messages to be published or deleted via limits or API,
	// sealed streams can not be unsealed via configuration update. Can only
	// be set on already created streams via the Update API.
	Sealed bool `json:"sealed,omitempty"`

	// DenyDelete restricts the ability to delete messages from a stream via
	// the API. Defaults to false.
	DenyDelete bool `json:"deny_delete,omitempty"`

	// DenyPurge restricts the ability to purge messages from a stream via
	// the API. Defaults to false.
	DenyPurge bool `json:"deny_purge,omitempty"`

	// AllowRollup allows the use of the Nats-Rollup header to replace all
	// contents of a stream, or subject in a stream, with a single new
	// message.
	AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`

	// Compression specifies the message storage compression algorithm.
	// Defaults to NoCompression.
	Compression StoreCompression `json:"compression"`

	// FirstSeq is the initial sequence number of the first message in the
	// stream.
	FirstSeq uint64 `json:"first_seq,omitempty"`

	// SubjectTransform allows applying a transformation to matching
	// messages' subjects.
	SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`

	// RePublish allows immediate republishing a message to the configured
	// subject after it's stored.
	RePublish *RePublish `json:"republish,omitempty"`

	// AllowDirect enables direct access to individual messages using direct
	// get API. Defaults to false.
	AllowDirect bool `json:"allow_direct"`

	// MirrorDirect enables direct access to individual messages from the
	// origin stream using direct get API. Defaults to false.
	MirrorDirect bool `json:"mirror_direct"`

	// ConsumerLimits defines limits of certain values that consumers can
	// set, defaults for those who don't set these settings
	ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`

	// Metadata is a set of application-defined key-value pairs for
	// associating metadata on the stream. This feature requires nats-server
	// v2.10.0 or later.
	Metadata map[string]string `json:"metadata,omitempty"`

	// Template identifies the template that manages the Stream.
	// Deprecated: This feature is no longer supported.
	Template string `json:"template_owner,omitempty"`
}

StreamConfig is the configuration of a JetStream stream.

type StreamConsumerLimits added in v1.30.0

type StreamConsumerLimits struct {
	// InactiveThreshold is a duration which instructs the server to clean
	// up the consumer if it has been inactive for the specified duration.
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

	// MaxAckPending is a maximum number of outstanding unacknowledged
	// messages for a consumer.
	MaxAckPending int `json:"max_ack_pending,omitempty"`
}

StreamConsumerLimits are the limits for a consumer on a stream. These can be overridden on a per consumer basis.

type StreamConsumerManager

type StreamConsumerManager interface {
	// CreateOrUpdateConsumer creates a consumer on a given stream with
	// given config. If consumer already exists, it will be updated (if
	// possible). Consumer interface is returned, allowing to operate on a
	// consumer (e.g. fetch messages).
	CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)

	// CreateConsumer creates a consumer on a given stream with given
	// config. If consumer already exists and the provided configuration
	// differs from its configuration, ErrConsumerExists is returned. If the
	// provided configuration is the same as the existing consumer, the
	// existing consumer is returned. Consumer interface is returned,
	// allowing to operate on a consumer (e.g. fetch messages).
	CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)

	// UpdateConsumer updates an existing consumer. If consumer does not
	// exist, ErrConsumerDoesNotExist is returned. Consumer interface is
	// returned, allowing to operate on a consumer (e.g. fetch messages).
	UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)

	// OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer
	// are managed by the library and provide a simple way to consume
	// messages from a stream. Ordered consumers are ephemeral in-memory
	// pull consumers and are resilient to deletes and restarts.
	OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error)

	// Consumer returns an interface to an existing consumer, allowing processing
	// of messages. If consumer does not exist, ErrConsumerNotFound is
	// returned.
	Consumer(ctx context.Context, stream string, consumer string) (Consumer, error)

	// DeleteConsumer removes a consumer with given name from a stream.
	// If consumer does not exist, ErrConsumerNotFound is returned.
	DeleteConsumer(ctx context.Context, stream string, consumer string) error
}

StreamConsumerManager provides CRUD API for managing consumers. It is available as a part of JetStream interface. This is an alternative to Stream interface, allowing to bypass stream lookup. CreateConsumer, UpdateConsumer, CreateOrUpdateConsumer and Consumer methods return a Consumer interface, allowing to operate on a consumer (e.g. consume messages).

type StreamInfo

type StreamInfo struct {
	// Config contains the configuration settings of the stream, set when
	// creating or updating the stream.
	Config StreamConfig `json:"config"`

	// Created is the timestamp when the stream was created.
	Created time.Time `json:"created"`

	// State provides the state of the stream at the time of request,
	// including metrics like the number of messages in the stream, total
	// bytes, etc.
	State StreamState `json:"state"`

	// Cluster contains information about the cluster to which this stream
	// belongs (if applicable).
	Cluster *ClusterInfo `json:"cluster,omitempty"`

	// Mirror contains information about another stream this one is
	// mirroring. Mirroring is used to create replicas of another stream's
	// data. This field is omitted if the stream is not mirroring another
	// stream.
	Mirror *StreamSourceInfo `json:"mirror,omitempty"`

	// Sources is a list of source streams from which this stream collects
	// data.
	Sources []*StreamSourceInfo `json:"sources,omitempty"`

	// TimeStamp indicates when the info was gathered by the server.
	TimeStamp time.Time `json:"ts"`
}

StreamInfo shows config and current state for this stream.

type StreamInfoLister

type StreamInfoLister interface {
	Info() <-chan *StreamInfo
	Err() error
}

StreamInfoLister is used to iterate over a channel of stream infos. Err method can be used to check for errors encountered during iteration. Info channel is always closed and therefore can be used in a range loop.

type StreamInfoOpt

type StreamInfoOpt func(*streamInfoRequest) error

StreamInfoOpt is a function setting options for [Stream.Info]

func WithDeletedDetails

func WithDeletedDetails(deletedDetails bool) StreamInfoOpt

WithDeletedDetails can be used to display the information about messages deleted from a stream on a stream info request

func WithSubjectFilter

func WithSubjectFilter(subject string) StreamInfoOpt

WithSubjectFilter can be used to display the information about messages stored on given subjects. NOTE: if the subject filter matches over 100k subjects, this will result in multiple requests to the server to retrieve all the information, and all of the returned subjects will be kept in memory.

type StreamListOpt added in v1.28.0

type StreamListOpt func(*streamsRequest) error

StreamListOpt is a functional option for [StreamManager.ListStreams] and [StreamManager.StreamNames] methods.

func WithStreamListSubject added in v1.28.0

func WithStreamListSubject(subject string) StreamListOpt

WithStreamListSubject can be used to filter results of ListStreams and StreamNames requests to only streams that have given subject in their configuration.

type StreamManager

type StreamManager interface {
	// CreateStream creates a new stream with given config and returns an
	// interface to operate on it. If stream with given name already exists,
	// ErrStreamNameAlreadyInUse is returned.
	CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error)

	// UpdateStream updates an existing stream. If stream does not exist,
	// ErrStreamNotFound is returned.
	UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)

	// CreateOrUpdateStream creates a stream with given config. If stream
	// already exists, it will be updated (if possible).
	CreateOrUpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)

	// Stream fetches [StreamInfo] and returns a [Stream] interface for a given stream name.
	// If stream does not exist, ErrStreamNotFound is returned.
	Stream(ctx context.Context, stream string) (Stream, error)

	// StreamNameBySubject returns a stream name stream listening on given
	// subject. If no stream is bound to given subject, ErrStreamNotFound
	// is returned.
	StreamNameBySubject(ctx context.Context, subject string) (string, error)

	// DeleteStream removes a stream with given name. If stream does not
	// exist, ErrStreamNotFound is returned.
	DeleteStream(ctx context.Context, stream string) error

	// ListStreams returns StreamInfoLister, enabling iterating over a
	// channel of stream infos.
	ListStreams(context.Context, ...StreamListOpt) StreamInfoLister

	// StreamNames returns a  StreamNameLister, enabling iterating over a
	// channel of stream names.
	StreamNames(context.Context, ...StreamListOpt) StreamNameLister
}

StreamManager provides CRUD API for managing streams. It is available as a part of JetStream interface. CreateStream, UpdateStream, CreateOrUpdateStream and Stream methods return a Stream interface, allowing to operate on a stream.

type StreamNameLister

type StreamNameLister interface {
	Name() <-chan string
	Err() error
}

StreamNameLister is used to iterate over a channel of stream names. Err method can be used to check for errors encountered during iteration. Name channel is always closed and therefore can be used in a range loop.

type StreamPurgeOpt

type StreamPurgeOpt func(*StreamPurgeRequest) error

StreamPurgeOpt is a function setting options for [Stream.Purge]

func WithPurgeKeep

func WithPurgeKeep(keep uint64) StreamPurgeOpt

WithPurgeKeep sets the number of messages to be kept in the stream after purge. Can be combined with WithPurgeSubject option, but not with WithPurgeSequence

func WithPurgeSequence

func WithPurgeSequence(sequence uint64) StreamPurgeOpt

WithPurgeSequence is used to set a specific sequence number up to which (but not including) messages will be purged from a stream Can be combined with WithPurgeSubject option, but not with WithPurgeKeep

func WithPurgeSubject

func WithPurgeSubject(subject string) StreamPurgeOpt

WithPurgeSubject sets a specific subject for which messages on a stream will be purged

type StreamPurgeRequest

type StreamPurgeRequest struct {
	// Purge up to but not including sequence.
	Sequence uint64 `json:"seq,omitempty"`
	// Subject to match against messages for the purge command.
	Subject string `json:"filter,omitempty"`
	// Number of messages to keep.
	Keep uint64 `json:"keep,omitempty"`
}

type StreamSource

type StreamSource struct {
	// Name is the name of the stream to source from.
	Name string `json:"name"`

	// OptStartSeq is the sequence number to start sourcing from.
	OptStartSeq uint64 `json:"opt_start_seq,omitempty"`

	// OptStartTime is the timestamp of messages to start sourcing from.
	OptStartTime *time.Time `json:"opt_start_time,omitempty"`

	// FilterSubject is the subject filter used to only replicate messages
	// with matching subjects.
	FilterSubject string `json:"filter_subject,omitempty"`

	// SubjectTransforms is a list of subject transforms to apply to
	// matching messages.
	//
	// Subject transforms on sources and mirrors are also used as subject
	// filters with optional transformations.
	SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`

	// External is a configuration referencing a stream source in another
	// account or JetStream domain.
	External *ExternalStream `json:"external,omitempty"`

	// Domain is used to configure a stream source in another JetStream
	// domain. This setting will set the External field with the appropriate
	// APIPrefix.
	Domain string `json:"-"`
}

StreamSource dictates how streams can source from other streams.

type StreamSourceInfo

type StreamSourceInfo struct {
	// Name is the name of the stream that is being replicated.
	Name string `json:"name"`

	// Lag informs how many messages behind the source/mirror operation is.
	// This will only show correctly if there is active communication
	// with stream/mirror.
	Lag uint64 `json:"lag"`

	// Active informs when last the mirror or sourced stream had activity.
	// Value will be -1 when there has been no activity.
	Active time.Duration `json:"active"`

	// FilterSubject is the subject filter defined for this source/mirror.
	FilterSubject string `json:"filter_subject,omitempty"`

	// SubjectTransforms is a list of subject transforms defined for this
	// source/mirror.
	SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

StreamSourceInfo shows information about an upstream stream source/mirror.

type StreamState

type StreamState struct {
	// Msgs is the number of messages stored in the stream.
	Msgs uint64 `json:"messages"`

	// Bytes is the number of bytes stored in the stream.
	Bytes uint64 `json:"bytes"`

	// FirstSeq is the sequence number of the first message in the stream.
	FirstSeq uint64 `json:"first_seq"`

	// FirstTime is the timestamp of the first message in the stream.
	FirstTime time.Time `json:"first_ts"`

	// LastSeq is the sequence number of the last message in the stream.
	LastSeq uint64 `json:"last_seq"`

	// LastTime is the timestamp of the last message in the stream.
	LastTime time.Time `json:"last_ts"`

	// Consumers is the number of consumers on the stream.
	Consumers int `json:"consumer_count"`

	// Deleted is a list of sequence numbers that have been removed from the
	// stream. This field will only be returned if the stream has been
	// fetched with the DeletedDetails option.
	Deleted []uint64 `json:"deleted"`

	// NumDeleted is the number of messages that have been removed from the
	// stream. Only deleted messages causing a gap in stream sequence numbers
	// are counted. Messages deleted at the beginning or end of the stream
	// are not counted.
	NumDeleted int `json:"num_deleted"`

	// NumSubjects is the number of unique subjects the stream has received
	// messages on.
	NumSubjects uint64 `json:"num_subjects"`

	// Subjects is a map of subjects the stream has received messages on
	// with message count per subject. This field will only be returned if
	// the stream has been fetched with the SubjectFilter option.
	Subjects map[string]uint64 `json:"subjects"`
}

StreamState is the state of a JetStream stream at the time of request.

type SubjectTransformConfig added in v1.30.0

type SubjectTransformConfig struct {
	// Source is the subject pattern to match incoming messages against.
	Source string `json:"src"`

	// Destination is the subject pattern to remap the subject to.
	Destination string `json:"dest"`
}

SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.

type Tier added in v1.32.0

type Tier struct {
	// Memory is the memory storage being used for Stream Message storage.
	Memory uint64 `json:"memory"`

	// Store is the disk storage being used for Stream Message storage.
	Store uint64 `json:"storage"`

	// ReservedMemory is the number of bytes reserved for memory usage by
	// this account on the server
	ReservedMemory uint64 `json:"reserved_memory"`

	// ReservedStore is the number of bytes reserved for disk usage by this
	// account on the server
	ReservedStore uint64 `json:"reserved_storage"`

	// Streams is the number of streams currently defined for this account.
	Streams int `json:"streams"`

	// Consumers is the number of consumers currently defined for this
	// account.
	Consumers int `json:"consumers"`

	// Limits are the JetStream limits for this account.
	Limits AccountLimits `json:"limits"`
}

Tier represents a JetStream account usage tier.

type WatchOpt added in v1.29.0

type WatchOpt interface {
	// contains filtered or unexported methods
}

func IgnoreDeletes added in v1.29.0

func IgnoreDeletes() WatchOpt

IgnoreDeletes will have the key watcher not pass any deleted keys.

func IncludeHistory added in v1.29.0

func IncludeHistory() WatchOpt

IncludeHistory instructs the key watcher to include historical values as well (up to KeyValueMaxHistory).

func MetaOnly added in v1.29.0

func MetaOnly() WatchOpt

MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value.

func ResumeFromRevision added in v1.32.0

func ResumeFromRevision(revision uint64) WatchOpt

ResumeFromRevision instructs the key watcher to resume from a specific revision number.

func UpdatesOnly added in v1.29.0

func UpdatesOnly() WatchOpt

UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL