Documentation ¶
Index ¶
- type AcknowledgementHandler
- type Controller
- func (c *Controller) Close()
- func (c *Controller) ConsumeIfNeeded(ctx context.Context) error
- func (c *Controller) ConsumeMessage(ctx context.Context) jetstream.MessageHandler
- func (c *Controller) HandleMessage(ctx context.Context, msg jetstream.Msg, ...)
- func (c *Controller) Publish(ctx context.Context, channel string, bm extensions.BrokerMessage) error
- func (c *Controller) StopConsumeIfNeeded()
- func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error)
- type ControllerOption
- func WithConnection(conn *nats.Conn) ControllerOption
- func WithConnectionOpts(opts ...nats.Option) ControllerOption
- func WithConsumer(name string) ControllerOption
- func WithConsumerConfig(config jetstream.ConsumerConfig) ControllerOption
- func WithLogger(logger extensions.Logger) ControllerOption
- func WithNakDelay(duration time.Duration) ControllerOption
- func WithStream(name string) ControllerOption
- func WithStreamConfig(config jetstream.StreamConfig) ControllerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AcknowledgementHandler ¶ added in v0.35.0
type AcknowledgementHandler struct {
// contains filtered or unexported fields
}
AcknowledgementHandler for nats jetstream broker.
func (AcknowledgementHandler) AckMessage ¶ added in v0.35.0
func (k AcknowledgementHandler) AckMessage()
AckMessage acknowledges the message.
func (AcknowledgementHandler) NakMessage ¶ added in v0.35.0
func (k AcknowledgementHandler) NakMessage()
NakMessage negatively acknowledges the message.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is the Controller implementation for asyncapi-codegen.
func NewController ¶
func NewController(url string, options ...ControllerOption) (*Controller, error)
NewController creates a new NATS JetStream controller.
func (*Controller) Close ¶
func (c *Controller) Close()
Close closes everything related to the broker.
func (*Controller) ConsumeIfNeeded ¶
func (c *Controller) ConsumeIfNeeded(ctx context.Context) error
ConsumeIfNeeded starts consuming messages if needed.
func (*Controller) ConsumeMessage ¶
func (c *Controller) ConsumeMessage(ctx context.Context) jetstream.MessageHandler
ConsumeMessage writes the message to the correct channel of the subject or in case there is no subscription the message will be acknowledged.
func (*Controller) HandleMessage ¶
func (c *Controller) HandleMessage(ctx context.Context, msg jetstream.Msg, sub extensions.BrokerChannelSubscription)
HandleMessage handles a message received from a stream.
func (*Controller) Publish ¶
func (c *Controller) Publish(ctx context.Context, channel string, bm extensions.BrokerMessage) error
Publish a message to the broker.
func (*Controller) StopConsumeIfNeeded ¶
func (c *Controller) StopConsumeIfNeeded()
StopConsumeIfNeeded stops consuming messages if needed (there is no other subscription).
func (*Controller) Subscribe ¶
func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error)
Subscribe to messages from the broker.
type ControllerOption ¶
type ControllerOption func(controller *Controller) error
ControllerOption is a function that can be used to configure a NATS controller.
func WithConnection ¶ added in v0.42.0
func WithConnection(conn *nats.Conn) ControllerOption
WithConnection uses the existing nats connection.
func WithConnectionOpts ¶ added in v0.36.0
func WithConnectionOpts(opts ...nats.Option) ControllerOption
WithConnectionOpts set the nats.Options to connect to nats.
func WithConsumer ¶
func WithConsumer(name string) ControllerOption
WithConsumer uses the given consumer name (the consumer has to be created before initializing).
func WithConsumerConfig ¶
func WithConsumerConfig(config jetstream.ConsumerConfig) ControllerOption
WithConsumerConfig set consumer configuration for creating or updating a consumer based on the given config.
func WithLogger ¶
func WithLogger(logger extensions.Logger) ControllerOption
WithLogger set a custom logger that will log operations on broker controller.
func WithNakDelay ¶ added in v0.35.0
func WithNakDelay(duration time.Duration) ControllerOption
WithNakDelay set the delay when redeliver messages via nak.
func WithStream ¶
func WithStream(name string) ControllerOption
WithStream uses the given stream name (the stream has to be created before initializing).
func WithStreamConfig ¶
func WithStreamConfig(config jetstream.StreamConfig) ControllerOption
WithStreamConfig add the stream configuration for creating or updating a stream based on the given configuration.