events

package
v3.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2020 License: Apache-2.0 Imports: 26 Imported by: 1

Documentation

Overview

Package events implements event handling through a PubSub interface.

Example
package main

import (
	"fmt"
	"sync"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
	"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
	"go.thethings.network/lorawan-stack/v3/pkg/util/test"
)

func main() {
	// This is required for unit test to pass.
	defer test.SetDefaultEventsPubSub(events.NewPubSub(10))()

	// The WaitGroup is only for synchronizing the unit test
	var wg sync.WaitGroup
	wg.Add(1)

	events.Subscribe("ns.**", events.HandlerFunc(func(e events.Event) {
		fmt.Printf("Received event %s\n", e.Name())

		wg.Done() // only for synchronizing the unit test
	}))

	// You can send any arbitrary event; you don't have to pass any identifiers or data.
	events.PublishEvent(test.Context(), "test.hello_world", nil, nil)

	// Defining the event is not mandatory, but will be needed in order to translate the descriptions.
	// Event names are lowercase snake_case and can be dot-separated as component.subsystem.subsystem.event
	// Event descriptions are short descriptions of what the event means.
	// Visibility rights are optional. If no rights are supplied, then the _ALL right is assumed.
	var adrSendEvent = events.Define("ns.mac.adr.send_req", "send ADR request", ttnpb.RIGHT_APPLICATION_TRAFFIC_READ)

	// These variables come from the request or you got them from the db or something.
	var (
		ctx      = test.Context()
		dev      ttnpb.EndDevice
		requests []ttnpb.MACCommand_LinkADRReq
	)

	// It's nice to be able to correlate events; we use a Correlation ID for that.
	// In most cases, there will already be a correlation ID in the context; this function will append a new one to the ones already in the context.
	ctx = events.ContextWithCorrelationID(ctx, events.NewCorrelationID())

	// Publishing an event to the events package will dispatch it on the "global" event pubsub.
	events.Publish(adrSendEvent(ctx, dev.EndDeviceIdentifiers, requests))

	wg.Wait() // only for synchronizing the unit test

}
Output:

Received event ns.mac.adr.send_req

Index

Examples

Constants

View Source
const DefaultBufferSize = 64

DefaultBufferSize is the default number of events that can be buffered before Publish starts to block.

Variables

View Source
var Definitions = make(map[string]string)

Definitions of registered events. Events that are defined in init() funcs will be collected for translation.

View Source
var IncludeCaller bool

IncludeCaller indicates whether the caller of Publish should be included in the event

Functions

func ContextWithCorrelationID

func ContextWithCorrelationID(ctx context.Context, cids ...string) context.Context

ContextWithCorrelationID returns a derived context with the correlation IDs if they were not already in there.

func CorrelationIDsFromContext

func CorrelationIDsFromContext(ctx context.Context) []string

CorrelationIDsFromContext returns the correlation IDs that are attached to the context.

func DefineFunc

func DefineFunc(name, description string, visibility ...ttnpb.Right) func() Definition

DefineFunc generates a function, which returns a Definition with specified name and description. Most callers should be using Define - this function is only useful for helper functions.

func NewCorrelationID

func NewCorrelationID() string

NewCorrelationID returns a new random correlation ID.

func Proto

func Proto(e Event) (*ttnpb.Event, error)

Proto returns the protobuf representation of the event.

func Publish

func Publish(evts ...Event)

Publish emits events on the default event pubsub.

func PublishEvent

func PublishEvent(ctx context.Context, name string, identifiers CombinedIdentifiers, data interface{}, visibility ...ttnpb.Right)

PublishEvent creates an event and emits it on the default event pubsub. Event names are dot-separated for namespacing. Event identifiers identify the entities that are related to the event. System events have nil identifiers. Event data will in most cases be marshaled to JSON, but ideally is a proto message.

func RegisterContextMarshaler

func RegisterContextMarshaler(name string, m ContextMarshaler)

RegisterContextMarshaler registers a ContextMarshaler with the given name. This should only be called from init funcs.

func SetDefaultPubSub

func SetDefaultPubSub(ps PubSub)

SetDefaultPubSub sets pubsub used by the package to ps.

func StreamServerInterceptor

func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

StreamServerInterceptor returns a new streaming server interceptor that that modifies the context.

func Subscribe

func Subscribe(name string, hdl Handler) error

Subscribe adds an event handler to the default event pubsub. The name can be a glob in order to catch multiple event types. The handler must be non-blocking.

func UnaryServerInterceptor

func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)

UnaryServerInterceptor returns a new unary server interceptor that modifies the context to include a correlation ID.

func Unsubscribe

func Unsubscribe(name string, hdl Handler)

Unsubscribe removes an event handler from the default event pubsub.

Types

type Channel

type Channel chan Event

Channel is a channel of Events that can be used as an event handler. The channel should be buffered, events will be dropped if the channel blocks. It is typically not safe to close this channel until you're absolutely sure that it is no longer registered as an event handler.

Example
package main

import (
	"fmt"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
)

func main() {
	eventChan := make(events.Channel, 2)

	events.Subscribe("example", eventChan)

	// From this moment on, "example" events will be delivered to the channel.
	// As soon as the channel is full, events will be dropped, so it's probably a
	// good idea to start handling the channel before subscribing.

	go func() {
		for e := range eventChan {
			fmt.Printf("Received event %v\n", e)
		}
	}()

	// Later:
	events.Unsubscribe("example", eventChan)

	// Note that in-transit events may still be delivered after Unsubscribe returns.
	// This means that you can't immediately close the channel after unsubscribing.
}
Output:

func (Channel) Notify

func (ch Channel) Notify(evt Event)

Notify implements the Handler interface.

func (Channel) ReceiveContext

func (ch Channel) ReceiveContext(ctx context.Context) Event

ReceiveContext returns the next event from the channel or returns nil when the context is done.

func (Channel) ReceiveTimeout

func (ch Channel) ReceiveTimeout(timeout time.Duration) Event

ReceiveTimeout returns the next event from the channel or returns nil after a timeout.

type CombinedIdentifiers

type CombinedIdentifiers interface {
	CombinedIdentifiers() *ttnpb.CombinedIdentifiers
}

type ContextMarshaler

type ContextMarshaler interface {
	MarshalContext(context.Context) []byte
	UnmarshalContext(context.Context, []byte) (context.Context, error)
}

ContextMarshaler interface for marshaling/unmarshaling contextual information to/from events.

type Definition

type Definition func(ctx context.Context, ids CombinedIdentifiers, data interface{}) Event

Definition of a registered event.

func Define

func Define(name, description string, visibility ...ttnpb.Right) Definition

Define a registered event.

func (Definition) BindData

func (d Definition) BindData(data interface{}) DefinitionDataClosure

BindData partially-applies Definition by binding data argument.

type DefinitionDataClosure

type DefinitionDataClosure func(ctx context.Context, ids CombinedIdentifiers) Event

DefinitionDataClosure is partially-applied Definition with data argument bound.

type Event

type Event interface {
	Context() context.Context
	Name() string
	Time() time.Time
	Identifiers() []*ttnpb.EntityIdentifiers
	Data() interface{}
	CorrelationIDs() []string
	Origin() string
	Caller() string
	Visibility() *ttnpb.Rights
}

Event interface

func ApplyDefinitionDataClosures

func ApplyDefinitionDataClosures(ctx context.Context, ids CombinedIdentifiers, closures ...DefinitionDataClosure) []Event

ApplyDefinitionDataClosures applies ...DefinitionDataClosure using context.Context and CombinedIdentifiers passed.

func FromProto

func FromProto(pb *ttnpb.Event) (Event, error)

FromProto returns the event from its protobuf representation.

func New

func New(ctx context.Context, name string, identifiers CombinedIdentifiers, data interface{}, requiredRights ...ttnpb.Right) Event

New returns a new Event. Event names are dot-separated for namespacing. Event identifiers identify the entities that are related to the event. System events have nil identifiers. Event data will in most cases be marshaled to JSON, but ideally is a proto message.

func UnmarshalJSON

func UnmarshalJSON(data []byte) (Event, error)

UnmarshalJSON unmarshals an event as JSON.

type Handler

type Handler interface {
	Notify(Event)
}

Handler interface for event listeners.

func ContextHandler

func ContextHandler(ctx context.Context, handler Handler) Handler

ContextHandler delivers events to the Handler as long as ctx.Err() is non-nil.

Example
package main

import (
	"context"
	"fmt"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
	"go.thethings.network/lorawan-stack/v3/pkg/util/test"
)

func main() {
	// Usually the context comes from somewhere else (e.g. a streaming RPC):
	ctx, cancel := context.WithCancel(test.Context())
	defer cancel()

	eventChan := make(events.Channel, 2)
	handler := events.ContextHandler(ctx, eventChan)

	events.Subscribe("example", handler)

	// From this moment on, "example" events will be delivered to the channel.
	// As soon as the channel is full, events will be dropped, so it's probably a
	// good idea to start handling the channel before subscribing.

	go func() {
		for {
			select {
			case <-ctx.Done():
				// Don't forget to unsubscribe:
				events.Unsubscribe("example", handler)

				// The ContextHandler will make sure that no events are delivered after
				// the context is canceled, so it is now safe to close the channel:
				close(eventChan)
				return
			case e := <-eventChan:
				fmt.Printf("Received event %v\n", e)
			}
		}
	}()
}
Output:

func HandlerFunc

func HandlerFunc(handler func(Event)) Handler

HandlerFunc makes the func implement the Handler interface.

Example
package main

import (
	"fmt"

	"go.thethings.network/lorawan-stack/v3/pkg/events"
)

func main() {
	handler := events.HandlerFunc(func(e events.Event) {
		fmt.Printf("Received event %v\n", e)
	})

	events.Subscribe("example", handler)

	// From this moment on, "example" events will be delivered to the handler func.

	events.Unsubscribe("example", handler)

	// Note that in-transit events may still be delivered after Unsubscribe returns.
}
Output:

type IdentifierFilter

type IdentifierFilter interface {
	Handler
	Subscribe(ctx context.Context, ids CombinedIdentifiers, handler Handler)
	Unsubscribe(ctx context.Context, ids CombinedIdentifiers, handler Handler)
}

IdentifierFilter can be used as a layer on top of a PubSub to filter events based on the identifiers they contain.

func NewIdentifierFilter

func NewIdentifierFilter() IdentifierFilter

NewIdentifierFilter returns a new IdentifierFilter (see interface).

type PubSub

type PubSub interface {
	Publisher
	Subscriber
}

PubSub interface combines the Publisher and Subscriber interfaces.

func DefaultPubSub

func DefaultPubSub() PubSub

DefaultPubSub returns the default PubSub.

func NewPubSub

func NewPubSub(bufSize uint) PubSub

NewPubSub returns a new event pubsub and starts a goroutine for handling.

type Publisher

type Publisher interface {
	// Publish emits an event on the default event pubsub.
	Publish(evt Event)
}

Publisher interface lets you publish events.

type Subscriber

type Subscriber interface {
	// Subscribe adds an event handler to the default event pubsub.
	// The name can be a glob in order to catch multiple event types.
	// The handler must be non-blocking.
	Subscribe(name string, hdl Handler) error
	// Unsubscribe removes an event handler from the default event pubsub.
	// Queued or in-transit events may still be delivered to the handler
	// even after Unsubscribe returns.
	Unsubscribe(name string, hdl Handler)
}

Subscriber interface lets you subscribe to events.

Directories

Path Synopsis
Package cloud implements an events.PubSub implementation that uses Go Cloud PubSub.
Package cloud implements an events.PubSub implementation that uses Go Cloud PubSub.
Package fs implements watching files for changes.
Package fs implements watching files for changes.
Package grpc contains an implementation of the EventsServer, which is used to stream all events published for a set of identifiers.
Package grpc contains an implementation of the EventsServer, which is used to stream all events published for a set of identifiers.
Package redis implements an events.PubSub implementation that uses Redis PubSub.
Package redis implements an events.PubSub implementation that uses Redis PubSub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL