events

package
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2023 License: Apache-2.0 Imports: 22 Imported by: 12

Documentation

Overview

Package events provides common utilities and formats for working with infratographer events

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrProviderNotConfigured is an error packages should return if no events provider is configured.
	ErrProviderNotConfigured = errors.New("events provider not configured")

	// ErrMissingChangeMessageEventType is returned when the event message has the incorrect field EventType value.
	ErrMissingChangeMessageEventType = errors.New("change message EventType field required")
	// ErrMissingChangeMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
	ErrMissingChangeMessageSubjectID = errors.New("change message SubjectID field required")

	// ErrMissingEventMessageEventType is returned when the event message has the incorrect field EventType value.
	ErrMissingEventMessageEventType = errors.New("event message EventType field required")
	// ErrMissingEventMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
	ErrMissingEventMessageSubjectID = errors.New("event message SubjectID field required")

	// ErrInvalidAuthRelationshipRequestAction is returned when the event message has the incorrect field Action value.
	ErrInvalidAuthRelationshipRequestAction = errors.New("auth relationship request message Action field must be write or delete")
	// ErrMissingAuthRelationshipRequestObjectID is returned when the event message has the incorrect field ObjectID value.
	ErrMissingAuthRelationshipRequestObjectID = errors.New("auth relationship request message ObjectID field required")
	// ErrMissingAuthRelationshipRequestRelation is returned when the event message has no relations defined.
	ErrMissingAuthRelationshipRequestRelation = errors.New("auth relationship request message Relations field required")
	// ErrMissingAuthRelationshipRequestRelationRelation is returned when the event message Relations has the incorrect field for Relation value.
	ErrMissingAuthRelationshipRequestRelationRelation = errors.New("auth relationship request message Relations Relation field required")
	// ErrMissingAuthRelationshipRequestRelationSubjectID is returned when the event message Relations has the incorrect field SubjectID value.
	ErrMissingAuthRelationshipRequestRelationSubjectID = errors.New("auth relationship request message Relations SubjectID field required")

	// ErrRequestNoResponders is returned when a request is attempted but no responder is listening.
	ErrRequestNoResponders = errors.New("no responders for request")
)
View Source
var (
	// CreateChangeType provides the event type for create events
	CreateChangeType ChangeType = "create"
	// UpdateChangeType provides the event type for update events
	UpdateChangeType ChangeType = "update"
	// DeleteChangeType provides the event type for delete events
	DeleteChangeType ChangeType = "delete"
	// WriteAuthRelationshipAction provides the auth relationship action for write requests
	WriteAuthRelationshipAction AuthRelationshipAction = "write"
	// DeleteAuthRelationshipAction provides the auth relationship action for delete requests
	DeleteAuthRelationshipAction AuthRelationshipAction = "delete"
)
View Source
var (
	// NATSDefaultConnectTimeout is the default connection timeout.
	NATSDefaultConnectTimeout = 5 * time.Second
	// NATSDefaultSubscriberFetchBatchSize is the default pull subscribe batch size.
	NATSDefaultSubscriberFetchBatchSize = 20
	// NATSDefaultSubscriberFetchTimeout is the max time a fetch will block before releasing.
	NATSDefaultSubscriberFetchTimeout = 5 * time.Second
	// NATSDefaultSubscriberFetchBackoff is the delay between a batch attempts.
	NATSDefaultSubscriberFetchBackoff = 5 * time.Second
	// NATSDefaultShutdownTimeout is the timeout for a shutdown to complete.
	NATSDefaultShutdownTimeout = 5 * time.Second
)
View Source
var (
	// ErrNATSInvalidAuthConfiguration is returned when the config has both Tokena nd CredsFile specified.
	ErrNATSInvalidAuthConfiguration = errors.New("invalid nats confinguration, both token and creds file are specified")

	// ErrNATSInvalidDeliveryPolicy is returned when an incorrect delivery policy is provided.
	ErrNATSInvalidDeliveryPolicy = errors.New("invalid delivery policy, expected all|last|last-per-subject|new|start-sequence|start-time")

	// ErrNATSMessageNoReplySubject is returned when calling ReplyAuthRelationshipRequest when the request has no reply subject defined.
	ErrNATSMessageNoReplySubject = errors.New("unable to reply to auth relationship request, no reply subject specified")
)

Functions

func MustViperFlags added in v0.3.5

func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet, appName string)

MustViperFlags returns the cobra flags and viper config for events.

func MustViperFlagsForNATS added in v0.3.5

func MustViperFlagsForNATS(v *viper.Viper, flags *pflag.FlagSet, appName string)

MustViperFlagsForNATS returns the cobra flags and viper config for a nats handler.

func NATSConsumerDurableName added in v0.3.5

func NATSConsumerDurableName(queueGroup, subject string) string

NATSConsumerDurableName is the generator function to create a new durable consumer name.

Types

type AuthRelationshipAction added in v0.3.4

type AuthRelationshipAction string

AuthRelationshipAction represents the possible auth relationship request actions for an AuthRelationshipRequest

type AuthRelationshipPublisher added in v0.3.5

type AuthRelationshipPublisher interface {
	// PublishAuthRelationshipRequest publishes to the specified topic with the message given.
	PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)
}

AuthRelationshipPublisher specifies the auth relationship publisher methods.

type AuthRelationshipRelation added in v0.3.5

type AuthRelationshipRelation struct {
	// Relation is the name of the relation the object from AuthRelationshipRequest has to the subject.
	Relation string `json:"relation"`
	// The subject the relation is to.
	SubjectID gidx.PrefixedID `json:"subjectID"`
}

AuthRelationshipRelation defines the relation an object from an AuthRelationshipRequest has to a subject.

func (AuthRelationshipRelation) Validate added in v0.3.5

func (r AuthRelationshipRelation) Validate() error

Validate ensures the message has all the required fields.

type AuthRelationshipRequest added in v0.3.3

type AuthRelationshipRequest struct {
	// Action describes the type of action being performed. Valid options are "write" and "delete".
	Action AuthRelationshipAction `json:"action"`
	// ObjectID is the PrefixedID of the object the permissions will be granted on
	ObjectID gidx.PrefixedID `json:"objectID"`
	// Relations defines all relations which should be written or deleted for this object.
	Relations []AuthRelationshipRelation `json:"relations"`
	// ConditionName represents the name of a conditional check that will be applied to this relationship. (Optional)
	// In SpiceDB this would be a caveat name
	ConditionName string `json:"conditionName"`
	// ConditionValues are the condition values to be used on the condition check. (Optional)
	ConditionValues map[string]interface{} `json:"conditionValue"`
	// TraceContext is a map of values used for OpenTelemetry context propagation.
	TraceContext map[string]string `json:"traceContext"`
	// TraceID is the ID of the trace for this event
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	TraceID string `json:"traceID"`
	// SpanID is the ID of the span that additional traces should based off of
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	SpanID string `json:"spanID"`
}

AuthRelationshipRequest contains the data structure expected to be used to write or delete an auth relationship from PermissionsAPI

func UnmarshalAuthRelationshipRequest added in v0.3.3

func UnmarshalAuthRelationshipRequest(b []byte) (AuthRelationshipRequest, error)

UnmarshalAuthRelationshipRequest returns an AuthRelationshipRequest from a json []byte.

func (AuthRelationshipRequest) GetTraceContext added in v0.3.5

func (m AuthRelationshipRequest) GetTraceContext(ctx context.Context) context.Context

GetTraceContext creates a new OpenTelementry context for the message.

func (AuthRelationshipRequest) Validate added in v0.3.5

func (m AuthRelationshipRequest) Validate() error

Validate ensures the message has all the required fields.

type AuthRelationshipResponse added in v0.3.3

type AuthRelationshipResponse struct {
	// Errors contains any errors, if empty the request was successful
	Errors []error `json:"errors"`
	// TraceContext is a map of values used for OpenTelemetry context propagation.
	TraceContext map[string]string `json:"traceContext"`
	// TraceID is the ID of the trace for this event
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	TraceID string `json:"traceID"`
	// SpanID is the ID of the span that additional traces should based off of
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	SpanID string `json:"spanID"`
}

AuthRelationshipResponse contains the data structure expected to be received from an AuthRelationshipRequest message to write or delete an auth relationship from PermissionsAPI

func UnmarshalAuthRelationshipResponse added in v0.3.3

func UnmarshalAuthRelationshipResponse(b []byte) (AuthRelationshipResponse, error)

UnmarshalAuthRelationshipResponse returns an AuthRelationshipRsponse from a json []byte.

func (AuthRelationshipResponse) GetTraceContext added in v0.3.5

func (m AuthRelationshipResponse) GetTraceContext(ctx context.Context) context.Context

GetTraceContext creates a new OpenTelementry context for the message.

func (AuthRelationshipResponse) Validate added in v0.3.5

func (m AuthRelationshipResponse) Validate() error

Validate ensures the message has all the required fields.

type AuthRelationshipSubscriber added in v0.3.5

type AuthRelationshipSubscriber interface {
	// SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message.
	SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
}

AuthRelationshipSubscriber specifies the auth relationship subscriber methods.

type ChangeMessage

type ChangeMessage struct {
	// SubjectID is the PrefixedID representing the node of the topic of this message
	SubjectID gidx.PrefixedID `json:"subjectID"`
	// EventType describes the type of event that has triggered this message
	EventType string `json:"eventType"`
	// AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message
	AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"`
	// ActorID is the PrefixedID representing the identity of the actor that caused this message to be triggered
	ActorID gidx.PrefixedID `json:"actorID"`
	// Source is a string representing the identity of the source system that created the message
	Source string `json:"source"`
	// Timestamp is the time representing when the message was created
	Timestamp time.Time `json:"timestamp"`
	// TraceContext is a map of values used for OpenTelemetry context propagation.
	TraceContext map[string]string `json:"traceContext"`
	// TraceID is the ID of the trace for this event
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	TraceID string `json:"traceID"`
	// SpanID is the ID of the span that additional traces should based off of
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	SpanID string `json:"spanID"`
	// SubjectFields is a map of the fields on the subject
	SubjectFields map[string]string `json:"subjectFields"`
	// Changeset is an optional map of the fields that changed triggering this message, this should be provided if the source can provide a changeset
	FieldChanges []FieldChange `json:"fieldChanges"`
	// AdditionalData is a field to store any addition information that may be important to include with your message
	AdditionalData map[string]interface{} `json:"additionalData"`
}

ChangeMessage contains the data structure expected to be received when picking an event from a changes message queue

func UnmarshalChangeMessage

func UnmarshalChangeMessage(b []byte) (ChangeMessage, error)

UnmarshalChangeMessage returns a ChangeMessage from a json []byte.

func (ChangeMessage) GetTraceContext added in v0.3.5

func (m ChangeMessage) GetTraceContext(ctx context.Context) context.Context

GetTraceContext creates a new OpenTelementry context for the message.

func (ChangeMessage) Validate added in v0.3.5

func (m ChangeMessage) Validate() error

Validate ensures the message has all the required fields.

type ChangeType

type ChangeType string

ChangeType represents the possible event types for a ChangeMessage

type Config added in v0.3.5

type Config struct {
	NATS NATSConfig `mapstructure:"nats"`
}

Config contains event provider configs.

type Connection added in v0.3.5

type Connection interface {
	// Gracefully close the connection.
	Shutdown(ctx context.Context) error

	// Source gives you the raw underlying connection object.
	Source() any

	Subscriber
	Publisher

	AuthRelationshipSubscriber
	AuthRelationshipPublisher
}

Connection defines a connection handler.

func NewConnection added in v0.3.5

func NewConnection(config Config, options ...Option) (Connection, error)

NewConnection creates a new Connection from the provided config.

type EventMessage

type EventMessage struct {
	// SubjectID is the PrefixedID representing the node of the topic of this message
	SubjectID gidx.PrefixedID `json:"subjectID"`
	// EventType describes the type of event that has triggered this message
	EventType string `json:"eventType"`
	// AdditionalSubjectIDs is a group of PrefixedIDs representing additional nodes associated with this message
	AdditionalSubjectIDs []gidx.PrefixedID `json:"additionalSubjects"`
	// Source is a string representing the identity of the source system that created the message
	Source string `json:"source"`
	// Timestamp is the time representing when the message was created
	Timestamp time.Time `json:"timestamp"`
	// TraceContext is a map of values used for OpenTelemetry context propagation.
	TraceContext map[string]string `json:"traceContext"`
	// TraceID is the ID of the trace for this event
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	TraceID string `json:"traceID"`
	// SpanID is the ID of the span that additional traces should based off of
	// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
	SpanID string `json:"spanID"`
	// Data is a field to store any information that may be important to include about the event
	Data map[string]interface{} `json:"data"`
}

EventMessage contains the data structure expected to be received when picking an event from an events message queue

func UnmarshalEventMessage

func UnmarshalEventMessage(b []byte) (EventMessage, error)

UnmarshalEventMessage returns a EventMessage from a json []byte.

func (EventMessage) GetTraceContext added in v0.3.5

func (m EventMessage) GetTraceContext(ctx context.Context) context.Context

GetTraceContext creates a new OpenTelementry context for the message.

func (EventMessage) Validate added in v0.3.5

func (m EventMessage) Validate() error

Validate ensures the message has all the required fields.

type FieldChange

type FieldChange struct {
	// Field is the name of the field that changed
	Field string `json:"field"`
	// PreviousValue is the value the field had before the change
	PreviousValue string `json:"previousValue"`
	// CurrentValue is the new value of the field after the change
	CurrentValue string `json:"currentValue"`
}

FieldChange represents a single field that was changed in a changeset and is used to map fields to the old and new values

type Message added in v0.3.5

type Message[T any] interface {
	// Connection returns the underlying connection the message was received on.
	Connection() Connection

	// ID returns the unique message id.
	ID() string
	// Topic returns the topic the message was sent to.
	Topic() string
	// Message returns the decoded message object.
	Message() T
	// Ack acks the message.
	Ack() error
	// Nak nacks the message.
	Nak(delay time.Duration) error
	// Term terminates the message.
	Term() error
	// Timestamp returns the time the message was submitted.
	Timestamp() time.Time
	// Deliveries returns the number of times the message was delivered.
	Deliveries() uint64

	// Error returns any error encountered while decoding the message
	Error() error

	// Source returns the underlying message object.
	Source() any
}

Message contains a message which has been published or received from a subscription.

type NATSAuthRelationshipRequest added in v0.3.5

type NATSAuthRelationshipRequest struct {
	*NATSMessage[AuthRelationshipRequest]
}

NATSAuthRelationshipRequest implements Request for AuthRelationshipRequest / AuthRelationshipResponse

func (*NATSAuthRelationshipRequest) Reply added in v0.3.5

Reply responds to an AuthRelationshipRequest with an AuthRelationshipResponse.

type NATSConfig

type NATSConfig struct {
	URL             string
	SubscribePrefix string
	PublishPrefix   string
	QueueGroup      string
	Token           string
	CredsFile       string
	Source          string

	ConnectTimeout           time.Duration
	ShutdownTimeout          time.Duration
	SubscriberFetchBatchSize int
	SubscriberFetchTimeout   time.Duration
	SubscriberFetchBackoff   time.Duration
	SubscriberNoAckExplicit  bool
	SubscriberNoManualAck    bool

	SubscriberDeliveryPolicy string
	SubscriberStartSequence  uint64
	SubscriberStartTime      time.Time
	// contains filtered or unexported fields
}

NATSConfig defines the NATS connection configuration.

func (NATSConfig) Configured added in v0.3.5

func (c NATSConfig) Configured() bool

Configured checks whether the provider has been configured.

func (NATSConfig) Validate added in v0.3.5

func (c NATSConfig) Validate() error

Validate ensures the configuration is valid.

func (NATSConfig) WithDefaults added in v0.3.5

func (c NATSConfig) WithDefaults() NATSConfig

WithDefaults sets default values for the field unset.

type NATSConnection added in v0.3.5

type NATSConnection struct {
	// contains filtered or unexported fields
}

NATSConnection implements Connection.

func NewNATSConnection added in v0.3.5

func NewNATSConnection(config NATSConfig, options ...NATSOption) (*NATSConnection, error)

NewNATSConnection creates a new nats jetstream connection.

func (*NATSConnection) PublishAuthRelationshipRequest added in v0.3.5

func (c *NATSConnection) PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)

PublishAuthRelationshipRequest publishes an AuthRelationshipRequest message and blocks until an AuthRelationshipResponse is provided.

func (*NATSConnection) PublishChange added in v0.3.5

func (c *NATSConnection) PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)

PublishChange publishes a ChangeMessage.

func (*NATSConnection) PublishEvent added in v0.3.5

func (c *NATSConnection) PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)

PublishEvent publishes an EventMessage.

func (*NATSConnection) Shutdown added in v0.3.5

func (c *NATSConnection) Shutdown(ctx context.Context) error

Shutdown gracefully drains the connection.

func (*NATSConnection) Source added in v0.3.5

func (c *NATSConnection) Source() any

Source returns the underlying NATS Connection.

func (*NATSConnection) SubscribeAuthRelationshipRequests added in v0.3.5

func (c *NATSConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)

SubscribeAuthRelationshipRequests creates a new pull subscription parsing incoming messages as AuthRelationshipRequest messages and returning a new Message channel.

func (*NATSConnection) SubscribeChanges added in v0.3.5

func (c *NATSConnection) SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)

SubscribeChanges creates a new pull subscription parsing incoming messages as ChangeMessage messages and returning a new Message channel.

func (*NATSConnection) SubscribeEvents added in v0.3.5

func (c *NATSConnection) SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)

SubscribeEvents creates a new pull subscription parsing incoming messages as EventMessage messages and returning a new Message channel.

type NATSMessage added in v0.3.5

type NATSMessage[T any] struct {
	// contains filtered or unexported fields
}

NATSMessage implements Message

func (*NATSMessage[T]) Ack added in v0.3.5

func (m *NATSMessage[T]) Ack() error

Ack acks the message.

func (*NATSMessage[T]) Connection added in v0.3.5

func (m *NATSMessage[T]) Connection() Connection

Connection returns the underlying Connection.

func (*NATSMessage[T]) Deliveries added in v0.3.5

func (m *NATSMessage[T]) Deliveries() uint64

Deliveries returns the number of times the message was delivered.

func (*NATSMessage[T]) Error added in v0.3.5

func (m *NATSMessage[T]) Error() error

Error returns any error with the message.

func (*NATSMessage[T]) ID added in v0.3.5

func (m *NATSMessage[T]) ID() string

ID returns the nats message sequence number for the consumer.

func (*NATSMessage[T]) Message added in v0.3.5

func (m *NATSMessage[T]) Message() T

Message returns the decoded message object.

func (*NATSMessage[T]) Nak added in v0.3.5

func (m *NATSMessage[T]) Nak(delay time.Duration) error

Nak calls a Nak with the provided delay.

func (*NATSMessage[T]) Source added in v0.3.5

func (m *NATSMessage[T]) Source() any

Source returns the underlying nats message.

func (*NATSMessage[T]) Term added in v0.3.5

func (m *NATSMessage[T]) Term() error

Term terminates the message from being processed again.

func (*NATSMessage[T]) Timestamp added in v0.3.5

func (m *NATSMessage[T]) Timestamp() time.Time

Timestamp returns the timestamp of the message.

func (*NATSMessage[T]) Topic added in v0.3.5

func (m *NATSMessage[T]) Topic() string

Topic returns the nats subject.

type NATSOption added in v0.3.5

type NATSOption func(c *NATSConfig) error

NATSOption defines a nats configuration option.

func WithNATSConnectOptions added in v0.3.5

func WithNATSConnectOptions(options ...nats.Option) NATSOption

WithNATSConnectOptions configures the connection options for nats.

func WithNATSJetStreamOptions added in v0.3.5

func WithNATSJetStreamOptions(options ...nats.JSOpt) NATSOption

WithNATSJetStreamOptions configures the jetstream connection options.

func WithNATSLogger added in v0.3.5

func WithNATSLogger(logger *zap.SugaredLogger) NATSOption

WithNATSLogger sets the logger for the nats connection.

func WithNATSSubscribeOptions added in v0.3.5

func WithNATSSubscribeOptions(options ...nats.SubOpt) NATSOption

WithNATSSubscribeOptions configures the subscribe options for new subscriptions.

type Option added in v0.3.5

type Option func(config *Config) error

Option configures a connection option.

func WithLogger added in v0.3.5

func WithLogger(logger *zap.SugaredLogger) Option

WithLogger sets the logger for the connection.

func WithNATSOptions added in v0.3.5

func WithNATSOptions(options ...NATSOption) Option

WithNATSOptions configures nats options.

type Publisher

type Publisher interface {
	// PublishChange publishes to the specified topic with the message given.
	PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)
	// PublishEvent publishes to the specified topic with the message given.
	PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)
}

Publisher specifies publisher methods.

type Request added in v0.3.5

type Request[TRequest, TResponse any] interface {
	Message[TRequest]

	// Reply publishes a response to the received message.
	Reply(ctx context.Context, message TResponse) (Message[TResponse], error)
}

Request extends Message by allowing replies to be sent for the received message.

type Subscriber

type Subscriber interface {
	// SubscribeChanges subscribes to the provided topic responding with an ChangeMessage message.
	SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)
	// SubscribeEvents subscribes to the provided topic responding with an EventMessage message.
	SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)
}

Subscriber specifies subscriber methods.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL