events

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2023 License: Apache-2.0 Imports: 18 Imported by: 12

Documentation

Overview

Package events provides common utilities and formats for working with infratographer events

Index

Constants

This section is empty.

Variables

View Source
var ErrMissingEventType = errors.New("event type missing")

ErrMissingEventType is returned when attempting to publish an event without an event type specified

View Source
var ErrUnsupportedPubsub = errors.New("unsupported pubsub provider")

ErrUnsupportedPubsub is returned when the pubsub URL is not a supported provider

Functions

func MustViperFlagsForPublisher

func MustViperFlagsForPublisher(v *viper.Viper, flags *pflag.FlagSet, appName string)

MustViperFlagsForPublisher returns the cobra flags and viper config for an event publisher

func MustViperFlagsForSubscriber

func MustViperFlagsForSubscriber(v *viper.Viper, flags *pflag.FlagSet)

MustViperFlagsForSubscriber returns the cobra flags and viper config for an event subscriber

Types

type ChangeMessage

type ChangeMessage struct {
	// SubjectID is the PrefixedID representing the node of the topic of this message
	SubjectID gidx.PrefixedID `json:"subjectID"`
	// EventType describes the type of event that has triggered this message
	EventType string `json:"eventType"`
	// AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message
	AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"`
	// ActorID is the PrefixedID representing the identity of the actor that caused this message to be triggered
	ActorID gidx.PrefixedID `json:"actorID"`
	// Source is a string representing the identity of the source system that created the message
	Source string `json:"source"`
	// Timestamp is the time representing when the message was created
	Timestamp time.Time `json:"timestamp"`
	// TraceID is the ID of the trace for this event
	TraceID string `json:"traceID"`
	// SpanID is the ID of the span that additional traces should based off of
	SpanID string `json:"spanID"`
	// SubjectFields is a map of the fields on the subject
	SubjectFields map[string]string `json:"subjectFields"`
	// Changeset is an optional map of the fields that changed triggering this message, this should be provided if the source can provide a changeset
	FieldChanges []FieldChange `json:"fieldChanges"`
	// AdditionalData is a field to store any addition information that may be important to include with your message
	AdditionalData map[string]interface{} `json:"additionalData"`
}

ChangeMessage contains the data structure expected to be received when picking an event from a changes message queue

func UnmarshalChangeMessage

func UnmarshalChangeMessage(b []byte) (ChangeMessage, error)

UnmarshalChangeMessage returns a ChangeMessage from a json []byte.

type ChangeType

type ChangeType string

ChangeType represents the possible event types for a ChangeMessage

var (
	// CreateChangeType provides the event type for create events
	CreateChangeType ChangeType = "create"
	// UpdateChangeType provides the event type for update events
	UpdateChangeType ChangeType = "update"
	// DeleteChangeType provides the event type for delete events
	DeleteChangeType ChangeType = "delete"
)

type EventMessage

type EventMessage struct {
	// SubjectID is the PrefixedID representing the node of the topic of this message
	SubjectID gidx.PrefixedID `json:"subjectID"`
	// EventType describes the type of event that has triggered this message
	EventType string `json:"eventType"`
	// AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message
	AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"`
	// Source is a string representing the identity of the source system that created the message
	Source string `json:"source"`
	// Timestamp is the time representing when the message was created
	Timestamp time.Time `json:"timestamp"`
	// TraceID is the ID of the trace for this event
	TraceID string `json:"traceID"`
	// SpanID is the ID of the span that additional traces should based off of
	SpanID string `json:"spanID"`
	// Data is a field to store any information that may be important to include about the event
	Data map[string]interface{} `json:"data"`
}

EventMessage contains the data structure expected to be received when picking an event from an events message queue

func UnmarshalEventMessage

func UnmarshalEventMessage(b []byte) (EventMessage, error)

UnmarshalEventMessage returns a EventMessage from a json []byte.

type FieldChange

type FieldChange struct {
	// Field is the name of the field that changed
	Field string `json:"field"`
	// PreviousValue is the value the field had before the change
	PreviousValue string `json:"previousValue"`
	// CurrentValue is the new value of the field after the change
	CurrentValue string `json:"currentValue"`
}

FieldChange represents a single field that was changed in a changeset and is used to map fields to the old and new values

type NATSConfig

type NATSConfig struct {
	Token     string `mapstructure:"token"`
	CredsFile string `mapstructure:"credsFile"`
}

NATSConfig handles reading in all pubsub values specific to NATS

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides a pubsub publisher that uses the watermill pubsub package

func NewPublisher

func NewPublisher(cfg PublisherConfig) (*Publisher, error)

NewPublisher returns a publisher for the given config provided

func NewPublisherWithLogger added in v0.1.3

func NewPublisherWithLogger(cfg PublisherConfig, logger *zap.SugaredLogger) (*Publisher, error)

NewPublisherWithLogger returns a publisher for the given config provided

func (*Publisher) Close added in v0.3.2

func (p *Publisher) Close() error

Close will close the publisher

func (*Publisher) PublishChange

func (p *Publisher) PublishChange(ctx context.Context, subjectType string, change ChangeMessage) error

PublishChange will publish a ChangeMessage to the topic for the change

func (*Publisher) PublishEvent

func (p *Publisher) PublishEvent(_ context.Context, subjectType string, event EventMessage) error

PublishEvent will publish an EventMessage to the proper topic for that event

type PublisherConfig

type PublisherConfig struct {
	URL        string        `mapstructure:"url"`
	Timeout    time.Duration `mapstructure:"timeout"`
	Prefix     string        `mapstructure:"prefix"`
	Source     string        `mapstructure:"source"`
	NATSConfig NATSConfig    `mapstructure:"nats"`
}

PublisherConfig handles reading in all the config values available for setting up a pubsub publisher

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber provides a pubsub subscriber that uses the watermill pubsub package

func NewSubscriber

func NewSubscriber(cfg SubscriberConfig, options ...nats.SubOpt) (*Subscriber, error)

NewSubscriber returns a subscriber for the given config provided

func NewSubscriberWithLogger added in v0.1.3

func NewSubscriberWithLogger(cfg SubscriberConfig, logger *zap.SugaredLogger, options ...nats.SubOpt) (*Subscriber, error)

NewSubscriberWithLogger returns a subscriber for the given config provided

func (*Subscriber) Close added in v0.3.2

func (s *Subscriber) Close() error

Close will close the subscriber

func (*Subscriber) SubscribeChanges

func (s *Subscriber) SubscribeChanges(ctx context.Context, topic string) (<-chan *message.Message, error)

SubscribeChanges will subscribe you to the changes for a given topic. To receive all changes of any kind you can pass in ">".

func (*Subscriber) SubscribeEvents

func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan *message.Message, error)

SubscribeEvents will subscribe you to the events for a given topic. To receive all changes of any kind you can pass in ">".

type SubscriberConfig

type SubscriberConfig struct {
	URL        string        `mapstructure:"url"`
	Timeout    time.Duration `mapstructure:"timeout"`
	Prefix     string        `mapstructure:"prefix"`
	QueueGroup string        `mapstructure:"queueGroup"`
	NATSConfig NATSConfig    `mapstructure:"nats"`
}

SubscriberConfig handles reading in all the config values available for setting up a pubsub publisher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL