Documentation ¶
Overview ¶
Package events implements event handling through a PubSub interface.
Example ¶
package main import ( "fmt" "sync" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" "go.thethings.network/lorawan-stack/v3/pkg/util/test" ) func main() { // This is required for unit test to pass. defer test.SetDefaultEventsPubSub(events.NewPubSub(10))() // The WaitGroup is only for synchronizing the unit test var wg sync.WaitGroup wg.Add(1) events.Subscribe("ns.**", events.HandlerFunc(func(e events.Event) { fmt.Printf("Received event %s\n", e.Name()) wg.Done() // only for synchronizing the unit test })) // You can send any arbitrary event; you don't have to pass any identifiers or data. events.Publish(events.New(test.Context(), "test.hello_world", "the events system says hello, world")) // Defining the event is not mandatory, but will be needed in order to translate the descriptions. // Event names are lowercase snake_case and can be dot-separated as component.subsystem.subsystem.event // Event descriptions are short descriptions of what the event means. // Visibility rights are optional. If no rights are supplied, then the _ALL right is assumed. adrSendEvent := events.Define("ns.mac.adr.send_req", "send ADR request", events.WithVisibility(ttnpb.RIGHT_APPLICATION_TRAFFIC_READ)) // These variables come from the request or you got them from the db or something. var ( ctx = test.Context() dev ttnpb.EndDevice requests []ttnpb.MACCommand_LinkADRReq ) // It's nice to be able to correlate events; we use a Correlation ID for that. // In most cases, there will already be a correlation ID in the context; this function will append a new one to the ones already in the context. ctx = events.ContextWithCorrelationID(ctx, events.NewCorrelationID()) // Publishing an event to the events package will dispatch it on the "global" event pubsub. events.Publish(adrSendEvent.NewWithIdentifiersAndData(ctx, dev.EndDeviceIdentifiers, requests)) wg.Wait() // only for synchronizing the unit test }
Output: Received event ns.mac.adr.send_req
Index ¶
- Constants
- Variables
- func ContextWithCorrelationID(ctx context.Context, cids ...string) context.Context
- func CorrelationIDsFromContext(ctx context.Context) []string
- func DefineFunc(name, description string, opts ...Option) func() Builder
- func NewCorrelationID() string
- func Proto(e Event) (*ttnpb.Event, error)
- func Publish(evts ...Event)
- func RegisterContextMarshaler(name string, m ContextMarshaler)
- func SetDefaultPubSub(ps PubSub)
- func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, ...) error
- func Subscribe(name string, hdl Handler) error
- func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (interface{}, error)
- func Unsubscribe(name string, hdl Handler)
- type Builder
- type Builders
- type Channel
- type CombinedIdentifiers
- type ContextMarshaler
- type Definition
- type DefinitionOption
- type Event
- type Handler
- type IdentifierFilter
- type Option
- type PubSub
- type Publisher
- type Subscriber
Examples ¶
Constants ¶
const DefaultBufferSize = 64
DefaultBufferSize is the default number of events that can be buffered before Publish starts to block.
Variables ¶
var IncludeCaller bool
IncludeCaller indicates whether the caller of Publish should be included in the event
Functions ¶
func ContextWithCorrelationID ¶
ContextWithCorrelationID returns a derived context with the correlation IDs if they were not already in there.
func CorrelationIDsFromContext ¶
CorrelationIDsFromContext returns the correlation IDs that are attached to the context.
func DefineFunc ¶
DefineFunc generates a function, which returns a Definition with specified name and description. Most callers should be using Define - this function is only useful for helper functions.
func NewCorrelationID ¶
func NewCorrelationID() string
NewCorrelationID returns a new random correlation ID.
func RegisterContextMarshaler ¶
func RegisterContextMarshaler(name string, m ContextMarshaler)
RegisterContextMarshaler registers a ContextMarshaler with the given name. This should only be called from init funcs.
func SetDefaultPubSub ¶
func SetDefaultPubSub(ps PubSub)
SetDefaultPubSub sets pubsub used by the package to ps.
func StreamServerInterceptor ¶
func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error
StreamServerInterceptor returns a new streaming server interceptor that that modifies the context.
func Subscribe ¶
Subscribe adds an event handler to the default event pubsub. The name can be a glob in order to catch multiple event types. The handler must be non-blocking.
func UnaryServerInterceptor ¶
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)
UnaryServerInterceptor returns a new unary server interceptor that modifies the context to include a correlation ID.
func Unsubscribe ¶
Unsubscribe removes an event handler from the default event pubsub.
Types ¶
type Builder ¶ added in v3.9.0
type Builder interface { Definition() Definition With(opts ...Option) Builder New(ctx context.Context, opts ...Option) Event // Convenience function for legacy code. Same as New(ctx, WithIdentifiers(ids), WithData(data)). NewWithIdentifiersAndData(ctx context.Context, ids CombinedIdentifiers, data interface{}) Event // Convenience function for legacy code. Same as With(WithData(data)). BindData(data interface{}) Builder }
Builder is the interface for building events from definitions.
type Builders ¶ added in v3.9.0
type Builders []Builder
Builders makes it easier to create multiple events at once.
func (Builders) Definitions ¶ added in v3.9.1
func (bs Builders) Definitions() []Definition
Definitions returns the definition for each builder in the list.
type Channel ¶
type Channel chan Event
Channel is a channel of Events that can be used as an event handler. The channel should be buffered, events will be dropped if the channel blocks. It is typically not safe to close this channel until you're absolutely sure that it is no longer registered as an event handler.
Example ¶
package main import ( "fmt" "go.thethings.network/lorawan-stack/v3/pkg/events" ) func main() { eventChan := make(events.Channel, 2) events.Subscribe("example", eventChan) // From this moment on, "example" events will be delivered to the channel. // As soon as the channel is full, events will be dropped, so it's probably a // good idea to start handling the channel before subscribing. go func() { for e := range eventChan { fmt.Printf("Received event %v\n", e) } }() // Later: events.Unsubscribe("example", eventChan) // Note that in-transit events may still be delivered after Unsubscribe returns. // This means that you can't immediately close the channel after unsubscribing. }
Output:
func (Channel) ReceiveContext ¶
ReceiveContext returns the next event from the channel or returns nil when the context is done.
type CombinedIdentifiers ¶
type CombinedIdentifiers interface {
CombinedIdentifiers() *ttnpb.CombinedIdentifiers
}
type ContextMarshaler ¶
type ContextMarshaler interface { MarshalContext(context.Context) []byte UnmarshalContext(context.Context, []byte) (context.Context, error) }
ContextMarshaler interface for marshaling/unmarshaling contextual information to/from events.
type Definition ¶
Definition describes an event definition.
type DefinitionOption ¶ added in v3.9.1
type DefinitionOption interface { Option // contains filtered or unexported methods }
DefinitionOption is like Option, but applies to the definition instead.
func WithDataType ¶ added in v3.9.1
func WithDataType(t interface{}) DefinitionOption
WithDataType returns an option that sets the data type of the event (for documentation).
func WithErrorDataType ¶ added in v3.9.1
func WithErrorDataType() DefinitionOption
WithErrorDataType is a convenience function that sets the data type of the event to an error.
func WithUpdatedFieldsDataType ¶ added in v3.9.1
func WithUpdatedFieldsDataType() DefinitionOption
WithUpdatedFieldsDataType is a convenience function that sets the data type of the event to a slice of updated fields.
type Event ¶
type Event interface { Context() context.Context Name() string Time() time.Time Identifiers() []*ttnpb.EntityIdentifiers Data() interface{} CorrelationIDs() []string Origin() string Caller() string Visibility() *ttnpb.Rights AuthType() string AuthTokenID() string AuthTokenType() string RemoteIP() string UserAgent() string }
Event interface
func New ¶
New returns a new Event. Instead of using New, most implementations should first define an event, and then create a new event from that definition.
func UnmarshalJSON ¶
UnmarshalJSON unmarshals an event as JSON.
type Handler ¶
type Handler interface {
Notify(Event)
}
Handler interface for event listeners.
func ContextHandler ¶
ContextHandler delivers events to the Handler as long as ctx.Err() is non-nil.
Example ¶
package main import ( "context" "fmt" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/util/test" ) func main() { // Usually the context comes from somewhere else (e.g. a streaming RPC): ctx, cancel := context.WithCancel(test.Context()) defer cancel() eventChan := make(events.Channel, 2) handler := events.ContextHandler(ctx, eventChan) events.Subscribe("example", handler) // From this moment on, "example" events will be delivered to the channel. // As soon as the channel is full, events will be dropped, so it's probably a // good idea to start handling the channel before subscribing. go func() { for { select { case <-ctx.Done(): // Don't forget to unsubscribe: events.Unsubscribe("example", handler) // The ContextHandler will make sure that no events are delivered after // the context is canceled, so it is now safe to close the channel: close(eventChan) return case e := <-eventChan: fmt.Printf("Received event %v\n", e) } } }() }
Output:
func HandlerFunc ¶
HandlerFunc makes the func implement the Handler interface.
Example ¶
package main import ( "fmt" "go.thethings.network/lorawan-stack/v3/pkg/events" ) func main() { handler := events.HandlerFunc(func(e events.Event) { fmt.Printf("Received event %v\n", e) }) events.Subscribe("example", handler) // From this moment on, "example" events will be delivered to the handler func. events.Unsubscribe("example", handler) // Note that in-transit events may still be delivered after Unsubscribe returns. }
Output:
type IdentifierFilter ¶
type IdentifierFilter interface { Handler Subscribe(ctx context.Context, ids CombinedIdentifiers, handler Handler) Unsubscribe(ctx context.Context, ids CombinedIdentifiers, handler Handler) }
IdentifierFilter can be used as a layer on top of a PubSub to filter events based on the identifiers they contain.
func NewIdentifierFilter ¶
func NewIdentifierFilter() IdentifierFilter
NewIdentifierFilter returns a new IdentifierFilter (see interface).
type Option ¶ added in v3.9.0
type Option interface {
// contains filtered or unexported methods
}
Option is an option that is used to build events.
func WithAuthFromContext ¶ added in v3.9.0
func WithAuthFromContext() Option
WithAuthFromContext returns an option that extracts auth information from the context when the event is created.
func WithClientInfoFromContext ¶ added in v3.9.1
func WithClientInfoFromContext() Option
WithClientInfoFromContext returns an option that extracts the UserAgent and the RemoteIP from the request context.
func WithData ¶ added in v3.9.0
func WithData(data interface{}) Option
WithData returns an option that sets the data of the event.
func WithIdentifiers ¶ added in v3.9.0
func WithIdentifiers(identifiers ...ttnpb.Identifiers) Option
WithIdentifiers returns an option that sets the identifiers of the event.
func WithVisibility ¶ added in v3.9.0
WithVisibility returns an option that sets the visibility of the event.
type PubSub ¶
type PubSub interface { Publisher Subscriber }
PubSub interface combines the Publisher and Subscriber interfaces.
type Publisher ¶
type Publisher interface { // Publish emits an event on the default event pubsub. Publish(evt Event) }
Publisher interface lets you publish events.
type Subscriber ¶
type Subscriber interface { // Subscribe adds an event handler to the default event pubsub. // The name can be a glob in order to catch multiple event types. // The handler must be non-blocking. Subscribe(name string, hdl Handler) error // Unsubscribe removes an event handler from the default event pubsub. // Queued or in-transit events may still be delivered to the handler // even after Unsubscribe returns. Unsubscribe(name string, hdl Handler) }
Subscriber interface lets you subscribe to events.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package cloud implements an events.PubSub implementation that uses Go Cloud PubSub.
|
Package cloud implements an events.PubSub implementation that uses Go Cloud PubSub. |
Package fs implements watching files for changes.
|
Package fs implements watching files for changes. |
Package grpc contains an implementation of the EventsServer, which is used to stream all events published for a set of identifiers.
|
Package grpc contains an implementation of the EventsServer, which is used to stream all events published for a set of identifiers. |
Package redis implements an events.PubSub implementation that uses Redis PubSub.
|
Package redis implements an events.PubSub implementation that uses Redis PubSub. |