pubsub

package module
v0.0.0-...-962d1e3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2024 License: GPL-3.0 Imports: 5 Imported by: 0

README

pubsub

Golang implementation of publish-subscribe pattern using channels.

err := pubsub.NewTopic[int]("topic", pubsub.NewRingBuffer[int](16))
if err != nil {
    // ...
}

subscriber, err = pubsub.Subscribe[int]("topic")
if err != nil {
    // ...
}

publisher, err = pubsub.NewPublisher[int]("topic")
if err != nil {
    // ...
}
  • Topics are explicitly created instead of implicit

  • Messages are not persisted to disk

  • Each subscriber has its own internal buffer to prevent deadlocks

  • Does not deep copy structs

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTopicDoesNotExist = errors.New("topic does not exist")
	ErrTopicExists       = errors.New("topic exists")
	ErrTypeMismatch      = errors.New("type mismatch")
)

Functions

func Close

func Close[T any](name string)

func NewTopic

func NewTopic[T any](name string, history ...ICache[T]) error

Types

type EmptyCache

type EmptyCache[T any] struct {
}

func (*EmptyCache[T]) Iter

func (h *EmptyCache[T]) Iter(f func(T))

func (*EmptyCache[T]) Push

func (h *EmptyCache[T]) Push(item T)

type ICache

type ICache[T any] interface {
	Push(item T)
	Iter(func(T))
}

func NewEmptyCache

func NewEmptyCache[T any]() ICache[T]

func NewRingCache

func NewRingCache[T any](bufferSize ...int) ICache[T]

type Publisher

type Publisher[T any] struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher[T any](name string) (*Publisher[T], error)

func (*Publisher[T]) Publish

func (p *Publisher[T]) Publish(msg T) error

type RingCache

type RingCache[T any] struct {
	// contains filtered or unexported fields
}

func (*RingCache[T]) Iter

func (r *RingCache[T]) Iter(f func(T))

func (*RingCache[T]) Push

func (r *RingCache[T]) Push(item T)

type Subscriber

type Subscriber[T any] struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber[T any](name string) (*Subscriber[T], error)

func (*Subscriber[T]) Read

func (s *Subscriber[T]) Read() []T

func (*Subscriber[T]) Recv

func (s *Subscriber[T]) Recv() <-chan struct{}

func (*Subscriber[T]) Unsubscribe

func (s *Subscriber[T]) Unsubscribe()

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL