messaging2

package
v0.0.0-...-015b2df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: BSD-3-Clause Imports: 15 Imported by: 60

README

messaging2 package

This package contains the messaging abstraction (QueryConnection & EventConnection) that service and service client (e.g. those in cacao-common/services) can use to publish and receive messages. This package also contains the implementations of the abstraction based on NATS (QueryConnection), NATS Streaming (EventConnection) and NATS JetStream (EventConnection).

Note that NATS Streaming is deprecated, its usage should be replaced by NATS JetStream.

Example

For an example of service client that uses the messaging abstraction, which publishes query request and events, checkout natsWorkspaceClient in cacao-common/service/workspace_nats_client.go.

For an example of service that uses the messaging abstraction, check out workspace service in cacao/workspace-service or user service (gitlab.com/cyverse/users-microservice).

Semantics

There are 2 types of messages, query and event. The difference between them is:

  • query generally corresponds to read operations.
  • query can withstand losses (we can safely retry), therefore does not need persistence.
  • event are operations that have side effect (make changes to the system).
  • event should be persisted by the underlying messaging bus.

Pattern

request/reply (query)

client sends a request to a service, service get back with a reply.

request/response (event)

client sends a request to a service, service get back with a response event.

The difference between this and request/reply for query is that the response event is still an event, which means other parties can listen for the response event, whereas reply cannot be consumed by other service directly.

e.g.

client published CredentialCreationRequested => service A received CredentialCreationRequested => service A respond with CredentialCreated.
service B received CredentialCreated => service B published CacheUpdated.

source/triggered

e.g. UserLoggedIn => PopulateCache

Graceful termination

We want to be able to terminate service without losing messages, especially events. When service exit gracefully, there is a grace period for the service to clean things up.

  • Stop all incoming
  • Incoming stopped
    • Need a way to be notified that incoming has stopped
    • This is necessary because stop incoming msg is not immediate, need to coordinate with message bus server.
  • Allow messages that are currently being processed to finish
    • This means we need some degree of separation between stopping the incoming and ongoing msg handlers
      • stop incoming (e.g. close subscription) should not stop ongoing msg handlers
    • Need a timeout, cannot wait indefinitely
    • Need to know when this finishes, so that service can proceeds to exit
      • This suggests that we need to keep track of messages that are being processed
  • Close connection and other resources (e.g. DB connection)
  • Exit
    • Main thread needs to wait for all of the above to happen and finish, need a way to wait for that

Integration tests

use docker-compose.yaml for running integration tests locally.

docker-compose up -d

for integration tests that needs NATS Streaming:

export NATS_URL=localhost:5222
go test -v --tags=stan_integration .

for integration tests that needs NATS Core or NATS JetStream:

export NATS_URL=localhost:4222
go test -v --tags=nats_integration,natsjs_integration .

Documentation

Index

Constants

View Source
const (
	// DefaultNatsURL is a default nats URL
	DefaultNatsURL = "nats://nats:4222"
	// DefaultNatsClusterID is a default Cluster ID for cacao
	DefaultNatsClusterID = "cacao-cluster"
	// DefaultNatsMaxReconnect is default max reconnect trials
	DefaultNatsMaxReconnect = 6 // max times to reconnect within nats.connect()
	// DefaultNatsReconnectWait is a default delay for next reconnect
	DefaultNatsReconnectWait = 10 * time.Second // seconds to wait within nats.connect()
	// DefaultNatsRequestTimeout is default timeout for requests
	DefaultNatsRequestTimeout = 5 * time.Second // timeout for requests
	// MaxNatsRequestTimeout is the maximum timeout to query requests over nats
	MaxNatsRequestTimeout = 10 * time.Minute // timeout for requests
	// DefaultAckWait is the default duration to wait for ack
	DefaultAckWait = 10 * time.Second
)
View Source
const (
	// DefaultStanEventsTimeout is a default timeout
	DefaultStanEventsTimeout = 10 * time.Second // used to wait for event ops to complete, is very conservation
	// DefaultStanAckWaitTime is a default wait time for ack wait
	DefaultStanAckWaitTime = 60 * time.Second
)
View Source
const AutoPopulateCloudEventSource = "AUTO_POPULATE"

AutoPopulateCloudEventSource is used to explicitly tells implementations of QueryConnection or EventConnection that the cloudevent source should be populated by the connection.

View Source
const CyVerseStreamName = "CYVERSE"

CyVerseStreamName is the stream name of the NATS JetStream. This is used to create JetStream Consumer and bind durable Subscription to.

Variables

This section is empty.

Functions

func ConvertNats

func ConvertNats(msg *nats.Msg) (cloudevents.Event, error)

ConvertNats converts NATS message to CloudEvents message

func ConvertStan

func ConvertStan(msg *stan.Msg) (cloudevents.Event, error)

ConvertStan converts STAN message to CloudEvents message

func CreateCloudEvent

func CreateCloudEvent[T common.EventType | common.QueryOp](data interface{}, eventType T, source string) (cloudevents.Event, error)

CreateCloudEvent takes any object, eventType string, source string, and creates a resulting CloudEvent This utility provides the following conveniences: * uniformly assigns a new id of the format "cloudevent-" + xid * sets the time to UTC * generically marshals the data to json and appropriately assigns the cloudevent type * creates or sets a transaction id, which can later be used to pair requests to responses or corresponding events

func CreateCloudEventWithAutoSource

func CreateCloudEventWithAutoSource[T common.EventType | common.QueryOp](data interface{}, eventType T) (cloudevents.Event, error)

CreateCloudEventWithAutoSource create cloudevent just like CreateCloudEvent, but with AutoPopulateCloudEventSource as the cloudevent source.

func CreateCloudEventWithTransactionID

func CreateCloudEventWithTransactionID[T common.EventType | common.QueryOp](data interface{}, eventType T, source string, transactionID common.TransactionID) (cloudevents.Event, error)

CreateCloudEventWithTransactionID creates a CloudEvent with given transactionID, transactionID is optional

func GetTransactionID

func GetTransactionID(ce *cloudevents.Event) common.TransactionID

GetTransactionID is a utility function that reads TransactionID from CloudEvent quick

func InitCyVerseStream

func InitCyVerseStream(conf NatsStanMsgConfig) error

InitCyVerseStream creates the Streams that is used for event messaging for CACAO. This needs to be called outside of normal cacao microservices, since normal service should not manage stream itself.

func NewTransactionID

func NewTransactionID() common.TransactionID

NewTransactionID is a utility function that will generate a CACAO-specific transaction ID of the form tid-<xid> Replies and events generated from a specific request should include the transaction id for easier matching of requests

func SetTransactionID

func SetTransactionID(ce *cloudevents.Event, transactionID common.TransactionID)

SetTransactionID is a utility function that writes TransactionID to CloudEvent quick

Types

type EventConnection

type EventConnection interface {
	// Listen listens for some events and call the corresponding handler when
	// receiving those events. This should only be called once on the
	// EventConnection.
	Listen(context.Context, map[common.EventType]EventHandlerFunc, *sync.WaitGroup) error

	// ListenWithConcurrentWorkers will start listening for events, just like
	// Listen(), but this will spawn worker go-routines that consume from an internal
	// channel. This allows for bounded concurrency. Channel buffer and worker count
	// need to be passed when calling this function.
	ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error

	// Publish will publish a cloudevent, the cloudevent source will be automatically populated if it is empty or AutoPopulateCloudEventSource.
	Publish(ce cloudevents.Event) error
	Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)
	WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)
}

EventConnection ...

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, event cloudevents.Event, writer EventResponseWriter) error

EventHandlerFunc ...

type EventObservable

type EventObservable interface {
	AddListener(common.EventType, common.TransactionID, Listener) (ListenerID, error)
	// AddListenerMultiEventType register a listener to a set of common.EventType
	AddListenerMultiEventType([]common.EventType, common.TransactionID, Listener) (ListenerID, error)
	RemoveListenerByID(id ListenerID)
}

EventObservable is an interface that allow dynamically adding and removing event listener.

type EventResponsePromise

type EventResponsePromise interface {
	// Response wait for response
	Response(ctx context.Context) (cloudevents.Event, error)
	// ResponseChan returns a channel that will be receiving the responses
	ResponseChan(ctx context.Context) <-chan cloudevents.Event
	// Close cleans up any resources used by the promise, this is useful when you do
	// not want to receive the response, or when response never arrives.
	//
	// If response is successfully received, then there should be NO need to call Close()
	Close() error
}

EventResponsePromise ...

type EventResponseWriter

type EventResponseWriter interface {
	// Write will publish the cloudevent, if the transaction ID is empty it will be
	// overridden with the transaction ID associated with the Incoming event that
	// triggered EventHandlerFunc. Write can be called multiple times on the same EventResponseWriter.
	Write(ce cloudevents.Event) error
	// NoResponse explicitly indicate that there is no response
	NoResponse()
}

EventResponseWriter ...

type EventSource

type EventSource struct {
	// contains filtered or unexported fields
}

EventSource dispatch events to registered listeners. Listeners can be registered dynamically.

func NewEventSource

func NewEventSource(natsConf NatsConfig, stanConf StanConfig) *EventSource

NewEventSource creates a new EventSource

func NewEventSourceFromStanConnection

func NewEventSourceFromStanConnection(conn *StanConnection) (*EventSource, error)

NewEventSourceFromStanConnection creates a EventSource from StanConnection (w/o subscription), the underlying STAN connection is re-used.

func (*EventSource) AddListener

func (es *EventSource) AddListener(eventType common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)

AddListener adds a listener for event. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. This will automatically starts a STAN connection & subscription if not already.

func (*EventSource) AddListenerMultiEventType

func (es *EventSource) AddListenerMultiEventType(eventTypes []common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)

AddListenerMultiEventType adds a listener for a set of event types. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. If listener has set ListenOnce, it will be removed after being called once. This will automatically starts a STAN connection & subscription if not already.

func (*EventSource) RemoveListenerByID

func (es *EventSource) RemoveListenerByID(id ListenerID)

RemoveListenerByID removes a listener. This will disconnect & unsubscribe if the last listener is removed.

type Listener

type Listener struct {
	Callback ListenerCallback
	// if enabled, then listener will be removed after been called once.
	// default to false.
	ListenOnce bool
}

Listener is callback registration for some EventType and/or TransactionID

type ListenerCallback

type ListenerCallback func(ev common.EventType, ce cloudevents.Event)

ListenerCallback is the callback function for a Listener

type ListenerID

type ListenerID string

ListenerID is the ID of listener in EventSource

type NatsConfig

type NatsConfig struct {
	URL             string `envconfig:"NATS_URL" default:"nats://nats:4222"`
	QueueGroup      string `envconfig:"NATS_QUEUE_GROUP"`
	WildcardSubject string `envconfig:"NATS_WILDCARD_SUBJECT" default:"cyverse.>"` // WildcardSubject field is optional, only used for NATS Query
	// ClientID is used for Nats Client Name, STAN Client ID, cloudevent source.
	ClientID       string `envconfig:"NATS_CLIENT_ID"`
	MaxReconnects  int    `envconfig:"NATS_MAX_RECONNECTS" default:"-1"`  // in seconds, implementation should default to DefaultNatsMaxReconnect
	ReconnectWait  int    `envconfig:"NATS_RECONNECT_WAIT" default:"-1"`  // in seconds, implementation should default to DefaultNatsReconnectWait
	RequestTimeout int    `envconfig:"NATS_REQUEST_TIMEOUT" default:"-1"` // in seconds, implementation should default to DefaultNatsRequestTimeout

	// EventWildcardSubject field is optional, only used for NATS JetStream Events
	EventWildcardSubject string `envconfig:"NATS_JS_WILDCARD_SUBJECT"`
	// AckWaitSec is the seconds to wait for acknowledgement or time the messaging
	// bus wait before attempting redelivery, this should be set to the maximum
	// seconds it takes for service to process an operation.
	AckWaitSec int `envconfig:"NATS_JS_ACK_WAIT" default:"-1"` // in seconds, implementation should default to DefaultAckWait
}

NatsConfig ...

func (NatsConfig) ConnectNats

func (conf NatsConfig) ConnectNats() (NatsConnection, error)

ConnectNats ...

type NatsConnection

type NatsConnection struct {
	// contains filtered or unexported fields
}

NatsConnection implements QueryConnection

func NewNatsConnection

func NewNatsConnection(conf NatsStanMsgConfig, nc *nats.Conn) (NatsConnection, error)

NewNatsConnection creates NatsConnection from an existing nats.Conn, this is useful if one uses different connection options to establish the connection.

func NewNatsConnectionFromConfig

func NewNatsConnectionFromConfig(conf NatsStanMsgConfig) (NatsConnection, error)

NewNatsConnectionFromConfig ...

func (*NatsConnection) Close

func (conn *NatsConnection) Close() error

Close ...

func (*NatsConnection) GetNatsConn

func (conn *NatsConnection) GetNatsConn() *nats.Conn

GetNatsConn returns the underlying nats.Conn (could be nil if not initialized), this is useful if one wants to implement custom NATS messaging.

func (*NatsConnection) Listen

Listen ...

func (*NatsConnection) ListenWithConcurrentWorkers

func (conn *NatsConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error

ListenWithConcurrentWorkers will create an internal channels to subscribe to nats.Msg. Multiple concurrent worker go-routine will consume msg from the channels. This allows for bounded concurrency.

func (*NatsConnection) Request

func (conn *NatsConnection) Request(ctx context.Context, request cloudevents.Event) (reply cloudevents.Event, err error)

Request ...

func (*NatsConnection) RequestStream

func (conn *NatsConnection) RequestStream(parentCtx context.Context, request cloudevents.Event) (ReplyStream, error)

RequestStream ...

type NatsJetConnection

type NatsJetConnection struct {
	// contains filtered or unexported fields
}

NatsJetConnection is a EventConnection that builds on NATS JetStream

func NewNatsJetConnection

func NewNatsJetConnection(config NatsStanMsgConfig, nc *nats.Conn) (NatsJetConnection, error)

NewNatsJetConnection creates a new NATS JetStream connection from existing nats.Conn. This is useful when one creates nats.Conn with different connection options.

func NewNatsJetConnectionFromConfig

func NewNatsJetConnectionFromConfig(config NatsStanMsgConfig) (NatsJetConnection, error)

NewNatsJetConnectionFromConfig creates a new NATS JetStream connection.

func (*NatsJetConnection) Close

func (conn *NatsJetConnection) Close() error

Close ...

func (*NatsJetConnection) GetJetStreamContext

func (conn *NatsJetConnection) GetJetStreamContext() nats.JetStreamContext

GetJetStreamContext returns the underlying nats.JetStreamContext, this could be useful if one wants to implement some custom messaging not covered in this implementation.

func (*NatsJetConnection) Listen

Listen ...

func (*NatsJetConnection) ListenWithConcurrentWorkers

func (conn *NatsJetConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error

ListenWithConcurrentWorkers ...

func (*NatsJetConnection) ListenWithConcurrentWorkersWithPush

func (conn *NatsJetConnection) ListenWithConcurrentWorkersWithPush(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error

ListenWithConcurrentWorkersWithPush is ListenWithConcurrentWorkers but with push consumer

func (*NatsJetConnection) Publish

func (conn *NatsJetConnection) Publish(ce cloudevents.Event) error

Publish ...

func (*NatsJetConnection) Request

func (conn *NatsJetConnection) Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)

Request sends a request event, and returns a promise that wait for response.

TODO compute subscription subject from responseEventTypes, e.g. use computeSubscribeSubjectFromEventTypes(), so that we receive fewer events that are not relevant.

func (*NatsJetConnection) RequestWithSyncSub

func (conn *NatsJetConnection) RequestWithSyncSub(ctx context.Context, requestCe cloudevents.Event, responseEventTypes []common.EventType) (cloudevents.Event, error)

RequestWithSyncSub does the same thing as Request but using a sync subscription.

func (*NatsJetConnection) WaitForResponse

func (conn *NatsJetConnection) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)

WaitForResponse ...

type NatsJetEventResponsePromise

type NatsJetEventResponsePromise struct {
	// contains filtered or unexported fields
}

NatsJetEventResponsePromise ...

func (*NatsJetEventResponsePromise) Close

func (rp *NatsJetEventResponsePromise) Close() error

Close ...

func (*NatsJetEventResponsePromise) Response

Response ...

func (*NatsJetEventResponsePromise) ResponseChan

func (rp *NatsJetEventResponsePromise) ResponseChan(ctx context.Context) <-chan cloudevents.Event

ResponseChan ... TODO consider have channel carry error as well (make the channel type a struct with cloudevent and error)

type NatsJetResponseWriter

type NatsJetResponseWriter struct {
	// contains filtered or unexported fields
}

NatsJetResponseWriter implements EventResponseWriter, this is used for EventHandlerFunc to publish events.

func (NatsJetResponseWriter) NoResponse

func (rw NatsJetResponseWriter) NoResponse()

NoResponse ...

func (NatsJetResponseWriter) Write

Write publish cloudevent, only set transaction ID when cloudevent does not have one

type NatsReplyWriter

type NatsReplyWriter struct {
	// contains filtered or unexported fields
}

NatsReplyWriter implements ReplyWriter

func (NatsReplyWriter) Write

func (n NatsReplyWriter) Write(event cloudevents.Event) error

Write a cloudevent as reply to the request. This can be called multiple time.

type NatsStanMsgConfig

type NatsStanMsgConfig struct {
	NatsConfig
	StanConfig
}

NatsStanMsgConfig is NATS & STAN config combined

func (NatsStanMsgConfig) ConnectNats

func (conf NatsStanMsgConfig) ConnectNats() (NatsConnection, error)

ConnectNats ...

func (NatsStanMsgConfig) ConnectStan

func (conf NatsStanMsgConfig) ConnectStan() (StanConnection, error)

ConnectStan ...

type QueryConnection

type QueryConnection interface {
	Request(ctx context.Context, requestCe cloudevents.Event) (reply cloudevents.Event, err error)
	// RequestStream request for multiple replies.
	// An example of the implementation is Scatter-Gather pattern.
	// https://docs.nats.io/using-nats/developer/sending/request_reply#scatter-gather
	RequestStream(ctx context.Context, requestCe cloudevents.Event) (ReplyStream, error)
	// Listen will start listening for queries, and call corresponding handlers if
	// query type matches what is provided in the map.
	//
	// Caller can stop listening by cancel the context passed to Listen(). WaitGroup
	// is for waiting for the Listen to stop, since stop listening is not immediate.
	//
	// Note that this is a non-blocking call, as in it does not block until the
	// listen stopped, use the WaitGroup to wait for Listen to be stopped.
	Listen(context.Context, map[common.QueryOp]QueryHandlerFunc, *sync.WaitGroup) error
	// ListenWithConcurrentWorkers will start listening for queries, just like
	// Listen(), but this will spawn worker go-routines that consume from an internal
	// channel. This allows for bounded concurrency. Channel buffer and worker count
	// need to be passed when calling this function.
	ListenWithConcurrentWorkers(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error
}

QueryConnection ...

type QueryHandlerFunc

type QueryHandlerFunc func(ctx context.Context, request cloudevents.Event, writer ReplyWriter)

QueryHandlerFunc is handler function for query, used by QueryConnection.Listen()

type ReplyStream

type ReplyStream interface {
	ExpectNext() bool
	Next() (cloudevents.Event, error)
}

ReplyStream is for getting back potentially multiple replies.

type ReplyStreamWriter

type ReplyStreamWriter struct {
	// contains filtered or unexported fields
}

ReplyStreamWriter ...

func NewReplyStreamWriter

func NewReplyStreamWriter(writer ReplyWriter) *ReplyStreamWriter

NewReplyStreamWriter ...

func (*ReplyStreamWriter) Flush

func (r *ReplyStreamWriter) Flush() error

Flush flushes all cloudevent

func (*ReplyStreamWriter) Write

func (r *ReplyStreamWriter) Write(event cloudevents.Event) error

Write implements ReplyWriter, events are not sent until calling Flush().

type ReplyWriter

type ReplyWriter interface {
	Write(cloudevents.Event) error
}

ReplyWriter ...

type StanConfig

type StanConfig struct {
	// ClusterID is STAN specific
	ClusterID string `envconfig:"NATS_CLUSTER_ID" default:"cacao-cluster"`
	// DurableName is STAN/JetStream specific
	DurableName   string `envconfig:"NATS_DURABLE_NAME"`
	EventsTimeout int    `envconfig:"NATS_EVENTS_TIMEOUT" default:"-1"` // in seconds, implementation should default to DefaultStanEventsTimeout
}

StanConfig ...

type StanConnection

type StanConnection struct {
	// contains filtered or unexported fields
}

StanConnection implements EventConnection

func NewStanConnectionFromConfig

func NewStanConnectionFromConfig(conf NatsStanMsgConfig) (StanConnection, error)

NewStanConnectionFromConfig ...

func NewStanConnectionFromConfigWithoutEventSource

func NewStanConnectionFromConfigWithoutEventSource(conf NatsStanMsgConfig) (StanConnection, error)

NewStanConnectionFromConfigWithoutEventSource will create a STAN connection that does not have EventSource. StanConnection created by this function should not use StanConnection.Request() or StanConnection.WaitForResponse()

func (*StanConnection) Close

func (conn *StanConnection) Close() error

Close ...

func (*StanConnection) GetStanConn

func (conn *StanConnection) GetStanConn() stan.Conn

GetStanConn ...

func (*StanConnection) Listen

Listen ...

func (*StanConnection) ListenWithConcurrentWorkers

func (conn *StanConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error

ListenWithConcurrentWorkers ...

func (*StanConnection) Publish

func (conn *StanConnection) Publish(ce cloudevents.Event) error

Publish ...

func (*StanConnection) Request

func (conn *StanConnection) Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)

Request ...

func (*StanConnection) WaitForResponse

func (conn *StanConnection) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)

WaitForResponse ...

type StanEventResponsePromise

type StanEventResponsePromise struct {
	// contains filtered or unexported fields
}

StanEventResponsePromise ...

func (*StanEventResponsePromise) Close

func (rp *StanEventResponsePromise) Close() error

Close ...

func (*StanEventResponsePromise) Response

Response ...

func (*StanEventResponsePromise) ResponseChan

func (rp *StanEventResponsePromise) ResponseChan(context.Context) <-chan cloudevents.Event

ResponseChan ...

type StanResponseWriter

type StanResponseWriter struct {
	// contains filtered or unexported fields
}

StanResponseWriter ...

func (StanResponseWriter) NoResponse

func (rw StanResponseWriter) NoResponse()

NoResponse ...

func (StanResponseWriter) Write

Write publish cloudevent, only set transaction ID when cloudevent does not have one

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL