nats

package
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCreatingSubscriber = errors.New("error creating subscriber")
	ErrCreatingPublisher  = errors.New("error creating publisher")
)

NATS Errors

Functions

func DisconnectErrorCallback

func DisconnectErrorCallback(logger log.Logger) natn.ConnErrHandler

DisconnectErrorCallback is called when the connection to nats server is lost

func EncodeJSONRequest

func EncodeJSONRequest(_ context.Context, msg *natn.Msg, request interface{}) error

EncodeJSONRequest is an Encoder that serializes the request as a JSON object to the Data of the Msg. Many JSON-over-NATS services can use it as a sensible default.

func NoOpErrorEncoder

func NoOpErrorEncoder(context.Context, error, string, *natn.Conn)

func NoOpResponseHandler

func NoOpResponseHandler(context.Context, string, *natn.Conn, interface{}) error

func ReconnectCallback

func ReconnectCallback(logger log.Logger) natn.ConnHandler

ReconnectCallback is called when the connection to nats server is re-established

Types

type AfterFunc

type AfterFunc func(context.Context, *natn.Conn) context.Context

type AfterPublish

type AfterPublish func(context.Context, *natn.Msg, error)

After is a function called after every message sent to NATS

type BeforeFunc

type BeforeFunc func(context.Context, *natn.Msg) context.Context

type BeforePublish

type BeforePublish func(context.Context, *natn.Msg) error

Before is a function that is called before every message sent to NATS

type ConnectionErrHandler

type ConnectionErrHandler func(t *Transport, e error)

type Decoder

type Decoder func(context.Context, *natn.Msg) (request interface{}, err error)

Decoder decodes the message received on NATS and converts into business entity

type ErrorEncoder

type ErrorEncoder kitn.ErrorEncoder

type ErrorHandler

type ErrorHandler interface{ transport.ErrorHandler }

type PublishErrorHandler

type PublishErrorHandler func(context.Context, error) error

PublishErrorHandler is a function that is called when an error occurs

type PublishMessageEncoder

type PublishMessageEncoder func(cx context.Context, sub string, data interface{}) (*natn.Msg, error)

PublishMessageEncoder encodes the value passed to it and converts to NATS message

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

publisher publishes message on NATS

func NewPublisher

func NewPublisher(connstr string, options ...PublisherOption) (*Publisher, error)

func (*Publisher) Endpoint

func (p *Publisher) Endpoint(sub string) endpoint.Endpoint

Endpoint returns a usable endpoint that invokes the remote endpoint.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, sub string, data interface{}) error

Publish publishes the message on NATS

type PublisherOption

type PublisherOption func(*Publisher)

PublisherOption lets you modify properties for publisher

func WithAfterPublish

func WithAfterPublish(afters ...AfterPublish) PublisherOption

func WithBeforePublish

func WithBeforePublish(befores ...BeforePublish) PublisherOption

func WithCustomPublisherMaxReconnect

func WithCustomPublisherMaxReconnect(maxReconnect int) PublisherOption

func WithCustomPublisherPingInterval

func WithCustomPublisherPingInterval(pingInterval time.Duration) PublisherOption

func WithCustomPublisherTimeout

func WithCustomPublisherTimeout(timeout time.Duration) PublisherOption

func WithErrorHandler

func WithErrorHandler(handler PublishErrorHandler) PublisherOption

func WithPublishHeader

func WithPublishHeader(headers natn.Header) PublisherOption

func WithPublishMessageEncoder

func WithPublishMessageEncoder(encoder PublishMessageEncoder) PublisherOption

func WithPublisherName

func WithPublisherName(name string) PublisherOption

func WithPublisherSubjectPrefix

func WithPublisherSubjectPrefix(prefix string) PublisherOption

type ResponseHandler

type ResponseHandler func(context.Context, string, *natn.Conn, interface{}) error

ResponseHandler handles the endpoint response

type Subscriber

type Subscriber interface {
	Id() string
	Topic() string
	Group() string
	IsValid() bool
}

type SubscriberOption

type SubscriberOption func(*subscriber)

SubscriberOption provides set of options to modify a Subscriber

func WithAfterFuncsSubscriberOption

func WithAfterFuncsSubscriberOption(fns ...AfterFunc) SubscriberOption

func WithBeforeFuncsSubscriberOption

func WithBeforeFuncsSubscriberOption(fns ...BeforeFunc) SubscriberOption

func WithDecoderSubscriberOption

func WithDecoderSubscriberOption(fn Decoder) SubscriberOption

func WithEndpointMiddleware

func WithEndpointMiddleware(m endpoint.Middleware) SubscriberOption

HandlerWithEndpointMiddleware provides an ability to add a middleware of the base type

func WithEndpointSubscriberOption

func WithEndpointSubscriberOption(end endpoint.Endpoint) SubscriberOption

func WithErrorEncoderSubscriberOption

func WithErrorEncoderSubscriberOption(e ErrorEncoder) SubscriberOption

func WithErrorhandlerSubscriberOption

func WithErrorhandlerSubscriberOption(e ErrorHandler) SubscriberOption

func WithId

func WithId(id string) SubscriberOption

func WithQGroupSubscriberOption

func WithQGroupSubscriberOption(qGroup string) SubscriberOption

func WithSubjectSubscriberOption

func WithSubjectSubscriberOption(sub string) SubscriberOption

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is transport server for natn.IO connection

func NewTransport

func NewTransport(
	closeCh chan struct{},
	options ...TransportOption,
) (*Transport, error)

NewTransport returns a new NATS transport

func (*Transport) Close

func (tr *Transport) Close() (err error)

Close shuts down Transport

func (*Transport) Open

func (tr *Transport) Open() error

Open starts the Transport

func (*Transport) Subscribe

func (tr *Transport) Subscribe(
	options ...SubscriberOption,
) (Subscriber, error)

func (*Transport) Subscribers

func (tr *Transport) Subscribers() []Subscriber

func (*Transport) Unsubscribe

func (tr *Transport) Unsubscribe(id string) error

type TransportOption

type TransportOption func(*Transport)

TransportOption is optional parameters for NATS Transport

func WithConnectionErrorHandler

func WithConnectionErrorHandler(h ConnectionErrHandler) TransportOption

WithConnectionErrorHandler sets a handler for connection errors

func WithDisconnectCallback

func WithDisconnectCallback(fn func(nc *natn.Conn, err error)) TransportOption

func WithFlushTimeout

func WithFlushTimeout(t time.Duration) TransportOption

WithFlushTimeout sets a timeout that we will wait for publisher to complete flushing the content on nats server before terminating connection

func WithLogging

func WithLogging(logger log.Logger) TransportOption

WithLogging sets logging for Transport, subscribers & publishers

func WithName

func WithName(n string) TransportOption

func WithNoRandomize

func WithNoRandomize(noRandomize bool) TransportOption

func WithReconnectCallback

func WithReconnectCallback(fn func(nc *natn.Conn)) TransportOption

func WithServers

func WithServers(servers []string) TransportOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL