dispatch

package
v0.0.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

README

Dispatch Package

The dispatch package provides a simple, type-safe event bus system for Go applications, supporting both synchronous and asynchronous event handling with wildcards and typed payloads. This package is designed for single-binary applications needing simple, type-safe, in-memory event handling. It's ideal for monolithic applications using the Hop framework where events don't need persistence or distributed processing. For distributed systems, message persistence, or advanced features like message routing and transformation, consider using a more comprehensive solution like Watermill or a message queue.

Features

  • 🔄 Asynchronous and synchronous event emission
  • 🎯 Type-safe payload handling with generics
  • 🌟 Wildcard pattern matching for event signatures
  • 🛡️ Panic recovery in event handlers
  • 📝 Structured logging integration
  • 🔍 Context support for cancellation

Installation

go get github.com/patrickward/hop/dispatch

Quick Start

package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Create a new event dispatcher
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	// Register an event handler
	dispatcher.On("user.created", func(ctx context.Context, event dispatch.Event) {
		if user, err := dispatch.PayloadAs[User](event); err == nil {
			fmt.Printf("New user created: %s\n", user.Name)
		}
	})

	// Emit an event
	dispatcher.Emit(context.Background(), "user.created", User{
		ID:   "123",
		Name: "John Doe",
	})
}

type User struct {
	ID   string
	Name string
}

Event Signatures

Events use dot-notation signatures (e.g., "user.created", "system.startup"). The signature format is typically:

<source>.<event_type>

Examples:

  • user.created
  • order.completed
  • system.startup
  • email.sent
Wildcard Support

You can use wildcards (*) in event signatures when registering handlers:

// Handle all user events
dispatcher.On("user.*", handler)

// Handle all created events from any source
dispatcher.On("*.created", handler)

// Handle all system events
dispatcher.On("system.*", handler)

Working with Payloads

Type-Safe Payload Handling

The package provides several helpers for type-safe payload handling:

// Direct type conversion
userEvent, err := dispatch.PayloadAs[User](event)

// Must variant (panics on error)
userEvent := dispatch.MustPayloadAs[User](event)

// Type checking
if dispatch.IsPayloadType[User](event) {
    // Handle user event
}

// Automatic payload handling
dispatcher.On("user.created", dispatch.HandlePayload[User](func(ctx context.Context, user User) {
    fmt.Printf("New user: %s\n", user.Name)
}))
Collection Payloads

Special helpers for common collection types:

// Working with map payloads
config, err := dispatch.PayloadAsMap(event)
regions, err := dispatch.PayloadMapAs[Region](event)

// Working with slice payloads
items, err := dispatch.PayloadAsSlice(event)
users, err := dispatch.PayloadSliceAs[User](event)

Synchronous vs Asynchronous

Asynchronous Emission (Default)
// Emit events asynchronously (non-blocking)
dispatcher.Emit(ctx, "user.created", user)
Synchronous Emission
// Emit events synchronously (blocks until all handlers complete)
dispatcher.EmitSync(ctx, "user.created", user)

Context Support

All event handlers receive a context.Context, which can be used for cancellation:

dispatcher.On("long.process", func(ctx context.Context, event dispatch.Event) {
    select {
    case <-ctx.Done():
        return // Context cancelled
    case <-time.After(time.Second):
        // Continue processing
    }
})

Error Handling

The dispatcher automatically recovers from panics in event handlers and logs them:

dispatcher.On("risky.operation", func(ctx context.Context, event dispatch.Event) {
    // Even if this panics, other handlers will still run
    panic("something went wrong")
})

Best Practices

  1. Event Naming: Use consistent naming patterns for events (e.g., resource.action)
  2. Type Safety: Use PayloadAs and type-safe handlers where possible
  3. Context Usage: Pass appropriate contexts for cancellation support
  4. Error Handling: Always check errors when converting payloads
  5. Documentation: Document event signatures and their expected payloads

Thread Safety

The dispatcher is thread-safe and can be safely used from multiple goroutines.

Performance Considerations

  • Async event emission (Emit) returns immediately and runs handlers in goroutines
  • Sync event emission (EmitSync) waits for all handlers to complete
  • Wildcard pattern matching adds minimal overhead
  • Consider using sync emission for critical path operations where order matters

Example: Complete System

package main

import (
	"context"
	"log/slog"
	"os"
	"time"

	"github.com/patrickward/hop/dispatch"
)

type OrderCreated struct {
	ID        string
	UserID    string
	Amount    float64
	CreatedAt time.Time
}

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	// Register multiple handlers
	dispatcher.On("order.created", dispatch.HandlePayload[OrderCreated](handleNewOrder))
	dispatcher.On("order.*", handleAnyOrderEvent)
	dispatcher.On("*.created", handleAnyCreatedEvent)

	// Emit an event
	order := OrderCreated{
		ID:        "ord_123",
		UserID:    "usr_456",
		Amount:    99.99,
		CreatedAt: time.Now(),
	}

	ctx := context.Background()
	dispatcher.EmitSync(ctx, "order.created", order)
}

func handleNewOrder(ctx context.Context, order OrderCreated) {
	// Process new order
}

func handleAnyOrderEvent(ctx context.Context, event dispatch.Event) {
	// Handle any order-related event
}

func handleAnyCreatedEvent(ctx context.Context, event dispatch.Event) {
	// Handle any creation event
}

Documentation

Overview

Package dispatch provides a lightweight, type-safe event bus implementation for building event-driven applications in Go. It supports both synchronous and asynchronous event handling, with features like wildcard pattern matching and typed payload handling through generics.

Basic Usage:

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
dispatcher := dispatch.NewDispatcher(logger)

// Register a handler
dispatcher.On("user.created", func(ctx context.Context, event dispatch.Event) {
    // Handle the event
})

// Emit an event
dispatcher.Emit(context.Background(), "user.created", userData)

Event Signatures:

Events use dot-notation signatures that typically follow the pattern:

<source>.<action>

For example:

  • user.created
  • order.completed
  • email.sent

Wildcard pattern matching is supported when registering handlers:

  • "user.*" matches all user events
  • "*.created" matches all creation events
  • "system.*" matches all system events

Type-Safe Payload Handling:

The package provides several helpers for safe payload type conversion:

// Direct conversion
user, err := dispatch.PayloadAs[User](event)

// Type-safe handler
dispatcher.On("user.created", dispatch.HandlePayload[User](func(ctx context.Context, user User) {
    // Work with strongly typed user data
}))

// Collection helpers
config, err := dispatch.PayloadAsMap(event)                // For map[string]any
items, err := dispatch.PayloadAsSlice(event)              // For []any
regions, err := dispatch.PayloadMapAs[Region](event)      // For map[string]Region
users, err := dispatch.PayloadSliceAs[User](event)        // For []User

Event Emission:

Events can be emitted either asynchronously (non-blocking) or synchronously (blocking):

// Async emission (handlers run in goroutines)
dispatcher.Emit(ctx, "user.created", userData)

// Sync emission (waits for all handlers to complete)
dispatcher.EmitSync(ctx, "user.created", userData)

Context Support:

All event handlers receive a context.Context that can be used for cancellation, timeouts, and passing request-scoped values:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

dispatcher.EmitSync(ctx, "long.process", data)

Thread Safety:

The event dispatcher is thread-safe and can be safely used from multiple goroutines. Handler registration and event emission are protected by appropriate synchronization.

When to Use:

This package is designed for single-binary applications needing simple, type-safe, in-memory event handling. It's ideal for monolithic applications using the Hop framework where events don't need persistence or distributed processing. For distributed systems, message persistence, or advanced features like message routing and transformation, consider using a more comprehensive solution like [Watermill](https://github.com/ThreeDotsLabs/watermill) or a message queue.

Error Handling:

The event dispatcher automatically recovers from panics in event handlers and logs them using the provided logger. This ensures that a failing handler won't affect other handlers or the stability of the event bus.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsPayloadType

func IsPayloadType[T any](e Event) bool

IsPayloadType checks if an event's payload is of the specified type T.

func MustPayloadAs

func MustPayloadAs[T any](e Event) T

MustPayloadAs converts an event's payload to the specified type T. Panics if the conversion fails.

func PayloadAs

func PayloadAs[T any](e Event) (T, error)

PayloadAs safely converts an event's payload to the specified type T. Returns the typed payload and any conversion error.

Example
package main

import (
	"fmt"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Example event with a structured payload
	type UserCreated struct {
		ID   string
		Name string
	}

	evt := dispatch.NewEvent("user.created", UserCreated{
		ID:   "123",
		Name: "John Doe",
	})

	// Safe conversion with error handling
	user, err := dispatch.PayloadAs[UserCreated](evt)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}
	fmt.Printf("User created: %s\n", user.Name)
}
Output:

User created: John Doe

func PayloadAsMap

func PayloadAsMap(e Event) (map[string]any, error)

PayloadAsMap is a convenience function for working with map[string]any payloads, which are common when dealing with JSON data.

Example
package main

import (
	"fmt"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Create an event with a map payload
	evt := dispatch.NewEvent("config.updated", map[string]any{
		"database": "postgres",
		"port":     5432,
	})

	// Use the convenience function for map payloads
	config, err := dispatch.PayloadAsMap(evt)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	if dbName, ok := config["database"].(string); ok {
		fmt.Printf("Database: %s\n", dbName)
	}
}
Output:

Database: postgres

func PayloadAsSlice

func PayloadAsSlice(e Event) ([]any, error)

PayloadAsSlice is a convenience function for working with []any payloads.

Example
package main

import (
	"fmt"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Create an event with a slice payload
	evt := dispatch.NewEvent("users.updated", []any{
		"john",
		"jane",
	})

	// Use the convenience function for slice payloads
	users, err := dispatch.PayloadAsSlice(evt)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	fmt.Printf("Users: %v\n", users)

}
Output:

Users: [john jane]

func PayloadMapAs

func PayloadMapAs[T any](e Event) (map[string]T, error)

PayloadMapAs converts a map payload into a map with typed values. Returns an error if the payload is not a map or if any value cannot be converted to type T.

Example:

type User struct { ID string }
userMap, err := PayloadMapAs[User](event)
Example
package main

import (
	"fmt"
	"sort"
	"sync"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Define a type we want to convert to
	type Region struct {
		Code string
		Name string
	}

	var mu sync.Mutex
	var results []string

	// Create an event with a map payload
	evt := dispatch.NewEvent("regions.updated", map[string]any{
		"us-east": Region{Code: "USE", Name: "US East"},
		"us-west": Region{Code: "USW", Name: "US West"},
	})

	// Convert the payload to a map of Regions
	regions, err := dispatch.PayloadMapAs[Region](evt)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	// Process the typed regions
	for key, region := range regions {
		mu.Lock()
		results = append(results, fmt.Sprintf("Region %s: %s (%s)", key, region.Name, region.Code))
		mu.Unlock()
	}

	// Sort and print results for consistent output
	sort.Strings(results)
	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

Region us-east: US East (USE)
Region us-west: US West (USW)

func PayloadSliceAs

func PayloadSliceAs[T any](e Event) ([]T, error)

PayloadSliceAs converts a slice payload into a slice of typed elements. Returns an error if the payload is not a slice or if any element cannot be converted to type T.

Example:

type User struct { ID string }
users, err := PayloadSliceAs[User](event)
Example
package main

import (
	"fmt"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Define a type we want to convert to
	type User struct {
		ID   string
		Name string
	}

	// Create an event with a slice payload
	evt := dispatch.NewEvent("users.imported", []any{
		User{ID: "1", Name: "Alice"},
		User{ID: "2", Name: "Bob"},
	})

	// Convert the payload to a slice of Users
	users, err := dispatch.PayloadSliceAs[User](evt)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	// Process the typed users
	for _, user := range users {
		fmt.Printf("User: %s (ID: %s)\n", user.Name, user.ID)
	}
}
Output:

User: Alice (ID: 1)
User: Bob (ID: 2)

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher manages event publishing and subscription

Example (Basic)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Create a new event dispatcher with a basic logger
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	// Register an event handler
	dispatcher.On("user.login", func(ctx context.Context, event dispatch.Event) {
		payload := event.Payload.(map[string]string)
		fmt.Printf("User logged in: %s\n", payload["username"])
	})

	// Emit an event
	dispatcher.Emit(context.Background(), "user.login", map[string]string{
		"username": "alice",
	})

	// Wait for async handler to complete
	time.Sleep(10 * time.Millisecond)
}
Output:

User logged in: alice
Example (ContextCancellation)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	// Create a context with cancellation
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	// Register a handler that respects context cancellation
	dispatcher.On("long.task", func(ctx context.Context, event dispatch.Event) {
		select {
		case <-ctx.Done():
			fmt.Println("Task cancelled")
			return
		case <-time.After(100 * time.Millisecond):
			fmt.Println("Task completed")
		}
	})

	// Emit event with cancellable context
	dispatcher.EmitSync(ctx, "long.task", nil)
}
Output:

Task cancelled
Example (MultipleHandlers)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"sort"
	"sync"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	var mu sync.Mutex
	var results []string

	// Register multiple handlers for the same event
	dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) {
		mu.Lock()
		results = append(results, "Logging notification")
		mu.Unlock()
	})

	dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) {
		mu.Lock()
		results = append(results, "Sending analytics")
		mu.Unlock()
	})

	dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) {
		mu.Lock()
		results = append(results, "Updating cache")
		mu.Unlock()
	})

	// Emit event synchronously - all handlers will be called
	dispatcher.EmitSync(context.Background(), "notification.sent", nil)

	// Sort and print results
	sort.Strings(results)
	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

Logging notification
Sending analytics
Updating cache
Example (SyncEmit)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	// Register event handlers
	dispatcher.On("task.process", func(ctx context.Context, event dispatch.Event) {
		fmt.Println("Processing task...")
		time.Sleep(10 * time.Millisecond)
		fmt.Println("Task completed")
	})

	// EmitSync will wait for all handlers to complete
	fmt.Println("Starting task")
	dispatcher.EmitSync(context.Background(), "task.process", nil)
	fmt.Println("All processing complete")

}
Output:

Starting task
Processing task...
Task completed
All processing complete
Example (Wildcards)
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"sort"
	"sync"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	dispatcher := dispatch.NewDispatcher(logger)

	var mu sync.Mutex
	var results []string

	// Register handlers with wildcards
	dispatcher.On("user.*", func(ctx context.Context, event dispatch.Event) {
		mu.Lock()
		results = append(results, fmt.Sprintf("User event: %s", event.Signature))
		mu.Unlock()
	})

	dispatcher.On("*.created", func(ctx context.Context, event dispatch.Event) {
		mu.Lock()
		results = append(results, fmt.Sprintf("Created event: %s", event.Signature))
		mu.Unlock()
	})

	// Emit events synchronously
	dispatcher.EmitSync(context.Background(), "user.created", nil)
	dispatcher.EmitSync(context.Background(), "user.deleted", nil)
	dispatcher.EmitSync(context.Background(), "post.created", nil)

	// Sort and print results
	sort.Strings(results)
	for _, result := range results {
		fmt.Println(result)
	}

}
Output:

Created event: post.created
Created event: user.created
User event: user.created
User event: user.deleted

func NewDispatcher

func NewDispatcher(logger *slog.Logger) *Dispatcher

NewDispatcher creates a new event bus/dispatcher

func (*Dispatcher) Emit

func (b *Dispatcher) Emit(ctx context.Context, signature string, payload any)

Emit sends an event to all registered handlers asynchronously

func (*Dispatcher) EmitSync

func (b *Dispatcher) EmitSync(ctx context.Context, signature string, payload any)

EmitSync sends an event and waits for all handlers to complete

func (*Dispatcher) On

func (b *Dispatcher) On(signature string, handler Handler)

On registers a handler for an event signature Supports wildcards: "hop.*" or "*.system.start"

type Event

type Event struct {
	ID        string    `json:"id"`
	Signature string    `json:"signature"` // e.g. "hop.system.start"
	Payload   any       `json:"payload,omitempty"`
	Timestamp time.Time `json:"timestamp"`
}

Event represents a system event with a simplified structure

func NewEvent

func NewEvent(signature string, payload any) Event

NewEvent creates an event with the given signature and optional payload

Example
package main

import (
	"fmt"

	"github.com/patrickward/hop/dispatch"
)

func main() {
	// Create a new event with a payload
	evt := dispatch.NewEvent("user.created", map[string]string{
		"id":    "123",
		"email": "user@example.com",
	})

	fmt.Printf("Signature: %s\n", evt.Signature)
}
Output:

Signature: user.created

type Handler

type Handler func(ctx context.Context, event Event)

Handler processes an event

func HandlePayload

func HandlePayload[T any](handler func(context.Context, T)) Handler

HandlePayload creates an event handler that automatically converts the payload to the specified type T and calls the provided typed handler function. If type conversion fails, logs the error and returns without calling the handler.

Example
type UserCreated struct {
	ID   string
	Name string
}

logger := newTestLogger(os.Stderr)
// Create a test event bus
bus := dispatch.NewDispatcher(logger) // You'd normally pass a logger here

// Register handler with automatic payload conversion
bus.On("user.created", dispatch.HandlePayload[UserCreated](func(ctx context.Context, user UserCreated) {
	fmt.Printf("Processing user: %s\n", user.Name)
}))

// Emit an event
ctx := context.Background()
bus.EmitSync(ctx, "user.created", UserCreated{
	ID:   "123",
	Name: "John Doe",
})
Output:

Processing user: John Doe

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL