Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultJetStreamStreamerConfig = JetStreamStreamerConfig{ AckWait: 30 * time.Second, }
DefaultJetStreamStreamerConfig are the default settings for the JetStream streamer
var MetadataIndexStream = &nats.StreamConfig{ Name: "MetadataIndexStream", Subjects: []string{ "MetadataIndex.*", }, MaxAge: 24 * time.Hour, Replicas: 5, }
MetadataIndexStream is the stream config for MetadataIndex messages.
var V2CDurableStream = &nats.StreamConfig{ Name: "V2CStream", Subjects: []string{ "v2c.*.*.*", }, MaxAge: 15 * time.Minute, Replicas: 5, }
V2CDurableStream is the stream config for Durable v2c messages.
Functions ¶
func MustConnectJetStream ¶
func MustConnectJetStream(nc *nats.Conn) nats.JetStreamContext
MustConnectJetStream creates a new JetStream connection.
func MustConnectNATS ¶
func MustConnectNATS() *nats.Conn
MustConnectNATS attempts to connect to the NATS message bus.
Types ¶
type JetStreamStreamerConfig ¶
type JetStreamStreamerConfig struct { // AckWait is the duration to wait before Ack() is considered failed and JetStream knows to resend the value. AckWait time.Duration }
JetStreamStreamerConfig contains options that can be set for a JetStream Streamer.
type Msg ¶
type Msg interface { // Data returns the serialized data stored in the message. Data() []byte // Ack acknowledges the message. Ack() error }
Msg is the interface for a message sent over the stream
type PersistentSub ¶
type PersistentSub interface { // Close the subscription, but allow future PersistentSubs to read from the sub starting after // the last acked message. Close() error }
PersistentSub is the interface to an active persistent subscription.
type Streamer ¶
type Streamer interface { // PersistentSubscribe creates a persistent subscription on a subject, calling the message // handler callback on each message that arrives on the sub. // // Here persistence means that if the subscription closes or dies and later resumes, // the Subscription will continue from the earliest message that was not acked. // // This position in the stream will be tracked according to the (subject, persistentName) pair. // * If you need a new subscription to see all of the available stream messages, you can receive them // by invoking PersistentSubscribe() on the same subject but a new persistentName. // * If you call PersistentSubscribe() with a new subject but an existing persistentName, the implementation // should treat it as a new persistent subscription and send all data available on the subscription. // // Parallel callers of PersistentSubscribe that use the same subject + persistentName pair will be added // to the same WorkQueue: messages published on that subject will be assigned to one of // the callers. If the assigned caller does not Ack() a message within an implementation's // timeout, then the message will be reassigned to another worker. PersistentSubscribe(subject, persistentName string, cb MsgHandler) (PersistentSub, error) // Publish publishes the data to the specific subject. Publish(subject string, data []byte) error // PeekLatestMessage returns the last message published on a subject. If no messages // exist for the subject method returns `nil`. // // PeekLatestMessage does not care about the state of any Sub. It strictly returns the last message sent from a Publish() // call. PeekLatestMessage(subject string) (Msg, error) }
Streamer is an interface for any streaming handler.
func NewJetStreamStreamer ¶
func NewJetStreamStreamer(nc *nats.Conn, js nats.JetStreamContext, sCfg *nats.StreamConfig) (Streamer, error)
NewJetStreamStreamer creates a new Streamer implemented using JetStream with default configuration.
func NewJetStreamStreamerWithConfig ¶
func NewJetStreamStreamerWithConfig(nc *nats.Conn, js nats.JetStreamContext, sCfg *nats.StreamConfig, cfg JetStreamStreamerConfig) (Streamer, error)
NewJetStreamStreamerWithConfig creates a new Streamer implemented using JetStream with specific configuration.