Documentation ¶
Overview ¶
Package soiree provides a simple event emitter that allows you to emit events and listen for them
Index ¶
- Variables
- type BaseEvent
- func (e *BaseEvent) Client() interface{}
- func (e *BaseEvent) Context() context.Context
- func (e *BaseEvent) IsAborted() bool
- func (e *BaseEvent) Payload() interface{}
- func (e *BaseEvent) Properties() Properties
- func (e *BaseEvent) SetAborted(abort bool)
- func (e *BaseEvent) SetClient(client interface{})
- func (e *BaseEvent) SetContext(ctx context.Context)
- func (e *BaseEvent) SetPayload(payload interface{})
- func (e *BaseEvent) SetProperties(properties Properties)
- func (e *BaseEvent) Topic() string
- type Event
- type EventPool
- func (m *EventPool) Close() error
- func (m *EventPool) Emit(eventName string, payload interface{}) <-chan error
- func (m *EventPool) EmitSync(eventName string, payload interface{}) []error
- func (m *EventPool) EnsureTopic(topicName string) *Topic
- func (m *EventPool) GetClient() interface{}
- func (m *EventPool) GetTopic(topicName string) (*Topic, error)
- func (m *EventPool) Off(topicName string, listenerID string) error
- func (m *EventPool) On(topicName string, listener Listener, opts ...ListenerOption) (string, error)
- func (m *EventPool) SetClient(client interface{})
- func (m *EventPool) SetErrChanBufferSize(size int)
- func (m *EventPool) SetErrorHandler(handler func(Event, error) error)
- func (m *EventPool) SetIDGenerator(generator func() string)
- func (m *EventPool) SetPanicHandler(panicHandler PanicHandler)
- func (m *EventPool) SetPool(pool Pool)
- type EventPoolOption
- func WithClient(client interface{}) EventPoolOption
- func WithErrChanBufferSize(size int) EventPoolOption
- func WithErrorHandler(errHandler func(Event, error) error) EventPoolOption
- func WithIDGenerator(idGen func() string) EventPoolOption
- func WithPanicHandler(panicHandler PanicHandler) EventPoolOption
- func WithPool(pool Pool) EventPoolOption
- type Listener
- type ListenerOption
- type PanicHandler
- type PondPool
- func (p *PondPool) CompletedTasks() int
- func (p *PondPool) FailedTasks() int
- func (p *PondPool) NewStatsCollector()
- func (p *PondPool) Release()
- func (p *PondPool) Running() int64
- func (p *PondPool) Stop()
- func (p *PondPool) Submit(task func())
- func (p *PondPool) SubmitMultipleAndWait(task []func())
- func (p *PondPool) SubmittedTasks() int
- func (p *PondPool) SuccessfulTasks() int
- func (p *PondPool) WaitingTasks() int
- type Pool
- type PoolOptions
- type Priority
- type Properties
- type Soiree
- type Topic
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNilListener is returned when a listener is nil ErrNilListener = errors.New("listener cannot be nil") // ErrInvalidTopicName is returned when a topic name is invalid ErrInvalidTopicName = errors.New("invalid topic name") // ErrInvalidPriority is returned when a priority is invalid ErrInvalidPriority = errors.New("invalid priority") // ErrTopicNotFound is returned when a listener option is invalid ErrTopicNotFound = errors.New("topic not found") // ErrListenerNotFound is returned when a listener is not found ErrListenerNotFound = errors.New("listener not found") // ErrEventProcessingAborted is returned when event processing is aborted ErrEventProcessingAborted = errors.New("event processing aborted") // ErrEmitterClosed is returned when the soiree is closed ErrEmitterClosed = errors.New("soiree is closed") // ErrEmitterAlreadyClosed is returned when the soiree is already closed ErrEmitterAlreadyClosed = errors.New("soiree is already closed") )
var DefaultErrorHandler = func(event Event, err error) error {
return err
}
var DefaultIDGenerator = func() string { return ulids.New().String() }
DefaultIDGenerator generates a unique identifier
var DefaultPanicHandler = func(p interface{}) { log.Error().Msgf("Panic occurred processing event: %v", p) }
DefaultPanicHandler handles panics by printing the panic value
Functions ¶
This section is empty.
Types ¶
type BaseEvent ¶
type BaseEvent struct {
// contains filtered or unexported fields
}
BaseEvent serves as a basic implementation of the `Event` interface and contains fields for storing the topic, payload, and aborted status of an event. The struct includes methods to interact with these fields such as getting and setting the payload, setting the aborted status, and checking if the event has been aborted. The struct also includes a `sync.RWMutex` field `mu` to handle concurrent access to the struct's fields in a thread-safe manner
func NewBaseEvent ¶
NewBaseEvent creates a new instance of BaseEvent with a payload
func (*BaseEvent) Client ¶ added in v0.4.1
func (e *BaseEvent) Client() interface{}
Client returns the event's client
func (*BaseEvent) Payload ¶
func (e *BaseEvent) Payload() interface{}
Payload returns the event's payload
func (*BaseEvent) Properties ¶
func (e *BaseEvent) Properties() Properties
Properties returns the event's properties
func (*BaseEvent) SetAborted ¶
SetAborted sets the event's aborted status
func (*BaseEvent) SetClient ¶ added in v0.4.1
func (e *BaseEvent) SetClient(client interface{})
SetClient sets the event's client
func (*BaseEvent) SetContext ¶ added in v0.4.1
SetContext sets the event's context
func (*BaseEvent) SetPayload ¶
func (e *BaseEvent) SetPayload(payload interface{})
SetPayload sets the event's payload
func (*BaseEvent) SetProperties ¶
func (e *BaseEvent) SetProperties(properties Properties)
SetProperties sets the event's properties
type Event ¶
type Event interface { // Topic returns the event's topic Topic() string // Payload returns the event's payload Payload() interface{} // Properties returns the event's properties Properties() Properties // SetPayload sets the event's payload SetPayload(interface{}) // SetProperties sets the event's properties SetProperties(Properties) // SetAborted sets the event's aborted status SetAborted(bool) // IsAborted checks the event's aborted status IsAborted() bool // Context returns the event's context Context() context.Context // SetContext sets the event's context SetContext(context.Context) // Client returns the event's client Client() interface{} // SetClient sets the event's client SetClient(interface{}) }
Event is an interface representing the structure of an instance of an event
type EventPool ¶
type EventPool struct {
// contains filtered or unexported fields
}
EventPool struct is controlling subscribing and unsubscribing listeners to topics, and emitting events to all subscribers
func NewEventPool ¶
func NewEventPool(opts ...EventPoolOption) *EventPool
NewEventPool initializes a new EventPool with optional configuration options
func (*EventPool) Close ¶
Close terminates the soiree, ensuring all pending events are processed; it performs cleanup and releases resources
func (*EventPool) Emit ¶
Emit asynchronously dispatches an event to all the subscribers of the event's topic It returns a channel that will receive any errors encountered during event handling
func (*EventPool) EmitSync ¶
EmitSync dispatches an event synchronously to all subscribers of the event's topic; his method will block until all notifications are completed
func (*EventPool) EnsureTopic ¶
EnsureTopic retrieves or creates a new topic by its name
func (*EventPool) GetClient ¶ added in v0.4.1
func (m *EventPool) GetClient() interface{}
GetClient fetches the set client on the event pool
func (*EventPool) GetTopic ¶
GetTopic retrieves a topic by its name. If the topic does not exist, it returns an error
func (*EventPool) On ¶
On subscribes a listener to a topic with the given name; returns a unique listener ID
func (*EventPool) SetClient ¶ added in v0.4.1
func (m *EventPool) SetClient(client interface{})
SetClient sets a client that can be used as a part of the event pool
func (*EventPool) SetErrChanBufferSize ¶
SetErrChanBufferSize sets the buffer size for the error channel for the event pool
func (*EventPool) SetErrorHandler ¶
SetErrorHandler sets the error handler for the event pool
func (*EventPool) SetIDGenerator ¶
SetIDGenerator sets the ID generator for the event pool
func (*EventPool) SetPanicHandler ¶
func (m *EventPool) SetPanicHandler(panicHandler PanicHandler)
SetPanicHandler sets the panic handler for the event pool
type EventPoolOption ¶
type EventPoolOption func(Soiree)
EventPoolOption defines a function type for Soiree configuration options
func WithClient ¶ added in v0.4.1
func WithClient(client interface{}) EventPoolOption
WithClient sets a custom client for the Soiree
func WithErrChanBufferSize ¶
func WithErrChanBufferSize(size int) EventPoolOption
WithErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits
func WithErrorHandler ¶
func WithErrorHandler(errHandler func(Event, error) error) EventPoolOption
WithErrorHandler sets a custom error handler for an Soiree
func WithIDGenerator ¶
func WithIDGenerator(idGen func() string) EventPoolOption
WithIDGenerator sets a custom ID generator for an Soiree
func WithPanicHandler ¶
func WithPanicHandler(panicHandler PanicHandler) EventPoolOption
WithPanicHandler sets a custom panic handler for an Soiree
type Listener ¶
Listener is a function type that can handle events of any type Listener takes an `Event` as a parameter and returns an `error`. This allows you to define functions that conform to this specific signature, making it easier to work with event listeners in the other parts of the code
type ListenerOption ¶
type ListenerOption func(*listenerItem)
ListenerOption is a function type that configures listener behavior
func WithListenerClient ¶ added in v0.4.1
func WithListenerClient(client interface{}) ListenerOption
WithClient sets the client of a listener
func WithPriority ¶
func WithPriority(priority Priority) ListenerOption
WithPriority sets the priority of a listener
type PanicHandler ¶
type PanicHandler func(interface{})
PanicHandler is a function type that handles panics
type PondPool ¶
type PondPool struct { // MaxWorkers is the maximum number of workers in the pool MaxWorkers int `json:"maxWorkers" koanf:"maxWorkers" default:"100"` // contains filtered or unexported fields }
PondPool is a worker pool implementation using the pond library
func NewPondPool ¶
func NewPondPool(opts ...PoolOptions) *PondPool
NewPondPool creates a new worker pool using the pond library
func (*PondPool) CompletedTasks ¶
CompletedTasks returns the number of tasks that completed either successfully or with a panic
func (*PondPool) FailedTasks ¶
FailedTasks returns the number of tasks that completed with a panic
func (*PondPool) NewStatsCollector ¶
func (p *PondPool) NewStatsCollector()
func (*PondPool) Release ¶
func (p *PondPool) Release()
Release stops all workers in the pool and waits for them to finish
func (*PondPool) Stop ¶
func (p *PondPool) Stop()
Stop causes this pool to stop accepting new tasks and signals all workers to exit Tasks being executed by workers will continue until completion (unless the process is terminated) Tasks in the queue will not be executed (so will drop any buffered tasks - ideally use Release)
func (*PondPool) Submit ¶
func (p *PondPool) Submit(task func())
Submit submits a task to the worker pool
func (*PondPool) SubmitMultipleAndWait ¶
func (p *PondPool) SubmitMultipleAndWait(task []func())
SubmitMultipleAndWait submits multiple tasks to the worker pool and waits for all them to finish
func (*PondPool) SubmittedTasks ¶
SubmittedTasks returns the number of tasks submitted to the pool
func (*PondPool) SuccessfulTasks ¶
SuccessfulTasks returns the number of tasks that completed successfully
func (*PondPool) WaitingTasks ¶
WaitingTasks returns the number of tasks waiting in the pool
type Pool ¶
type Pool interface { // Submit submits a task to the worker pool Submit(task func()) // Running returns the number of running workers in the pool Running() int64 // Release stops all workers in the pool and waits for them to finish Release() // Stop causes this pool to stop accepting new tasks and signals all workers to exit Stop() // SubmittedTasks returns the number of tasks submitted to the pool SubmittedTasks() int // WaitingTasks returns the number of tasks waiting in the pool WaitingTasks() int // SuccessfulTasks returns the number of tasks that completed successfully SuccessfulTasks() int // FailedTasks returns the number of tasks that completed with a panic FailedTasks() int // CompletedTasks returns the number of tasks that completed either successfully or with a panic CompletedTasks() int }
Pool is an interface for a worker pool
type PoolOptions ¶ added in v0.4.1
type PoolOptions func(*PondPool)
PoolOptions is a type for setting options on the pool
func WithMaxWorkers ¶ added in v0.4.1
func WithMaxWorkers(maxWorkers int) PoolOptions
WithMaxWorkers sets the maximum number of workers in the pool
func WithName ¶ added in v0.4.1
func WithName(name string) PoolOptions
WithName sets the name of the pool
func WithOptions ¶ added in v0.4.1
func WithOptions(opts ...pond.Option) PoolOptions
WithOptions sets the options for the pool
type Properties ¶
type Properties map[string]interface{}
Properties is a map of properties to set on an event
func (Properties) GetKey ¶ added in v0.4.1
func (p Properties) GetKey(key string) interface{}
Get a property from the Properties map
func (Properties) Set ¶
func (p Properties) Set(name string, value interface{}) Properties
Set a property on the Properties map
type Soiree ¶
type Soiree interface { // On registers a listener function to a specific topic On(topicName string, listener Listener, opts ...ListenerOption) (string, error) // Off removes a listener from a specific topic using the listener's unique ID Off(topicName string, listenerID string) error // Emit asynchronously sends an event to all subscribers of a topic and returns a channel of errors Emit(eventName string, payload interface{}) <-chan error // EmitSync sends an event synchronously to all subscribers of a topic; blocks until all listeners have been notified EmitSync(eventName string, payload interface{}) []error // GetTopic retrieves the Topic object associated with the given topic name GetTopic(topicName string) (*Topic, error) // EnsureTopic creates a new topic if it does not exist, or returns the existing one EnsureTopic(topicName string) *Topic // SetErrorHandler assigns a custom error handler function for the Soiree SetErrorHandler(func(Event, error) error) // SetIDGenerator assigns a function that generates a unique ID string for new listeners SetIDGenerator(func() string) // SetPool sets a custom goroutine pool for managing concurrency within the Soiree SetPool(Pool) // SetPanicHandler sets a function that will be called in case of a panic during event handling SetPanicHandler(PanicHandler) // SetErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits SetErrChanBufferSize(int) // Close gracefully shuts down the Soiree, ensuring all pending events are processed Close() error // SetClient sets the client for the Soiree SetClient(client interface{}) // GetClient gets the client for the Soiree GetClient() interface{} }
Soiree is an interface that defines the behavior of your get-together
type Topic ¶
type Topic struct { // Name signifies the topic's unique identifier Name string // contains filtered or unexported fields }
Topic represents an event channel to which listeners can subscribe
func (*Topic) AddListener ¶
func (t *Topic) AddListener(id string, listener Listener, opts ...ListenerOption)
AddListener adds a new listener to the topic with a specified priority and returns an identifier for the listener
func (*Topic) RemoveListener ¶
RemoveListener removes a listener from the topic using its identifier