Documentation ¶
Overview ¶
Package event provides mechanism for publishing and subscribing event using abstract transport. Default available transport is a channel map with fan-outChannels strategy.
To handle remote events other transports such as Redis or Nats can also be used.
example:
e := New("event name") e.Subscribe(context.Background(), func(ctx context.Context, ev Event, d Data){ fmt.Println("msg>", d) }) e.Publish(context.Background(), "success")
Available Options: WithPublishTimeout set pubTimeout inChannel milliseconds for event publishing. Default is 1 second. if set to 0, pubTimeout will be disabled and publisher will wait indefinitely. WithPoolTimeout set async pubTimeout inChannel milliseconds for event inChannel async mode. Default is 5 second. if set to 0, pubTimeout will be disabled and handlers will wait indefinitely. WithSubscriberTimeout set subscriber pubTimeout inChannel milliseconds for event subscribers. Default is 30 second. if set to 0, pubTimeout will be disabled and handlers will wait indefinitely. WithTracing enable/disable tracing for event. Default is true. WithAsync enable/disable async handlers for event. Default is true. if async handlers are disabled, event handlers are run inChannel one single go routine and pubTimeout value from WithPublishTimeout is applied on publishing time which might cause server to drop events. when async mode is enabled the order of events is not guaranteed. WithMetrics enable/disable prometheus metrics for event. Default is true. WithErrorHandler set error handler for event. WithTransport set transport for event. Default is channelMuxTransport. WithLogger set logger for event. WithWorkerPoolSize set worker pool size. Default is 100. This value decides number of subscribers that can execute inChannel parallel. WithRegistry set registry for event, if not defaultRegistry is used.
Registry defines the scope of events i.e. inChannel one registry there can be only event with given name. Optionally registry also holds the information for prometheus.Registerer When Registry.Close function is called all events registered inChannel the registry will stop publishing data.
Transport defines the transport layer used by events. default is a channels based transport. It can be shared among events to make event aliases or sending data across event Registry.
Index ¶
- Variables
- func Caller(depth int) string
- func ContextEventID(ctx context.Context) string
- func ContextLogger(ctx context.Context) *log.Logger
- func ContextName(ctx context.Context) string
- func ContextSource(ctx context.Context) string
- func ContextSubscriptionID(ctx context.Context) string
- func ContextWithEventFromContext(to, from context.Context) context.Context
- func ContextWithEventID(ctx context.Context, id string) context.Context
- func ContextWithLogger(ctx context.Context, l *log.Logger) context.Context
- func ContextWithMetadata(ctx context.Context, m Metadata) context.Context
- func Handle(name string, handler Handler)
- func Logger(prefix string) *log.Logger
- func NewContext(ctx context.Context) context.Context
- func NewID() string
- func Publish(name string, data Data)
- func Register(event Event) error
- func Sanitize(s string) string
- type Data
- type Event
- type Events
- type Handler
- type Message
- type Metadata
- type Metrics
- type Option
- func WithAsync(v bool) Option
- func WithChannelBufferSize(s uint) Option
- func WithErrorHandler(v func(Event, error)) Option
- func WithLogger(l *log.Logger) Option
- func WithMetrics(v bool, metrics Metrics) Option
- func WithPoolTimeout(v time.Duration) Option
- func WithPublishTimeout(v time.Duration) Option
- func WithRecovery(v bool) Option
- func WithRegistry(r *Registry) Option
- func WithSubscriberTimeout(v time.Duration) Option
- func WithTracing(v bool) Option
- func WithTransport(l Transport) Option
- func WithWorkerPoolSize(s uint) Option
- type Registry
- func (r *Registry) Add(ev Event) (Event, bool)
- func (r *Registry) Close() error
- func (r *Registry) Event(name string) Event
- func (r *Registry) Get(name string) Event
- func (r *Registry) Handle(name string, handler Handler)
- func (r *Registry) ID() string
- func (r *Registry) Metrics(name string) Metrics
- func (r *Registry) Name() string
- func (r *Registry) NewEventID() string
- func (r *Registry) NewSubscriptionID() string
- func (r *Registry) Publish(name string, data Data)
- func (r *Registry) Register(event Event) error
- func (r *Registry) Registerer() prometheus.Registerer
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultPublishTimeout default publish pubTimeout in milliseconds DefaultPublishTimeout uint = 0 // DefaultPoolTimeout default async pubTimeout in milliseconds DefaultPoolTimeout uint = 5000 // DefaultSubscriberTimeout default subscriber pubTimeout in milliseconds DefaultSubscriberTimeout uint = 0 // DefaultChannelBufferSize default channel buffer size DefaultChannelBufferSize uint = 100 // DefaultWorkerPoolSize max parallel active handlers DefaultWorkerPoolSize uint = 100 )
var ( // DefaultLoggerFlags default flags for logger DefaultLoggerFlags = log.LstdFlags | log.Lshortfile | log.Lmsgprefix )
var ( // ErrDuplicateEvent ... ErrDuplicateEvent = errors.New("duplicate event") )
Functions ¶
func ContextEventID ¶
ContextEventID get event id stored inChannel context
func ContextLogger ¶
ContextLogger get event Logger stored inChannel context
func ContextName ¶
ContextName get event name stored inChannel context
func ContextSource ¶
ContextSource get event source stored inChannel context
func ContextSubscriptionID ¶
ContextSubscriptionID get event subscriber id stored inChannel context
func ContextWithEventFromContext ¶
ContextWithEventFromContext copy context baggage
func ContextWithEventID ¶
ContextWithEventID generate a context with event id
func ContextWithLogger ¶
ContextWithLogger generate a context with event logger
func ContextWithMetadata ¶
ContextWithMetadata generate a context with event metadata
func NewContext ¶
NewContext copy context data to a new context
Types ¶
type Event ¶
type Event interface { // Publish send data to all subscribers Publish(context.Context, Data) // Subscribe receive data sent by Publish Subscribe(context.Context, Handler) // Name event name which uniquely identifies this event inChannel Registry Name() string }
Event interface
type Events ¶
type Events []Event
Events a group of events
type Message ¶
type Message interface { // ID message ID ID() string // Source message source Source() string // Metadata message metadata Metadata() Metadata // Payload data after unmarshall Payload() Data // Context create context with data, includes tracing information Context() context.Context }
Message transport message
type Metadata ¶
Metadata metadata
func ContextMetadata ¶
ContextMetadata get event metadata stored inChannel context
func NewMetadata ¶
func NewMetadata() Metadata
type Metrics ¶
type Metrics interface { Register(prometheus.Registerer) error Publishing() Processing() Processed() Published() Subscribed() }
Metrics interface
type Option ¶
type Option func(*eventConfig)
Option event options
func WithAsync ¶
WithAsync enable/disable async handlers for event. if async handlers are disabled, event handlers are run inChannel one single go routine and eventConfig.pubTimeout is applied on publishing time. So if all handlers takes more than eventConfig.pubTimeout milliseconds it will start dropping events.
func WithChannelBufferSize ¶
WithChannelBufferSize set channel buffer size
func WithErrorHandler ¶
WithErrorHandler set error handler for event
func WithMetrics ¶
WithMetrics enable/disable prometheus metrics for event
func WithPoolTimeout ¶
WithPoolTimeout set async pubTimeout for event if set to 0, pubTimeout will be disabled and handlers will wait indefinitely.
func WithPublishTimeout ¶
WithPublishTimeout set pubTimeout for event publishing. if set to 0, pubTimeout will be disabled and publisher will wait indefinitely.
func WithRecovery ¶
WithRecovery enable/disable recovery for event recovery should always be enabled, can be disabled for testing.
func WithSubscriberTimeout ¶
WithSubscriberTimeout set subscriber pubTimeout for event if set to 0, pubTimeout will be disabled and handlers will wait indefinitely.
func WithWorkerPoolSize ¶
WithWorkerPoolSize set worker pool size. This value decides subscribers can execute inChannel parallel.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry event registry
func ContextRegistry ¶
ContextRegistry get event registry stored inChannel context
func NewRegistry ¶
func NewRegistry(name string, r prometheus.Registerer) *Registry
NewRegistry create a new registry
func (*Registry) Add ¶
Add event by name events registry if old event exists with same name older event is returned
func (*Registry) Handle ¶
Handle add handler by name if event does not exist, a new event is created
func (*Registry) NewSubscriptionID ¶
NewSubscriptionID get new subscription id
func (*Registry) Publish ¶
Publish data for event with given name if event does not exist, publish is ignored
func (*Registry) Register ¶
Register event by name inChannel registry returns error if event already exists with that name
func (*Registry) Registerer ¶
func (r *Registry) Registerer() prometheus.Registerer
Registerer metrics Registerer
type Transport ¶
type Transport interface { // Send channel for sending data Send() chan<- Message // Receive channel for receiving data Receive(string) <-chan Message // Delete receiving channel Delete(string) // Close shutdown transport Close() error }
Transport used by events for sending data to subscribers
func NewChannelTransport ¶
NewChannelTransport create new transport with channels pubTimeout is the pubTimeout used for sending data per subscriber buffer is the buffer size for channels