Documentation ¶
Index ¶
- Variables
- func DisconnectErrorCallback(logger log.Logger) natn.ConnErrHandler
- func EncodeJSONRequest(_ context.Context, msg *natn.Msg, request interface{}) error
- func NoOpErrorEncoder(context.Context, error, string, *natn.Conn)
- func NoOpResponseHandler(context.Context, string, *natn.Conn, interface{}) error
- func ReconnectCallback(logger log.Logger) natn.ConnHandler
- type AfterFunc
- type AfterPublish
- type BeforeFunc
- type BeforePublish
- type ConnectionErrHandler
- type Decoder
- type ErrorEncoder
- type ErrorHandler
- type PublishErrorHandler
- type PublishMessageEncoder
- type Publisher
- type PublisherOption
- func WithAfterPublish(afters ...AfterPublish) PublisherOption
- func WithBeforePublish(befores ...BeforePublish) PublisherOption
- func WithCustomPublisherMaxReconnect(maxReconnect int) PublisherOption
- func WithCustomPublisherPingInterval(pingInterval time.Duration) PublisherOption
- func WithCustomPublisherTimeout(timeout time.Duration) PublisherOption
- func WithErrorHandler(handler PublishErrorHandler) PublisherOption
- func WithPublishHeader(headers natn.Header) PublisherOption
- func WithPublishMessageEncoder(encoder PublishMessageEncoder) PublisherOption
- func WithPublisherName(name string) PublisherOption
- func WithPublisherSubjectPrefix(prefix string) PublisherOption
- type ResponseHandler
- type Subscriber
- type SubscriberOption
- func WithAfterFuncsSubscriberOption(fns ...AfterFunc) SubscriberOption
- func WithBeforeFuncsSubscriberOption(fns ...BeforeFunc) SubscriberOption
- func WithDecoderSubscriberOption(fn Decoder) SubscriberOption
- func WithEndpointMiddleware(m endpoint.Middleware) SubscriberOption
- func WithEndpointSubscriberOption(end endpoint.Endpoint) SubscriberOption
- func WithErrorEncoderSubscriberOption(e ErrorEncoder) SubscriberOption
- func WithErrorhandlerSubscriberOption(e ErrorHandler) SubscriberOption
- func WithId(id string) SubscriberOption
- func WithQGroupSubscriberOption(qGroup string) SubscriberOption
- func WithSubjectSubscriberOption(sub string) SubscriberOption
- type Transport
- type TransportOption
- func WithConnectionErrorHandler(h ConnectionErrHandler) TransportOption
- func WithDisconnectCallback(fn func(nc *natn.Conn, err error)) TransportOption
- func WithFlushTimeout(t time.Duration) TransportOption
- func WithLogging(logger log.Logger) TransportOption
- func WithName(n string) TransportOption
- func WithNoRandomize(noRandomize bool) TransportOption
- func WithReconnectCallback(fn func(nc *natn.Conn)) TransportOption
- func WithServers(servers []string) TransportOption
Constants ¶
This section is empty.
Variables ¶
var ( ErrCreatingSubscriber = errors.New("error creating subscriber") ErrCreatingPublisher = errors.New("error creating publisher") )
NATS Errors
Functions ¶
func DisconnectErrorCallback ¶
func DisconnectErrorCallback(logger log.Logger) natn.ConnErrHandler
DisconnectErrorCallback is called when the connection to nats server is lost
func EncodeJSONRequest ¶
EncodeJSONRequest is an Encoder that serializes the request as a JSON object to the Data of the Msg. Many JSON-over-NATS services can use it as a sensible default.
func NoOpResponseHandler ¶
func ReconnectCallback ¶
func ReconnectCallback(logger log.Logger) natn.ConnHandler
ReconnectCallback is called when the connection to nats server is re-established
Types ¶
type AfterPublish ¶
After is a function called after every message sent to NATS
type BeforePublish ¶
Before is a function that is called before every message sent to NATS
type ConnectionErrHandler ¶
type ErrorEncoder ¶
type ErrorEncoder kitn.ErrorEncoder
type ErrorHandler ¶
type ErrorHandler interface{ transport.ErrorHandler }
type PublishErrorHandler ¶
PublishErrorHandler is a function that is called when an error occurs
type PublishMessageEncoder ¶
type PublishMessageEncoder func(cx context.Context, sub string, data interface{}) (*natn.Msg, error)
PublishMessageEncoder encodes the value passed to it and converts to NATS message
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
publisher publishes message on NATS
func NewPublisher ¶
func NewPublisher(connstr string, options ...PublisherOption) (*Publisher, error)
type PublisherOption ¶
type PublisherOption func(*Publisher)
PublisherOption lets you modify properties for publisher
func WithAfterPublish ¶
func WithAfterPublish(afters ...AfterPublish) PublisherOption
func WithBeforePublish ¶
func WithBeforePublish(befores ...BeforePublish) PublisherOption
func WithCustomPublisherMaxReconnect ¶
func WithCustomPublisherMaxReconnect(maxReconnect int) PublisherOption
func WithCustomPublisherPingInterval ¶
func WithCustomPublisherPingInterval(pingInterval time.Duration) PublisherOption
func WithCustomPublisherTimeout ¶
func WithCustomPublisherTimeout(timeout time.Duration) PublisherOption
func WithErrorHandler ¶
func WithErrorHandler(handler PublishErrorHandler) PublisherOption
func WithPublishHeader ¶
func WithPublishHeader(headers natn.Header) PublisherOption
func WithPublishMessageEncoder ¶
func WithPublishMessageEncoder(encoder PublishMessageEncoder) PublisherOption
func WithPublisherName ¶
func WithPublisherName(name string) PublisherOption
func WithPublisherSubjectPrefix ¶
func WithPublisherSubjectPrefix(prefix string) PublisherOption
type ResponseHandler ¶
ResponseHandler handles the endpoint response
type Subscriber ¶
type SubscriberOption ¶
type SubscriberOption func(*subscriber)
SubscriberOption provides set of options to modify a Subscriber
func WithAfterFuncsSubscriberOption ¶
func WithAfterFuncsSubscriberOption(fns ...AfterFunc) SubscriberOption
func WithBeforeFuncsSubscriberOption ¶
func WithBeforeFuncsSubscriberOption(fns ...BeforeFunc) SubscriberOption
func WithDecoderSubscriberOption ¶
func WithDecoderSubscriberOption(fn Decoder) SubscriberOption
func WithEndpointMiddleware ¶
func WithEndpointMiddleware(m endpoint.Middleware) SubscriberOption
HandlerWithEndpointMiddleware provides an ability to add a middleware of the base type
func WithEndpointSubscriberOption ¶
func WithEndpointSubscriberOption(end endpoint.Endpoint) SubscriberOption
func WithErrorEncoderSubscriberOption ¶
func WithErrorEncoderSubscriberOption(e ErrorEncoder) SubscriberOption
func WithErrorhandlerSubscriberOption ¶
func WithErrorhandlerSubscriberOption(e ErrorHandler) SubscriberOption
func WithId ¶
func WithId(id string) SubscriberOption
func WithQGroupSubscriberOption ¶
func WithQGroupSubscriberOption(qGroup string) SubscriberOption
func WithSubjectSubscriberOption ¶
func WithSubjectSubscriberOption(sub string) SubscriberOption
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is transport server for natn.IO connection
func NewTransport ¶
func NewTransport( closeCh chan struct{}, options ...TransportOption, ) (*Transport, error)
NewTransport returns a new NATS transport
func (*Transport) Subscribe ¶
func (tr *Transport) Subscribe( options ...SubscriberOption, ) (Subscriber, error)
func (*Transport) Subscribers ¶
func (tr *Transport) Subscribers() []Subscriber
func (*Transport) Unsubscribe ¶
type TransportOption ¶
type TransportOption func(*Transport)
TransportOption is optional parameters for NATS Transport
func WithConnectionErrorHandler ¶
func WithConnectionErrorHandler(h ConnectionErrHandler) TransportOption
WithConnectionErrorHandler sets a handler for connection errors
func WithDisconnectCallback ¶
func WithDisconnectCallback(fn func(nc *natn.Conn, err error)) TransportOption
func WithFlushTimeout ¶
func WithFlushTimeout(t time.Duration) TransportOption
WithFlushTimeout sets a timeout that we will wait for publisher to complete flushing the content on nats server before terminating connection
func WithLogging ¶
func WithLogging(logger log.Logger) TransportOption
WithLogging sets logging for Transport, subscribers & publishers
func WithName ¶
func WithName(n string) TransportOption
func WithNoRandomize ¶
func WithNoRandomize(noRandomize bool) TransportOption
func WithReconnectCallback ¶
func WithReconnectCallback(fn func(nc *natn.Conn)) TransportOption
func WithServers ¶
func WithServers(servers []string) TransportOption