events

package
v3.31.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2024 License: Apache-2.0 Imports: 32 Imported by: 1

Documentation

Overview

Package events implements event handling through a PubSub interface.

Example
package main

import (
	"fmt"
	"sync"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
	"go.thethings.network/lorawan-stack/v3/pkg/events/basic"
	"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
	"go.thethings.network/lorawan-stack/v3/pkg/util/test"
)

func main() {
	// The context typically comes from the request or something.
	ctx := test.Context()

	// This is required for unit test to pass.
	defer test.SetDefaultEventsPubSub(basic.NewPubSub())() //nolint:revive

	// The WaitGroup is only for synchronizing the unit test
	var wg sync.WaitGroup
	wg.Add(1)

	if err := events.Subscribe(ctx, []string{"ns.mac.adr.send_req"}, nil, events.HandlerFunc(func(e events.Event) {
		fmt.Printf("Received event %s\n", e.Name())

		wg.Done() // only for synchronizing the unit test
	})); err != nil {
		panic(err)
	}

	// You can send any arbitrary event; you don't have to pass any identifiers or data.
	events.Publish(events.New(test.Context(), "test.hello_world", "the events system says hello, world"))

	// Defining the event is not mandatory, but will be needed in order to translate the descriptions.
	// Event names are lowercase snake_case and can be dot-separated as component.subsystem.subsystem.event
	// Event descriptions are short descriptions of what the event means.
	// Visibility rights are optional. If no rights are supplied, then the _ALL right is assumed.
	adrSendEvent := events.Define(
		"ns.mac.adr.send_req", "send ADR request",
		events.WithVisibility(ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_READ),
	)

	// These variables come from the request or you got them from the db or something.
	var (
		dev      ttnpb.EndDevice
		requests []ttnpb.MACCommand_LinkADRReq
	)

	// It's nice to be able to correlate events; we use a Correlation ID for that.
	// In most cases, there will already be a correlation ID in the context;
	// this function will append a new one to the ones already in the context.
	ctx = events.ContextWithCorrelationID(ctx, events.NewCorrelationID())

	// Publishing an event to the events package will dispatch it on the "global" event pubsub.
	events.Publish(adrSendEvent.NewWithIdentifiersAndData(ctx, dev.Ids, requests))

	wg.Wait() // only for synchronizing the unit test

}
Output:

Received event ns.mac.adr.send_req

Index

Examples

Constants

This section is empty.

Variables

View Source
var IncludeCaller bool

IncludeCaller indicates whether the caller of Publish should be included in the event.

Functions

func ContextWithCorrelationID

func ContextWithCorrelationID(ctx context.Context, cids ...string) context.Context

ContextWithCorrelationID returns a derived context with the correlation IDs if they were not already in there.

func CorrelationIDsFromContext

func CorrelationIDsFromContext(ctx context.Context) []string

CorrelationIDsFromContext returns the correlation IDs that are attached to the context.

func DefineFunc

func DefineFunc(name, description string, opts ...Option) func() Builder

DefineFunc generates a function, which returns a Definition with specified name and description. Most callers should be using Define - this function is only useful for helper functions.

func NamesFromPatterns added in v3.28.1

func NamesFromPatterns(definedNames map[string]struct{}, patterns []string) ([]string, error)

NamesFromPatterns returns the event names which match the given patterns. The defined names are a set of event names which are used to match the patterns.

func NewCorrelationID

func NewCorrelationID() string

NewCorrelationID returns a new random correlation ID.

func Proto

func Proto(e Event) (*ttnpb.Event, error)

Proto returns the protobuf representation of the event.

func Publish

func Publish(evs ...Event)

Publish emits events on the default event pubsub.

func RegisterContextMarshaler

func RegisterContextMarshaler(name string, m ContextMarshaler)

RegisterContextMarshaler registers a ContextMarshaler with the given name. This should only be called from init funcs.

func RegisterCorrelationIDPrefix added in v3.28.0

func RegisterCorrelationIDPrefix(
	class correlationIDClass, prefix correlationIDPrefix,
) func(context.Context, ...string) context.Context

RegisterCorrelationIDPrefix register a prefix which ensures that will be unique within the same class. The function returns a function that can be used to add the correlation ID to a context. If the prefix is already registered, the function panics. The function is not goroutine safe, and it is meant to be called during package initialization.

func SetDefaultPubSub

func SetDefaultPubSub(ps PubSub)

SetDefaultPubSub sets pubsub used by the package to ps.

func StreamServerInterceptor

func StreamServerInterceptor(ignoredMethods []string) grpc.StreamServerInterceptor

StreamServerInterceptor returns a new streaming server interceptor that that modifies the context.

func Subscribe

func Subscribe(ctx context.Context, names []string, ids []*ttnpb.EntityIdentifiers, hdl Handler) error

Subscribe subscribes on the default PubSub.

func UnaryServerInterceptor

func UnaryServerInterceptor(ignoredMethods []string) grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a new unary server interceptor that modifies the context to include a correlation ID.

Types

type Builder added in v3.9.0

type Builder interface {
	Definition() Definition

	With(opts ...Option) Builder
	New(ctx context.Context, opts ...Option) Event

	// Convenience function for legacy code. Same as New(ctx, WithIdentifiers(ids), WithData(data)).
	NewWithIdentifiersAndData(ctx context.Context, ids EntityIdentifiers, data any) Event
	// Convenience function for legacy code. Same as With(WithData(data)).
	BindData(data any) Builder
}

Builder is the interface for building events from definitions.

func Define

func Define(name, description string, opts ...Option) Builder

Define a registered event.

type Builders added in v3.9.0

type Builders []Builder

Builders makes it easier to create multiple events at once.

func All added in v3.9.1

func All() Builders

All returns all defined events, sorted by name.

func (Builders) Definitions added in v3.9.1

func (bs Builders) Definitions() []Definition

Definitions returns the definition for each builder in the list.

func (Builders) New added in v3.9.0

func (bs Builders) New(ctx context.Context, opts ...Option) []Event

New returns new events for each builder in the list.

type Channel

type Channel chan Event

Channel is a channel of Events that can be used as an event handler. The channel should be buffered, events will be dropped if the channel blocks. It is typically not safe to close this channel until you're absolutely sure that it is no longer registered as an event handler.

Example
package main

import (
	"context"
	"fmt"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
	"go.thethings.network/lorawan-stack/v3/pkg/util/test"
)

func main() {
	// The context typically comes from the request or something.
	ctx := test.Context()

	eventChan := make(events.Channel, 2)

	subCtx, unsubscribe := context.WithCancel(ctx)
	if err := events.Subscribe(subCtx, []string{"example"}, nil, eventChan); err != nil {
		panic(err)
	}

	// From this moment on, "example" events will be delivered to the channel.
	// As soon as the channel is full, events will be dropped, so it's probably a
	// good idea to start handling the channel before subscribing.

	go func() {
		for e := range eventChan {
			fmt.Printf("Received event %v\n", e)
		}
	}()

	// We want to unsubscribe when this function returns.
	defer unsubscribe()

	// Note that in-transit events may still be delivered after Unsubscribe returns.
	// This means that you can't immediately close the channel after unsubscribing.
}
Output:

func (Channel) Notify

func (ch Channel) Notify(evt Event)

Notify implements the Handler interface.

func (Channel) ReceiveContext

func (ch Channel) ReceiveContext(ctx context.Context) Event

ReceiveContext returns the next event from the channel or returns nil when the context is done.

func (Channel) ReceiveTimeout

func (ch Channel) ReceiveTimeout(timeout time.Duration) Event

ReceiveTimeout returns the next event from the channel or returns nil after a timeout.

type ContextMarshaler

type ContextMarshaler interface {
	MarshalContext(context.Context) []byte
	UnmarshalContext(context.Context, []byte) (context.Context, error)
}

ContextMarshaler interface for marshaling/unmarshaling contextual information to/from events.

type Definition

type Definition interface {
	Name() string
	Description() string
	DataType() proto.Message
	PropagateToParent() bool
}

Definition describes an event definition.

func GetDefinition added in v3.17.2

func GetDefinition(evt Event) Definition

GetDefinition gets the definition for an event.

type DefinitionOption added in v3.9.1

type DefinitionOption interface {
	Option
	// contains filtered or unexported methods
}

DefinitionOption is like Option, but applies to the definition instead.

func WithDataType added in v3.9.1

func WithDataType(t any) DefinitionOption

WithDataType returns an option that sets the data type of the event (for documentation).

func WithErrorDataType added in v3.9.1

func WithErrorDataType() DefinitionOption

WithErrorDataType is a convenience function that sets the data type of the event to an error.

func WithPropagateToParent added in v3.17.2

func WithPropagateToParent() DefinitionOption

WithPropagateToParent returns an option that propagate the event to its parent. Typically used to propagate end device events to applications.

func WithUpdatedFieldsDataType added in v3.9.1

func WithUpdatedFieldsDataType() DefinitionOption

WithUpdatedFieldsDataType is a convenience function that sets the data type of the event to a slice of updated fields.

type EntityIdentifiers added in v3.12.2

type EntityIdentifiers interface {
	GetEntityIdentifiers() *ttnpb.EntityIdentifiers
}

EntityIdentifiers interface is for everything from which we can get entity identifiers.

type Event

type Event interface {
	UniqueID() string
	Context() context.Context
	Name() string
	Time() time.Time
	Identifiers() []*ttnpb.EntityIdentifiers
	Data() any
	CorrelationIds() []string
	Origin() string
	Caller() string
	Visibility() *ttnpb.Rights
	AuthType() string
	AuthTokenID() string
	AuthTokenType() string
	RemoteIP() string
	UserAgent() string
}

Event interface.

func FromProto

func FromProto(pb *ttnpb.Event) (Event, error)

FromProto returns the event from its protobuf representation.

func New

func New(ctx context.Context, name, description string, opts ...Option) Event

New returns a new Event. Instead of using New, most implementations should first define an event, and then create a new event from that definition.

func UnmarshalJSON

func UnmarshalJSON(data []byte) (Event, error)

UnmarshalJSON unmarshals an event as JSON.

type Handler

type Handler interface {
	Notify(Event)
}

Handler interface for event listeners.

func ContextHandler

func ContextHandler(ctx context.Context, handler Handler) Handler

ContextHandler delivers events to the Handler as long as ctx.Err() is non-nil.

Example
package main

import (
	"context"
	"fmt"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
	"go.thethings.network/lorawan-stack/v3/pkg/util/test"
)

func main() {
	// Usually the context comes from somewhere else (e.g. a streaming RPC):
	ctx, cancel := context.WithCancel(test.Context())
	defer cancel()

	eventChan := make(events.Channel, 2)
	handler := events.ContextHandler(ctx, eventChan)

	if err := events.Subscribe(ctx, []string{"example"}, nil, handler); err != nil {
		panic(err)
	}

	// We automatically unsubscribe when he context gets canceled.

	// From this moment on, "example" events will be delivered to the channel.
	// As soon as the channel is full, events will be dropped, so it's probably a
	// good idea to start handling the channel before subscribing.

	go func() {
		for {
			select {
			case <-ctx.Done():
				// The ContextHandler will make sure that no events are delivered after
				// the context is canceled, so it is now safe to close the channel:
				close(eventChan)
				return
			case e := <-eventChan:
				fmt.Printf("Received event %v\n", e)
			}
		}
	}()
}
Output:

func HandlerFunc

func HandlerFunc(handler func(Event)) Handler

HandlerFunc makes the func implement the Handler interface.

Example
package main

import (
	"context"
	"fmt"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
	"go.thethings.network/lorawan-stack/v3/pkg/util/test"
)

func main() {
	// The context typically comes from the request or something.
	ctx := test.Context()

	handler := events.HandlerFunc(func(e events.Event) {
		fmt.Printf("Received event %v\n", e)
	})

	subCtx, unsubscribe := context.WithCancel(ctx)
	if err := events.Subscribe(subCtx, []string{"example"}, nil, handler); err != nil {
		panic(err)
	}

	// From this moment on, "example" events will be delivered to the handler func.

	// We want to unsubscribe when this function returns.
	defer unsubscribe()

	// Note that in-transit events may still be delivered after unsubscribe returns.
}
Output:

type Option added in v3.9.0

type Option interface {
	// contains filtered or unexported methods
}

Option is an option that is used to build events.

func WithAuthFromContext added in v3.9.0

func WithAuthFromContext() Option

WithAuthFromContext returns an option that extracts auth information from the context when the event is created.

func WithClientInfoFromContext added in v3.9.1

func WithClientInfoFromContext() Option

WithClientInfoFromContext returns an option that extracts the UserAgent and the RemoteIP from the request context.

func WithData added in v3.9.0

func WithData(data any) Option

WithData returns an option that sets the data of the event.

func WithIdentifiers added in v3.9.0

func WithIdentifiers(identifiers ...EntityIdentifiers) Option

WithIdentifiers returns an option that sets the identifiers of the event.

func WithVisibility added in v3.9.0

func WithVisibility(rights ...ttnpb.Right) Option

WithVisibility returns an option that sets the visibility of the event.

type PubSub

type PubSub interface {
	Publisher
	Subscriber
}

PubSub interface combines the Publisher and Subscriber interfaces.

func DefaultPubSub

func DefaultPubSub() PubSub

DefaultPubSub returns the default PubSub.

type PublishFunc added in v3.28.0

type PublishFunc func(...Event)

PublishFunc is a function which implements Publisher.

func (PublishFunc) Publish added in v3.28.0

func (f PublishFunc) Publish(evs ...Event)

Publish implements events.Publisher.

type Publisher

type Publisher interface {
	// Publish emits an event on the default event pubsub.
	Publish(evs ...Event)
}

Publisher interface lets you publish events.

type Store added in v3.13.0

type Store interface {
	PubSub
	// FindRelated finds events with matching correlation IDs.
	FindRelated(ctx context.Context, correlationID string) ([]Event, error)
	// FetchHistory fetches the tail (optional) of historical events matching the
	// given names (optional) and identifiers (mandatory) after the given time (optional).
	FetchHistory(
		ctx context.Context, names []string, ids []*ttnpb.EntityIdentifiers, after *time.Time, tail int,
	) ([]Event, error)
	// SubscribeWithHistory is like FetchHistory, but after fetching historical events,
	// this continues sending live events until the context is done.
	SubscribeWithHistory(
		ctx context.Context, names []string, ids []*ttnpb.EntityIdentifiers, after *time.Time, tail int, hdl Handler,
	) error
}

Store extends PubSub implementations with storage of historical events.

type Subscriber

type Subscriber interface {
	// Subscribe to events that match the names and identifiers.
	// The subscription continues until the context is canceled.
	// Events matching _any_ of the names or identifiers will get sent to the handler.
	// The handler must be non-blocking.
	Subscribe(ctx context.Context, names []string, identifiers []*ttnpb.EntityIdentifiers, hdl Handler) error
}

Subscriber interface lets you subscribe to events.

type Subscription added in v3.12.0

type Subscription interface {
	// Match returns whether the event matches the subscription.
	Match(Event) bool
	// Notify notifies the subscription of a new matching event.
	Notify(Event)
}

Subscription is the interface for PubSub subscriptions.

Directories

Path Synopsis
Package basic provides a basic events PubSub implementation.
Package basic provides a basic events PubSub implementation.
Package batch contains a batch publisher implementation of events.Publisher.
Package batch contains a batch publisher implementation of events.Publisher.
Package cloud implements an events.PubSub implementation that uses Go Cloud PubSub.
Package cloud implements an events.PubSub implementation that uses Go Cloud PubSub.
Package grpc contains an implementation of the EventsServer, which is used to stream all events published for a set of identifiers.
Package grpc contains an implementation of the EventsServer, which is used to stream all events published for a set of identifiers.
internal
eventstest
Package eventstest provides a blackbox test for events PubSub implementations.
Package eventstest provides a blackbox test for events PubSub implementations.
Package redis implements an events.PubSub implementation that uses Redis PubSub.
Package redis implements an events.PubSub implementation that uses Redis PubSub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL