eventbus

package module
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2023 License: MIT Imports: 3 Imported by: 4

README

English | 简体中文

eventbus

A lightweight eventbus that simplifies communication between goroutines, it supports synchronous and asynchronous message publishing.

Installation

Make sure that go(version 1.18+) is installed on your computer. Type the following command:

go get github.com/werbenhu/eventbus

Import package in your project

import (
	"github.com/werbenhu/eventbus"
)

What's eventbus?

EventBus supports both synchronous and asynchronous message publication. it uses a Copy-On-Write map to manage handlers and topics, so it is not recommended for use in scenarios with a large number of frequent subscriptions and unsubscriptions.

Asynchronous Way

In EventBus, each topic corresponds to a channel. The Publish() method pushes the message to the channel, and the handler in the Subscribe() method handles the message that comes out of the channel.If you want to use a buffered EventBus, you can create a buffered EventBus with the eventbus.NewBuffered(bufferSize int) method, which will create a buffered channel for each topic.

Synchronous Way

In the synchronous way, EventBus does not use channels, but passes payloads to subscribers by calling the handler directly. To publish messages synchronously, use the eventbus.PublishSync() function.

eventbus example
package main

import (
	"fmt"
	"time"

	"github.com/werbenhu/eventbus"
)

func handler(topic string, payload int) {
	fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}

func main() {
	bus := eventbus.New()

	// Subscribe to a topic. Returns an error if the handler is not a function.
	// The handler function must have two parameters: the first parameter must be of type string,
	// and the second parameter's type must match the type of `payload` in the `Publish()` function.
	bus.Subscribe("testtopic", handler)

	// Publish a message asynchronously.
	// The `Publish()` function triggers the handler defined for the topic, and passes the `payload` as an argument.
	// The type of `payload` must match the type of the second parameter in the handler function defined in `Subscribe()`.
	bus.Publish("testtopic", 100)

	// Publish a message synchronously.
	bus.PublishSync("testtopic", 200)

	// Wait a bit to ensure that subscribers have received all asynchronous messages before unsubscribing.
	time.Sleep(time.Millisecond)
	bus.Unsubscribe("testtopic", handler)

	// Close the event bus.
	bus.Close()
}

Using the global singleton object of EventBus

To make it more convenient to use EventBus, there is a global singleton object for EventBus. The internal channel of this singleton is unbuffered, and you can directly use eventbus.Subscribe(), eventbus.Publish(), and eventbus.Unsubscribe() to call the corresponding methods of the singleton object.

package main

import (
	"fmt"
	"time"

	"github.com/werbenhu/eventbus"
)

func handler(topic string, payload int) {
	fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}

func main() {

	// eventbus.Subscribe() will call the global singleton's Subscribe() method
	eventbus.Subscribe("testtopic", handler)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		// Asynchronously publish messages
		for i := 0; i < 100; i++ {
			// eventbus.Publish() will call the global singleton's Publish() method
			eventbus.Publish("testtopic", i)
		}
		// Synchronously publish messages
		for i := 100; i < 200; i++ {
			eventbus.PublishSync("testtopic", i)
		}
		wg.Done()
	}()
	wg.Wait()

	time.Sleep(time.Millisecond)
	// eventbus.Unsubscribe() will call the global singleton's Unsubscribe() method
	eventbus.Unsubscribe("testtopic", handler)

	// eventbus.Close() will call the global singleton's Close() method
	eventbus.Close()
}

Use Pipe instead of channel

Pipe is a wrapper for a channel without the concept of topics, with the generic parameter corresponding to the type of the channel. eventbus.NewPipe[T]() is equivalent to make(chan T). Publishers publish messages, and subscribers receive messages. You can use the Pipe.Publish() method instead of chan <-, and the Pipe.Subscribe() method instead of <-chan.

If there are multiple subscribers, each subscriber will receive every message that is published.If you want to use a buffered channel, you can use the eventbus.NewBufferedPipe[T](bufferSize int) method to create a buffered pipe.Pipe also supports synchronous and asynchronous message publishing. If you need to use the synchronous method, call Pipe.PublishSync().

pipe example
func handler1(val string) {
	fmt.Printf("handler1 val:%s\n", val)
}

func handler2(val string) {
	fmt.Printf("handler2 val:%s\n", val)
}

func main() {
	pipe := eventbus.NewPipe[string]()
	pipe.Subscribe(handler1)
	pipe.Subscribe(handler2)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		for i := 0; i < 100; i++ {
			pipe.Publish(strconv.Itoa(i))
		}
		for i := 100; i < 200; i++ {
			pipe.PublishSync(strconv.Itoa(i))
		}
		wg.Done()
	}()
	wg.Wait()

	time.Sleep(time.Millisecond)
	pipe.Unsubscribe(handler1)
	pipe.Unsubscribe(handler2)
	pipe.Close()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrHandlerIsNotFunc  = err{Code: 10000, Msg: "handler is not a function"}
	ErrHandlerParamNum   = err{Code: 10001, Msg: "the number of parameters of the handler must be two"}
	ErrHandlerFirstParam = err{Code: 10002, Msg: "the first of parameters of the handler must be a string"}
	ErrNoSubscriber      = err{Code: 10003, Msg: "no subscriber on topic"}
	ErrChannelClosed     = err{Code: 10004, Msg: "channel is closed"}
)

Global variables that represent common errors that may be returned by the eventbus functions.

Functions

func Close added in v1.0.3

func Close()

Close closes the singleton instance of EventBus.

func Publish added in v1.0.3

func Publish(topic string, payload any) error

Publish triggers the handlers defined for a topic. The `payload` argument will be passed to the handler. The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.

func PublishSync added in v1.0.4

func PublishSync(topic string, payload any) error

PublishSync is a synchronous version of Publish that triggers the handlers defined for a topic with the given payload. The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.

func ResetSingleton added in v1.0.8

func ResetSingleton()

ResetSingleton resets the singleton object. If the singleton object is not nil, it first closes the old singleton, and then creates a new singleton instance.

func Subscribe added in v1.0.3

func Subscribe(topic string, handler any) error

Subscribe subscribes to a topic, return an error if the handler is not a function. The handler must have two parameters: the first parameter must be a string, and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()`

func Unsubscribe added in v1.0.3

func Unsubscribe(topic string, handler any) error

Unsubscribe removes handler defined for a topic. Returns error if there are no handlers subscribed to the topic.

Types

type CowMap

type CowMap struct {
	sync.Map
}

CowMap is a wrapper of Copy-On-Write map

If a fully meaningful CowMap is implemented, both sync.Map and CowMap utilize atomic.Value atomic operations to access the map during data reading, resulting in similar read performance. In reality, sync.Map is already a read-write separated structure, yet it has better write performance. Therefore, CowMap directly utilizes sync.Map as its internal structure.

func NewCowMap

func NewCowMap() *CowMap

CowMap creates a new CowMap instance

func (*CowMap) Clear

func (c *CowMap) Clear()

Clear Removes all key-value pairs from the map

func (*CowMap) Len

func (c *CowMap) Len() uint32

Len returns the number of key-value pairs stored in the map

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is a container for event topics. Each topic corresponds to a channel. `eventbus.Publish()` pushes a message to the channel, and the handler in `eventbus.Subscribe()` will process the message coming out of the channel.

func New

func New() *EventBus

New returns new EventBus with empty handlers.

func NewBuffered

func NewBuffered(bufferSize int) *EventBus

NewBuffered returns new EventBus with a buffered channel. The second argument indicate the buffer's length

func (*EventBus) Close

func (e *EventBus) Close()

Close closes the eventbus

func (*EventBus) Publish

func (e *EventBus) Publish(topic string, payload any) error

publish triggers the handlers defined for this channel asynchronously. The `payload` argument will be passed to the handler. It uses the channel to asynchronously call the handler. The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.

func (*EventBus) PublishSync added in v1.0.4

func (e *EventBus) PublishSync(topic string, payload any) error

publishSync triggers the handlers defined for this channel synchronously. The payload argument will be passed to the handler. It does not use channels and instead directly calls the handler function.

func (*EventBus) Subscribe

func (e *EventBus) Subscribe(topic string, handler any) error

Subscribe subscribes to a topic, return an error if the handler is not a function. The handler must have two parameters: the first parameter must be a string, and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()`

func (*EventBus) Unsubscribe

func (e *EventBus) Unsubscribe(topic string, handler any) error

Unsubscribe removes handler defined for a topic. Returns error if there are no handlers subscribed to the topic.

type Handler

type Handler[T any] func(payload T)

type Pipe

type Pipe[T any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Pipe is a wrapper for a channel that allows for asynchronous message passing to subscribers. Use Pipe.Publish() instead of `chan<-` and Pipe.Subscribe() instead of `<-chan`. To pass messages to subscribers synchronously, use Pipe.PublishSync(), which does not use a channel. If multiple subscribers exist, each subscriber will receive the message.

func NewBufferedPipe

func NewBufferedPipe[T any](bufferSize int) *Pipe[T]

NewPipe create a buffered pipe, bufferSize is the buffer size of the pipe When create a buffered pipe. You can publish into the Pipe without a corresponding concurrent subscriber.

func NewPipe

func NewPipe[T any]() *Pipe[T]

NewPipe create a unbuffered pipe

func (*Pipe[T]) Close

func (p *Pipe[T]) Close()

close closes the pipe

func (*Pipe[T]) Publish

func (p *Pipe[T]) Publish(payload T) error

Publish triggers the handlers defined for this pipe, transferring the payload to the handlers.

func (*Pipe[T]) PublishSync added in v1.0.4

func (p *Pipe[T]) PublishSync(payload T) error

PublishSync triggers the handlers defined for this pipe synchronously, without using a channel. The payload will be passed directly to the handlers.

func (*Pipe[T]) Subscribe

func (p *Pipe[T]) Subscribe(handler Handler[T]) error

subscribe add a handler to a pipe, return error if the pipe is closed.

func (*Pipe[T]) Unsubscribe

func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error

unsubscribe removes handler defined for this pipe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL