soiree

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

Soiree

Soiree, a fancy event affair, or, event - is a library indendied to simplify event management inside a golang codebase but more generically is a 2-tier channel system, one for queuing jobs and another to control how many workers operate on that job queue concurrently. The goal is a dead-simple interface for event subscription and handling, using pond for performance management, goroutine pooling, and wrapping an event management interface with thread-safe interactions.

Overview

Functionally, Soiree is intended to provide:

  • In-Memory management: Host and manage events internally without external dependencies or libraries
  • Listener prioritization: Controls for invocation order
  • Concurrent Processing: Utilize pooled goroutines for handling events in parallel and re-releasing resources, with thread safety
  • Configurable Subscriptions: Leverages basic pattern matching for event subscriptions
  • General Re-use: Configure with custom handlers for errors, IDs, and panics panics
But why?

In modern software architectures, microservices vs. monoliths is a false dichotomy; the optimum is usually somewhere in the middle. With the Datum service, we're pretty intentionally sticking with a "monolith" in the sense we are producing a single docker image from this codebase, but the often overlooked aspect of these architectures is the context under which the service is started, and run. If you assume that the connectivity from your client is created in a homogeneous fashion, ex:

┌──────────────┐        .───────.         ┌─────────────────┐
│              │       ╱         ╲        │                 │
│    Client    │──────▶   proxy   ────────▶  Datum Service  │
│              │       `.       ,'        │                 │
└──────────────┘         `─────'          └─────────────────┘

Then all instances of the datum service will be required to perform things like authorizations validation, session issuance, etc. The validity of these actions is managed with external state machines, such as Redis.

                                                                   ┌────────────────┐
┌──────────────┐        .───────.         ┌─────────────────┐      │                │
│              │       ╱         ╲        │                 │      │     Redis,     │
│    Client    │──────▶   proxy   ────────▶  Datum Service  ├─────▶│   PostgreSQL   │
│              │       `.       ,'        │                 │      │                │
└──────────────┘         `─────'          └─────────────────┘      └────────────────┘

We do this because we want to be able to run many instances of the Datum service, for things such as canary, rollouts, etc.

                                          ┌─────────────────┐
                                          │                 │
                                     ┌───▶│  Datum Service  ├──┐
                                     │    │                 │  │
                                     │    └─────────────────┘  │
                                     │                         │   ┌────────────────┐
┌──────────────┐        .───────.    │    ┌─────────────────┐  │   │                │
│              │       ╱         ╲   │    │                 │  │   │     Redis,     │
│    Client    │──────▶   proxy   ───┼────▶  Datum Service  ├──┴┬─▶▶   PostgreSQL   │
│              │       `.       ,'   │    │                 │   │  │                │
└──────────────┘         `─────'     │    └─────────────────┘   │  └────────────────┘
                                     │                          │
                                     │    ┌─────────────────┐   │
                                     │    │                 │   │
                                     └───▶│  Datum Service  │───┘
                                          │                 │
                                          └─────────────────┘

Now, where things start to get more fun is when you layer in the desire to perform I/O operations either managed by us, or externally (e.g. S3), as well as connect to external data stores (e.g. Turso).

                                             ┌──────────────┐
                                             │              │
                                             │      S3      │
                                             │              │           ┌───────────────┐
                                             └───────▲──────┘    ┌─────▶│ Outbound HTTP │
                                                     │           │      │(e.g. webhooks)│
                                                     │           │      └───────────────┘
                                                     │           │
                                                   ┌─┘           │
                                                   │             │
                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ┬─────────────┐
                                                   │             │       │ Stuff under │
                   │                               │             │       │ our control │
                                          ┌────────┴────────┬────┘       └─────────────┘
                   │                      │                 │                          │
                                     ┌───▶│  Datum Service  ├──┐
                   │                 │    │                 │  │                       │
                                     │    └─────────────────┘  │
                   │                 │                         │   ┌────────────────┐  │
┌──────────────┐        .───────.    │    ┌─────────────────┐  │   │                │
│              │   │   ╱         ╲   │    │                 │  │   │     Redis,     │  │
│    Client    │──────▶   proxy   ───┼────▶  Datum Service  ├──┴┬─▶▶   PostgreSQL   │
│              │   │   `.       ,'   │    │                 │   │  │                │  │
└──────────────┘         `─────'     │    └─────────────────┘   │  └────────────────┘
                   │                 │                          │                      │
                                     │    ┌─────────────────┐   │
                   │                 │    │                 │   │                      │
                                     └───▶│  Datum Service  │───┘
                   │                      │                 │                          │
                                          └────────┬────────┴─────────────┐
                   │                               │                      │            │
                    ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─
                                                   │                      │
                                                   └─────┐                │
                                                         │                │
                                                         │                │               ┌──────────────┐
                                                         │                │               │ Other future │
                                                         ▼                └──────────────▶│  ridiculous  │
                                                     ┌───────────┐                        │    ideas     │
                                                     │   Turso   │                        └──────────────┘
                                                     └───────────┘

Given we need to be able to perform all kinds of workload actions such as writing a file to a bucket, committing a SQL transaction, sending an http request, we need bounded patterns and degrees of resource control. Which is to say, we need to control resource contention in our runtime as we don't want someone's regular HTTP request to be blocked by the fact someone else requested a bulk upload to S3. This means creating rough groupings of workload types and bounding them so that you can monitor and control the behaviors and lumpiness of the variances with the workload types.

Check out this blog (there are many on this topic) for some real world examples on how systems with these "lumpy" workload types can become easily bottlenecked with volume. Since we are intending to open the flood gates around event ingestion from other sources (similar to how Posthog, Segment, etc., work) we need to anticipate a very high load of unstructured data which needs to be written efficiently to a myriad of external sources, as well as bulk routines which may be long running such as file imports, uploads, exports, etc.

How many goroutines can / should I have?

A single go-routine currently uses a minimum stack size of 2KB. It is likely that your actual code will also allocate some additional memory on the heap (for e.g. JSON serialization or similar) for each goroutine. This would mean 1M go-routines could easily require 2-4 GB or RAM (should be ok for an average environment)

Most OS will limit the number of open connections in various ways. For TCP/IP there is usually a limit of open ports per interface. This is about 28K on many modern systems. There is usually an additional limit per process (e.g. ulimit for number of open file-descriptors) which will by default be around 1000. So without changing the OS configuration you will have a maximum of 1000 concurrent connections on Linux.

So depending on the system you should probably not create more than 1000 goroutines, because they might start failing with “maximum num of file descriptors reached” or even drop packets. If you increase the limits you are still bound by the 28K connections from a single IP address.

Quick Start

package main

import (
	"fmt"
	"github.com/datumforge/datum/pkg/events/soiree"
)

func main() {
	e := soiree.NewEventPool()
	e.On("user.created", func(evt soiree.Event) error {
		fmt.Println("Event received:", evt.Topic())
		return nil
	})
	e.Emit("user.created", "Matty Ice")
}

Configuration

Your Soiree can come with a few options if you wish:

e := soiree.NewEventPool(
	soiree.WithErrorHandler(customErrorHandler),
	soiree.WithIDGenerator(customIDGenerator),
)

Subscribing to events using basic pattern matching

Per guidance of many pubsubs such as Kafka, operating a multi-tenant cluster typically requires you to define user spaces for each tenant. For the purpose of this section, "user spaces" are a collection of topics, which are grouped together under the management of a single entity or user.

In Kafka and many other pubsub systems, the main unit of data is the topic. Users can create and name each topic. They can also delete them, but it is not possible to rename a topic directly. Instead, to rename a topic, the user must create a new topic, move the messages from the original topic to the new, and then delete the original. With this in mind, it is recommended to define logical spaces, based on an hierarchical topic naming structure. This setup can then be combined with security features, such as prefixed ACLs, to isolate different spaces and tenants, while also minimizing the administrative overhead for securing the data in the cluster.

These logical user spaces can be grouped in different ways, by team or organizational unit: here, the organization is the main aggregator.

Example topic naming structure:

<org_id>.<user_id>.

Documentation

Overview

Package soiree provides a simple event emitter that allows you to emit events and listen for them

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNilListener is returned when a listener is nil
	ErrNilListener = errors.New("listener cannot be nil")
	// ErrInvalidTopicName is returned when a topic name is invalid
	ErrInvalidTopicName = errors.New("invalid topic name")
	// ErrInvalidPriority is returned when a priority is invalid
	ErrInvalidPriority = errors.New("invalid priority")
	// ErrTopicNotFound is returned when a listener option is invalid
	ErrTopicNotFound = errors.New("topic not found")
	// ErrListenerNotFound is returned when a listener is not found
	ErrListenerNotFound = errors.New("listener not found")
	// ErrEventProcessingAborted is returned when event processing is aborted
	ErrEventProcessingAborted = errors.New("event processing aborted")
	// ErrEmitterClosed is returned when the soiree is closed
	ErrEmitterClosed = errors.New("soiree is closed")
	// ErrEmitterAlreadyClosed is returned when the soiree is already closed
	ErrEmitterAlreadyClosed = errors.New("soiree is already closed")
)
View Source
var DefaultErrorHandler = func(event Event, err error) error {
	return err
}
View Source
var DefaultIDGenerator = func() string {
	return ulids.New().String()
}

DefaultIDGenerator generates a unique identifier

View Source
var DefaultPanicHandler = func(p interface{}) {
	fmt.Printf("Panic occurred: %v\n", p)
}

DefaultPanicHandler handles panics by printing the panic value

Functions

This section is empty.

Types

type BaseEvent

type BaseEvent struct {
	// contains filtered or unexported fields
}

BaseEvent serves as a basic implementation of the `Event` interface and contains fields for storing the topic, payload, and aborted status of an event. The struct includes methods to interact with these fields such as getting and setting the payload, setting the aborted status, and checking if the event has been aborted. The struct also includes a `sync.RWMutex` field `mu` to handle concurrent access to the struct's fields in a thread-safe manner

func NewBaseEvent

func NewBaseEvent(topic string, payload interface{}) *BaseEvent

NewBaseEvent creates a new instance of BaseEvent with a payload

func (*BaseEvent) IsAborted

func (e *BaseEvent) IsAborted() bool

IsAborted checks the event's aborted status

func (*BaseEvent) Payload

func (e *BaseEvent) Payload() interface{}

Payload returns the event's payload

func (*BaseEvent) Properties

func (e *BaseEvent) Properties() Properties

Properties returns the event's properties

func (*BaseEvent) SetAborted

func (e *BaseEvent) SetAborted(abort bool)

SetAborted sets the event's aborted status

func (*BaseEvent) SetPayload

func (e *BaseEvent) SetPayload(payload interface{})

SetPayload sets the event's payload

func (*BaseEvent) SetProperties

func (e *BaseEvent) SetProperties(properties Properties)

SetProperties sets the event's properties

func (*BaseEvent) Topic

func (e *BaseEvent) Topic() string

Topic returns the event's topic

type Event

type Event interface {
	// Topic returns the event's topic
	Topic() string
	// Payload returns the event's payload
	Payload() interface{}
	// Properties returns the event's properties
	Properties() Properties
	// SetPayload sets the event's payload
	SetPayload(interface{})
	// SetProperties sets the event's properties
	SetProperties(Properties)
	// SetAborted sets the event's aborted status
	SetAborted(bool)
	// IsAborted checks the event's aborted status
	IsAborted() bool
}

Event is an interface representing the structure of an instance of an event

type EventPool

type EventPool struct {
	// contains filtered or unexported fields
}

EventPool struct is controlling subscribing and unsubscribing listeners to topics, and emitting events to all subscribers

func NewEventPool

func NewEventPool(opts ...EventPoolOption) *EventPool

NewEventPool initializes a new EventPool with optional configuration options

func (*EventPool) Close

func (m *EventPool) Close() error

Close terminates the soiree, ensuring all pending events are processed; it performs cleanup and releases resources

func (*EventPool) Emit

func (m *EventPool) Emit(eventName string, payload interface{}) <-chan error

Emit asynchronously dispatches an event to all the subscribers of the event's topic It returns a channel that will receive any errors encountered during event handling

func (*EventPool) EmitSync

func (m *EventPool) EmitSync(eventName string, payload interface{}) []error

EmitSync dispatches an event synchronously to all subscribers of the event's topic; his method will block until all notifications are completed

func (*EventPool) EnsureTopic

func (m *EventPool) EnsureTopic(topicName string) *Topic

EnsureTopic retrieves or creates a new topic by its name

func (*EventPool) GetTopic

func (m *EventPool) GetTopic(topicName string) (*Topic, error)

GetTopic retrieves a topic by its name. If the topic does not exist, it returns an error

func (*EventPool) Off

func (m *EventPool) Off(topicName string, listenerID string) error

Off unsubscribes a listener from a topic using the listener's unique ID

func (*EventPool) On

func (m *EventPool) On(topicName string, listener Listener, opts ...ListenerOption) (string, error)

On subscribes a listener to a topic with the given name; returns a unique listener ID

func (*EventPool) SetErrChanBufferSize

func (m *EventPool) SetErrChanBufferSize(size int)

SetErrChanBufferSize sets the buffer size for the error channel for the event pool

func (*EventPool) SetErrorHandler

func (m *EventPool) SetErrorHandler(handler func(Event, error) error)

SetErrorHandler sets the error handler for the event pool

func (*EventPool) SetIDGenerator

func (m *EventPool) SetIDGenerator(generator func() string)

SetIDGenerator sets the ID generator for the event pool

func (*EventPool) SetPanicHandler

func (m *EventPool) SetPanicHandler(panicHandler PanicHandler)

SetPanicHandler sets the panic handler for the event pool

func (*EventPool) SetPool

func (m *EventPool) SetPool(pool Pool)

SetPool sets the pool for the event pool

type EventPoolOption

type EventPoolOption func(Soiree)

EventPoolOption defines a function type for Soiree configuration options

func WithErrChanBufferSize

func WithErrChanBufferSize(size int) EventPoolOption

WithErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits

func WithErrorHandler

func WithErrorHandler(errHandler func(Event, error) error) EventPoolOption

WithErrorHandler sets a custom error handler for an Soiree

func WithIDGenerator

func WithIDGenerator(idGen func() string) EventPoolOption

WithIDGenerator sets a custom ID generator for an Soiree

func WithPanicHandler

func WithPanicHandler(panicHandler PanicHandler) EventPoolOption

WithPanicHandler sets a custom panic handler for an Soiree

func WithPool

func WithPool(pool Pool) EventPoolOption

WithPool sets a custom pool for an Soiree

type Listener

type Listener func(Event) error

Listener is a function type that can handle events of any type Listener takes an `Event` as a parameter and returns an `error`. This allows you to define functions that conform to this specific signature, making it easier to work with event listeners in the other parts of the code

type ListenerOption

type ListenerOption func(*listenerItem)

ListenerOption is a function type that configures listener behavior

func WithPriority

func WithPriority(priority Priority) ListenerOption

WithPriority sets the priority of a listener

type PanicHandler

type PanicHandler func(interface{})

PanicHandler is a function type that handles panics

type PondPool

type PondPool struct {
	// contains filtered or unexported fields
}

PondPool is a worker pool implementation using the pond library

func NewNamedPondPool added in v0.6.0

func NewNamedPondPool(maxWorkers, maxCapacity int, name string, options ...pond.Option) *PondPool

NewNamedPondPool creates a new instance of PondPool with the passed options and name

func NewPondPool

func NewPondPool(maxWorkers, maxCapacity int, options ...pond.Option) *PondPool

NewPondPool creates a new instance of PondPool with the passed options

func (*PondPool) CompletedTasks

func (p *PondPool) CompletedTasks() int

CompletedTasks returns the number of tasks that completed either successfully or with a panic

func (*PondPool) FailedTasks

func (p *PondPool) FailedTasks() int

FailedTasks returns the number of tasks that completed with a panic

func (*PondPool) IdleWorkers

func (p *PondPool) IdleWorkers() int

IdleWorkers returns the number of idle workers in the pool

func (*PondPool) NewStatsCollector

func (p *PondPool) NewStatsCollector()

func (*PondPool) Release

func (p *PondPool) Release()

Release stops all workers in the pool and waits for them to finish

func (*PondPool) ReleaseWithDeadline

func (p *PondPool) ReleaseWithDeadline(deadline time.Duration)

ReleaseWithDeadline stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached, whichever comes first

func (*PondPool) Running

func (p *PondPool) Running() int

Running returns the number of running workers in the pool

func (*PondPool) Stop

func (p *PondPool) Stop()

Stop causes this pool to stop accepting new tasks and signals all workers to exit Tasks being executed by workers will continue until completion (unless the process is terminated) Tasks in the queue will not be executed (so will drop any buffered tasks - ideally use Release or ReleaseWithDeadline)

func (*PondPool) StopAndWaitFor

func (p *PondPool) StopAndWaitFor(deadline time.Duration)

StopAndWaitFor stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached, whichever comes first

func (*PondPool) Submit

func (p *PondPool) Submit(task func())

Submit submits a task to the worker pool

func (*PondPool) SubmitAndWait

func (p *PondPool) SubmitAndWait(task func())

SubmitAndWait submits a task to the worker pool and waits for it to finish

func (*PondPool) SubmitBefore

func (p *PondPool) SubmitBefore(task func(), deadline time.Duration)

SubmitBefore submits a task to the worker pool before a specified task

func (*PondPool) SubmitMultipleAndWait added in v0.6.0

func (p *PondPool) SubmitMultipleAndWait(task []func())

SubmitMultipleAndWait submits multiple tasks to the worker pool and waits for all them to finish

func (*PondPool) SubmittedTasks

func (p *PondPool) SubmittedTasks() int

SubmittedTasks returns the number of tasks submitted to the pool

func (*PondPool) SuccessfulTasks

func (p *PondPool) SuccessfulTasks() int

SuccessfulTasks returns the number of tasks that completed successfully

func (*PondPool) WaitingTasks

func (p *PondPool) WaitingTasks() int

WaitingTasks returns the number of tasks waiting in the pool

type Pool

type Pool interface {
	// Submit submits a task to the worker pool
	Submit(task func())
	// Running returns the number of running workers in the pool
	Running() int
	// Release stops all workers in the pool and waits for them to finish
	Release()
	// ReleaseWithDeadline stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached
	ReleaseWithDeadline(deadline time.Duration)
	// Stop causes this pool to stop accepting new tasks and signals all workers to exit
	Stop()
	// IdleWorkers returns the number of idle workers in the pool
	IdleWorkers() int
	// SubmittedTasks returns the number of tasks submitted to the pool
	SubmittedTasks() int
	// WaitingTasks returns the number of tasks waiting in the pool
	WaitingTasks() int
	// SuccessfulTasks returns the number of tasks that completed successfully
	SuccessfulTasks() int
	// FailedTasks returns the number of tasks that completed with a panic
	FailedTasks() int
	// CompletedTasks returns the number of tasks that completed either successfully or with a panic
	CompletedTasks() int
	// StopAndWaitFor stops this pool and waits until either all tasks in the queue are completed or the given deadline is reached
	StopAndWaitFor(deadline time.Duration)
	// SubmitAndWait submits a task to the worker pool and waits for it to finish
	SubmitAndWait(task func())
	// SubmitBefore submits a task to the worker pool before a specified task
	SubmitBefore(task func(), deadline time.Duration)
}

Pool is an interface for a worker pool

type Priority

type Priority int

Priority type for listener priority levels

const (
	Lowest Priority = iota + 1 // Lowest priority
	Low
	Normal
	High
	Highest
)

type Properties

type Properties map[string]interface{}

Properties is a map of properties to set on an event

func NewProperties

func NewProperties() Properties

NewProperties creates a new Properties map

func (Properties) Set

func (p Properties) Set(name string, value interface{}) Properties

Set a property on the Properties map

type Soiree

type Soiree interface {
	// On registers a listener function to a specific topic
	On(topicName string, listener Listener, opts ...ListenerOption) (string, error)
	// Off removes a listener from a specific topic using the listener's unique ID
	Off(topicName string, listenerID string) error
	// Emit asynchronously sends an event to all subscribers of a topic and returns a channel of errors
	Emit(eventName string, payload interface{}) <-chan error
	// EmitSync sends an event synchronously to all subscribers of a topic; blocks until all listeners have been notified
	EmitSync(eventName string, payload interface{}) []error
	// GetTopic retrieves the Topic object associated with the given topic name
	GetTopic(topicName string) (*Topic, error)
	// EnsureTopic creates a new topic if it does not exist, or returns the existing one
	EnsureTopic(topicName string) *Topic
	// SetErrorHandler assigns a custom error handler function for the Soiree
	SetErrorHandler(func(Event, error) error)
	// SetIDGenerator assigns a function that generates a unique ID string for new listeners
	SetIDGenerator(func() string)
	// SetPool sets a custom goroutine pool for managing concurrency within the Soiree
	SetPool(Pool)
	// SetPanicHandler sets a function that will be called in case of a panic during event handling
	SetPanicHandler(PanicHandler)
	// SetErrChanBufferSize sets the size of the buffered channel for errors returned by asynchronous emits
	SetErrChanBufferSize(int)
	// Close gracefully shuts down the Soiree, ensuring all pending events are processed
	Close() error
}

Soiree is an interface that defines the behavior of your get-together

type Topic

type Topic struct {
	// Name signifies the topic's unique identifier
	Name string
	// contains filtered or unexported fields
}

Topic represents an event channel to which listeners can subscribe

func NewTopic

func NewTopic() *Topic

NewTopic creates a new Topic

func (*Topic) AddListener

func (t *Topic) AddListener(id string, listener Listener, opts ...ListenerOption)

AddListener adds a new listener to the topic with a specified priority and returns an identifier for the listener

func (*Topic) RemoveListener

func (t *Topic) RemoveListener(id string) error

RemoveListener removes a listener from the topic using its identifier

func (*Topic) Trigger

func (t *Topic) Trigger(event Event) []error

Trigger calls all listeners of the topic with the event

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL