eventbus

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2023 License: MIT Imports: 2 Imported by: 0

README

Event Bus

pipeline codecov Go Report Card Go Reference

GoLang Library for implementing event-driven architecture

EventBus can be used to implement event-driven architectures in Golang, Each bus can have multiple subscribers to different topics.

Features

  • Listening on events using channels or callbacks with the ability to cancel subscriptions at any time.
  • Once subscriptions, After receiving the first event, Subscription is automatically cancelled.
  • Non-blocking publishing, Yet event ordering is guaranteed for each subscriber.

Getting Started

Installation
go get github.com/optimus-hft/event-bus
Usage
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	eventbus "github.com/optimus-hft/event-bus"
)

func main() {
	bus := eventbus.New[int]()
	channel, unsub := bus.Subscribe("t1")

	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
	defer stop()

	go func() {
		for {
			select {
			case event := <-channel:
				fmt.Println("event", event)
			case <-ctx.Done():
				unsub()
				return
			}
		}
	}()

	bus.Publish("t1", 1)
	bus.Publish("t1", 2)
	bus.Publish("t1", 3)

	time.Sleep(time.Second)
}

Contributing

Pull requests and bug reports are welcome. For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus[T any] struct {
	// contains filtered or unexported fields
}

Bus can be used to implement event-driven architectures in Golang. Each bus can have multiple subscribers to different topics.

func New added in v0.1.1

func New[T any]() *Bus[T]

New creates a new event bus.

func (*Bus[T]) On

func (b *Bus[T]) On(topic string, callback func(event T)) func()

On executes the given callback whenever an event is sent to the given topic, Also an unsubscribe functions is returned that can be used to cancel the subscription.

func (*Bus[T]) Once

func (b *Bus[T]) Once(topic string, callback func(event T)) func()

Once executes the given callback as soon as receiving the first event on the given topic. After one callback execution, Callback will not be called again. Also, an unsubscribe functions is returned that can be used to cancel the subscription before receiving any events.

func (*Bus[T]) Publish

func (b *Bus[T]) Publish(topic string, event T)

Publish sends the given event to all the subscribers of the given topic. Publish is non-blocking and doesn't wait for subscribers. Although this is non-blocking, event ordering is guaranteed in a FIFO manner. Each subscriber gets events in the same order they were published. A slow subscriber doesn't block other subscribers, Ordering is handled for each subscriber separately and different subscribers on the same topic can be reading different events at a given time.

func (*Bus[T]) Subscribe

func (b *Bus[T]) Subscribe(topic string) (<-chan T, func())

Subscribe returns a channel for listening on a given topic events, Also an unsubscribe functions is returned that can be used to cancel the subscription.

func (*Bus[T]) SubscribeOnce

func (b *Bus[T]) SubscribeOnce(topic string) (<-chan T, func())

SubscribeOnce returns a channel for listening on a given topic events, After the first event, subscription is automatically cancelled. Also, an unsubscribe functions is returned that can be used to cancel the subscription before receiving any events.

type Serializer added in v0.1.1

type Serializer struct {
	// contains filtered or unexported fields
}

Serializer can be used to serialize execution of some goroutines according to their sequence number. If goroutine A with a higher sequence number gets scheduled before goroutine B with a lower sequence number, A will be queued and will be executed after execution of B.

func NewSerializer added in v0.1.1

func NewSerializer(lastExecutedSequence uint64) *Serializer

NewSerializer creates a new serializer.

func (*Serializer) Execute added in v0.1.1

func (s *Serializer) Execute(cb callback, sequence uint64)

Execute runs the given callbacks serially according to each callback sequence number.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL