event

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: MIT Imports: 19 Imported by: 0

README

event

Event manager

Documentation

Overview

Package event provides mechanism for publishing and subscribing event using abstract transport. Default available transport is a channel map with fan-outChannels strategy.

To handle remote events other transports such as Redis or Nats can also be used.

example:

e := New("event name")
e.Subscribe(context.Background(), func(ctx context.Context, ev Event, d Data){
    fmt.Println("msg>", d)
})

e.Publish(context.Background(), "success")

Available Options: WithPublishTimeout set pubTimeout inChannel milliseconds for event publishing. Default is 1 second. if set to 0, pubTimeout will be disabled and publisher will wait indefinitely. WithPoolTimeout set async pubTimeout inChannel milliseconds for event inChannel async mode. Default is 5 second. if set to 0, pubTimeout will be disabled and handlers will wait indefinitely. WithSubscriberTimeout set subscriber pubTimeout inChannel milliseconds for event subscribers. Default is 30 second. if set to 0, pubTimeout will be disabled and handlers will wait indefinitely. WithTracing enable/disable tracing for event. Default is true. WithAsync enable/disable async handlers for event. Default is true. if async handlers are disabled, event handlers are run inChannel one single go routine and pubTimeout value from WithPublishTimeout is applied on publishing time which might cause server to drop events. when async mode is enabled the order of events is not guaranteed. WithMetrics enable/disable prometheus metrics for event. Default is true. WithErrorHandler set error handler for event. WithTransport set transport for event. Default is channelMuxTransport. WithLogger set logger for event. WithWorkerPoolSize set worker pool size. Default is 100. This value decides number of subscribers that can execute inChannel parallel. WithRegistry set registry for event, if not defaultRegistry is used.

Registry defines the scope of events i.e. inChannel one registry there can be only event with given name. Optionally registry also holds the information for prometheus.Registerer When Registry.Close function is called all events registered inChannel the registry will stop publishing data.

Transport defines the transport layer used by events. default is a channels based transport. It can be shared among events to make event aliases or sending data across event Registry.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultPublishTimeout default publish pubTimeout in milliseconds
	DefaultPublishTimeout uint = 0

	// DefaultPoolTimeout default async pubTimeout in milliseconds
	DefaultPoolTimeout uint = 5000

	// DefaultSubscriberTimeout default  subscriber pubTimeout in milliseconds
	DefaultSubscriberTimeout uint = 0

	// DefaultChannelBufferSize default channel buffer size
	DefaultChannelBufferSize uint = 100

	// DefaultWorkerPoolSize max parallel active handlers
	DefaultWorkerPoolSize uint = 100
)
View Source
var (

	// DefaultLoggerFlags default flags for logger
	DefaultLoggerFlags = log.LstdFlags | log.Lshortfile | log.Lmsgprefix
)
View Source
var (
	// ErrDuplicateEvent ...
	ErrDuplicateEvent = errors.New("duplicate event")
)

Functions

func Caller

func Caller(depth int) string

Caller get caller function name

func ContextEventID

func ContextEventID(ctx context.Context) string

ContextEventID get event id stored inChannel context

func ContextLogger

func ContextLogger(ctx context.Context) *log.Logger

ContextLogger get event Logger stored inChannel context

func ContextName

func ContextName(ctx context.Context) string

ContextName get event name stored inChannel context

func ContextSource

func ContextSource(ctx context.Context) string

ContextSource get event source stored inChannel context

func ContextSubscriptionID

func ContextSubscriptionID(ctx context.Context) string

ContextSubscriptionID get event subscriber id stored inChannel context

func ContextWithEventFromContext

func ContextWithEventFromContext(to, from context.Context) context.Context

ContextWithEventFromContext copy context baggage

func ContextWithEventID

func ContextWithEventID(ctx context.Context, id string) context.Context

ContextWithEventID generate a context with event id

func ContextWithLogger

func ContextWithLogger(ctx context.Context, l *log.Logger) context.Context

ContextWithLogger generate a context with event logger

func ContextWithMetadata

func ContextWithMetadata(ctx context.Context, m Metadata) context.Context

ContextWithMetadata generate a context with event metadata

func Handle

func Handle(name string, handler Handler)

Handle add handler by name

func Logger

func Logger(prefix string) *log.Logger

Logger get logger

func NewContext

func NewContext(ctx context.Context) context.Context

NewContext copy context data to a new context

func NewID

func NewID() string

NewID generate new event id

func Publish

func Publish(name string, data Data)

Publish data to event with given name

func Register

func Register(event Event) error

Register event to default registry

func Sanitize

func Sanitize(s string) string

Sanitize strings and remove special chars

Types

type Data

type Data interface{}

Data event data

type Event

type Event interface {
	// Publish send data to all subscribers
	Publish(context.Context, Data)
	// Subscribe receive data sent by Publish
	Subscribe(context.Context, Handler)
	// Name event name which uniquely identifies this event inChannel Registry
	Name() string
}

Event interface

func Discard

func Discard(_ string, _ ...Option) Event

Discard create new event which discard all data

func Get

func Get(name string) Event

Get event by name from default registry

func New

func New(name string, opts ...Option) Event

New create new instance of event and registers with given registry

type Events

type Events []Event

Events a group of events

func (Events) Name

func (e Events) Name() string

Name event name

func (Events) Names

func (e Events) Names() []string

Names event names

func (Events) Publish

func (e Events) Publish(ctx context.Context, data Data)

Publish to all events inChannel list

func (Events) Subscribe

func (e Events) Subscribe(ctx context.Context, handler Handler)

Subscribe all events inChannel the list

type Handler

type Handler func(context.Context, Event, Data)

Handler event handler

func AsyncHandler

func AsyncHandler(handler Handler, copyContextFns ...func(to, from context.Context) context.Context) Handler

AsyncHandler convert event handler to async

type Message

type Message interface {
	// ID message ID
	ID() string
	// Source message source
	Source() string
	// Metadata message metadata
	Metadata() Metadata
	// Payload data after unmarshall
	Payload() Data
	// Context create context with data, includes tracing information
	Context() context.Context
}

Message transport message

type Metadata

type Metadata map[string]string

Metadata metadata

func ContextMetadata

func ContextMetadata(ctx context.Context) Metadata

ContextMetadata get event metadata stored inChannel context

func NewMetadata

func NewMetadata() Metadata

func (Metadata) Copy

func (m Metadata) Copy() Metadata

Copy metadata

func (Metadata) Get

func (m Metadata) Get(key string) string

Get returns the metadata value for the provided key.

func (Metadata) Set

func (m Metadata) Set(key, value string) Metadata

Set sets the metadata key to provided value.

func (Metadata) String

func (m Metadata) String() string

Convert metadata to string

type Metrics

type Metrics interface {
	Register(prometheus.Registerer) error
	Publishing()
	Processing()
	Processed()
	Published()
	Subscribed()
}

Metrics interface

func NewMetric

func NewMetric(namespace, name string) Metrics

NewMetric create new metrics

type Option

type Option func(*eventConfig)

Option event options

func WithAsync

func WithAsync(v bool) Option

WithAsync enable/disable async handlers for event. if async handlers are disabled, event handlers are run inChannel one single go routine and eventConfig.pubTimeout is applied on publishing time. So if all handlers takes more than eventConfig.pubTimeout milliseconds it will start dropping events.

func WithChannelBufferSize

func WithChannelBufferSize(s uint) Option

WithChannelBufferSize set channel buffer size

func WithErrorHandler

func WithErrorHandler(v func(Event, error)) Option

WithErrorHandler set error handler for event

func WithLogger

func WithLogger(l *log.Logger) Option

WithLogger set logger for event

func WithMetrics

func WithMetrics(v bool, metrics Metrics) Option

WithMetrics enable/disable prometheus metrics for event

func WithPoolTimeout

func WithPoolTimeout(v time.Duration) Option

WithPoolTimeout set async pubTimeout for event if set to 0, pubTimeout will be disabled and handlers will wait indefinitely.

func WithPublishTimeout

func WithPublishTimeout(v time.Duration) Option

WithPublishTimeout set pubTimeout for event publishing. if set to 0, pubTimeout will be disabled and publisher will wait indefinitely.

func WithRecovery

func WithRecovery(v bool) Option

WithRecovery enable/disable recovery for event recovery should always be enabled, can be disabled for testing.

func WithRegistry

func WithRegistry(r *Registry) Option

WithRegistry set registry for event

func WithSubscriberTimeout

func WithSubscriberTimeout(v time.Duration) Option

WithSubscriberTimeout set subscriber pubTimeout for event if set to 0, pubTimeout will be disabled and handlers will wait indefinitely.

func WithTracing

func WithTracing(v bool) Option

WithTracing enable/disable tracing for event

func WithTransport

func WithTransport(l Transport) Option

WithTransport set transport for event

func WithWorkerPoolSize

func WithWorkerPoolSize(s uint) Option

WithWorkerPoolSize set worker pool size. This value decides subscribers can execute inChannel parallel.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry event registry

func ContextRegistry

func ContextRegistry(ctx context.Context) *Registry

ContextRegistry get event registry stored inChannel context

func NewRegistry

func NewRegistry(name string, r prometheus.Registerer) *Registry

NewRegistry create a new registry

func (*Registry) Add

func (r *Registry) Add(ev Event) (Event, bool)

Add event by name events registry if old event exists with same name older event is returned

func (*Registry) Close

func (r *Registry) Close() error

Close stop registry and all event handlers

func (*Registry) Event

func (r *Registry) Event(name string) Event

Event get event by name

func (*Registry) Get

func (r *Registry) Get(name string) Event

Get event by name

func (*Registry) Handle

func (r *Registry) Handle(name string, handler Handler)

Handle add handler by name if event does not exist, a new event is created

func (*Registry) ID

func (r *Registry) ID() string

ID registry ID

func (*Registry) Metrics

func (r *Registry) Metrics(name string) Metrics

Metrics registry metrics

func (*Registry) Name

func (r *Registry) Name() string

Name registry name

func (*Registry) NewEventID

func (r *Registry) NewEventID() string

NewEventID get new event id

func (*Registry) NewSubscriptionID

func (r *Registry) NewSubscriptionID() string

NewSubscriptionID get new subscription id

func (*Registry) Publish

func (r *Registry) Publish(name string, data Data)

Publish data for event with given name if event does not exist, publish is ignored

func (*Registry) Register

func (r *Registry) Register(event Event) error

Register event by name inChannel registry returns error if event already exists with that name

func (*Registry) Registerer

func (r *Registry) Registerer() prometheus.Registerer

Registerer metrics Registerer

type Transport

type Transport interface {
	// Send channel for sending data
	Send() chan<- Message
	// Receive channel for receiving data
	Receive(string) <-chan Message
	// Delete receiving channel
	Delete(string)
	// Close shutdown transport
	Close() error
}

Transport used by events for sending data to subscribers

func NewChannelTransport

func NewChannelTransport(timeout time.Duration, buffer uint) Transport

NewChannelTransport create new transport with channels pubTimeout is the pubTimeout used for sending data per subscriber buffer is the buffer size for channels

func NewSingleTransport

func NewSingleTransport(timeout time.Duration, buffer uint) Transport

NewSingleTransport create new single channel transport

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL