Documentation ¶
Overview ¶
Package dispatch provides a lightweight, type-safe event bus implementation for building event-driven applications in Go. It supports both synchronous and asynchronous event handling, with features like wildcard pattern matching and typed payload handling through generics.
Basic Usage:
logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) dispatcher := dispatch.NewDispatcher(logger) // Register a handler dispatcher.On("user.created", func(ctx context.Context, event dispatch.Event) { // Handle the event }) // Emit an event dispatcher.Emit(context.Background(), "user.created", userData)
Event Signatures:
Events use dot-notation signatures that typically follow the pattern:
<source>.<action>
For example:
- user.created
- order.completed
- email.sent
Wildcard pattern matching is supported when registering handlers:
- "user.*" matches all user events
- "*.created" matches all creation events
- "system.*" matches all system events
Type-Safe Payload Handling:
The package provides several helpers for safe payload type conversion:
// Direct conversion user, err := dispatch.PayloadAs[User](event) // Type-safe handler dispatcher.On("user.created", dispatch.HandlePayload[User](func(ctx context.Context, user User) { // Work with strongly typed user data })) // Collection helpers config, err := dispatch.PayloadAsMap(event) // For map[string]any items, err := dispatch.PayloadAsSlice(event) // For []any regions, err := dispatch.PayloadMapAs[Region](event) // For map[string]Region users, err := dispatch.PayloadSliceAs[User](event) // For []User
Event Emission:
Events can be emitted either asynchronously (non-blocking) or synchronously (blocking):
// Async emission (handlers run in goroutines) dispatcher.Emit(ctx, "user.created", userData) // Sync emission (waits for all handlers to complete) dispatcher.EmitSync(ctx, "user.created", userData)
Context Support:
All event handlers receive a context.Context that can be used for cancellation, timeouts, and passing request-scoped values:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() dispatcher.EmitSync(ctx, "long.process", data)
Thread Safety:
The event dispatcher is thread-safe and can be safely used from multiple goroutines. Handler registration and event emission are protected by appropriate synchronization.
When to Use:
This package is designed for single-binary applications needing simple, type-safe, in-memory event handling. It's ideal for monolithic applications using the Hop framework where events don't need persistence or distributed processing. For distributed systems, message persistence, or advanced features like message routing and transformation, consider using a more comprehensive solution like [Watermill](https://github.com/ThreeDotsLabs/watermill) or a message queue.
Error Handling:
The event dispatcher automatically recovers from panics in event handlers and logs them using the provided logger. This ensures that a failing handler won't affect other handlers or the stability of the event bus.
Index ¶
- func IsPayloadType[T any](e Event) bool
- func MustPayloadAs[T any](e Event) T
- func PayloadAs[T any](e Event) (T, error)
- func PayloadAsMap(e Event) (map[string]any, error)
- func PayloadAsSlice(e Event) ([]any, error)
- func PayloadMapAs[T any](e Event) (map[string]T, error)
- func PayloadSliceAs[T any](e Event) ([]T, error)
- type Dispatcher
- type Event
- type Handler
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsPayloadType ¶
IsPayloadType checks if an event's payload is of the specified type T.
func MustPayloadAs ¶
MustPayloadAs converts an event's payload to the specified type T. Panics if the conversion fails.
func PayloadAs ¶
PayloadAs safely converts an event's payload to the specified type T. Returns the typed payload and any conversion error.
Example ¶
package main import ( "fmt" "github.com/patrickward/hop/dispatch" ) func main() { // Example event with a structured payload type UserCreated struct { ID string Name string } evt := dispatch.NewEvent("user.created", UserCreated{ ID: "123", Name: "John Doe", }) // Safe conversion with error handling user, err := dispatch.PayloadAs[UserCreated](evt) if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("User created: %s\n", user.Name) }
Output: User created: John Doe
func PayloadAsMap ¶
PayloadAsMap is a convenience function for working with map[string]any payloads, which are common when dealing with JSON data.
Example ¶
package main import ( "fmt" "github.com/patrickward/hop/dispatch" ) func main() { // Create an event with a map payload evt := dispatch.NewEvent("config.updated", map[string]any{ "database": "postgres", "port": 5432, }) // Use the convenience function for map payloads config, err := dispatch.PayloadAsMap(evt) if err != nil { fmt.Printf("Error: %v\n", err) return } if dbName, ok := config["database"].(string); ok { fmt.Printf("Database: %s\n", dbName) } }
Output: Database: postgres
func PayloadAsSlice ¶
PayloadAsSlice is a convenience function for working with []any payloads.
Example ¶
package main import ( "fmt" "github.com/patrickward/hop/dispatch" ) func main() { // Create an event with a slice payload evt := dispatch.NewEvent("users.updated", []any{ "john", "jane", }) // Use the convenience function for slice payloads users, err := dispatch.PayloadAsSlice(evt) if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("Users: %v\n", users) }
Output: Users: [john jane]
func PayloadMapAs ¶
PayloadMapAs converts a map payload into a map with typed values. Returns an error if the payload is not a map or if any value cannot be converted to type T.
Example:
type User struct { ID string } userMap, err := PayloadMapAs[User](event)
Example ¶
package main import ( "fmt" "sort" "sync" "github.com/patrickward/hop/dispatch" ) func main() { // Define a type we want to convert to type Region struct { Code string Name string } var mu sync.Mutex var results []string // Create an event with a map payload evt := dispatch.NewEvent("regions.updated", map[string]any{ "us-east": Region{Code: "USE", Name: "US East"}, "us-west": Region{Code: "USW", Name: "US West"}, }) // Convert the payload to a map of Regions regions, err := dispatch.PayloadMapAs[Region](evt) if err != nil { fmt.Printf("Error: %v\n", err) return } // Process the typed regions for key, region := range regions { mu.Lock() results = append(results, fmt.Sprintf("Region %s: %s (%s)", key, region.Name, region.Code)) mu.Unlock() } // Sort and print results for consistent output sort.Strings(results) for _, result := range results { fmt.Println(result) } }
Output: Region us-east: US East (USE) Region us-west: US West (USW)
func PayloadSliceAs ¶
PayloadSliceAs converts a slice payload into a slice of typed elements. Returns an error if the payload is not a slice or if any element cannot be converted to type T.
Example:
type User struct { ID string } users, err := PayloadSliceAs[User](event)
Example ¶
package main import ( "fmt" "github.com/patrickward/hop/dispatch" ) func main() { // Define a type we want to convert to type User struct { ID string Name string } // Create an event with a slice payload evt := dispatch.NewEvent("users.imported", []any{ User{ID: "1", Name: "Alice"}, User{ID: "2", Name: "Bob"}, }) // Convert the payload to a slice of Users users, err := dispatch.PayloadSliceAs[User](evt) if err != nil { fmt.Printf("Error: %v\n", err) return } // Process the typed users for _, user := range users { fmt.Printf("User: %s (ID: %s)\n", user.Name, user.ID) } }
Output: User: Alice (ID: 1) User: Bob (ID: 2)
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher manages event publishing and subscription
Example (Basic) ¶
package main import ( "context" "fmt" "log/slog" "os" "time" "github.com/patrickward/hop/dispatch" ) func main() { // Create a new event dispatcher with a basic logger logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) dispatcher := dispatch.NewDispatcher(logger) // Register an event handler dispatcher.On("user.login", func(ctx context.Context, event dispatch.Event) { payload := event.Payload.(map[string]string) fmt.Printf("User logged in: %s\n", payload["username"]) }) // Emit an event dispatcher.Emit(context.Background(), "user.login", map[string]string{ "username": "alice", }) // Wait for async handler to complete time.Sleep(10 * time.Millisecond) }
Output: User logged in: alice
Example (ContextCancellation) ¶
package main import ( "context" "fmt" "log/slog" "os" "time" "github.com/patrickward/hop/dispatch" ) func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) dispatcher := dispatch.NewDispatcher(logger) // Create a context with cancellation ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() // Register a handler that respects context cancellation dispatcher.On("long.task", func(ctx context.Context, event dispatch.Event) { select { case <-ctx.Done(): fmt.Println("Task cancelled") return case <-time.After(100 * time.Millisecond): fmt.Println("Task completed") } }) // Emit event with cancellable context dispatcher.EmitSync(ctx, "long.task", nil) }
Output: Task cancelled
Example (MultipleHandlers) ¶
package main import ( "context" "fmt" "log/slog" "os" "sort" "sync" "github.com/patrickward/hop/dispatch" ) func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) dispatcher := dispatch.NewDispatcher(logger) var mu sync.Mutex var results []string // Register multiple handlers for the same event dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) { mu.Lock() results = append(results, "Logging notification") mu.Unlock() }) dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) { mu.Lock() results = append(results, "Sending analytics") mu.Unlock() }) dispatcher.On("notification.sent", func(ctx context.Context, event dispatch.Event) { mu.Lock() results = append(results, "Updating cache") mu.Unlock() }) // Emit event synchronously - all handlers will be called dispatcher.EmitSync(context.Background(), "notification.sent", nil) // Sort and print results sort.Strings(results) for _, result := range results { fmt.Println(result) } }
Output: Logging notification Sending analytics Updating cache
Example (SyncEmit) ¶
package main import ( "context" "fmt" "log/slog" "os" "time" "github.com/patrickward/hop/dispatch" ) func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) dispatcher := dispatch.NewDispatcher(logger) // Register event handlers dispatcher.On("task.process", func(ctx context.Context, event dispatch.Event) { fmt.Println("Processing task...") time.Sleep(10 * time.Millisecond) fmt.Println("Task completed") }) // EmitSync will wait for all handlers to complete fmt.Println("Starting task") dispatcher.EmitSync(context.Background(), "task.process", nil) fmt.Println("All processing complete") }
Output: Starting task Processing task... Task completed All processing complete
Example (Wildcards) ¶
package main import ( "context" "fmt" "log/slog" "os" "sort" "sync" "github.com/patrickward/hop/dispatch" ) func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) dispatcher := dispatch.NewDispatcher(logger) var mu sync.Mutex var results []string // Register handlers with wildcards dispatcher.On("user.*", func(ctx context.Context, event dispatch.Event) { mu.Lock() results = append(results, fmt.Sprintf("User event: %s", event.Signature)) mu.Unlock() }) dispatcher.On("*.created", func(ctx context.Context, event dispatch.Event) { mu.Lock() results = append(results, fmt.Sprintf("Created event: %s", event.Signature)) mu.Unlock() }) // Emit events synchronously dispatcher.EmitSync(context.Background(), "user.created", nil) dispatcher.EmitSync(context.Background(), "user.deleted", nil) dispatcher.EmitSync(context.Background(), "post.created", nil) // Sort and print results sort.Strings(results) for _, result := range results { fmt.Println(result) } }
Output: Created event: post.created Created event: user.created User event: user.created User event: user.deleted
func NewDispatcher ¶
func NewDispatcher(logger *slog.Logger) *Dispatcher
NewDispatcher creates a new event bus/dispatcher
func (*Dispatcher) Emit ¶
func (b *Dispatcher) Emit(ctx context.Context, signature string, payload any)
Emit sends an event to all registered handlers asynchronously
func (*Dispatcher) EmitSync ¶
func (b *Dispatcher) EmitSync(ctx context.Context, signature string, payload any)
EmitSync sends an event and waits for all handlers to complete
func (*Dispatcher) On ¶
func (b *Dispatcher) On(signature string, handler Handler)
On registers a handler for an event signature Supports wildcards: "hop.*" or "*.system.start"
type Event ¶
type Event struct { ID string `json:"id"` Signature string `json:"signature"` // e.g. "hop.system.start" Payload any `json:"payload,omitempty"` Timestamp time.Time `json:"timestamp"` }
Event represents a system event with a simplified structure
func NewEvent ¶
NewEvent creates an event with the given signature and optional payload
Example ¶
package main import ( "fmt" "github.com/patrickward/hop/dispatch" ) func main() { // Create a new event with a payload evt := dispatch.NewEvent("user.created", map[string]string{ "id": "123", "email": "user@example.com", }) fmt.Printf("Signature: %s\n", evt.Signature) }
Output: Signature: user.created
type Handler ¶
Handler processes an event
func HandlePayload ¶
HandlePayload creates an event handler that automatically converts the payload to the specified type T and calls the provided typed handler function. If type conversion fails, logs the error and returns without calling the handler.
Example ¶
type UserCreated struct { ID string Name string } logger := newTestLogger(os.Stderr) // Create a test event bus bus := dispatch.NewDispatcher(logger) // You'd normally pass a logger here // Register handler with automatic payload conversion bus.On("user.created", dispatch.HandlePayload[UserCreated](func(ctx context.Context, user UserCreated) { fmt.Printf("Processing user: %s\n", user.Name) })) // Emit an event ctx := context.Background() bus.EmitSync(ctx, "user.created", UserCreated{ ID: "123", Name: "John Doe", })
Output: Processing user: John Doe