jetstream

package
v0.6.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package jetstream provides interface and two implementations to connect Nats JetStream.

Function NewDefaultJetStreamClient(url string, opts ...nats.Option) returns a client with default implementation, which relies on the input url and other nats connection options.

Function NewInClusterJetStreamClient() assumes the invoker is in a Kubernetes cluster, and there are several environment variables are available, which are used to connect to the Nats JetStream server. Those environment variables include:

NUMAFLOW_ISBSVC_JETSTREAM_URL, NUMAFLOW_ISBSVC_JETSTREAM_USER, NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD, NUMAFLOW_ISBSVC_JETSTREAM_TLS_ENABLED (optional)

When using InClusterJetStreamClient, it has ability to auto reconnect if corresponding parameter is enabled in function Connect().

For example:

client.Connect(ctx, AutoReconnect())

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewDefaultJetStreamClient

func NewDefaultJetStreamClient(url string, opts ...nats.Option) *defaultJetStreamClient

NewDefaultJetStreamClient is used to get a default JetStream client instance

func NewInClusterJetStreamClient

func NewInClusterJetStreamClient() *inClusterJetStreamClient

NewInClusterJetStreamClient return an instance of inClusterJetStreamClient

Types

type JetStreamClient

type JetStreamClient interface {
	Connect(ctx context.Context, opts ...JetStreamClientOption) (*NatsConn, error)
}

JetStreamClient is used to provide a jetstream client

type JetStreamClientOption

type JetStreamClientOption func(*jsClientOptions)

func ConnectionCheckInterval

func ConnectionCheckInterval(d time.Duration) JetStreamClientOption

ConnectionCheckInterval is an Option to set connection check interval.

func DisconnectErrHandler

func DisconnectErrHandler(f func(*NatsConn, error)) JetStreamClientOption

DisconnectErrHandler is an option to set disconnect handler.

func NoReconnect

func NoReconnect() JetStreamClientOption

NoReconnect is an Option to set no auto reconnect.

func ReconnectHandler

func ReconnectHandler(f func(*NatsConn)) JetStreamClientOption

ReconnectHandler is an Option to set reconnect handler.

type JetStreamContext

type JetStreamContext struct {
	// contains filtered or unexported fields
}

JetStreamContext is a proxy struct to nats.JetStreamContext The existence of this proxy is to replace underlying nats.JetStreamContext with new one after reconnection.

func (*JetStreamContext) AddConsumer

func (jsc *JetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)

func (*JetStreamContext) AddStream

func (jsc *JetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)

func (*JetStreamContext) ConsumerInfo

func (jsc *JetStreamContext) ConsumerInfo(stream string, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)

func (*JetStreamContext) CreateKeyValue

func (jsc *JetStreamContext) CreateKeyValue(cfg *nats.KeyValueConfig) (nats.KeyValue, error)

func (*JetStreamContext) DeleteConsumer

func (jsc *JetStreamContext) DeleteConsumer(stream string, consumer string, opts ...nats.JSOpt) error

func (*JetStreamContext) DeleteKeyValue

func (jsc *JetStreamContext) DeleteKeyValue(bucket string) error

func (*JetStreamContext) DeleteStream

func (jsc *JetStreamContext) DeleteStream(name string, opts ...nats.JSOpt) error

func (*JetStreamContext) KeyValue

func (jsc *JetStreamContext) KeyValue(bucket string) (nats.KeyValue, error)

func (*JetStreamContext) PublishMsg added in v0.6.4

func (jsc *JetStreamContext) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)

func (*JetStreamContext) PublishMsgAsync

func (jsc *JetStreamContext) PublishMsgAsync(m *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error)

func (*JetStreamContext) PullSubscribe

func (jsc *JetStreamContext) PullSubscribe(subj string, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)

func (*JetStreamContext) StreamInfo

func (jsc *JetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error)

type NatsConn

type NatsConn struct {
	Conn *nats.Conn
	// contains filtered or unexported fields
}

NatsConn is a wrapper of nats.Conn, which implements our own magic for auto reconnection.

func NewNatsConn

func NewNatsConn(conn *nats.Conn) *NatsConn

NewNatsConn returns a NatsConn instance

func (*NatsConn) Close

func (nc *NatsConn) Close()

Close function closes the underlying Nats connection.

func (*NatsConn) IsClosed

func (nc *NatsConn) IsClosed() bool

IsClosed is a simple proxy invocation.

func (*NatsConn) IsConnected

func (nc *NatsConn) IsConnected() bool

IsConnected function implements the magic to check if the connection is OK. It utilize the dedicated JetStreamContext to call AccountInfo() function, and check if it works for determination. To reduce occasionality, it checks 3 times if there's a failure.

func (*NatsConn) JetStream

func (nc *NatsConn) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)

JetStream function invokes same function of underlying Nats connection for returning, meanwhile store the JetStreamContext for restoration after reconnection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL