Documentation ¶
Overview ¶
Package events provides methods to initializ§e and interface with an events broker.
Package events provides types and methods to interact with a messaging stream broker.
Index ¶
- Variables
- func AsNatsConnection(n *NatsJetstream) *nats.Conn
- func AsNatsJetStreamContext(n *NatsJetstream) nats.JetStreamContext
- func AsNatsMsg(m Message) (*nats.Msg, error)
- func MustNatsMsg(m Message) *nats.Msg
- type EventType
- type Message
- type MockMessage
- func (_m *MockMessage) Ack() error
- func (_m *MockMessage) Data() []byte
- func (_m *MockMessage) EXPECT() *MockMessage_Expecter
- func (_m *MockMessage) ExtractOtelTraceContext(ctx context.Context) context.Context
- func (_m *MockMessage) InProgress() error
- func (_m *MockMessage) Nak() error
- func (_m *MockMessage) Subject() string
- func (_m *MockMessage) Term() error
- type MockMessage_Ack_Call
- type MockMessage_Data_Call
- type MockMessage_Expecter
- func (_e *MockMessage_Expecter) Ack() *MockMessage_Ack_Call
- func (_e *MockMessage_Expecter) Data() *MockMessage_Data_Call
- func (_e *MockMessage_Expecter) ExtractOtelTraceContext(ctx interface{}) *MockMessage_ExtractOtelTraceContext_Call
- func (_e *MockMessage_Expecter) InProgress() *MockMessage_InProgress_Call
- func (_e *MockMessage_Expecter) Nak() *MockMessage_Nak_Call
- func (_e *MockMessage_Expecter) Subject() *MockMessage_Subject_Call
- func (_e *MockMessage_Expecter) Term() *MockMessage_Term_Call
- type MockMessage_ExtractOtelTraceContext_Call
- func (_c *MockMessage_ExtractOtelTraceContext_Call) Return(_a0 context.Context) *MockMessage_ExtractOtelTraceContext_Call
- func (_c *MockMessage_ExtractOtelTraceContext_Call) Run(run func(ctx context.Context)) *MockMessage_ExtractOtelTraceContext_Call
- func (_c *MockMessage_ExtractOtelTraceContext_Call) RunAndReturn(run func(context.Context) context.Context) *MockMessage_ExtractOtelTraceContext_Call
- type MockMessage_InProgress_Call
- type MockMessage_Nak_Call
- type MockMessage_Subject_Call
- type MockMessage_Term_Call
- type MockStream
- func (_m *MockStream) Close() error
- func (_m *MockStream) EXPECT() *MockStream_Expecter
- func (_m *MockStream) Open() error
- func (_m *MockStream) Publish(ctx context.Context, subject string, msg []byte) error
- func (_m *MockStream) PublishOverwrite(ctx context.Context, subject string, msg []byte) error
- func (_m *MockStream) PullMsg(ctx context.Context, batch int) ([]Message, error)
- func (_m *MockStream) Subscribe(ctx context.Context) (MsgCh, error)
- type MockStream_Close_Call
- type MockStream_Expecter
- func (_e *MockStream_Expecter) Close() *MockStream_Close_Call
- func (_e *MockStream_Expecter) Open() *MockStream_Open_Call
- func (_e *MockStream_Expecter) Publish(ctx interface{}, subject interface{}, msg interface{}) *MockStream_Publish_Call
- func (_e *MockStream_Expecter) PublishOverwrite(ctx interface{}, subject interface{}, msg interface{}) *MockStream_PublishOverwrite_Call
- func (_e *MockStream_Expecter) PullMsg(ctx interface{}, batch interface{}) *MockStream_PullMsg_Call
- func (_e *MockStream_Expecter) Subscribe(ctx interface{}) *MockStream_Subscribe_Call
- type MockStream_Open_Call
- type MockStream_PublishOverwrite_Call
- func (_c *MockStream_PublishOverwrite_Call) Return(_a0 error) *MockStream_PublishOverwrite_Call
- func (_c *MockStream_PublishOverwrite_Call) Run(run func(ctx context.Context, subject string, msg []byte)) *MockStream_PublishOverwrite_Call
- func (_c *MockStream_PublishOverwrite_Call) RunAndReturn(run func(context.Context, string, []byte) error) *MockStream_PublishOverwrite_Call
- type MockStream_Publish_Call
- func (_c *MockStream_Publish_Call) Return(_a0 error) *MockStream_Publish_Call
- func (_c *MockStream_Publish_Call) Run(run func(ctx context.Context, subject string, msg []byte)) *MockStream_Publish_Call
- func (_c *MockStream_Publish_Call) RunAndReturn(run func(context.Context, string, []byte) error) *MockStream_Publish_Call
- type MockStream_PullMsg_Call
- func (_c *MockStream_PullMsg_Call) Return(_a0 []Message, _a1 error) *MockStream_PullMsg_Call
- func (_c *MockStream_PullMsg_Call) Run(run func(ctx context.Context, batch int)) *MockStream_PullMsg_Call
- func (_c *MockStream_PullMsg_Call) RunAndReturn(run func(context.Context, int) ([]Message, error)) *MockStream_PullMsg_Call
- type MockStream_Subscribe_Call
- func (_c *MockStream_Subscribe_Call) Return(_a0 MsgCh, _a1 error) *MockStream_Subscribe_Call
- func (_c *MockStream_Subscribe_Call) Run(run func(ctx context.Context)) *MockStream_Subscribe_Call
- func (_c *MockStream_Subscribe_Call) RunAndReturn(run func(context.Context) (MsgCh, error)) *MockStream_Subscribe_Call
- type MsgCh
- type NatsConsumerOptions
- type NatsJetstream
- func (n *NatsJetstream) Close() error
- func (n *NatsJetstream) Open() error
- func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error
- func (n *NatsJetstream) PublishOverwrite(ctx context.Context, subjectSuffix string, data []byte) error
- func (n *NatsJetstream) PullMsg(_ context.Context, batch int) ([]Message, error)
- func (n *NatsJetstream) Subscribe(ctx context.Context) (MsgCh, error)
- type NatsOptions
- type NatsStreamOptions
- type ResourceType
- type Stream
- type StreamParameters
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNatsConfig is returned when the conf ErrNatsConfig = errors.New("error in NATs Jetstream configuration") // ErrNatsConn is returned when an error occurs in connecting to NATS. ErrNatsConn = errors.New("error opening nats connection") // ErrNatsJetstream is returned when an error occurs in setting up the NATS Jetstream context. ErrNatsJetstream = errors.New("error in NATS Jetstream") // ErrNatsJetstreamAddStream os returned when an attempt to add a NATS Jetstream fails. ErrNatsJetstreamAddStream = errors.New("error adding stream to NATS Jetstream") // ErrNatsJetstreamAddConsumer is returned when theres an error adding a consumer to the NATS Jetstream. ErrNatsJetstreamAddConsumer = errors.New("error adding consumer on NATS Jetstream") // ErrNatsJetstreamUpdateConsumer is returned when theres an error updating a consumer configuration on the NATS Jetstream. ErrNatsJetstreamUpdateConsumer = errors.New("error updating consumer configuration on NATS Jetstream") // ErrNatsMsgPull is returned when theres and error pulling a message from a NATS Jetstream. ErrNatsMsgPull = errors.New("error fetching message from NATS Jetstream") // ErrSubscription is returned when an error in the consumer subscription occurs. ErrSubscription = errors.New("error subscribing to stream") )
Functions ¶
func AsNatsConnection ¶
func AsNatsConnection(n *NatsJetstream) *nats.Conn
Add some conversions for functions/APIs that expect NATS primitive types. This allows consumers of NatsJetsream to convert easily to the types they need, without exporting the members or coercing and direct clients/holders of NatsJetstream to do this conversion. AsNatsConnection exposes the otherwise private NATS connection pointer
func AsNatsJetStreamContext ¶
func AsNatsJetStreamContext(n *NatsJetstream) nats.JetStreamContext
AsNatsJetstreamContext exposes the otherwise private NATS JetStreamContext
func MustNatsMsg ¶
func MustNatsMsg(m Message) *nats.Msg
MustNatsMsg will panic if the type assertion fails
Types ¶
type EventType ¶
type EventType string
EventType is a type identifying the Event kind that has occurred on an object.
type Message ¶
type Message interface { // Ack the message as processed on the stream. Ack() error // Nak the message as not processed on the stream. Nak() error // Term signals to the broker that the message processing has failed and the message // must not be redelivered. Term() error // InProgress resets the redelivery timer for the message on the stream // to indicate the message is being worked on. InProgress() error // Subject returns the message subject. Subject() string // Data returns the data contained in the message. Data() []byte // ExtractOtelTraceContext returns a context populated with the parent trace if any. ExtractOtelTraceContext(ctx context.Context) context.Context }
Message interface defines the methods available on the messages received on the stream.
These methods are to be implemented by the stream broker for its messages.
type MockMessage ¶
MockMessage is an autogenerated mock type for the Message type
func NewMockMessage ¶
func NewMockMessage(t interface { mock.TestingT Cleanup(func()) }) *MockMessage
NewMockMessage creates a new instance of MockMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockMessage) Ack ¶
func (_m *MockMessage) Ack() error
Ack provides a mock function with given fields:
func (*MockMessage) Data ¶
func (_m *MockMessage) Data() []byte
Data provides a mock function with given fields:
func (*MockMessage) EXPECT ¶
func (_m *MockMessage) EXPECT() *MockMessage_Expecter
func (*MockMessage) ExtractOtelTraceContext ¶
func (_m *MockMessage) ExtractOtelTraceContext(ctx context.Context) context.Context
ExtractOtelTraceContext provides a mock function with given fields: ctx
func (*MockMessage) InProgress ¶
func (_m *MockMessage) InProgress() error
InProgress provides a mock function with given fields:
func (*MockMessage) Nak ¶
func (_m *MockMessage) Nak() error
Nak provides a mock function with given fields:
func (*MockMessage) Subject ¶
func (_m *MockMessage) Subject() string
Subject provides a mock function with given fields:
func (*MockMessage) Term ¶
func (_m *MockMessage) Term() error
Term provides a mock function with given fields:
type MockMessage_Ack_Call ¶
MockMessage_Ack_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ack'
func (*MockMessage_Ack_Call) Return ¶
func (_c *MockMessage_Ack_Call) Return(_a0 error) *MockMessage_Ack_Call
func (*MockMessage_Ack_Call) Run ¶
func (_c *MockMessage_Ack_Call) Run(run func()) *MockMessage_Ack_Call
func (*MockMessage_Ack_Call) RunAndReturn ¶
func (_c *MockMessage_Ack_Call) RunAndReturn(run func() error) *MockMessage_Ack_Call
type MockMessage_Data_Call ¶
MockMessage_Data_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Data'
func (*MockMessage_Data_Call) Return ¶
func (_c *MockMessage_Data_Call) Return(_a0 []byte) *MockMessage_Data_Call
func (*MockMessage_Data_Call) Run ¶
func (_c *MockMessage_Data_Call) Run(run func()) *MockMessage_Data_Call
func (*MockMessage_Data_Call) RunAndReturn ¶
func (_c *MockMessage_Data_Call) RunAndReturn(run func() []byte) *MockMessage_Data_Call
type MockMessage_Expecter ¶
type MockMessage_Expecter struct {
// contains filtered or unexported fields
}
func (*MockMessage_Expecter) Ack ¶
func (_e *MockMessage_Expecter) Ack() *MockMessage_Ack_Call
Ack is a helper method to define mock.On call
func (*MockMessage_Expecter) Data ¶
func (_e *MockMessage_Expecter) Data() *MockMessage_Data_Call
Data is a helper method to define mock.On call
func (*MockMessage_Expecter) ExtractOtelTraceContext ¶
func (_e *MockMessage_Expecter) ExtractOtelTraceContext(ctx interface{}) *MockMessage_ExtractOtelTraceContext_Call
ExtractOtelTraceContext is a helper method to define mock.On call
- ctx context.Context
func (*MockMessage_Expecter) InProgress ¶
func (_e *MockMessage_Expecter) InProgress() *MockMessage_InProgress_Call
InProgress is a helper method to define mock.On call
func (*MockMessage_Expecter) Nak ¶
func (_e *MockMessage_Expecter) Nak() *MockMessage_Nak_Call
Nak is a helper method to define mock.On call
func (*MockMessage_Expecter) Subject ¶
func (_e *MockMessage_Expecter) Subject() *MockMessage_Subject_Call
Subject is a helper method to define mock.On call
func (*MockMessage_Expecter) Term ¶
func (_e *MockMessage_Expecter) Term() *MockMessage_Term_Call
Term is a helper method to define mock.On call
type MockMessage_ExtractOtelTraceContext_Call ¶
MockMessage_ExtractOtelTraceContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExtractOtelTraceContext'
func (*MockMessage_ExtractOtelTraceContext_Call) Return ¶
func (_c *MockMessage_ExtractOtelTraceContext_Call) Return(_a0 context.Context) *MockMessage_ExtractOtelTraceContext_Call
func (*MockMessage_ExtractOtelTraceContext_Call) Run ¶
func (_c *MockMessage_ExtractOtelTraceContext_Call) Run(run func(ctx context.Context)) *MockMessage_ExtractOtelTraceContext_Call
func (*MockMessage_ExtractOtelTraceContext_Call) RunAndReturn ¶
func (_c *MockMessage_ExtractOtelTraceContext_Call) RunAndReturn(run func(context.Context) context.Context) *MockMessage_ExtractOtelTraceContext_Call
type MockMessage_InProgress_Call ¶
MockMessage_InProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InProgress'
func (*MockMessage_InProgress_Call) Return ¶
func (_c *MockMessage_InProgress_Call) Return(_a0 error) *MockMessage_InProgress_Call
func (*MockMessage_InProgress_Call) Run ¶
func (_c *MockMessage_InProgress_Call) Run(run func()) *MockMessage_InProgress_Call
func (*MockMessage_InProgress_Call) RunAndReturn ¶
func (_c *MockMessage_InProgress_Call) RunAndReturn(run func() error) *MockMessage_InProgress_Call
type MockMessage_Nak_Call ¶
MockMessage_Nak_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Nak'
func (*MockMessage_Nak_Call) Return ¶
func (_c *MockMessage_Nak_Call) Return(_a0 error) *MockMessage_Nak_Call
func (*MockMessage_Nak_Call) Run ¶
func (_c *MockMessage_Nak_Call) Run(run func()) *MockMessage_Nak_Call
func (*MockMessage_Nak_Call) RunAndReturn ¶
func (_c *MockMessage_Nak_Call) RunAndReturn(run func() error) *MockMessage_Nak_Call
type MockMessage_Subject_Call ¶
MockMessage_Subject_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subject'
func (*MockMessage_Subject_Call) Return ¶
func (_c *MockMessage_Subject_Call) Return(_a0 string) *MockMessage_Subject_Call
func (*MockMessage_Subject_Call) Run ¶
func (_c *MockMessage_Subject_Call) Run(run func()) *MockMessage_Subject_Call
func (*MockMessage_Subject_Call) RunAndReturn ¶
func (_c *MockMessage_Subject_Call) RunAndReturn(run func() string) *MockMessage_Subject_Call
type MockMessage_Term_Call ¶
MockMessage_Term_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Term'
func (*MockMessage_Term_Call) Return ¶
func (_c *MockMessage_Term_Call) Return(_a0 error) *MockMessage_Term_Call
func (*MockMessage_Term_Call) Run ¶
func (_c *MockMessage_Term_Call) Run(run func()) *MockMessage_Term_Call
func (*MockMessage_Term_Call) RunAndReturn ¶
func (_c *MockMessage_Term_Call) RunAndReturn(run func() error) *MockMessage_Term_Call
type MockStream ¶
MockStream is an autogenerated mock type for the Stream type
func NewMockStream ¶
func NewMockStream(t interface { mock.TestingT Cleanup(func()) }) *MockStream
NewMockStream creates a new instance of MockStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockStream) Close ¶
func (_m *MockStream) Close() error
Close provides a mock function with given fields:
func (*MockStream) EXPECT ¶
func (_m *MockStream) EXPECT() *MockStream_Expecter
func (*MockStream) Open ¶
func (_m *MockStream) Open() error
Open provides a mock function with given fields:
func (*MockStream) PublishOverwrite ¶
PublishOverwrite provides a mock function with given fields: ctx, subject, msg
type MockStream_Close_Call ¶
MockStream_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
func (*MockStream_Close_Call) Return ¶
func (_c *MockStream_Close_Call) Return(_a0 error) *MockStream_Close_Call
func (*MockStream_Close_Call) Run ¶
func (_c *MockStream_Close_Call) Run(run func()) *MockStream_Close_Call
func (*MockStream_Close_Call) RunAndReturn ¶
func (_c *MockStream_Close_Call) RunAndReturn(run func() error) *MockStream_Close_Call
type MockStream_Expecter ¶
type MockStream_Expecter struct {
// contains filtered or unexported fields
}
func (*MockStream_Expecter) Close ¶
func (_e *MockStream_Expecter) Close() *MockStream_Close_Call
Close is a helper method to define mock.On call
func (*MockStream_Expecter) Open ¶
func (_e *MockStream_Expecter) Open() *MockStream_Open_Call
Open is a helper method to define mock.On call
func (*MockStream_Expecter) Publish ¶
func (_e *MockStream_Expecter) Publish(ctx interface{}, subject interface{}, msg interface{}) *MockStream_Publish_Call
Publish is a helper method to define mock.On call
- ctx context.Context
- subject string
- msg []byte
func (*MockStream_Expecter) PublishOverwrite ¶
func (_e *MockStream_Expecter) PublishOverwrite(ctx interface{}, subject interface{}, msg interface{}) *MockStream_PublishOverwrite_Call
PublishOverwrite is a helper method to define mock.On call
- ctx context.Context
- subject string
- msg []byte
func (*MockStream_Expecter) PullMsg ¶
func (_e *MockStream_Expecter) PullMsg(ctx interface{}, batch interface{}) *MockStream_PullMsg_Call
PullMsg is a helper method to define mock.On call
- ctx context.Context
- batch int
func (*MockStream_Expecter) Subscribe ¶
func (_e *MockStream_Expecter) Subscribe(ctx interface{}) *MockStream_Subscribe_Call
Subscribe is a helper method to define mock.On call
- ctx context.Context
type MockStream_Open_Call ¶
MockStream_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open'
func (*MockStream_Open_Call) Return ¶
func (_c *MockStream_Open_Call) Return(_a0 error) *MockStream_Open_Call
func (*MockStream_Open_Call) Run ¶
func (_c *MockStream_Open_Call) Run(run func()) *MockStream_Open_Call
func (*MockStream_Open_Call) RunAndReturn ¶
func (_c *MockStream_Open_Call) RunAndReturn(run func() error) *MockStream_Open_Call
type MockStream_PublishOverwrite_Call ¶
MockStream_PublishOverwrite_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PublishOverwrite'
func (*MockStream_PublishOverwrite_Call) Return ¶
func (_c *MockStream_PublishOverwrite_Call) Return(_a0 error) *MockStream_PublishOverwrite_Call
func (*MockStream_PublishOverwrite_Call) Run ¶
func (_c *MockStream_PublishOverwrite_Call) Run(run func(ctx context.Context, subject string, msg []byte)) *MockStream_PublishOverwrite_Call
func (*MockStream_PublishOverwrite_Call) RunAndReturn ¶
func (_c *MockStream_PublishOverwrite_Call) RunAndReturn(run func(context.Context, string, []byte) error) *MockStream_PublishOverwrite_Call
type MockStream_Publish_Call ¶
MockStream_Publish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Publish'
func (*MockStream_Publish_Call) Return ¶
func (_c *MockStream_Publish_Call) Return(_a0 error) *MockStream_Publish_Call
func (*MockStream_Publish_Call) Run ¶
func (_c *MockStream_Publish_Call) Run(run func(ctx context.Context, subject string, msg []byte)) *MockStream_Publish_Call
func (*MockStream_Publish_Call) RunAndReturn ¶
func (_c *MockStream_Publish_Call) RunAndReturn(run func(context.Context, string, []byte) error) *MockStream_Publish_Call
type MockStream_PullMsg_Call ¶
MockStream_PullMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullMsg'
func (*MockStream_PullMsg_Call) Return ¶
func (_c *MockStream_PullMsg_Call) Return(_a0 []Message, _a1 error) *MockStream_PullMsg_Call
func (*MockStream_PullMsg_Call) Run ¶
func (_c *MockStream_PullMsg_Call) Run(run func(ctx context.Context, batch int)) *MockStream_PullMsg_Call
func (*MockStream_PullMsg_Call) RunAndReturn ¶
func (_c *MockStream_PullMsg_Call) RunAndReturn(run func(context.Context, int) ([]Message, error)) *MockStream_PullMsg_Call
type MockStream_Subscribe_Call ¶
MockStream_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe'
func (*MockStream_Subscribe_Call) Return ¶
func (_c *MockStream_Subscribe_Call) Return(_a0 MsgCh, _a1 error) *MockStream_Subscribe_Call
func (*MockStream_Subscribe_Call) Run ¶
func (_c *MockStream_Subscribe_Call) Run(run func(ctx context.Context)) *MockStream_Subscribe_Call
func (*MockStream_Subscribe_Call) RunAndReturn ¶
func (_c *MockStream_Subscribe_Call) RunAndReturn(run func(context.Context) (MsgCh, error)) *MockStream_Subscribe_Call
type NatsConsumerOptions ¶
type NatsConsumerOptions struct { // Pull indicates this is a pull based subscriber Pull bool `mapstructure:"pull"` // Sets the durable consumer name Name string `mapstructure:"name"` // Sets the queue group for this consumer QueueGroup string `mapstructure:"queue_group"` AckWait time.Duration `mapstructure:"ack_wait"` MaxAckPending int `mapstructure:"max_ack_pending"` // Setting the FilterSubject turns this consumer into a push based consumer, // With no filter subject, the consumer is a pull based consumer. // // Although if the stream is a WorkQueue stream, then this must be set // and should be unique between consumers on the stream. FilterSubject string `mapstructure:"filter_subject"` // Subscribe to these subjects through this consumer. SubscribeSubjects []string `mapstructure:"subscribe_subjects"` }
NatsConsumerOptions is the parameters for the NATS consumer configuration.
Note: Nats consumers are views into the stream, multiple subscribers may bind on a consumer.
type NatsJetstream ¶
type NatsJetstream struct {
// contains filtered or unexported fields
}
NatsJetstream wraps the NATs JetStream connector to implement the Stream interface.
func NewJetstreamFromConn ¶
func NewJetstreamFromConn(c *nats.Conn) *NatsJetstream
NewJetstreamFromConn takes an already established NATS connection pointer and returns a NatsJetstream pointer
func NewNatsBroker ¶
func NewNatsBroker(params StreamParameters) (*NatsJetstream, error)
NewNatsBroker validates the given stream broker parameters and returns a stream broker implementation.
func (*NatsJetstream) Close ¶
func (n *NatsJetstream) Close() error
Close drains any subscriptions and closes the NATS Jetstream connection.
func (*NatsJetstream) Open ¶
func (n *NatsJetstream) Open() error
Open connects to the NATS Jetstream.
func (*NatsJetstream) Publish ¶
Publish publishes an event onto the NATS Jetstream. The caller is responsible for message addressing and data serialization.
func (*NatsJetstream) PublishOverwrite ¶
func (n *NatsJetstream) PublishOverwrite(ctx context.Context, subjectSuffix string, data []byte) error
PublishOverwrite publishes an event and will overwrite any existing message with that subject in the queue
type NatsOptions ¶
type NatsOptions struct { // URL is the NATS server URL URL string `mapstructure:"url"` // AppName is the name of the application connecting to the // NATS stream, this parameter is used to open the NATS connection // and bind as a durable consumer. AppName string `mapstructure:"app_name"` // NATS stream user, when no creds file is provided. StreamUser string `mapstructure:"stream_user"` // NATS stream pass, when no creds file is provided. StreamPass string `mapstructure:"stream_pass"` // NATS creds file CredsFile string `mapstructure:"creds_file"` // The subject prefix when publishing a message. PublisherSubjectPrefix string `mapstructure:"publisher_subject_prefix"` // URN Namespace to include in the published messages. StreamURNNamespace string `mapstructure:"stream_urn_ns"` // SubscribeSubjects when defined will result in the event broker subscribing to given streams. SubscribeSubjects []string `mapstructure:"subscribe_subjects"` // NATS connection timeout ConnectTimeout time.Duration `mapstructure:"connect_timeout"` // Setting Consumer parameters will cause a NATS consumer to be added. Consumer *NatsConsumerOptions `mapstructure:"consumer"` // Setting Stream parameters will cause a NATS stream to be added. Stream *NatsStreamOptions `mapstructure:"stream"` // KVReplicationFactor sets the number of copies for a bucket in a NATS clustered environment KVReplicationFactor int `mapstructure:"kv_replication"` }
NatsOptions holds the configuration parameters to setup NATS Jetstream.
type NatsStreamOptions ¶
type NatsStreamOptions struct { // Name for the stream Name string `mapstructure:"name"` // Subjects allowed to publish on the stream Subjects []string `mapstructure:"subjects"` // Acknowledgements required for each msg // // https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#acknowledgement-models Acknowledgements bool `mapstructure:"acknowledgements"` // DuplicateWindow, messages containing the same message ID will be // deduplicated in this time window. // // https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication DuplicateWindow time.Duration `mapstructure:"duplicate_window"` // Retention is the message eviction criteria // // https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#stream-limits-retention-and-policy Retention string `mapstructure:"retention"` }
NatsStreamOptions are parameters to setup a NATS stream.
type ResourceType ¶
type ResourceType string
ResourceType is the kind of the object included the message.
type Stream ¶
type Stream interface { // Open sets up the stream connection. Open() error // Publish publishes the message to the message broker. // // rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg. Publish(ctx context.Context, subject string, msg []byte) error // PublishOverwrite publishes the message to the message broker overwriting any existing message with that subject PublishOverwrite(ctx context.Context, subject string, msg []byte) error // Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from. Subscribe(ctx context.Context) (MsgCh, error) // PullMsg pulls upto batch count of messages from the stream through the pull based subscription. PullMsg(ctx context.Context, batch int) ([]Message, error) // Closes the connection to the stream, along with unsubscribing any subscriptions. Close() error }
Stream provides methods to interact with the event stream.
func NewStream ¶
func NewStream(parameters StreamParameters) (Stream, error)
NewStream returns a Stream implementation.
type StreamParameters ¶
type StreamParameters interface{}
StreamParameters is the configuration for the Stream broker, the interface is type asserted by the stream broker implementation.