Documentation
¶
Index ¶
- Constants
- Variables
- func Version() string
- type EncodedMiddleware
- type ErrorConfig
- type Handler
- type HandlerError
- type NatsCtxHandler
- type NatsMiddlewareFunc
- type NatsMsg
- func (n *NatsMsg) Context() context.Context
- func (n *NatsMsg) Respond(data []byte) error
- func (n *NatsMsg) RespondWithHeaders(data []byte, headers map[string]string) error
- func (n *NatsMsg) RespondWithOriginalHeaders(data []byte, headers ...string) error
- func (n *NatsMsg) WithContext(ctx context.Context) *NatsMsg
- type NatsRouter
- func (n *NatsRouter) ChanQueueSubscribe(subject, queue string, ch chan *NatsMsg) (*nats.Subscription, error)
- func (n *NatsRouter) ChanSubscribe(subject string, ch chan *NatsMsg) (*nats.Subscription, error)
- func (n *NatsRouter) Close()
- func (n *NatsRouter) Conn() *nats.Conn
- func (n *NatsRouter) Drain()
- func (n *NatsRouter) Publish(subject string, data []byte) error
- func (n *NatsRouter) Queue(queue string) *Queue
- func (n *NatsRouter) QueueSubscribe(subject, queue string, handler NatsCtxHandler) (*nats.Subscription, error)
- func (n *NatsRouter) Subject(subjects ...string) *Subject
- func (n *NatsRouter) Subscribe(subject string, handler NatsCtxHandler) (*nats.Subscription, error)
- func (n *NatsRouter) Use(fns ...NatsMiddlewareFunc) *NatsRouter
- func (n *NatsRouter) Wiretap() *Subject
- func (n *NatsRouter) WithMiddleware(fns ...NatsMiddlewareFunc) *NatsRouter
- type Queue
- func (q *Queue) ChanSubscribe(subject string, ch chan *NatsMsg) (*nats.Subscription, error)
- func (q *Queue) Subject(subjects ...string) *Subject
- func (q *Queue) Subscribe(subject string, handler NatsCtxHandler) (*nats.Subscription, error)
- func (q *Queue) Use(fns ...NatsMiddlewareFunc) *Queue
- func (q *Queue) WithMiddleware(fns ...NatsMiddlewareFunc) *Queue
- type RouterOption
- type RouterOptions
- type Subject
- func (s *Subject) All() *Subject
- func (s *Subject) Any() *Subject
- func (s *Subject) ChanSubscribe(ch chan *NatsMsg) (*nats.Subscription, error)
- func (s *Subject) Queue(queue string) *Subject
- func (s *Subject) Subject(subjects ...string) *Subject
- func (s *Subject) Subscribe(handler NatsCtxHandler) (*nats.Subscription, error)
Constants ¶
const ALL_SUBJECT = ">"
ALL_SUBJECT is a wildcard subject that can match one or more tokens in a subject string.
const ANY_SUBJECT = "*"
ANY_SUBJECT is a wildcard subject that can match any single token in a subject string.
Variables ¶
var ErrNonLastAllSubject = errors.New("'all' subject must be last in subject chain")
This error is returned when an "all" subject is not the last one in a subject chain.
var ErrUnsupportedEncoding = errors.New("unsupported encoding")
Functions ¶
Types ¶
type EncodedMiddleware ¶ added in v0.2.0
type EncodedMiddleware struct {
// contains filtered or unexported fields
}
Simple middleware that simulated EncodedConn-type subscriptions
func NewEncodedMiddleware ¶ added in v0.2.0
func NewEncodedMiddleware(cb Handler, encType string) (*EncodedMiddleware, error)
Create a new EncodedConn middleware with given encoder type
func (*EncodedMiddleware) EncodedMiddleware ¶ added in v0.2.0
func (e *EncodedMiddleware) EncodedMiddleware(NatsCtxHandler) NatsCtxHandler
Actual middleware function
type ErrorConfig ¶
Error configuration `Tag` is the header key and `Format` is the error encoding Defaults are "error" and "json"
type Handler ¶ added in v0.2.0
type Handler interface{}
handler := func(m *NatsMsg) handler := func(ctx context.Context, m *Msg) handler := func(ctx context.Context, p *person) handler := func(ctx context.Context, subject string, o *obj) handler := func(ctx context.Context, subject, reply string, o *obj)
type HandlerError ¶
func (*HandlerError) Error ¶
func (e *HandlerError) Error() string
type NatsCtxHandler ¶
Handler function that adds a `context.Context` to a `*nats.Msg`
type NatsMiddlewareFunc ¶
type NatsMiddlewareFunc func(NatsCtxHandler) NatsCtxHandler
Middleware function that takes a `NatsCtxHandler` and returns a new `NatsCtxHandler`
type NatsMsg ¶
type NatsMsg struct { *nats.Msg // contains filtered or unexported fields }
Extend nats.Msg struct to include context
func (*NatsMsg) RespondWithHeaders ¶
Send a response with given headers
func (*NatsMsg) RespondWithOriginalHeaders ¶
Send a response and copy given original headers, or all if nothing defined
type NatsRouter ¶
type NatsRouter struct {
// contains filtered or unexported fields
}
NatsRouter is a middleware-supported NATS router that provides a fluent API for subscribing to subjects and chaining middleware functions.
func Connect ¶ added in v0.1.0
func Connect(url string, options ...RouterOption) (*NatsRouter, error)
func NewRouter
deprecated
func NewRouter(nc *nats.Conn, options ...RouterOption) *NatsRouter
Create a new NatsRouter with a *nats.Conn and an optional list of RouterOptions functions. It sets the default RouterOptions to use a default ErrorConfig, and then iterates through each option function, calling it with the RouterOptions struct pointer to set any additional options.
Deprecated: Use Connect instead. This does not support properly draining publications and subscriptions.
func NewRouterWithAddress
deprecated
func NewRouterWithAddress(addr string, options ...RouterOption) (*NatsRouter, error)
Creates a new NatsRouter with a string address and an optional list of RouterOptions functions. It connects to the NATS server using the provided address, and then calls NewRouter to create a new NatsRouter with the resulting *nats.Conn and optional RouterOptions. If there was an error connecting to the server, it returns nil and the error.
Deprecated: Use Connect instead. This does not support properly draining publications and subscriptions.
func (*NatsRouter) ChanQueueSubscribe ¶ added in v0.0.5
func (n *NatsRouter) ChanQueueSubscribe(subject, queue string, ch chan *NatsMsg) (*nats.Subscription, error)
Same as QueueSubscribe, except uses channels. Note that error handling is available only for middleware, since the message is processed first by middleware and then inserted into the *NatsMsg channel.
func (*NatsRouter) ChanSubscribe ¶ added in v0.0.5
func (n *NatsRouter) ChanSubscribe(subject string, ch chan *NatsMsg) (*nats.Subscription, error)
Same as Subscribe, except uses channels. Note that error handling is available only for middleware, since the message is processed first by middleware and then inserted into the *NatsMsg channel.
func (*NatsRouter) Conn ¶
func (n *NatsRouter) Conn() *nats.Conn
Get current underlying NATS connection
func (*NatsRouter) Drain ¶ added in v0.1.0
func (n *NatsRouter) Drain()
Drain pubs/subs and close connection to NATS server
func (*NatsRouter) Publish ¶
func (n *NatsRouter) Publish(subject string, data []byte) error
Publish is a passthrough function for the `nats` Publish function
func (*NatsRouter) Queue ¶
func (n *NatsRouter) Queue(queue string) *Queue
Returns a new `Queue` object with the given queue name
func (*NatsRouter) QueueSubscribe ¶
func (n *NatsRouter) QueueSubscribe(subject, queue string, handler NatsCtxHandler) (*nats.Subscription, error)
QueueSubscribe registers a handler function for the specified subject and queue group and returns a *nats.Subscription. The handler function is wrapped with any registered middleware functions in reverse order.
func (*NatsRouter) Subject ¶
func (n *NatsRouter) Subject(subjects ...string) *Subject
Returns a new `Subject` object with the given subject name(s)
func (*NatsRouter) Subscribe ¶
func (n *NatsRouter) Subscribe(subject string, handler NatsCtxHandler) (*nats.Subscription, error)
Subscribe registers a handler function for the specified subject and returns a *nats.Subscription. The handler function is wrapped with any registered middleware functions in reverse order.
func (*NatsRouter) Use ¶
func (n *NatsRouter) Use(fns ...NatsMiddlewareFunc) *NatsRouter
Alias for `WithMiddleware`
func (*NatsRouter) Wiretap ¶
func (n *NatsRouter) Wiretap() *Subject
Returns a new `Subject` object with the wildcard ALL_SUBJECT. This is also knows as a "wiretap" mode, listening on all requests.
func (*NatsRouter) WithMiddleware ¶
func (n *NatsRouter) WithMiddleware(fns ...NatsMiddlewareFunc) *NatsRouter
Returns a new `NatsRouter` with additional middleware functions
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue group that can be subscribed to subjects
func (*Queue) ChanSubscribe ¶ added in v0.0.5
Same as Subscribe, with channel support
func (*Queue) Subscribe ¶
func (q *Queue) Subscribe(subject string, handler NatsCtxHandler) (*nats.Subscription, error)
Subscribe to a subject as a part of this queue group with the specified handler function
func (*Queue) Use ¶
func (q *Queue) Use(fns ...NatsMiddlewareFunc) *Queue
Alias for `WithMiddleware`
func (*Queue) WithMiddleware ¶
func (q *Queue) WithMiddleware(fns ...NatsMiddlewareFunc) *Queue
Returns a new `Queue` object with additional middleware functions
type RouterOption ¶
type RouterOption func(*RouterOptions)
Defines a function type that will be used to define options for the router.
func WithErrorConfig ¶
func WithErrorConfig(ec *ErrorConfig) RouterOption
Define error config in the router options.
func WithErrorConfigString ¶
func WithErrorConfigString(tag, format string) RouterOption
Define error config as strings in the router options.
func WithNatsOption ¶ added in v0.3.1
func WithNatsOption(options ...nats.Option) RouterOption
Apply one or more nats.Option to the config before connecting
func WithNatsOptions ¶ added in v0.1.0
func WithNatsOptions(nopts nats.Options) RouterOption
Set nats.Options for the connection before connecting
func WithRequestIdTag ¶
func WithRequestIdTag(tag string) RouterOption
Define new request id header tag
type RouterOptions ¶
type RouterOptions struct { ErrorConfig *ErrorConfig RequestIdTag string NatsOptions nats.Options }
Defines a struct for the router options, which currently contains error config, default request id tag (for error reporting) and optional list of NATS connection options.
func GetDefaultRouterOptions ¶ added in v0.1.0
func GetDefaultRouterOptions() RouterOptions
Get default RouterOptions
func (RouterOptions) Connect ¶ added in v0.1.0
func (r RouterOptions) Connect() (*NatsRouter, error)
Connect will attempt to connect to a NATS server with multiple options.
type Subject ¶
type Subject struct {
// contains filtered or unexported fields
}
This type defines a subject in NATS messaging.
func (*Subject) ChanSubscribe ¶ added in v0.0.5
Same as Subscribe, with channel support
func (*Subject) Queue ¶
This method returns a new subject with a queue group that is used to load balance messages across multiple subscribers.
func (*Subject) Subject ¶
This method returns a new subject that includes the specified subject strings. It appends the new subject strings to the existing subjects slice.
func (*Subject) Subscribe ¶
func (s *Subject) Subscribe(handler NatsCtxHandler) (*nats.Subscription, error)
This function subscribes a NATS context handler to a subject or a queue.