Documentation ¶
Overview ¶
Package events provides common utilities and formats for working with infratographer events
Index ¶
- Variables
- func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet, appName string)
- func MustViperFlagsForNATS(v *viper.Viper, flags *pflag.FlagSet, appName string)
- func NATSConsumerDurableName(queueGroup, subject string) string
- type AuthRelationshipAction
- type AuthRelationshipPublisher
- type AuthRelationshipRelation
- type AuthRelationshipRequest
- type AuthRelationshipResponse
- type AuthRelationshipSubscriber
- type ChangeMessage
- type ChangeType
- type Config
- type Connection
- type EventMessage
- type FieldChange
- type Message
- type NATSAuthRelationshipRequest
- type NATSConfig
- type NATSConnection
- func (c *NATSConnection) PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)
- func (c *NATSConnection) PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)
- func (c *NATSConnection) PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)
- func (c *NATSConnection) Shutdown(ctx context.Context) error
- func (c *NATSConnection) Source() any
- func (c *NATSConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
- func (c *NATSConnection) SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)
- func (c *NATSConnection) SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)
- type NATSMessage
- func (m *NATSMessage[T]) Ack() error
- func (m *NATSMessage[T]) Connection() Connection
- func (m *NATSMessage[T]) Deliveries() uint64
- func (m *NATSMessage[T]) Error() error
- func (m *NATSMessage[T]) ID() string
- func (m *NATSMessage[T]) Message() T
- func (m *NATSMessage[T]) Nak(delay time.Duration) error
- func (m *NATSMessage[T]) Source() any
- func (m *NATSMessage[T]) Term() error
- func (m *NATSMessage[T]) Timestamp() time.Time
- func (m *NATSMessage[T]) Topic() string
- type NATSOption
- type Option
- type Publisher
- type Request
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // ErrProviderNotConfigured is an error packages should return if no events provider is configured. ErrProviderNotConfigured = errors.New("events provider not configured") // ErrMissingChangeMessageEventType is returned when the event message has the incorrect field EventType value. ErrMissingChangeMessageEventType = errors.New("change message EventType field required") // ErrMissingChangeMessageSubjectID is returned when the event message has the incorrect field SubjectID value. ErrMissingChangeMessageSubjectID = errors.New("change message SubjectID field required") // ErrMissingEventMessageEventType is returned when the event message has the incorrect field EventType value. ErrMissingEventMessageEventType = errors.New("event message EventType field required") // ErrMissingEventMessageSubjectID is returned when the event message has the incorrect field SubjectID value. ErrMissingEventMessageSubjectID = errors.New("event message SubjectID field required") // ErrInvalidAuthRelationshipRequestAction is returned when the event message has the incorrect field Action value. ErrInvalidAuthRelationshipRequestAction = errors.New("auth relationship request message Action field must be write or delete") // ErrMissingAuthRelationshipRequestObjectID is returned when the event message has the incorrect field ObjectID value. ErrMissingAuthRelationshipRequestObjectID = errors.New("auth relationship request message ObjectID field required") // ErrMissingAuthRelationshipRequestRelation is returned when the event message has no relations defined. ErrMissingAuthRelationshipRequestRelation = errors.New("auth relationship request message Relations field required") // ErrMissingAuthRelationshipRequestRelationRelation is returned when the event message Relations has the incorrect field for Relation value. ErrMissingAuthRelationshipRequestRelationRelation = errors.New("auth relationship request message Relations Relation field required") // ErrMissingAuthRelationshipRequestRelationSubjectID is returned when the event message Relations has the incorrect field SubjectID value. ErrMissingAuthRelationshipRequestRelationSubjectID = errors.New("auth relationship request message Relations SubjectID field required") // ErrRequestNoResponders is returned when a request is attempted but no responder is listening. ErrRequestNoResponders = errors.New("no responders for request") )
var ( // CreateChangeType provides the event type for create events CreateChangeType ChangeType = "create" // UpdateChangeType provides the event type for update events UpdateChangeType ChangeType = "update" // DeleteChangeType provides the event type for delete events DeleteChangeType ChangeType = "delete" // WriteAuthRelationshipAction provides the auth relationship action for write requests WriteAuthRelationshipAction AuthRelationshipAction = "write" // DeleteAuthRelationshipAction provides the auth relationship action for delete requests DeleteAuthRelationshipAction AuthRelationshipAction = "delete" )
var ( // NATSDefaultConnectTimeout is the default connection timeout. NATSDefaultConnectTimeout = 5 * time.Second // NATSDefaultSubscriberFetchBatchSize is the default pull subscribe batch size. NATSDefaultSubscriberFetchBatchSize = 20 // NATSDefaultSubscriberFetchTimeout is the max time a fetch will block before releasing. NATSDefaultSubscriberFetchTimeout = 5 * time.Second // NATSDefaultSubscriberFetchBackoff is the delay between a batch attempts. NATSDefaultSubscriberFetchBackoff = 5 * time.Second // NATSDefaultShutdownTimeout is the timeout for a shutdown to complete. NATSDefaultShutdownTimeout = 5 * time.Second )
var ( // ErrNATSInvalidAuthConfiguration is returned when the config has both Tokena nd CredsFile specified. ErrNATSInvalidAuthConfiguration = errors.New("invalid nats confinguration, both token and creds file are specified") // ErrNATSInvalidDeliveryPolicy is returned when an incorrect delivery policy is provided. ErrNATSInvalidDeliveryPolicy = errors.New("invalid delivery policy, expected all|last|last-per-subject|new|start-sequence|start-time") // ErrNATSMessageNoReplySubject is returned when calling ReplyAuthRelationshipRequest when the request has no reply subject defined. ErrNATSMessageNoReplySubject = errors.New("unable to reply to auth relationship request, no reply subject specified") )
Functions ¶
func MustViperFlags ¶ added in v0.3.5
MustViperFlags returns the cobra flags and viper config for events.
func MustViperFlagsForNATS ¶ added in v0.3.5
MustViperFlagsForNATS returns the cobra flags and viper config for a nats handler.
func NATSConsumerDurableName ¶ added in v0.3.5
NATSConsumerDurableName is the generator function to create a new durable consumer name. If queueGroup is empty, an empty durable name is returned to support ephemeral consumers.
Types ¶
type AuthRelationshipAction ¶ added in v0.3.4
type AuthRelationshipAction string
AuthRelationshipAction represents the possible auth relationship request actions for an AuthRelationshipRequest
type AuthRelationshipPublisher ¶ added in v0.3.5
type AuthRelationshipPublisher interface { // PublishAuthRelationshipRequest publishes to the specified topic with the message given. PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error) }
AuthRelationshipPublisher specifies the auth relationship publisher methods.
type AuthRelationshipRelation ¶ added in v0.3.5
type AuthRelationshipRelation struct { // Relation is the name of the relation the object from AuthRelationshipRequest has to the subject. Relation string `json:"relation"` // The subject the relation is to. SubjectID gidx.PrefixedID `json:"subjectID"` }
AuthRelationshipRelation defines the relation an object from an AuthRelationshipRequest has to a subject.
func (AuthRelationshipRelation) Validate ¶ added in v0.3.5
func (r AuthRelationshipRelation) Validate() error
Validate ensures the message has all the required fields.
type AuthRelationshipRequest ¶ added in v0.3.3
type AuthRelationshipRequest struct { // Action describes the type of action being performed. Valid options are "write" and "delete". Action AuthRelationshipAction `json:"action"` // ObjectID is the PrefixedID of the object the permissions will be granted on ObjectID gidx.PrefixedID `json:"objectID"` // Relations defines all relations which should be written or deleted for this object. Relations []AuthRelationshipRelation `json:"relations"` // ConditionName represents the name of a conditional check that will be applied to this relationship. (Optional) // In SpiceDB this would be a caveat name ConditionName string `json:"conditionName"` // ConditionValues are the condition values to be used on the condition check. (Optional) ConditionValues map[string]interface{} `json:"conditionValue"` // TraceContext is a map of values used for OpenTelemetry context propagation. TraceContext map[string]string `json:"traceContext"` // TraceID is the ID of the trace for this event // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. TraceID string `json:"traceID"` // SpanID is the ID of the span that additional traces should based off of // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. SpanID string `json:"spanID"` }
AuthRelationshipRequest contains the data structure expected to be used to write or delete an auth relationship from PermissionsAPI
func UnmarshalAuthRelationshipRequest ¶ added in v0.3.3
func UnmarshalAuthRelationshipRequest(b []byte) (AuthRelationshipRequest, error)
UnmarshalAuthRelationshipRequest returns an AuthRelationshipRequest from a json []byte.
func (AuthRelationshipRequest) GetTraceContext ¶ added in v0.3.5
func (m AuthRelationshipRequest) GetTraceContext(ctx context.Context) context.Context
GetTraceContext creates a new OpenTelementry context for the message.
func (AuthRelationshipRequest) Validate ¶ added in v0.3.5
func (m AuthRelationshipRequest) Validate() error
Validate ensures the message has all the required fields.
type AuthRelationshipResponse ¶ added in v0.3.3
type AuthRelationshipResponse struct { // Errors contains any errors, if empty the request was successful Errors []error `json:"errors"` // TraceContext is a map of values used for OpenTelemetry context propagation. TraceContext map[string]string `json:"traceContext"` // TraceID is the ID of the trace for this event // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. TraceID string `json:"traceID"` // SpanID is the ID of the span that additional traces should based off of // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. SpanID string `json:"spanID"` }
AuthRelationshipResponse contains the data structure expected to be received from an AuthRelationshipRequest message to write or delete an auth relationship from PermissionsAPI
func UnmarshalAuthRelationshipResponse ¶ added in v0.3.3
func UnmarshalAuthRelationshipResponse(b []byte) (AuthRelationshipResponse, error)
UnmarshalAuthRelationshipResponse returns an AuthRelationshipRsponse from a json []byte.
func (AuthRelationshipResponse) GetTraceContext ¶ added in v0.3.5
func (m AuthRelationshipResponse) GetTraceContext(ctx context.Context) context.Context
GetTraceContext creates a new OpenTelementry context for the message.
func (AuthRelationshipResponse) Validate ¶ added in v0.3.5
func (m AuthRelationshipResponse) Validate() error
Validate ensures the message has all the required fields.
type AuthRelationshipSubscriber ¶ added in v0.3.5
type AuthRelationshipSubscriber interface { // SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message. SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error) }
AuthRelationshipSubscriber specifies the auth relationship subscriber methods.
type ChangeMessage ¶
type ChangeMessage struct { // SubjectID is the PrefixedID representing the node of the topic of this message SubjectID gidx.PrefixedID `json:"subjectID"` // EventType describes the type of event that has triggered this message EventType string `json:"eventType"` // AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"` // ActorID is the PrefixedID representing the identity of the actor that caused this message to be triggered ActorID gidx.PrefixedID `json:"actorID"` // Source is a string representing the identity of the source system that created the message Source string `json:"source"` // Timestamp is the time representing when the message was created Timestamp time.Time `json:"timestamp"` // TraceContext is a map of values used for OpenTelemetry context propagation. TraceContext map[string]string `json:"traceContext"` // TraceID is the ID of the trace for this event // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. TraceID string `json:"traceID"` // SpanID is the ID of the span that additional traces should based off of // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. SpanID string `json:"spanID"` // SubjectFields is a map of the fields on the subject SubjectFields map[string]string `json:"subjectFields"` // Changeset is an optional map of the fields that changed triggering this message, this should be provided if the source can provide a changeset FieldChanges []FieldChange `json:"fieldChanges"` // AdditionalData is a field to store any addition information that may be important to include with your message AdditionalData map[string]interface{} `json:"additionalData"` }
ChangeMessage contains the data structure expected to be received when picking an event from a changes message queue
func UnmarshalChangeMessage ¶
func UnmarshalChangeMessage(b []byte) (ChangeMessage, error)
UnmarshalChangeMessage returns a ChangeMessage from a json []byte.
func (ChangeMessage) GetAddSubjects ¶ added in v0.4.0
func (m ChangeMessage) GetAddSubjects() []gidx.PrefixedID
GetAddSubjects returns the additional subjects of the message
func (ChangeMessage) GetEventType ¶ added in v0.4.0
func (m ChangeMessage) GetEventType() string
GetEventType returns the event type of the message
func (ChangeMessage) GetSubject ¶ added in v0.4.0
func (m ChangeMessage) GetSubject() gidx.PrefixedID
GetSubject returns the subject of the message
func (ChangeMessage) GetTraceContext ¶ added in v0.3.5
func (m ChangeMessage) GetTraceContext(ctx context.Context) context.Context
GetTraceContext creates a new OpenTelementry context for the message.
func (ChangeMessage) Validate ¶ added in v0.3.5
func (m ChangeMessage) Validate() error
Validate ensures the message has all the required fields.
type ChangeType ¶
type ChangeType string
ChangeType represents the possible event types for a ChangeMessage
type Config ¶ added in v0.3.5
type Config struct {
NATS NATSConfig `mapstructure:"nats"`
}
Config contains event provider configs.
type Connection ¶ added in v0.3.5
type Connection interface { // Gracefully close the connection. Shutdown(ctx context.Context) error // Source gives you the raw underlying connection object. Source() any Subscriber Publisher AuthRelationshipSubscriber AuthRelationshipPublisher }
Connection defines a connection handler.
func NewConnection ¶ added in v0.3.5
func NewConnection(config Config, options ...Option) (Connection, error)
NewConnection creates a new Connection from the provided config.
type EventMessage ¶
type EventMessage struct { // SubjectID is the PrefixedID representing the node of the topic of this message SubjectID gidx.PrefixedID `json:"subjectID"` // EventType describes the type of event that has triggered this message EventType string `json:"eventType"` // AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"` // Source is a string representing the identity of the source system that created the message Source string `json:"source"` // Timestamp is the time representing when the message was created Timestamp time.Time `json:"timestamp"` // TraceContext is a map of values used for OpenTelemetry context propagation. TraceContext map[string]string `json:"traceContext"` // TraceID is the ID of the trace for this event // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. TraceID string `json:"traceID"` // SpanID is the ID of the span that additional traces should based off of // Deprecated: Use TraceContext with OpenTelemetry context propagation instead. SpanID string `json:"spanID"` // Data is a field to store any information that may be important to include about the event Data map[string]interface{} `json:"data"` }
EventMessage contains the data structure expected to be received when picking an event from an events message queue
func UnmarshalEventMessage ¶
func UnmarshalEventMessage(b []byte) (EventMessage, error)
UnmarshalEventMessage returns a EventMessage from a json []byte.
func (EventMessage) GetAddSubjects ¶ added in v0.4.0
func (m EventMessage) GetAddSubjects() []gidx.PrefixedID
GetAddSubjects returns the additional subjects of the message
func (EventMessage) GetEventType ¶ added in v0.4.0
func (m EventMessage) GetEventType() string
GetEventType returns the event type of the message
func (EventMessage) GetSubject ¶ added in v0.4.0
func (m EventMessage) GetSubject() gidx.PrefixedID
GetSubject returns the subject of the message
func (EventMessage) GetTraceContext ¶ added in v0.3.5
func (m EventMessage) GetTraceContext(ctx context.Context) context.Context
GetTraceContext creates a new OpenTelementry context for the message.
func (EventMessage) Validate ¶ added in v0.3.5
func (m EventMessage) Validate() error
Validate ensures the message has all the required fields.
type FieldChange ¶
type FieldChange struct { // Field is the name of the field that changed Field string `json:"field"` // PreviousValue is the value the field had before the change PreviousValue string `json:"previousValue"` // CurrentValue is the new value of the field after the change CurrentValue string `json:"currentValue"` }
FieldChange represents a single field that was changed in a changeset and is used to map fields to the old and new values
type Message ¶ added in v0.3.5
type Message[T any] interface { // Connection returns the underlying connection the message was received on. Connection() Connection // ID returns the unique message id. ID() string // Topic returns the topic the message was sent to. Topic() string // Message returns the decoded message object. Message() T // Ack acks the message. Ack() error // Nak nacks the message. Nak(delay time.Duration) error // Term terminates the message. Term() error // Timestamp returns the time the message was submitted. Timestamp() time.Time // Deliveries returns the number of times the message was delivered. Deliveries() uint64 // Error returns any error encountered while decoding the message Error() error // Source returns the underlying message object. Source() any }
Message contains a message which has been published or received from a subscription.
type NATSAuthRelationshipRequest ¶ added in v0.3.5
type NATSAuthRelationshipRequest struct { *NATSMessage[AuthRelationshipRequest] }
NATSAuthRelationshipRequest implements Request for AuthRelationshipRequest / AuthRelationshipResponse
func (*NATSAuthRelationshipRequest) Reply ¶ added in v0.3.5
func (r *NATSAuthRelationshipRequest) Reply(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error)
Reply responds to an AuthRelationshipRequest with an AuthRelationshipResponse.
type NATSConfig ¶
type NATSConfig struct { URL string SubscribePrefix string PublishPrefix string QueueGroup string Token string CredsFile string Source string ConnectTimeout time.Duration ShutdownTimeout time.Duration SubscriberFetchBatchSize int SubscriberFetchTimeout time.Duration SubscriberFetchBackoff time.Duration SubscriberNoAckExplicit bool SubscriberNoManualAck bool SubscriberDeliveryPolicy string SubscriberStartSequence uint64 SubscriberStartTime time.Time // contains filtered or unexported fields }
NATSConfig defines the NATS connection configuration.
func (NATSConfig) Configured ¶ added in v0.3.5
func (c NATSConfig) Configured() bool
Configured checks whether the provider has been configured.
func (NATSConfig) Validate ¶ added in v0.3.5
func (c NATSConfig) Validate() error
Validate ensures the configuration is valid.
func (NATSConfig) WithDefaults ¶ added in v0.3.5
func (c NATSConfig) WithDefaults() NATSConfig
WithDefaults sets default values for the field unset.
type NATSConnection ¶ added in v0.3.5
type NATSConnection struct {
// contains filtered or unexported fields
}
NATSConnection implements Connection.
func NewNATSConnection ¶ added in v0.3.5
func NewNATSConnection(config NATSConfig, options ...NATSOption) (*NATSConnection, error)
NewNATSConnection creates a new nats jetstream connection.
func (*NATSConnection) PublishAuthRelationshipRequest ¶ added in v0.3.5
func (c *NATSConnection) PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)
PublishAuthRelationshipRequest publishes an AuthRelationshipRequest message and blocks until an AuthRelationshipResponse is provided.
func (*NATSConnection) PublishChange ¶ added in v0.3.5
func (c *NATSConnection) PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)
PublishChange publishes a ChangeMessage.
func (*NATSConnection) PublishEvent ¶ added in v0.3.5
func (c *NATSConnection) PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)
PublishEvent publishes an EventMessage.
func (*NATSConnection) Shutdown ¶ added in v0.3.5
func (c *NATSConnection) Shutdown(ctx context.Context) error
Shutdown gracefully drains the connection.
func (*NATSConnection) Source ¶ added in v0.3.5
func (c *NATSConnection) Source() any
Source returns the underlying NATS Connection.
func (*NATSConnection) SubscribeAuthRelationshipRequests ¶ added in v0.3.5
func (c *NATSConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
SubscribeAuthRelationshipRequests creates a new pull subscription parsing incoming messages as AuthRelationshipRequest messages and returning a new Message channel.
func (*NATSConnection) SubscribeChanges ¶ added in v0.3.5
func (c *NATSConnection) SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)
SubscribeChanges creates a new pull subscription parsing incoming messages as ChangeMessage messages and returning a new Message channel.
func (*NATSConnection) SubscribeEvents ¶ added in v0.3.5
func (c *NATSConnection) SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)
SubscribeEvents creates a new pull subscription parsing incoming messages as EventMessage messages and returning a new Message channel.
type NATSMessage ¶ added in v0.3.5
type NATSMessage[T any] struct { // contains filtered or unexported fields }
NATSMessage implements Message
func (*NATSMessage[T]) Ack ¶ added in v0.3.5
func (m *NATSMessage[T]) Ack() error
Ack acks the message.
func (*NATSMessage[T]) Connection ¶ added in v0.3.5
func (m *NATSMessage[T]) Connection() Connection
Connection returns the underlying Connection.
func (*NATSMessage[T]) Deliveries ¶ added in v0.3.5
func (m *NATSMessage[T]) Deliveries() uint64
Deliveries returns the number of times the message was delivered.
func (*NATSMessage[T]) Error ¶ added in v0.3.5
func (m *NATSMessage[T]) Error() error
Error returns any error with the message.
func (*NATSMessage[T]) ID ¶ added in v0.3.5
func (m *NATSMessage[T]) ID() string
ID returns the nats message sequence number for the consumer.
func (*NATSMessage[T]) Message ¶ added in v0.3.5
func (m *NATSMessage[T]) Message() T
Message returns the decoded message object.
func (*NATSMessage[T]) Nak ¶ added in v0.3.5
func (m *NATSMessage[T]) Nak(delay time.Duration) error
Nak calls a Nak with the provided delay.
func (*NATSMessage[T]) Source ¶ added in v0.3.5
func (m *NATSMessage[T]) Source() any
Source returns the underlying nats message.
func (*NATSMessage[T]) Term ¶ added in v0.3.5
func (m *NATSMessage[T]) Term() error
Term terminates the message from being processed again.
func (*NATSMessage[T]) Timestamp ¶ added in v0.3.5
func (m *NATSMessage[T]) Timestamp() time.Time
Timestamp returns the timestamp of the message.
func (*NATSMessage[T]) Topic ¶ added in v0.3.5
func (m *NATSMessage[T]) Topic() string
Topic returns the nats subject.
type NATSOption ¶ added in v0.3.5
type NATSOption func(c *NATSConfig) error
NATSOption defines a nats configuration option.
func WithNATSConnectOptions ¶ added in v0.3.5
func WithNATSConnectOptions(options ...nats.Option) NATSOption
WithNATSConnectOptions configures the connection options for nats.
func WithNATSJetStreamOptions ¶ added in v0.3.5
func WithNATSJetStreamOptions(options ...nats.JSOpt) NATSOption
WithNATSJetStreamOptions configures the jetstream connection options.
func WithNATSLogger ¶ added in v0.3.5
func WithNATSLogger(logger *zap.SugaredLogger) NATSOption
WithNATSLogger sets the logger for the nats connection.
func WithNATSSubscribeOptions ¶ added in v0.3.5
func WithNATSSubscribeOptions(options ...nats.SubOpt) NATSOption
WithNATSSubscribeOptions configures the subscribe options for new subscriptions.
type Option ¶ added in v0.3.5
Option configures a connection option.
func WithLogger ¶ added in v0.3.5
func WithLogger(logger *zap.SugaredLogger) Option
WithLogger sets the logger for the connection.
func WithNATSOptions ¶ added in v0.3.5
func WithNATSOptions(options ...NATSOption) Option
WithNATSOptions configures nats options.
type Publisher ¶
type Publisher interface { // PublishChange publishes to the specified topic with the message given. PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error) // PublishEvent publishes to the specified topic with the message given. PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error) }
Publisher specifies publisher methods.
type Request ¶ added in v0.3.5
type Request[TRequest, TResponse any] interface { Message[TRequest] // Reply publishes a response to the received message. Reply(ctx context.Context, message TResponse) (Message[TResponse], error) }
Request extends Message by allowing replies to be sent for the received message.
type Subscriber ¶
type Subscriber interface { // SubscribeChanges subscribes to the provided topic responding with an ChangeMessage message. SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error) // SubscribeEvents subscribes to the provided topic responding with an EventMessage message. SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error) }
Subscriber specifies subscriber methods.