eventbus

package module
v0.0.0-...-b795b65 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2023 License: MIT Imports: 4 Imported by: 5

README

Eventbus

A generics based Eventbus library. Extracted from DefraDB.

Installation

todo

Usage

todo

License

MIT

Documentation

Overview

Package events provides the internal event system.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidChannelType   = fmt.Errorf("invalid channel type")
	ErrChannelAlreadyExists = fmt.Errorf("channel already exists")
)
View Source
var (
	ErrSubscribedToClosedChan = fmt.Errorf("cannot subscribe to a closed channel")
)

Functions

func Publish

func Publish[T any](bus Bus, evt T) error

Publish to the channel in the bus that matches the type parameter

Types

type Bus

type Bus interface {
	Get(string) (any, bool)
	Set(string, any) error
}

func NewBus

func NewBus() Bus

NewBus returns a new event bus

type Channel

type Channel[T any] interface {
	// Subscribe subscribes to the Channel, returning a channel by which events can
	// be read from, or an error should one occur (e.g. if this object is closed).
	//
	// This function is non-blocking unless the subscription-buffer is full.
	Subscribe() (Subscription[T], error)

	// Unsubscribe unsubscribes from the Channel, closing the provided channel.
	//
	// Will do nothing if this object is already closed.
	Unsubscribe(Subscription[T])

	// Publish pushes the given item into this channel. Non-blocking.
	Publish(item T)

	// Close closes this Channel, and any owned or subscribing channels.
	Close()
}

Channel represents a subscribable type that will expose inputted items to subscribers.

func New

func New[T any](subscriberBufferSize int, eventBufferSize int) Channel[T]

New creates and returns a new Channel instance.

At the moment this will always return a new simpleChannel, however that may change in the future as this feature gets fleshed out.

func NewSimpleChannel

func NewSimpleChannel[T any](subscriberBufferSize int, eventBufferSize int) Channel[T]

NewSimpleChannel creates a new simpleChannel with the given subscriberBufferSize and eventBufferSize.

Should the buffers be filled subsequent calls to functions on this object may start to block.

type Publisher

type Publisher[T any] struct {
	// contains filtered or unexported fields
}

Publisher hold a referance to the event channel, the associated subscription channel and the stream channel that returns data to the subscribed client

func NewPublisher

func NewPublisher[T any](ch Channel[T], streamBufferSize int) (*Publisher[T], error)

NewPublisher creates a new Publisher with the given event Channel, subscribes to the event Channel and opens a new channel for the stream.

func (*Publisher[T]) Event

func (p *Publisher[T]) Event() Subscription[T]

Event returns the subscription channel

func (*Publisher[T]) Publish

func (p *Publisher[T]) Publish(data any)

Publish sends data to the streaming channel and unsubscribes if the client hangs for too long.

func (*Publisher[T]) Stream

func (p *Publisher[T]) Stream() chan any

Stream returns the streaming channel

func (*Publisher[T]) Unsubscribe

func (p *Publisher[T]) Unsubscribe()

Unsubscribe unsubscribes the client for the event channel and closes the stream.

type Subscription

type Subscription[T any] chan T

func Subscribe

func Subscribe[T any](bus Bus) (Subscription[T], error)

Subscribe to the channel in the bus that matches the type parameter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL