events

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 11 Imported by: 4

README

Events

Package events provides an interface and methods to interact with an events stream broker.

The package provides methods to serialize, deserialize data sent on the stream as a pubsubx.Message along with methods to parse the message URN through urnx.

Connect to a NATS Jetstream to publish messages.

Example below sets up a NATS stream broker with the parameters provided, The stream, consumer and subscription(s) are initialized when defined, based on the configuration.

	options := events.NatsOptions{
		AppName:                "foo",
		URL:                    "nats://nats:4222",
		StreamUser:             viper.GetString("nats.stream.user"),
		StreamPass:             viper.GetString("nats.stream.pass"),
		CredsFile:              viper.GetString("nats.creds.file"),
		...

		// Defining a stream will result in the stream being added if not present.
		Stream: &events.NatsStreamOptions{
			// Name of the stream to be created.
			Name:     viper.GetString("nats.stream.name"),

			// Subjects associated with the stream.
			Subjects: viper.GetStringSlice("nats.stream.subjects"),
		},

		// Defining a consumer will result in the consumer being added if not present.
		Consumer: &events.NatsConsumerOptions{
			// Pull indicates this is a pull based stream, subcriptions to it will be pull based.
			Pull: viper.GetBool("nats.stream.consumer.pull")

			// Sets the durable consumer name, by setting a durable consumer name
			// the consumer is not epheremal and removed once there are no subscribers.
			Name: viper.GetString("nats.stream.consumer.name")

			....
		}
	}

	// initialize broker - validates the configuration and returns a Stream
	stream, err := events.NewStream(natsOptions(appName, streamURL))
	if err != nil {
		panic(err)
	}

	// Open connection - sets up required streams, consumers.
	if err := stream.Open(); err != nil {
		panic(err)
	}


    // publish asynchronously to subscribed consumer.
	if err := stream.PublishAsyncWithContext(ctx, resourceTypeServer, eventTypeCreate, uuid.New(), &Server{}); err != nil {
		panic(err)
	}


	// subscribe to one or more consumers, this returns a single channel.
	eventsCh, err := o.streamBroker.Subscribe(ctx)
	if err != nil {
		o.logger.Fatal(err)
	}

	for _, msg := range {
		// unpacks the data as a *pubsubx.Message
		data, err := msg.Data()
		if err != nil {
			panic(err)
		}

		// parse and retrieve the Subject URN
		urn, err := msg.SubjectURN(data)
		if err != nil {
			panic(err)
		}

		// ack the message
		if err := msg.Ack(); err != nil {
			panic(err)
		}
	}

Implementations

TODO(joel) : Link to implementations of this library.

Documentation

Overview

Package events provides methods to initializ§e and interface with an events broker.

Package events provides types and methods to interact with a messaging stream broker.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNatsConfig is returned when the conf
	ErrNatsConfig = errors.New("error in NATs Jetstream configuration")

	// ErrNatsConn is returned when an error occurs in connecting to NATS.
	ErrNatsConn = errors.New("error opening nats connection")

	// ErrNatsJetstream is returned when an error occurs in setting up the NATS Jetstream context.
	ErrNatsJetstream = errors.New("error in NATS Jetstream")

	// ErrNatsJetstreamAddStream os returned when an attempt to add a NATS Jetstream fails.
	ErrNatsJetstreamAddStream = errors.New("error adding stream to NATS Jetstream")

	// ErrNatsJetstreamAddConsumer is returned when theres an error adding a consumer to the NATS Jetstream.
	ErrNatsJetstreamAddConsumer = errors.New("error adding consumer on NATS Jetstream")

	// ErrNatsJetstreamUpdateConsumer is returned when theres an error updating a consumer configuration on the NATS Jetstream.
	ErrNatsJetstreamUpdateConsumer = errors.New("error updating consumer configuration on NATS Jetstream")

	// ErrNatsMsgPull is returned when theres and error pulling a message from a NATS Jetstream.
	ErrNatsMsgPull = errors.New("error fetching message from NATS Jetstream")

	// ErrSubscription is returned when an error in the consumer subscription occurs.
	ErrSubscription = errors.New("error subscribing to stream")
)

Functions

func AsNatsConnection

func AsNatsConnection(n *NatsJetstream) *nats.Conn

Add some conversions for functions/APIs that expect NATS primitive types. This allows consumers of NatsJetsream to convert easily to the types they need, without exporting the members or coercing and direct clients/holders of NatsJetstream to do this conversion. AsNatsConnection exposes the otherwise private NATS connection pointer

func AsNatsJetStreamContext

func AsNatsJetStreamContext(n *NatsJetstream) nats.JetStreamContext

AsNatsJetstreamContext exposes the otherwise private NATS JetStreamContext

func AsNatsMsg

func AsNatsMsg(m Message) (*nats.Msg, error)

AsNatsMsg exposes the underlying nats.Msg to a sophisticated consumer.

func MustNatsMsg

func MustNatsMsg(m Message) *nats.Msg

MustNatsMsg will panic if the type assertion fails

Types

type EventType

type EventType string

EventType is a type identifying the Event kind that has occurred on an object.

const (
	// Create action kind identifies objects that were created.
	Create EventType = "create"

	// Update action kind identifies objects that were updated.
	Update EventType = "update"

	// Delete action kind identifies objects that were removed.
	Delete EventType = "delete"
)

type Message

type Message interface {
	// Ack the message as processed on the stream.
	Ack() error

	// Nak the message as not processed on the stream.
	Nak() error

	// Term signals to the broker that the message processing has failed and the message
	// must not be redelivered.
	Term() error

	// InProgress resets the redelivery timer for the message on the stream
	// to indicate the message is being worked on.
	InProgress() error

	// Subject returns the message subject.
	Subject() string

	// Data returns the data contained in the message.
	Data() []byte

	// ExtractOtelTraceContext returns a context populated with the parent trace if any.
	ExtractOtelTraceContext(ctx context.Context) context.Context
}

Message interface defines the methods available on the messages received on the stream.

These methods are to be implemented by the stream broker for its messages.

type MockMessage

type MockMessage struct {
	mock.Mock
}

MockMessage is an autogenerated mock type for the Message type

func NewMockMessage

func NewMockMessage(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockMessage

NewMockMessage creates a new instance of MockMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockMessage) Ack

func (_m *MockMessage) Ack() error

Ack provides a mock function with given fields:

func (*MockMessage) Data

func (_m *MockMessage) Data() []byte

Data provides a mock function with given fields:

func (*MockMessage) EXPECT

func (_m *MockMessage) EXPECT() *MockMessage_Expecter

func (*MockMessage) ExtractOtelTraceContext

func (_m *MockMessage) ExtractOtelTraceContext(ctx context.Context) context.Context

ExtractOtelTraceContext provides a mock function with given fields: ctx

func (*MockMessage) InProgress

func (_m *MockMessage) InProgress() error

InProgress provides a mock function with given fields:

func (*MockMessage) Nak

func (_m *MockMessage) Nak() error

Nak provides a mock function with given fields:

func (*MockMessage) Subject

func (_m *MockMessage) Subject() string

Subject provides a mock function with given fields:

func (*MockMessage) Term

func (_m *MockMessage) Term() error

Term provides a mock function with given fields:

type MockMessage_Ack_Call

type MockMessage_Ack_Call struct {
	*mock.Call
}

MockMessage_Ack_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ack'

func (*MockMessage_Ack_Call) Return

func (*MockMessage_Ack_Call) Run

func (_c *MockMessage_Ack_Call) Run(run func()) *MockMessage_Ack_Call

func (*MockMessage_Ack_Call) RunAndReturn

func (_c *MockMessage_Ack_Call) RunAndReturn(run func() error) *MockMessage_Ack_Call

type MockMessage_Data_Call

type MockMessage_Data_Call struct {
	*mock.Call
}

MockMessage_Data_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Data'

func (*MockMessage_Data_Call) Return

func (*MockMessage_Data_Call) Run

func (_c *MockMessage_Data_Call) Run(run func()) *MockMessage_Data_Call

func (*MockMessage_Data_Call) RunAndReturn

func (_c *MockMessage_Data_Call) RunAndReturn(run func() []byte) *MockMessage_Data_Call

type MockMessage_Expecter

type MockMessage_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockMessage_Expecter) Ack

Ack is a helper method to define mock.On call

func (*MockMessage_Expecter) Data

Data is a helper method to define mock.On call

func (*MockMessage_Expecter) ExtractOtelTraceContext

func (_e *MockMessage_Expecter) ExtractOtelTraceContext(ctx interface{}) *MockMessage_ExtractOtelTraceContext_Call

ExtractOtelTraceContext is a helper method to define mock.On call

  • ctx context.Context

func (*MockMessage_Expecter) InProgress

InProgress is a helper method to define mock.On call

func (*MockMessage_Expecter) Nak

Nak is a helper method to define mock.On call

func (*MockMessage_Expecter) Subject

Subject is a helper method to define mock.On call

func (*MockMessage_Expecter) Term

Term is a helper method to define mock.On call

type MockMessage_ExtractOtelTraceContext_Call

type MockMessage_ExtractOtelTraceContext_Call struct {
	*mock.Call
}

MockMessage_ExtractOtelTraceContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExtractOtelTraceContext'

func (*MockMessage_ExtractOtelTraceContext_Call) Return

func (*MockMessage_ExtractOtelTraceContext_Call) Run

func (*MockMessage_ExtractOtelTraceContext_Call) RunAndReturn

type MockMessage_InProgress_Call

type MockMessage_InProgress_Call struct {
	*mock.Call
}

MockMessage_InProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InProgress'

func (*MockMessage_InProgress_Call) Return

func (*MockMessage_InProgress_Call) Run

func (*MockMessage_InProgress_Call) RunAndReturn

func (_c *MockMessage_InProgress_Call) RunAndReturn(run func() error) *MockMessage_InProgress_Call

type MockMessage_Nak_Call

type MockMessage_Nak_Call struct {
	*mock.Call
}

MockMessage_Nak_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Nak'

func (*MockMessage_Nak_Call) Return

func (*MockMessage_Nak_Call) Run

func (_c *MockMessage_Nak_Call) Run(run func()) *MockMessage_Nak_Call

func (*MockMessage_Nak_Call) RunAndReturn

func (_c *MockMessage_Nak_Call) RunAndReturn(run func() error) *MockMessage_Nak_Call

type MockMessage_Subject_Call

type MockMessage_Subject_Call struct {
	*mock.Call
}

MockMessage_Subject_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subject'

func (*MockMessage_Subject_Call) Return

func (*MockMessage_Subject_Call) Run

func (*MockMessage_Subject_Call) RunAndReturn

func (_c *MockMessage_Subject_Call) RunAndReturn(run func() string) *MockMessage_Subject_Call

type MockMessage_Term_Call

type MockMessage_Term_Call struct {
	*mock.Call
}

MockMessage_Term_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Term'

func (*MockMessage_Term_Call) Return

func (*MockMessage_Term_Call) Run

func (_c *MockMessage_Term_Call) Run(run func()) *MockMessage_Term_Call

func (*MockMessage_Term_Call) RunAndReturn

func (_c *MockMessage_Term_Call) RunAndReturn(run func() error) *MockMessage_Term_Call

type MockStream

type MockStream struct {
	mock.Mock
}

MockStream is an autogenerated mock type for the Stream type

func NewMockStream

func NewMockStream(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockStream

NewMockStream creates a new instance of MockStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockStream) Close

func (_m *MockStream) Close() error

Close provides a mock function with given fields:

func (*MockStream) EXPECT

func (_m *MockStream) EXPECT() *MockStream_Expecter

func (*MockStream) Open

func (_m *MockStream) Open() error

Open provides a mock function with given fields:

func (*MockStream) Publish

func (_m *MockStream) Publish(ctx context.Context, subject string, msg []byte) error

Publish provides a mock function with given fields: ctx, subject, msg

func (*MockStream) PublishOverwrite

func (_m *MockStream) PublishOverwrite(ctx context.Context, subject string, msg []byte) error

PublishOverwrite provides a mock function with given fields: ctx, subject, msg

func (*MockStream) PullMsg

func (_m *MockStream) PullMsg(ctx context.Context, batch int) ([]Message, error)

PullMsg provides a mock function with given fields: ctx, batch

func (*MockStream) Subscribe

func (_m *MockStream) Subscribe(ctx context.Context) (MsgCh, error)

Subscribe provides a mock function with given fields: ctx

type MockStream_Close_Call

type MockStream_Close_Call struct {
	*mock.Call
}

MockStream_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockStream_Close_Call) Return

func (*MockStream_Close_Call) Run

func (_c *MockStream_Close_Call) Run(run func()) *MockStream_Close_Call

func (*MockStream_Close_Call) RunAndReturn

func (_c *MockStream_Close_Call) RunAndReturn(run func() error) *MockStream_Close_Call

type MockStream_Expecter

type MockStream_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockStream_Expecter) Close

Close is a helper method to define mock.On call

func (*MockStream_Expecter) Open

Open is a helper method to define mock.On call

func (*MockStream_Expecter) Publish

func (_e *MockStream_Expecter) Publish(ctx interface{}, subject interface{}, msg interface{}) *MockStream_Publish_Call

Publish is a helper method to define mock.On call

  • ctx context.Context
  • subject string
  • msg []byte

func (*MockStream_Expecter) PublishOverwrite

func (_e *MockStream_Expecter) PublishOverwrite(ctx interface{}, subject interface{}, msg interface{}) *MockStream_PublishOverwrite_Call

PublishOverwrite is a helper method to define mock.On call

  • ctx context.Context
  • subject string
  • msg []byte

func (*MockStream_Expecter) PullMsg

func (_e *MockStream_Expecter) PullMsg(ctx interface{}, batch interface{}) *MockStream_PullMsg_Call

PullMsg is a helper method to define mock.On call

  • ctx context.Context
  • batch int

func (*MockStream_Expecter) Subscribe

func (_e *MockStream_Expecter) Subscribe(ctx interface{}) *MockStream_Subscribe_Call

Subscribe is a helper method to define mock.On call

  • ctx context.Context

type MockStream_Open_Call

type MockStream_Open_Call struct {
	*mock.Call
}

MockStream_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open'

func (*MockStream_Open_Call) Return

func (*MockStream_Open_Call) Run

func (_c *MockStream_Open_Call) Run(run func()) *MockStream_Open_Call

func (*MockStream_Open_Call) RunAndReturn

func (_c *MockStream_Open_Call) RunAndReturn(run func() error) *MockStream_Open_Call

type MockStream_PublishOverwrite_Call

type MockStream_PublishOverwrite_Call struct {
	*mock.Call
}

MockStream_PublishOverwrite_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PublishOverwrite'

func (*MockStream_PublishOverwrite_Call) Return

func (*MockStream_PublishOverwrite_Call) Run

func (*MockStream_PublishOverwrite_Call) RunAndReturn

type MockStream_Publish_Call

type MockStream_Publish_Call struct {
	*mock.Call
}

MockStream_Publish_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Publish'

func (*MockStream_Publish_Call) Return

func (*MockStream_Publish_Call) Run

func (_c *MockStream_Publish_Call) Run(run func(ctx context.Context, subject string, msg []byte)) *MockStream_Publish_Call

func (*MockStream_Publish_Call) RunAndReturn

type MockStream_PullMsg_Call

type MockStream_PullMsg_Call struct {
	*mock.Call
}

MockStream_PullMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullMsg'

func (*MockStream_PullMsg_Call) Return

func (*MockStream_PullMsg_Call) Run

func (_c *MockStream_PullMsg_Call) Run(run func(ctx context.Context, batch int)) *MockStream_PullMsg_Call

func (*MockStream_PullMsg_Call) RunAndReturn

func (_c *MockStream_PullMsg_Call) RunAndReturn(run func(context.Context, int) ([]Message, error)) *MockStream_PullMsg_Call

type MockStream_Subscribe_Call

type MockStream_Subscribe_Call struct {
	*mock.Call
}

MockStream_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe'

func (*MockStream_Subscribe_Call) Return

func (*MockStream_Subscribe_Call) Run

func (*MockStream_Subscribe_Call) RunAndReturn

type MsgCh

type MsgCh chan Message

MsgCh is a channel over which messages arrive when subscribed.

type NatsConsumerOptions

type NatsConsumerOptions struct {
	// Pull indicates this is a pull based subscriber
	Pull bool `mapstructure:"pull"`

	// Sets the durable consumer name
	Name string `mapstructure:"name"`

	// Sets the queue group for this consumer
	QueueGroup string `mapstructure:"queue_group"`

	AckWait time.Duration `mapstructure:"ack_wait"`

	MaxAckPending int `mapstructure:"max_ack_pending"`

	// Setting the FilterSubject turns this consumer into a push based consumer,
	// With no filter subject, the consumer is a pull based consumer.
	//
	// Although if the stream is a WorkQueue stream, then this must be set
	// and should be unique between consumers on the stream.
	FilterSubject string `mapstructure:"filter_subject"`

	// Subscribe to these subjects through this consumer.
	SubscribeSubjects []string `mapstructure:"subscribe_subjects"`
}

NatsConsumerOptions is the parameters for the NATS consumer configuration.

Note: Nats consumers are views into the stream, multiple subscribers may bind on a consumer.

type NatsJetstream

type NatsJetstream struct {
	// contains filtered or unexported fields
}

NatsJetstream wraps the NATs JetStream connector to implement the Stream interface.

func NewJetstreamFromConn

func NewJetstreamFromConn(c *nats.Conn) *NatsJetstream

NewJetstreamFromConn takes an already established NATS connection pointer and returns a NatsJetstream pointer

func NewNatsBroker

func NewNatsBroker(params StreamParameters) (*NatsJetstream, error)

NewNatsBroker validates the given stream broker parameters and returns a stream broker implementation.

func (*NatsJetstream) Close

func (n *NatsJetstream) Close() error

Close drains any subscriptions and closes the NATS Jetstream connection.

func (*NatsJetstream) Open

func (n *NatsJetstream) Open() error

Open connects to the NATS Jetstream.

func (*NatsJetstream) Publish

func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error

Publish publishes an event onto the NATS Jetstream. The caller is responsible for message addressing and data serialization.

func (*NatsJetstream) PublishOverwrite

func (n *NatsJetstream) PublishOverwrite(ctx context.Context, subjectSuffix string, data []byte) error

PublishOverwrite publishes an event and will overwrite any existing message with that subject in the queue

func (*NatsJetstream) PullMsg

func (n *NatsJetstream) PullMsg(_ context.Context, batch int) ([]Message, error)

PullMsg pulls up to the batch count of messages from each pull-based subscription to subjects on the stream.

func (*NatsJetstream) Subscribe

func (n *NatsJetstream) Subscribe(ctx context.Context) (MsgCh, error)

Subscribe to all configured SubscribeSubjects

type NatsOptions

type NatsOptions struct {
	// URL is the NATS server URL
	URL string `mapstructure:"url"`

	// AppName is the name of the application connecting to the
	// NATS stream, this parameter is used to open the NATS connection
	// and bind as a durable consumer.
	AppName string `mapstructure:"app_name"`

	// NATS stream user, when no creds file is provided.
	StreamUser string `mapstructure:"stream_user"`

	// NATS stream pass, when no creds file is provided.
	StreamPass string `mapstructure:"stream_pass"`

	// NATS creds file
	CredsFile string `mapstructure:"creds_file"`

	// The subject prefix when publishing a message.
	PublisherSubjectPrefix string `mapstructure:"publisher_subject_prefix"`

	// URN Namespace to include in the published messages.
	StreamURNNamespace string `mapstructure:"stream_urn_ns"`

	// SubscribeSubjects when defined will result in the event broker subscribing to given streams.
	SubscribeSubjects []string `mapstructure:"subscribe_subjects"`

	// NATS connection timeout
	ConnectTimeout time.Duration `mapstructure:"connect_timeout"`

	// Setting Consumer parameters will cause a NATS consumer to be added.
	Consumer *NatsConsumerOptions `mapstructure:"consumer"`

	// Setting Stream parameters will cause a NATS stream to be added.
	Stream *NatsStreamOptions `mapstructure:"stream"`

	// KVReplicationFactor sets the number of copies for a bucket in a NATS clustered environment
	KVReplicationFactor int `mapstructure:"kv_replication"`
}

NatsOptions holds the configuration parameters to setup NATS Jetstream.

type NatsStreamOptions

type NatsStreamOptions struct {
	// Name for the stream
	Name string `mapstructure:"name"`

	// Subjects allowed to publish on the stream
	Subjects []string `mapstructure:"subjects"`

	// Acknowledgements required for each msg
	//
	// https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#acknowledgement-models
	Acknowledgements bool `mapstructure:"acknowledgements"`

	// DuplicateWindow, messages containing the same message ID will be
	// deduplicated in this time window.
	//
	// https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication
	DuplicateWindow time.Duration `mapstructure:"duplicate_window"`

	// Retention is the message eviction criteria
	//
	// https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#stream-limits-retention-and-policy
	Retention string `mapstructure:"retention"`
}

NatsStreamOptions are parameters to setup a NATS stream.

type ResourceType

type ResourceType string

ResourceType is the kind of the object included the message.

type Stream

type Stream interface {
	// Open sets up the stream connection.
	Open() error

	// Publish publishes the message to the message broker.
	//
	// rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg.
	Publish(ctx context.Context, subject string, msg []byte) error

	// PublishOverwrite publishes the message to the message broker overwriting any existing message with that subject
	PublishOverwrite(ctx context.Context, subject string, msg []byte) error

	// Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from.
	Subscribe(ctx context.Context) (MsgCh, error)

	// PullMsg pulls upto batch count of messages from the stream through the pull based subscription.
	PullMsg(ctx context.Context, batch int) ([]Message, error)

	// Closes the connection to the stream, along with unsubscribing any subscriptions.
	Close() error
}

Stream provides methods to interact with the event stream.

func NewStream

func NewStream(parameters StreamParameters) (Stream, error)

NewStream returns a Stream implementation.

type StreamParameters

type StreamParameters interface{}

StreamParameters is the configuration for the Stream broker, the interface is type asserted by the stream broker implementation.

Directories

Path Synopsis
internal
pkg
kv
The registry package builds functionality for tracking live controller processes in a NATS KV store.
The registry package builds functionality for tracking live controller processes in a NATS KV store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL