msgbus

package
v0.0.0-...-7359d40 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultJetStreamStreamerConfig = JetStreamStreamerConfig{
	AckWait: 30 * time.Second,
}

DefaultJetStreamStreamerConfig are the default settings for the JetStream streamer

View Source
var MetadataIndexStream = &nats.StreamConfig{
	Name: "MetadataIndexStream",
	Subjects: []string{
		"MetadataIndex.*",
	},
	MaxAge:   24 * time.Hour,
	Replicas: 5,
}

MetadataIndexStream is the stream config for MetadataIndex messages.

View Source
var V2CDurableStream = &nats.StreamConfig{
	Name: "V2CStream",
	Subjects: []string{
		"v2c.*.*.*",
	},
	MaxAge:   15 * time.Minute,
	Replicas: 5,
}

V2CDurableStream is the stream config for Durable v2c messages.

Functions

func MustConnectJetStream

func MustConnectJetStream(nc *nats.Conn) nats.JetStreamContext

MustConnectJetStream creates a new JetStream connection.

func MustConnectNATS

func MustConnectNATS() *nats.Conn

MustConnectNATS attempts to connect to the NATS message bus.

Types

type JetStreamStreamerConfig

type JetStreamStreamerConfig struct {
	// AckWait is the duration to wait before Ack() is considered failed and JetStream knows to resend the value.
	AckWait time.Duration
}

JetStreamStreamerConfig contains options that can be set for a JetStream Streamer.

type Msg

type Msg interface {
	// Data returns the serialized data stored in the message.
	Data() []byte
	// Ack acknowledges the message.
	Ack() error
}

Msg is the interface for a message sent over the stream

type MsgHandler

type MsgHandler func(msg Msg)

MsgHandler is a function that processes Msg.

type PersistentSub

type PersistentSub interface {
	// Close the subscription, but allow future PersistentSubs to read from the sub starting after
	// the last acked message.
	Close() error
}

PersistentSub is the interface to an active persistent subscription.

type Streamer

type Streamer interface {
	// PersistentSubscribe creates a persistent subscription on a subject, calling the message
	// handler callback on each message that arrives on the sub.
	//
	// Here persistence means that if the subscription closes or dies and later resumes,
	// the Subscription will continue from the earliest message that was not acked.
	//
	// This position in the stream will be tracked according to the (subject, persistentName) pair.
	// * If you need a new subscription to see all of the available stream messages, you can receive them
	//   by invoking PersistentSubscribe() on the same subject but a new persistentName.
	// * If you call PersistentSubscribe() with a new subject but an existing persistentName, the implementation
	//   should treat it as a new persistent subscription and send all data available on the subscription.
	//
	// Parallel callers of PersistentSubscribe that use the same subject + persistentName pair will be added
	// to the same WorkQueue: messages published on that subject will be assigned to one of
	// the callers. If the assigned caller does not Ack() a message within an implementation's
	// timeout, then the message will be reassigned to another worker.
	PersistentSubscribe(subject, persistentName string, cb MsgHandler) (PersistentSub, error)

	// Publish publishes the data to the specific subject.
	Publish(subject string, data []byte) error

	// PeekLatestMessage returns the last message published on a subject. If no messages
	// exist for the subject method returns `nil`.
	//
	// PeekLatestMessage does not care about the state of any Sub. It strictly returns the last message sent from a Publish()
	// call.
	PeekLatestMessage(subject string) (Msg, error)
}

Streamer is an interface for any streaming handler.

func NewJetStreamStreamer

func NewJetStreamStreamer(nc *nats.Conn, js nats.JetStreamContext, sCfg *nats.StreamConfig) (Streamer, error)

NewJetStreamStreamer creates a new Streamer implemented using JetStream with default configuration.

func NewJetStreamStreamerWithConfig

func NewJetStreamStreamerWithConfig(nc *nats.Conn, js nats.JetStreamContext, sCfg *nats.StreamConfig, cfg JetStreamStreamerConfig) (Streamer, error)

NewJetStreamStreamerWithConfig creates a new Streamer implemented using JetStream with specific configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL