eventbus

package module
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: MIT Imports: 3 Imported by: 0

README

EventBus

Package go-eventbus is the little and lightweight eventbus with async compatibility for GoLang.

Installation

Make sure that Go is installed on your computer. Type the following command in your terminal:

go get github.com/sotvokun/go-eventbus

After it the package is ready to use.

Import package in your project

Add following line in your *.go file:

import "github.com/sotvokun/go-eventbus"

If you unhappy to use long eventbus, you can do something like this:

import (
	evbus "github.com/sotvokun/go-eventbus"
)
Example
func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
	bus := eventbus.New();
	bus.Subscribe("main:calculator", calculator);
	bus.Publish("main:calculator", 20, 40);
	bus.Unsubscribe("main:calculator", calculator);
}
Implemented methods
  • New()
  • Subscribe()
  • SubscribeOnce()
  • HasCallback()
  • Unsubscribe()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()
New()

New returns new EventBus with empty handlers.

bus := eventbus.New();
Subscribe(topic string, fn interface{}) error

Subscribe to a topic. Returns error if fn is not a function.

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
SubscribeOnce(topic string, fn interface{}) error

Subscribe to a topic once. Handler will be removed after executing. Returns error if fn is not a function.

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
Unsubscribe(topic string, fn interface{}) error

Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

bus.Unsubscribe("topic:handler", HelloWord);
HasCallback(topic string) bool

Returns true if exists any callback subscribed to the topic.

Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");
SubscribeAsync(topic string, fn interface{}, transactional bool)

Subscribe to a topic with an asynchronous callback. Returns error if fn is not a function.

func slowCalculator(a, b int) {
	time.Sleep(3 * time.Second)
	fmt.Printf("%d\n", a + b)
}

bus := eventbus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)

bus.Publish("main:slow_calculator", 20, 60)

fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")

bus.WaitAsync() // wait for all async callbacks to complete

fmt.Println("do some stuff after waiting for result")

Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)

SubscribeOnceAsync(topic string, args ...interface{})

SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

WaitAsync()

WaitAsync waits for all async callbacks to complete.

Cross Process Events

Works with two rpc services:

  • a client service to listen to remotely published events from a server
  • a server service to listen to client subscriptions

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}
Notes

Documentation is available here: godoc.org. Full information about code coverage is also available here: EventBus on gocover.io.

Support

If you do have a contribution for the package feel free to put up a Pull Request or open Issue.

Special thanks to contributors

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ArgumentProcessor added in v1.1.0

type ArgumentProcessor = func(callback *EventHandler, arg ...any) []reflect.Value

type Bus

type Bus interface {
	BusController
	BusSubscriber
	BusPublisher
}

Bus englobes global (subscribe, publish, control) bus behavior

func New

func New() Bus

New returns new EventBus with empty handlers.

type BusController

type BusController interface {
	HasCallback(topic string) bool
	WaitAsync()
}

BusController defines bus control behavior (checking handler's presence, synchronization)

type BusPublisher

type BusPublisher interface {
	Publish(topic string, args ...any)
}

BusPublisher defines publishing-related bus behavior

type BusSubscriber

type BusSubscriber interface {
	Subscribe(topic string, fn any) error
	SubscribeAsync(topic string, fn any, transactional bool) error
	SubscribeOnce(topic string, fn any) error
	SubscribeOnceAsync(topic string, fn any) error
	Unsubscribe(topic string, handler any) error
	SetArgumentProcessor(topic string, argProc ArgumentProcessor)
	SetDefaultArgumentProcessor(argProc ...ArgumentProcessor)
}

BusSubscriber defines subscription-related bus behavior

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus - box for handlers and callbacks.

func (*EventBus) HasCallback

func (bus *EventBus) HasCallback(topic string) bool

HasCallback returns true if exists any callback subscribed to the topic.

func (*EventBus) Publish

func (bus *EventBus) Publish(topic string, args ...any)

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func (*EventBus) SetArgumentProcessor added in v1.1.0

func (bus *EventBus) SetArgumentProcessor(topic string, argProc ArgumentProcessor)

SetArgumentProcessor sets the argument processor for a topic

The argument processor is useful for customizing how arguments are passed to the subscriber's callback. The default argument processor will be used if no argument processor is set for a topic.

func (*EventBus) SetDefaultArgumentProcessor added in v1.2.0

func (bus *EventBus) SetDefaultArgumentProcessor(argProc ...ArgumentProcessor)

SetDefaultArgumentProcessor sets the default argument processor

Pass no arguments to reset the default argument processor to the built-in default argument processor.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic string, fn any) error

Subscribe subscribes to a topic. Returns error if `fn` is not a function.

func (*EventBus) SubscribeAsync

func (bus *EventBus) SubscribeAsync(topic string, fn any, transactional bool) error

SubscribeAsync subscribes to a topic with an asynchronous callback Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently (false) Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnce

func (bus *EventBus) SubscribeOnce(topic string, fn any) error

SubscribeOnce subscribes to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnceAsync

func (bus *EventBus) SubscribeOnceAsync(topic string, fn any) error

SubscribeOnceAsync subscribes to a topic once with an asynchronous callback Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic string, handler any) error

Unsubscribe removes callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

func (*EventBus) WaitAsync

func (bus *EventBus) WaitAsync()

WaitAsync waits for all async callbacks to complete

type EventHandler added in v1.2.1

type EventHandler struct {
	Callback reflect.Value

	sync.Mutex // lock for an event handler - useful for running async callbacks serially
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL