Documentation ¶
Overview ¶
Package soiree provides a simple event emitter that allows you to emit events and listen for them
Index ¶
- Variables
- type BaseEvent
- func (e *BaseEvent) IsAborted() bool
- func (e *BaseEvent) Payload() interface{}
- func (e *BaseEvent) Properties() Properties
- func (e *BaseEvent) SetAborted(abort bool)
- func (e *BaseEvent) SetPayload(payload interface{})
- func (e *BaseEvent) SetProperties(properties Properties)
- func (e *BaseEvent) Topic() string
- type Event
- type EventPool
- func (m *EventPool) Close() error
- func (m *EventPool) Emit(eventName string, payload interface{}) <-chan error
- func (m *EventPool) EmitSync(eventName string, payload interface{}) []error
- func (m *EventPool) EnsureTopic(topicName string) *Topic
- func (m *EventPool) GetTopic(topicName string) (*Topic, error)
- func (m *EventPool) Off(topicName string, listenerID string) error
- func (m *EventPool) On(topicName string, listener Listener, opts ...ListenerOption) (string, error)
- func (m *EventPool) SetErrChanBufferSize(size int)
- func (m *EventPool) SetErrorHandler(handler func(Event, error) error)
- func (m *EventPool) SetIDGenerator(generator func() string)
- func (m *EventPool) SetPanicHandler(panicHandler PanicHandler)
- func (m *EventPool) SetPool(pool Pool)
- type EventPoolOption
- type Listener
- type ListenerOption
- type PanicHandler
- type PondPool
- func (p *PondPool) CompletedTasks() int
- func (p *PondPool) FailedTasks() int
- func (p *PondPool) IdleWorkers() int
- func (p *PondPool) NewStatsCollector()
- func (p *PondPool) Release()
- func (p *PondPool) ReleaseWithDeadline(deadline time.Duration)
- func (p *PondPool) Running() int
- func (p *PondPool) Stop()
- func (p *PondPool) StopAndWaitFor(deadline time.Duration)
- func (p *PondPool) Submit(task func())
- func (p *PondPool) SubmitAndWait(task func())
- func (p *PondPool) SubmitBefore(task func(), deadline time.Duration)
- func (p *PondPool) SubmitMultipleAndWait(task []func())
- func (p *PondPool) SubmittedTasks() int
- func (p *PondPool) SuccessfulTasks() int
- func (p *PondPool) WaitingTasks() int
- type Pool
- type Priority
- type Properties
- type Soiree
- type Topic
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNilListener is returned when a listener is nil ErrNilListener = errors.New("listener cannot be nil") // ErrInvalidTopicName is returned when a topic name is invalid ErrInvalidTopicName = errors.New("invalid topic name") // ErrInvalidPriority is returned when a priority is invalid ErrInvalidPriority = errors.New("invalid priority") // ErrTopicNotFound is returned when a listener option is invalid ErrTopicNotFound = errors.New("topic not found") // ErrListenerNotFound is returned when a listener is not found ErrListenerNotFound = errors.New("listener not found") // ErrEventProcessingAborted is returned when event processing is aborted ErrEventProcessingAborted = errors.New("event processing aborted") // ErrEmitterClosed is returned when the soiree is closed ErrEmitterClosed = errors.New("soiree is closed") // ErrEmitterAlreadyClosed is returned when the soiree is already closed ErrEmitterAlreadyClosed = errors.New("soiree is already closed") )
var DefaultErrorHandler = func(event Event, err error) error {
return err
}
var DefaultIDGenerator = func() string { return ulids.New().String() }
DefaultIDGenerator generates a unique identifier
var DefaultPanicHandler = func(p interface{}) { log.Error().Msgf("Panic occurred: %v", p) }
DefaultPanicHandler handles panics by printing the panic value
Functions ¶
This section is empty.
Types ¶
type BaseEvent ¶
type BaseEvent struct {
// contains filtered or unexported fields
}
BaseEvent serves as a basic implementation of the `Event` interface and contains fields for storing the topic, payload, and aborted status of an event. The struct includes methods to interact with these fields such as getting and setting the payload, setting the aborted status, and checking if the event has been aborted. The struct also includes a `sync.RWMutex` field `mu` to handle concurrent access to the struct's fields in a thread-safe manner
func NewBaseEvent ¶
NewBaseEvent creates a new instance of BaseEvent with a payload
func (*BaseEvent) Payload ¶
func (e *BaseEvent) Payload() interface{}
Payload returns the event's payload
func (*BaseEvent) Properties ¶
func (e *BaseEvent) Properties() Properties
Properties returns the event's properties
func (*BaseEvent) SetAborted ¶
SetAborted sets the event's aborted status
func (*BaseEvent) SetPayload ¶
func (e *BaseEvent) SetPayload(payload interface{})
SetPayload sets the event's payload
func (*BaseEvent) SetProperties ¶
func (e *BaseEvent) SetProperties(properties Properties)
SetProperties sets the event's properties
type Event ¶
type Event interface { // Topic returns the event's topic Topic() string // Payload returns the event's payload Payload() interface{} // Properties returns the event's properties Properties() Properties // SetPayload sets the event's payload SetPayload(interface{}) // SetProperties sets the event's properties SetProperties(Properties) // SetAborted sets the event's aborted status SetAborted(bool) // IsAborted checks the event's aborted status IsAborted() bool }
Event is an interface representing the structure of an instance of an event
type EventPool ¶
type EventPool struct {
// contains filtered or unexported fields
}
EventPool struct is controlling subscribing and unsubscribing listeners to topics, and emitting events to all subscribers
func NewEventPool ¶
func NewEventPool(opts ...EventPoolOption) *EventPool
NewEventPool initializes a new EventPool with optional configuration options
func (*EventPool) Close ¶
Close terminates the soiree, ensuring all pending events are processed; it performs cleanup and releases resources
func (*EventPool) Emit ¶
Emit asynchronously dispatches an event to all the subscribers of the event's topic It returns a channel that will receive any errors encountered during event handling
func (*EventPool) EmitSync ¶
EmitSync dispatches an event synchronously to all subscribers of the event's topic; his method will block until all notifications are completed
func (*EventPool) EnsureTopic ¶
EnsureTopic retrieves or creates a new topic by its name
func (*EventPool) GetTopic ¶
GetTopic retrieves a topic by its name. If the topic does not exist, it returns an error
func (*EventPool) On ¶
On subscribes a listener to a topic with the given name; returns a unique listener ID
func (*EventPool) SetErrChanBufferSize ¶
SetErrChanBufferSize sets the buffer size for the error channel for the event pool
func (*EventPool) SetErrorHandler ¶
SetErrorHandler sets the error handler for the event pool
func (*EventPool) SetIDGenerator ¶
SetIDGenerator sets the ID generator for the event pool
func (*EventPool) SetPanicHandler ¶
func (m *EventPool) SetPanicHandler(panicHandler PanicHandler)
SetPanicHandler sets the panic handler for the event pool
type EventPoolOption ¶
type EventPoolOption func(Soiree)
EventPoolOption defines a function type for Soiree configuration options
func WithErrChanBufferSize ¶
func WithErrChanBufferSize(size int) EventPoolOption
WithErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits
func WithErrorHandler ¶
func WithErrorHandler(errHandler func(Event, error) error) EventPoolOption
WithErrorHandler sets a custom error handler for an Soiree
func WithIDGenerator ¶
func WithIDGenerator(idGen func() string) EventPoolOption
WithIDGenerator sets a custom ID generator for an Soiree
func WithPanicHandler ¶
func WithPanicHandler(panicHandler PanicHandler) EventPoolOption
WithPanicHandler sets a custom panic handler for an Soiree
type Listener ¶
Listener is a function type that can handle events of any type Listener takes an `Event` as a parameter and returns an `error`. This allows you to define functions that conform to this specific signature, making it easier to work with event listeners in the other parts of the code
type ListenerOption ¶
type ListenerOption func(*listenerItem)
ListenerOption is a function type that configures listener behavior
func WithPriority ¶
func WithPriority(priority Priority) ListenerOption
WithPriority sets the priority of a listener
type PanicHandler ¶
type PanicHandler func(interface{})
PanicHandler is a function type that handles panics
type PondPool ¶
type PondPool struct {
// contains filtered or unexported fields
}
PondPool is a worker pool implementation using the pond library
func NewNamedPondPool ¶
NewNamedPondPool creates a new instance of PondPool with the passed options and name
func NewPondPool ¶
NewPondPool creates a new instance of PondPool with the passed options
func (*PondPool) CompletedTasks ¶
CompletedTasks returns the number of tasks that completed either successfully or with a panic
func (*PondPool) FailedTasks ¶
FailedTasks returns the number of tasks that completed with a panic
func (*PondPool) IdleWorkers ¶
IdleWorkers returns the number of idle workers in the pool
func (*PondPool) NewStatsCollector ¶
func (p *PondPool) NewStatsCollector()
func (*PondPool) Release ¶
func (p *PondPool) Release()
Release stops all workers in the pool and waits for them to finish
func (*PondPool) ReleaseWithDeadline ¶
ReleaseWithDeadline stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached, whichever comes first
func (*PondPool) Stop ¶
func (p *PondPool) Stop()
Stop causes this pool to stop accepting new tasks and signals all workers to exit Tasks being executed by workers will continue until completion (unless the process is terminated) Tasks in the queue will not be executed (so will drop any buffered tasks - ideally use Release or ReleaseWithDeadline)
func (*PondPool) StopAndWaitFor ¶
StopAndWaitFor stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached, whichever comes first
func (*PondPool) Submit ¶
func (p *PondPool) Submit(task func())
Submit submits a task to the worker pool
func (*PondPool) SubmitAndWait ¶
func (p *PondPool) SubmitAndWait(task func())
SubmitAndWait submits a task to the worker pool and waits for it to finish
func (*PondPool) SubmitBefore ¶
SubmitBefore submits a task to the worker pool before a specified task
func (*PondPool) SubmitMultipleAndWait ¶
func (p *PondPool) SubmitMultipleAndWait(task []func())
SubmitMultipleAndWait submits multiple tasks to the worker pool and waits for all them to finish
func (*PondPool) SubmittedTasks ¶
SubmittedTasks returns the number of tasks submitted to the pool
func (*PondPool) SuccessfulTasks ¶
SuccessfulTasks returns the number of tasks that completed successfully
func (*PondPool) WaitingTasks ¶
WaitingTasks returns the number of tasks waiting in the pool
type Pool ¶
type Pool interface { // Submit submits a task to the worker pool Submit(task func()) // Running returns the number of running workers in the pool Running() int // Release stops all workers in the pool and waits for them to finish Release() // ReleaseWithDeadline stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached ReleaseWithDeadline(deadline time.Duration) // Stop causes this pool to stop accepting new tasks and signals all workers to exit Stop() // IdleWorkers returns the number of idle workers in the pool IdleWorkers() int // SubmittedTasks returns the number of tasks submitted to the pool SubmittedTasks() int // WaitingTasks returns the number of tasks waiting in the pool WaitingTasks() int // SuccessfulTasks returns the number of tasks that completed successfully SuccessfulTasks() int // FailedTasks returns the number of tasks that completed with a panic FailedTasks() int // CompletedTasks returns the number of tasks that completed either successfully or with a panic CompletedTasks() int // StopAndWaitFor stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached StopAndWaitFor(deadline time.Duration) // SubmitAndWait submits a task to the worker pool and waits for it to finish SubmitAndWait(task func()) // SubmitBefore submits a task to the worker pool before a specified task SubmitBefore(task func(), deadline time.Duration) }
Pool is an interface for a worker pool
type Properties ¶
type Properties map[string]interface{}
Properties is a map of properties to set on an event
func (Properties) Set ¶
func (p Properties) Set(name string, value interface{}) Properties
Set a property on the Properties map
type Soiree ¶
type Soiree interface { // On registers a listener function to a specific topic On(topicName string, listener Listener, opts ...ListenerOption) (string, error) // Off removes a listener from a specific topic using the listener's unique ID Off(topicName string, listenerID string) error // Emit asynchronously sends an event to all subscribers of a topic and returns a channel of errors Emit(eventName string, payload interface{}) <-chan error // EmitSync sends an event synchronously to all subscribers of a topic; blocks until all listeners have been notified EmitSync(eventName string, payload interface{}) []error // GetTopic retrieves the Topic object associated with the given topic name GetTopic(topicName string) (*Topic, error) // EnsureTopic creates a new topic if it does not exist, or returns the existing one EnsureTopic(topicName string) *Topic // SetErrorHandler assigns a custom error handler function for the Soiree SetErrorHandler(func(Event, error) error) // SetIDGenerator assigns a function that generates a unique ID string for new listeners SetIDGenerator(func() string) // SetPool sets a custom goroutine pool for managing concurrency within the Soiree SetPool(Pool) // SetPanicHandler sets a function that will be called in case of a panic during event handling SetPanicHandler(PanicHandler) // SetErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits SetErrChanBufferSize(int) // Close gracefully shuts down the Soiree, ensuring all pending events are processed Close() error }
Soiree is an interface that defines the behavior of your get-together
type Topic ¶
type Topic struct { // Name signifies the topic's unique identifier Name string // contains filtered or unexported fields }
Topic represents an event channel to which listeners can subscribe
func (*Topic) AddListener ¶
func (t *Topic) AddListener(id string, listener Listener, opts ...ListenerOption)
AddListener adds a new listener to the topic with a specified priority and returns an identifier for the listener
func (*Topic) RemoveListener ¶
RemoveListener removes a listener from the topic using its identifier