Documentation ¶
Index ¶
- func NewDefaultJetStreamClient(url string, opts ...nats.Option) *defaultJetStreamClient
- func NewInClusterJetStreamClient() *inClusterJetStreamClient
- type JetStreamClient
- type JetStreamClientOption
- type JetStreamContext
- func (jsc *JetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
- func (jsc *JetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
- func (jsc *JetStreamContext) ConsumerInfo(stream string, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
- func (jsc *JetStreamContext) CreateKeyValue(cfg *nats.KeyValueConfig) (nats.KeyValue, error)
- func (jsc *JetStreamContext) DeleteConsumer(stream string, consumer string, opts ...nats.JSOpt) error
- func (jsc *JetStreamContext) DeleteKeyValue(bucket string) error
- func (jsc *JetStreamContext) DeleteStream(name string, opts ...nats.JSOpt) error
- func (jsc *JetStreamContext) KeyValue(bucket string) (nats.KeyValue, error)
- func (jsc *JetStreamContext) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)
- func (jsc *JetStreamContext) PublishMsgAsync(m *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error)
- func (jsc *JetStreamContext) PullSubscribe(subj string, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
- func (jsc *JetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error)
- type NatsConn
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewDefaultJetStreamClient ¶
func NewDefaultJetStreamClient(url string, opts ...nats.Option) *defaultJetStreamClient
NewDefaultJetStreamClient is used to get a default JetStream client instance
func NewInClusterJetStreamClient ¶
func NewInClusterJetStreamClient() *inClusterJetStreamClient
NewInClusterJetStreamClient return an instance of inClusterJetStreamClient
Types ¶
type JetStreamClient ¶
type JetStreamClient interface {
Connect(ctx context.Context, opts ...JetStreamClientOption) (*NatsConn, error)
}
JetStreamClient is used to provide a JetStream client
type JetStreamClientOption ¶
type JetStreamClientOption func(*jsClientOptions)
func ConnectionCheckInterval ¶
func ConnectionCheckInterval(d time.Duration) JetStreamClientOption
ConnectionCheckInterval is an Option to set connection check interval.
func DisconnectErrHandler ¶
func DisconnectErrHandler(f func(*NatsConn, error)) JetStreamClientOption
DisconnectErrHandler is an option to set disconnect handler.
func NoReconnect ¶
func NoReconnect() JetStreamClientOption
NoReconnect is an Option to set no auto reconnect.
func ReconnectHandler ¶
func ReconnectHandler(f func(*NatsConn)) JetStreamClientOption
ReconnectHandler is an Option to set reconnect handler.
type JetStreamContext ¶
type JetStreamContext struct {
// contains filtered or unexported fields
}
JetStreamContext is a proxy struct to nats.JetStreamContext The existence of this proxy is to replace underlying nats.JetStreamContext with new one after reconnection.
func (*JetStreamContext) AddConsumer ¶
func (jsc *JetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
func (*JetStreamContext) AddStream ¶
func (jsc *JetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
func (*JetStreamContext) ConsumerInfo ¶
func (jsc *JetStreamContext) ConsumerInfo(stream string, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
func (*JetStreamContext) CreateKeyValue ¶
func (jsc *JetStreamContext) CreateKeyValue(cfg *nats.KeyValueConfig) (nats.KeyValue, error)
func (*JetStreamContext) DeleteConsumer ¶
func (jsc *JetStreamContext) DeleteConsumer(stream string, consumer string, opts ...nats.JSOpt) error
func (*JetStreamContext) DeleteKeyValue ¶
func (jsc *JetStreamContext) DeleteKeyValue(bucket string) error
func (*JetStreamContext) DeleteStream ¶
func (jsc *JetStreamContext) DeleteStream(name string, opts ...nats.JSOpt) error
func (*JetStreamContext) KeyValue ¶
func (jsc *JetStreamContext) KeyValue(bucket string) (nats.KeyValue, error)
func (*JetStreamContext) PublishMsg ¶
func (jsc *JetStreamContext) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)
func (*JetStreamContext) PublishMsgAsync ¶
func (jsc *JetStreamContext) PublishMsgAsync(m *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error)
func (*JetStreamContext) PullSubscribe ¶
func (jsc *JetStreamContext) PullSubscribe(subj string, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
func (*JetStreamContext) StreamInfo ¶
func (jsc *JetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error)
type NatsConn ¶
type NatsConn struct { Conn *nats.Conn // contains filtered or unexported fields }
NatsConn is a wrapper of nats.Conn, which implements our own magic for auto reconnection.
func NewNatsConn ¶
func NewNatsConn(conn *nats.Conn) *NatsConn
NewNatsConn returns a NatsConn instance
func (*NatsConn) Close ¶
func (nc *NatsConn) Close()
Close function closes the underlying Nats connection.
func (*NatsConn) IsConnected ¶
IsConnected function implements the magic to check if the connection is OK. It utilizes the dedicated JetStreamContext to call AccountInfo() function, and check if it works for determination. To reduce occasionality, it checks 3 times if there's a failure.
func (*NatsConn) JetStream ¶
func (nc *NatsConn) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)
JetStream function invokes same function of underlying Nats connection for returning, meanwhile store the JetStreamContext for restoration after reconnection.