Documentation ¶
Overview ¶
Package events provides common utilities and formats for working with infratographer events
Index ¶
- Variables
- func MustViperFlagsForPublisher(v *viper.Viper, flags *pflag.FlagSet, appName string)
- func MustViperFlagsForSubscriber(v *viper.Viper, flags *pflag.FlagSet)
- type ChangeMessage
- type ChangeType
- type EventMessage
- type FieldChange
- type NATSConfig
- type Publisher
- type PublisherConfig
- type Subscriber
- type SubscriberConfig
Constants ¶
This section is empty.
Variables ¶
var ErrMissingEventType = errors.New("event type missing")
ErrMissingEventType is returned when attempting to publish an event without an event type specified
var ErrUnsupportedPubsub = errors.New("unsupported pubsub provider")
ErrUnsupportedPubsub is returned when the pubsub URL is not a supported provider
Functions ¶
func MustViperFlagsForPublisher ¶
MustViperFlagsForPublisher returns the cobra flags and viper config for an event publisher
Types ¶
type ChangeMessage ¶
type ChangeMessage struct { // SubjectID is the PrefixedID representing the node of the topic of this message SubjectID gidx.PrefixedID `json:"subjectID"` // EventType describes the type of event that has triggered this message EventType string `json:"eventType"` // AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"` // ActorID is the PrefixedID representing the identity of the actor that caused this message to be triggered ActorID gidx.PrefixedID `json:"actorID"` // Source is a string representing the identity of the source system that created the message Source string `json:"source"` // Timestamp is the time representing when the message was created Timestamp time.Time `json:"timestamp"` // TraceID is the ID of the trace for this event TraceID string `json:"traceID"` // SpanID is the ID of the span that additional traces should based off of SpanID string `json:"spanID"` // SubjectFields is a map of the fields on the subject SubjectFields map[string]string `json:"subjectFields"` // Changeset is an optional map of the fields that changed triggering this message, this should be provided if the source can provide a changeset FieldChanges []FieldChange `json:"fieldChanges"` // AdditionalData is a field to store any addition information that may be important to include with your message AdditionalData map[string]interface{} `json:"additionalData"` }
ChangeMessage contains the data structure expected to be received when picking an event from a changes message queue
func UnmarshalChangeMessage ¶
func UnmarshalChangeMessage(b []byte) (ChangeMessage, error)
UnmarshalChangeMessage returns a ChangeMessage from a json []byte.
type ChangeType ¶
type ChangeType string
ChangeType represents the possible event types for a ChangeMessage
var ( // CreateChangeType provides the event type for create events CreateChangeType ChangeType = "create" // UpdateChangeType provides the event type for update events UpdateChangeType ChangeType = "update" // DeleteChangeType provides the event type for delete events DeleteChangeType ChangeType = "delete" )
type EventMessage ¶
type EventMessage struct { // SubjectID is the PrefixedID representing the node of the topic of this message SubjectID gidx.PrefixedID `json:"subjectID"` // EventType describes the type of event that has triggered this message EventType string `json:"eventType"` // AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"` // Source is a string representing the identity of the source system that created the message Source string `json:"source"` // Timestamp is the time representing when the message was created Timestamp time.Time `json:"timestamp"` // TraceID is the ID of the trace for this event TraceID string `json:"traceID"` // SpanID is the ID of the span that additional traces should based off of SpanID string `json:"spanID"` // Data is a field to store any information that may be important to include about the event Data map[string]interface{} `json:"data"` }
EventMessage contains the data structure expected to be received when picking an event from an events message queue
func UnmarshalEventMessage ¶
func UnmarshalEventMessage(b []byte) (EventMessage, error)
UnmarshalEventMessage returns a EventMessage from a json []byte.
type FieldChange ¶
type FieldChange struct { // Field is the name of the field that changed Field string `json:"field"` // PreviousValue is the value the field had before the change PreviousValue string `json:"previousValue"` // CurrentValue is the new value of the field after the change CurrentValue string `json:"currentValue"` }
FieldChange represents a single field that was changed in a changeset and is used to map fields to the old and new values
type NATSConfig ¶
type NATSConfig struct { Token string `mapstructure:"token"` CredsFile string `mapstructure:"credsFile"` }
NATSConfig handles reading in all pubsub values specific to NATS
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher provides a pubsub publisher that uses the watermill pubsub package
func NewPublisher ¶
func NewPublisher(cfg PublisherConfig) (*Publisher, error)
NewPublisher returns a publisher for the given config provided
func NewPublisherWithLogger ¶ added in v0.1.3
func NewPublisherWithLogger(cfg PublisherConfig, logger *zap.SugaredLogger) (*Publisher, error)
NewPublisherWithLogger returns a publisher for the given config provided
func (*Publisher) PublishChange ¶
func (p *Publisher) PublishChange(ctx context.Context, subjectType string, change ChangeMessage) error
PublishChange will publish a ChangeMessage to the topic for the change
func (*Publisher) PublishEvent ¶
PublishEvent will publish an EventMessage to the proper topic for that event
type PublisherConfig ¶
type PublisherConfig struct { URL string `mapstructure:"url"` Timeout time.Duration `mapstructure:"timeout"` Prefix string `mapstructure:"prefix"` Source string `mapstructure:"source"` NATSConfig NATSConfig `mapstructure:"nats"` }
PublisherConfig handles reading in all the config values available for setting up a pubsub publisher
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber provides a pubsub subscriber that uses the watermill pubsub package
func NewSubscriber ¶
func NewSubscriber(cfg SubscriberConfig, options ...nats.SubOpt) (*Subscriber, error)
NewSubscriber returns a subscriber for the given config provided
func NewSubscriberWithLogger ¶ added in v0.1.3
func NewSubscriberWithLogger(cfg SubscriberConfig, logger *zap.SugaredLogger, options ...nats.SubOpt) (*Subscriber, error)
NewSubscriberWithLogger returns a subscriber for the given config provided
func (*Subscriber) Close ¶ added in v0.3.2
func (s *Subscriber) Close() error
Close will close the subscriber
func (*Subscriber) SubscribeChanges ¶
func (s *Subscriber) SubscribeChanges(ctx context.Context, topic string) (<-chan *message.Message, error)
SubscribeChanges will subscribe you to the changes for a given topic. To receive all changes of any kind you can pass in ">".
func (*Subscriber) SubscribeEvents ¶
func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan *message.Message, error)
SubscribeEvents will subscribe you to the events for a given topic. To receive all changes of any kind you can pass in ">".
type SubscriberConfig ¶
type SubscriberConfig struct { URL string `mapstructure:"url"` Timeout time.Duration `mapstructure:"timeout"` Prefix string `mapstructure:"prefix"` QueueGroup string `mapstructure:"queueGroup"` NATSConfig NATSConfig `mapstructure:"nats"` }
SubscriberConfig handles reading in all the config values available for setting up a pubsub publisher