Documentation ¶
Index ¶
- Constants
- func ConvertNats(msg *nats.Msg) (cloudevents.Event, error)
- func ConvertStan(msg *stan.Msg) (cloudevents.Event, error)
- func CreateCloudEvent[T common.EventType | common.QueryOp](data interface{}, eventType T, source string) (cloudevents.Event, error)
- func CreateCloudEventWithAutoSource[T common.EventType | common.QueryOp](data interface{}, eventType T) (cloudevents.Event, error)
- func CreateCloudEventWithTransactionID[T common.EventType | common.QueryOp](data interface{}, eventType T, source string, ...) (cloudevents.Event, error)
- func GetTransactionID(ce *cloudevents.Event) common.TransactionID
- func InitCyVerseStream(conf NatsStanMsgConfig) error
- func NewTransactionID() common.TransactionID
- func SetTransactionID(ce *cloudevents.Event, transactionID common.TransactionID)
- type EventConnection
- type EventHandlerFunc
- type EventObservable
- type EventResponsePromise
- type EventResponseWriter
- type EventSource
- func (es *EventSource) AddListener(eventType common.EventType, transactionID common.TransactionID, ...) (ListenerID, error)
- func (es *EventSource) AddListenerMultiEventType(eventTypes []common.EventType, transactionID common.TransactionID, ...) (ListenerID, error)
- func (es *EventSource) RemoveListenerByID(id ListenerID)
- type Listener
- type ListenerCallback
- type ListenerID
- type NatsConfig
- type NatsConnection
- func (conn *NatsConnection) Close() error
- func (conn *NatsConnection) GetNatsConn() *nats.Conn
- func (conn *NatsConnection) Listen(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup) error
- func (conn *NatsConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup, ...) error
- func (conn *NatsConnection) Request(ctx context.Context, request cloudevents.Event) (reply cloudevents.Event, err error)
- func (conn *NatsConnection) RequestStream(parentCtx context.Context, request cloudevents.Event) (ReplyStream, error)
- type NatsJetConnection
- func (conn *NatsJetConnection) Close() error
- func (conn *NatsJetConnection) GetJetStreamContext() nats.JetStreamContext
- func (conn *NatsJetConnection) Listen(ctx context.Context, m map[common.EventType]EventHandlerFunc, ...) error
- func (conn *NatsJetConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, ...) error
- func (conn *NatsJetConnection) ListenWithConcurrentWorkersWithPush(ctx context.Context, m map[common.EventType]EventHandlerFunc, ...) error
- func (conn *NatsJetConnection) Publish(ce cloudevents.Event) error
- func (conn *NatsJetConnection) Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)
- func (conn *NatsJetConnection) RequestWithSyncSub(ctx context.Context, requestCe cloudevents.Event, ...) (cloudevents.Event, error)
- func (conn *NatsJetConnection) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)
- type NatsJetEventResponsePromise
- type NatsJetResponseWriter
- type NatsReplyWriter
- type NatsStanMsgConfig
- type QueryConnection
- type QueryHandlerFunc
- type ReplyStream
- type ReplyStreamWriter
- type ReplyWriter
- type StanConfig
- type StanConnection
- func (conn *StanConnection) Close() error
- func (conn *StanConnection) GetStanConn() stan.Conn
- func (conn *StanConnection) Listen(ctx context.Context, m map[common.EventType]EventHandlerFunc, ...) error
- func (conn *StanConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, ...) error
- func (conn *StanConnection) Publish(ce cloudevents.Event) error
- func (conn *StanConnection) Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)
- func (conn *StanConnection) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)
- type StanEventResponsePromise
- type StanResponseWriter
Constants ¶
const ( // DefaultNatsURL is a default nats URL DefaultNatsURL = "nats://nats:4222" // DefaultNatsClusterID is a default Cluster ID for cacao DefaultNatsClusterID = "cacao-cluster" // DefaultNatsMaxReconnect is default max reconnect trials DefaultNatsMaxReconnect = 6 // max times to reconnect within nats.connect() // DefaultNatsReconnectWait is a default delay for next reconnect DefaultNatsReconnectWait = 10 * time.Second // seconds to wait within nats.connect() // DefaultNatsRequestTimeout is default timeout for requests DefaultNatsRequestTimeout = 5 * time.Second // timeout for requests // MaxNatsRequestTimeout is the maximum timeout to query requests over nats MaxNatsRequestTimeout = 10 * time.Minute // timeout for requests // DefaultAckWait is the default duration to wait for ack DefaultAckWait = 10 * time.Second )
const ( // DefaultStanEventsTimeout is a default timeout DefaultStanEventsTimeout = 10 * time.Second // used to wait for event ops to complete, is very conservation // DefaultStanAckWaitTime is a default wait time for ack wait DefaultStanAckWaitTime = 60 * time.Second )
const AutoPopulateCloudEventSource = "AUTO_POPULATE"
AutoPopulateCloudEventSource is used to explicitly tells implementations of QueryConnection or EventConnection that the cloudevent source should be populated by the connection.
const CyVerseStreamName = "CYVERSE"
CyVerseStreamName is the stream name of the NATS JetStream. This is used to create JetStream Consumer and bind durable Subscription to.
Variables ¶
This section is empty.
Functions ¶
func ConvertNats ¶
func ConvertNats(msg *nats.Msg) (cloudevents.Event, error)
ConvertNats converts NATS message to CloudEvents message
func ConvertStan ¶
func ConvertStan(msg *stan.Msg) (cloudevents.Event, error)
ConvertStan converts STAN message to CloudEvents message
func CreateCloudEvent ¶
func CreateCloudEvent[T common.EventType | common.QueryOp](data interface{}, eventType T, source string) (cloudevents.Event, error)
CreateCloudEvent takes any object, eventType string, source string, and creates a resulting CloudEvent This utility provides the following conveniences: * uniformly assigns a new id of the format "cloudevent-" + xid * sets the time to UTC * generically marshals the data to json and appropriately assigns the cloudevent type * creates or sets a transaction id, which can later be used to pair requests to responses or corresponding events
func CreateCloudEventWithAutoSource ¶
func CreateCloudEventWithAutoSource[T common.EventType | common.QueryOp](data interface{}, eventType T) (cloudevents.Event, error)
CreateCloudEventWithAutoSource create cloudevent just like CreateCloudEvent, but with AutoPopulateCloudEventSource as the cloudevent source.
func CreateCloudEventWithTransactionID ¶
func CreateCloudEventWithTransactionID[T common.EventType | common.QueryOp](data interface{}, eventType T, source string, transactionID common.TransactionID) (cloudevents.Event, error)
CreateCloudEventWithTransactionID creates a CloudEvent with given transactionID, transactionID is optional
func GetTransactionID ¶
func GetTransactionID(ce *cloudevents.Event) common.TransactionID
GetTransactionID is a utility function that reads TransactionID from CloudEvent quick
func InitCyVerseStream ¶
func InitCyVerseStream(conf NatsStanMsgConfig) error
InitCyVerseStream creates the Streams that is used for event messaging for CACAO. This needs to be called outside of normal cacao microservices, since normal service should not manage stream itself.
func NewTransactionID ¶
func NewTransactionID() common.TransactionID
NewTransactionID is a utility function that will generate a CACAO-specific transaction ID of the form tid-<xid> Replies and events generated from a specific request should include the transaction id for easier matching of requests
func SetTransactionID ¶
func SetTransactionID(ce *cloudevents.Event, transactionID common.TransactionID)
SetTransactionID is a utility function that writes TransactionID to CloudEvent quick
Types ¶
type EventConnection ¶
type EventConnection interface { // Listen listens for some events and call the corresponding handler when // receiving those events. This should only be called once on the // EventConnection. Listen(context.Context, map[common.EventType]EventHandlerFunc, *sync.WaitGroup) error // ListenWithConcurrentWorkers will start listening for events, just like // Listen(), but this will spawn worker go-routines that consume from an internal // channel. This allows for bounded concurrency. Channel buffer and worker count // need to be passed when calling this function. ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error // Publish will publish a cloudevent, the cloudevent source will be automatically populated if it is empty or AutoPopulateCloudEventSource. Publish(ce cloudevents.Event) error Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error) }
EventConnection ...
type EventHandlerFunc ¶
type EventHandlerFunc func(ctx context.Context, event cloudevents.Event, writer EventResponseWriter) error
EventHandlerFunc ...
type EventObservable ¶
type EventObservable interface { AddListener(common.EventType, common.TransactionID, Listener) (ListenerID, error) // AddListenerMultiEventType register a listener to a set of common.EventType AddListenerMultiEventType([]common.EventType, common.TransactionID, Listener) (ListenerID, error) RemoveListenerByID(id ListenerID) }
EventObservable is an interface that allow dynamically adding and removing event listener.
type EventResponsePromise ¶
type EventResponsePromise interface { // Response wait for response Response(ctx context.Context) (cloudevents.Event, error) // ResponseChan returns a channel that will be receiving the responses ResponseChan(ctx context.Context) <-chan cloudevents.Event // Close cleans up any resources used by the promise, this is useful when you do // not want to receive the response, or when response never arrives. // // If response is successfully received, then there should be NO need to call Close() Close() error }
EventResponsePromise ...
type EventResponseWriter ¶
type EventResponseWriter interface { // Write will publish the cloudevent, if the transaction ID is empty it will be // overridden with the transaction ID associated with the Incoming event that // triggered EventHandlerFunc. Write can be called multiple times on the same EventResponseWriter. Write(ce cloudevents.Event) error // NoResponse explicitly indicate that there is no response NoResponse() }
EventResponseWriter ...
type EventSource ¶
type EventSource struct {
// contains filtered or unexported fields
}
EventSource dispatch events to registered listeners. Listeners can be registered dynamically.
func NewEventSource ¶
func NewEventSource(natsConf NatsConfig, stanConf StanConfig) *EventSource
NewEventSource creates a new EventSource
func NewEventSourceFromStanConnection ¶
func NewEventSourceFromStanConnection(conn *StanConnection) (*EventSource, error)
NewEventSourceFromStanConnection creates a EventSource from StanConnection (w/o subscription), the underlying STAN connection is re-used.
func (*EventSource) AddListener ¶
func (es *EventSource) AddListener(eventType common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)
AddListener adds a listener for event. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. This will automatically starts a STAN connection & subscription if not already.
func (*EventSource) AddListenerMultiEventType ¶
func (es *EventSource) AddListenerMultiEventType(eventTypes []common.EventType, transactionID common.TransactionID, listener Listener) (ListenerID, error)
AddListenerMultiEventType adds a listener for a set of event types. Use "" for eventType or transactionID as wildcard (cannot both be wildcard). Return a unique id of the listener, and connection-related error. If listener has set ListenOnce, it will be removed after being called once. This will automatically starts a STAN connection & subscription if not already.
func (*EventSource) RemoveListenerByID ¶
func (es *EventSource) RemoveListenerByID(id ListenerID)
RemoveListenerByID removes a listener. This will disconnect & unsubscribe if the last listener is removed.
type Listener ¶
type Listener struct { Callback ListenerCallback // if enabled, then listener will be removed after been called once. // default to false. ListenOnce bool }
Listener is callback registration for some EventType and/or TransactionID
type ListenerCallback ¶
type ListenerCallback func(ev common.EventType, ce cloudevents.Event)
ListenerCallback is the callback function for a Listener
type NatsConfig ¶
type NatsConfig struct { URL string `envconfig:"NATS_URL" default:"nats://nats:4222"` QueueGroup string `envconfig:"NATS_QUEUE_GROUP"` WildcardSubject string `envconfig:"NATS_WILDCARD_SUBJECT" default:"cyverse.>"` // WildcardSubject field is optional, only used for NATS Query // ClientID is used for Nats Client Name, STAN Client ID, cloudevent source. ClientID string `envconfig:"NATS_CLIENT_ID"` MaxReconnects int `envconfig:"NATS_MAX_RECONNECTS" default:"-1"` // in seconds, implementation should default to DefaultNatsMaxReconnect ReconnectWait int `envconfig:"NATS_RECONNECT_WAIT" default:"-1"` // in seconds, implementation should default to DefaultNatsReconnectWait RequestTimeout int `envconfig:"NATS_REQUEST_TIMEOUT" default:"-1"` // in seconds, implementation should default to DefaultNatsRequestTimeout // EventWildcardSubject field is optional, only used for NATS JetStream Events EventWildcardSubject string `envconfig:"NATS_JS_WILDCARD_SUBJECT"` // AckWaitSec is the seconds to wait for acknowledgement or time the messaging // bus wait before attempting redelivery, this should be set to the maximum // seconds it takes for service to process an operation. AckWaitSec int `envconfig:"NATS_JS_ACK_WAIT" default:"-1"` // in seconds, implementation should default to DefaultAckWait }
NatsConfig ...
func (NatsConfig) ConnectNats ¶
func (conf NatsConfig) ConnectNats() (NatsConnection, error)
ConnectNats ...
type NatsConnection ¶
type NatsConnection struct {
// contains filtered or unexported fields
}
NatsConnection implements QueryConnection
func NewNatsConnection ¶
func NewNatsConnection(conf NatsStanMsgConfig, nc *nats.Conn) (NatsConnection, error)
NewNatsConnection creates NatsConnection from an existing nats.Conn, this is useful if one uses different connection options to establish the connection.
func NewNatsConnectionFromConfig ¶
func NewNatsConnectionFromConfig(conf NatsStanMsgConfig) (NatsConnection, error)
NewNatsConnectionFromConfig ...
func (*NatsConnection) GetNatsConn ¶
func (conn *NatsConnection) GetNatsConn() *nats.Conn
GetNatsConn returns the underlying nats.Conn (could be nil if not initialized), this is useful if one wants to implement custom NATS messaging.
func (*NatsConnection) Listen ¶
func (conn *NatsConnection) Listen(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup) error
Listen ...
func (*NatsConnection) ListenWithConcurrentWorkers ¶
func (conn *NatsConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error
ListenWithConcurrentWorkers will create an internal channels to subscribe to nats.Msg. Multiple concurrent worker go-routine will consume msg from the channels. This allows for bounded concurrency.
func (*NatsConnection) Request ¶
func (conn *NatsConnection) Request(ctx context.Context, request cloudevents.Event) (reply cloudevents.Event, err error)
Request ...
func (*NatsConnection) RequestStream ¶
func (conn *NatsConnection) RequestStream(parentCtx context.Context, request cloudevents.Event) (ReplyStream, error)
RequestStream ...
type NatsJetConnection ¶
type NatsJetConnection struct {
// contains filtered or unexported fields
}
NatsJetConnection is a EventConnection that builds on NATS JetStream
func NewNatsJetConnection ¶
func NewNatsJetConnection(config NatsStanMsgConfig, nc *nats.Conn) (NatsJetConnection, error)
NewNatsJetConnection creates a new NATS JetStream connection from existing nats.Conn. This is useful when one creates nats.Conn with different connection options.
func NewNatsJetConnectionFromConfig ¶
func NewNatsJetConnectionFromConfig(config NatsStanMsgConfig) (NatsJetConnection, error)
NewNatsJetConnectionFromConfig creates a new NATS JetStream connection.
func (*NatsJetConnection) GetJetStreamContext ¶
func (conn *NatsJetConnection) GetJetStreamContext() nats.JetStreamContext
GetJetStreamContext returns the underlying nats.JetStreamContext, this could be useful if one wants to implement some custom messaging not covered in this implementation.
func (*NatsJetConnection) Listen ¶
func (conn *NatsJetConnection) Listen(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup) error
Listen ...
func (*NatsJetConnection) ListenWithConcurrentWorkers ¶
func (conn *NatsJetConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error
ListenWithConcurrentWorkers ...
func (*NatsJetConnection) ListenWithConcurrentWorkersWithPush ¶
func (conn *NatsJetConnection) ListenWithConcurrentWorkersWithPush(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error
ListenWithConcurrentWorkersWithPush is ListenWithConcurrentWorkers but with push consumer
func (*NatsJetConnection) Publish ¶
func (conn *NatsJetConnection) Publish(ce cloudevents.Event) error
Publish ...
func (*NatsJetConnection) Request ¶
func (conn *NatsJetConnection) Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)
Request sends a request event, and returns a promise that wait for response.
TODO compute subscription subject from responseEventTypes, e.g. use computeSubscribeSubjectFromEventTypes(), so that we receive fewer events that are not relevant.
func (*NatsJetConnection) RequestWithSyncSub ¶
func (conn *NatsJetConnection) RequestWithSyncSub(ctx context.Context, requestCe cloudevents.Event, responseEventTypes []common.EventType) (cloudevents.Event, error)
RequestWithSyncSub does the same thing as Request but using a sync subscription.
func (*NatsJetConnection) WaitForResponse ¶
func (conn *NatsJetConnection) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)
WaitForResponse ...
type NatsJetEventResponsePromise ¶
type NatsJetEventResponsePromise struct {
// contains filtered or unexported fields
}
NatsJetEventResponsePromise ...
func (*NatsJetEventResponsePromise) Close ¶
func (rp *NatsJetEventResponsePromise) Close() error
Close ...
func (*NatsJetEventResponsePromise) Response ¶
func (rp *NatsJetEventResponsePromise) Response(ctx context.Context) (cloudevents.Event, error)
Response ...
func (*NatsJetEventResponsePromise) ResponseChan ¶
func (rp *NatsJetEventResponsePromise) ResponseChan(ctx context.Context) <-chan cloudevents.Event
ResponseChan ... TODO consider have channel carry error as well (make the channel type a struct with cloudevent and error)
type NatsJetResponseWriter ¶
type NatsJetResponseWriter struct {
// contains filtered or unexported fields
}
NatsJetResponseWriter implements EventResponseWriter, this is used for EventHandlerFunc to publish events.
func (NatsJetResponseWriter) NoResponse ¶
func (rw NatsJetResponseWriter) NoResponse()
NoResponse ...
func (NatsJetResponseWriter) Write ¶
func (rw NatsJetResponseWriter) Write(ce cloudevents.Event) error
Write publish cloudevent, only set transaction ID when cloudevent does not have one
type NatsReplyWriter ¶
type NatsReplyWriter struct {
// contains filtered or unexported fields
}
NatsReplyWriter implements ReplyWriter
func (NatsReplyWriter) Write ¶
func (n NatsReplyWriter) Write(event cloudevents.Event) error
Write a cloudevent as reply to the request. This can be called multiple time.
type NatsStanMsgConfig ¶
type NatsStanMsgConfig struct { NatsConfig StanConfig }
NatsStanMsgConfig is NATS & STAN config combined
func (NatsStanMsgConfig) ConnectNats ¶
func (conf NatsStanMsgConfig) ConnectNats() (NatsConnection, error)
ConnectNats ...
func (NatsStanMsgConfig) ConnectStan ¶
func (conf NatsStanMsgConfig) ConnectStan() (StanConnection, error)
ConnectStan ...
type QueryConnection ¶
type QueryConnection interface { Request(ctx context.Context, requestCe cloudevents.Event) (reply cloudevents.Event, err error) // RequestStream request for multiple replies. // An example of the implementation is Scatter-Gather pattern. // https://docs.nats.io/using-nats/developer/sending/request_reply#scatter-gather RequestStream(ctx context.Context, requestCe cloudevents.Event) (ReplyStream, error) // Listen will start listening for queries, and call corresponding handlers if // query type matches what is provided in the map. // // Caller can stop listening by cancel the context passed to Listen(). WaitGroup // is for waiting for the Listen to stop, since stop listening is not immediate. // // Note that this is a non-blocking call, as in it does not block until the // listen stopped, use the WaitGroup to wait for Listen to be stopped. Listen(context.Context, map[common.QueryOp]QueryHandlerFunc, *sync.WaitGroup) error // ListenWithConcurrentWorkers will start listening for queries, just like // Listen(), but this will spawn worker go-routines that consume from an internal // channel. This allows for bounded concurrency. Channel buffer and worker count // need to be passed when calling this function. ListenWithConcurrentWorkers(ctx context.Context, m map[common.QueryOp]QueryHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error }
QueryConnection ...
type QueryHandlerFunc ¶
type QueryHandlerFunc func(ctx context.Context, request cloudevents.Event, writer ReplyWriter)
QueryHandlerFunc is handler function for query, used by QueryConnection.Listen()
type ReplyStream ¶
type ReplyStream interface { ExpectNext() bool Next() (cloudevents.Event, error) }
ReplyStream is for getting back potentially multiple replies.
type ReplyStreamWriter ¶
type ReplyStreamWriter struct {
// contains filtered or unexported fields
}
ReplyStreamWriter ...
func NewReplyStreamWriter ¶
func NewReplyStreamWriter(writer ReplyWriter) *ReplyStreamWriter
NewReplyStreamWriter ...
func (*ReplyStreamWriter) Flush ¶
func (r *ReplyStreamWriter) Flush() error
Flush flushes all cloudevent
func (*ReplyStreamWriter) Write ¶
func (r *ReplyStreamWriter) Write(event cloudevents.Event) error
Write implements ReplyWriter, events are not sent until calling Flush().
type StanConfig ¶
type StanConfig struct { // ClusterID is STAN specific ClusterID string `envconfig:"NATS_CLUSTER_ID" default:"cacao-cluster"` // DurableName is STAN/JetStream specific DurableName string `envconfig:"NATS_DURABLE_NAME"` EventsTimeout int `envconfig:"NATS_EVENTS_TIMEOUT" default:"-1"` // in seconds, implementation should default to DefaultStanEventsTimeout }
StanConfig ...
type StanConnection ¶
type StanConnection struct {
// contains filtered or unexported fields
}
StanConnection implements EventConnection
func NewStanConnectionFromConfig ¶
func NewStanConnectionFromConfig(conf NatsStanMsgConfig) (StanConnection, error)
NewStanConnectionFromConfig ...
func NewStanConnectionFromConfigWithoutEventSource ¶
func NewStanConnectionFromConfigWithoutEventSource(conf NatsStanMsgConfig) (StanConnection, error)
NewStanConnectionFromConfigWithoutEventSource will create a STAN connection that does not have EventSource. StanConnection created by this function should not use StanConnection.Request() or StanConnection.WaitForResponse()
func (*StanConnection) GetStanConn ¶
func (conn *StanConnection) GetStanConn() stan.Conn
GetStanConn ...
func (*StanConnection) Listen ¶
func (conn *StanConnection) Listen(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup) error
Listen ...
func (*StanConnection) ListenWithConcurrentWorkers ¶
func (conn *StanConnection) ListenWithConcurrentWorkers(ctx context.Context, m map[common.EventType]EventHandlerFunc, wg *sync.WaitGroup, channelBufferLen uint, workerCount uint) error
ListenWithConcurrentWorkers ...
func (*StanConnection) Publish ¶
func (conn *StanConnection) Publish(ce cloudevents.Event) error
Publish ...
func (*StanConnection) Request ¶
func (conn *StanConnection) Request(requestCe cloudevents.Event, responseEventTypes []common.EventType) (EventResponsePromise, error)
Request ...
func (*StanConnection) WaitForResponse ¶
func (conn *StanConnection) WaitForResponse(transactionID common.TransactionID, responseEventTypes []common.EventType) (EventResponsePromise, error)
WaitForResponse ...
type StanEventResponsePromise ¶
type StanEventResponsePromise struct {
// contains filtered or unexported fields
}
StanEventResponsePromise ...
func (*StanEventResponsePromise) Close ¶
func (rp *StanEventResponsePromise) Close() error
Close ...
func (*StanEventResponsePromise) Response ¶
func (rp *StanEventResponsePromise) Response(ctx context.Context) (cloudevents.Event, error)
Response ...
func (*StanEventResponsePromise) ResponseChan ¶
func (rp *StanEventResponsePromise) ResponseChan(context.Context) <-chan cloudevents.Event
ResponseChan ...
type StanResponseWriter ¶
type StanResponseWriter struct {
// contains filtered or unexported fields
}
StanResponseWriter ...
func (StanResponseWriter) Write ¶
func (rw StanResponseWriter) Write(ce cloudevents.Event) error
Write publish cloudevent, only set transaction ID when cloudevent does not have one