eventbus

package module
v0.0.0-...-7f27ebc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

eventbus

纯内存实现的发布订阅模式的事件总线,订阅逻辑仿照的nats

特点

  1. 并发安全
  2. 通过泛型提供有限的类型安全
  3. 支持回调和channel两种订阅方式
  4. 支持订阅者取消订阅

TODO

  • 支持queue模式
  • 支持通配匹配和前向匹配
  • 支持同步的Request/Reply模式

使用方法

安装

go get github.com/ganyyy/eventbus

使用

package main

import (
	"sync"
	"sync/atomic"

	"github.com/ganyyy/eventbus"
)

func main() {

	var bus = eventbus.NewMultiBus(4)

	const (
		Topic = "test.1"
		Num   = 100
	)

	var total1, total2 atomic.Int64
	var subCallback = eventbus.NewSubs(
		Topic,
		eventbus.Callback(func(p eventbus.Var[int]) {
			total1.Add(int64(p.Val()))
		}),
	)

	var notifyChannel = make(chan eventbus.Var[int], Num)
	var subChan = eventbus.NewSubs(
		Topic,
		eventbus.Chan(notifyChannel),
	)
	var stopNotify = make(chan struct{})

	go func() {
		for val := range notifyChannel {
			total2.Add(int64(val.Val()))
		}
		close(stopNotify)
	}()

	bus.Subscribe(subCallback)
	bus.Subscribe(subChan)

	defer bus.Unsubscribe(subChan)
	defer bus.Unsubscribe(subCallback)

	var wg sync.WaitGroup
	wg.Add(Num)

	for i := range Num {
		go func(i int) {
			bus.Publish(Topic, i)
			wg.Done()
		}(i)
	}

	wg.Wait()
	close(notifyChannel)
	<-stopNotify

	println(total1.Load(), total2.Load())
}

Documentation

Index

Constants

View Source
const (
	TSep  = "."
	BtSep = '.'
)
View Source
const (
	PListCacheMin  = 128 // build a quick cache when the number of subscriptions is greater than this value
	StackCacheSize = 32  // normal stack cache size
)
View Source
const (
	InitNodeSubCache   = 16
	InitLevelNodeCache = 16
)

Variables

View Source
var (
	ErrInvalidSubject = errors.New("sublist: invalid subject")
	ErrSublistNil     = errors.New("sublist: sublist is nil")
	ErrNotFound       = errors.New("sublist: not found")
	ErrSlowConsumer   = errors.New("subjection: slow consumer")
)
View Source
var Default = NewMultiBus(32)

Default is the default multi-sublist.

Functions

func MatchLevel

func MatchLevel(level *Level, tokens []string, ret *sublistResult)

MatchLevel

func SplitSubject

func SplitSubject(subject string, cache []string) ([]string, bool)

SplitSubject splits the subject into tokens.

func ValidSubject

func ValidSubject(subject string) bool

ValidSubject returns true if the subject is valid.

Types

type ICall

type ICall interface {
	// contains filtered or unexported methods
}

func Any

func Any[T any, F chan Var[T] | func(Var[T])](inner F) ICall

func Callback

func Callback[T any](cb func(Var[T])) ICall

Callback creates a new subscribe with a callback

func Chan

func Chan[T any](notify chan Var[T]) ICall

Chan creates a new subscribe with a channel

type ISet

type ISet[T comparable] interface {
	Add(e ...T)
	Remove(e ...T)
	Contains(e T) bool
	Len() int
	AppendToSlice(slice []T) []T
	Clear()
	Range(f func(e T) bool)
}

ISet is a set interface.

type ISublist

type ISublist interface {
	Subscribe(sub *Subscription) error
	Publish(subject string, param any) error

	Unsubscribe(sub *Subscription) error
	UnsubscribeBatch(subs []*Subscription) error

	SnmpInfo() *Snmp
}

type ITransformSet

type ITransformSet[T comparable] interface {
	ISet[T]
	Transform() ITransformSet[T]
}

ITransformSet is a set interface that can transform to another set.

type Level

type Level struct {
	Nodes map[string]*Node //

}

func NewLevel

func NewLevel() *Level

func (*Level) NumNodes

func (l *Level) NumNodes() int

NumNodes

func (*Level) PruneNode

func (l *Level) PruneNode(topic string)

PruneNode

type LevelCache

type LevelCache struct {
	Level *Level
	Node  *Node
	Topic string
}

type MixSet

type MixSet[T comparable] struct{ ITransformSet[T] }

MixSet is a set that uses a slice for small sets and a map for large sets.

func InitMixSet

func InitMixSet[T comparable]() MixSet[T]

InitMixSet creates a new set with an initial capacity.

func NewMixSet

func NewMixSet[T comparable]() *MixSet[T]

NewMixSet creates a new set.

func (*MixSet[T]) Add

func (s *MixSet[T]) Add(e ...T)

Add adds an element to the set.

func (*MixSet[T]) Remove

func (s *MixSet[T]) Remove(e ...T)

Remove removes an element from the set.

func (*MixSet[T]) Transform

func (s *MixSet[T]) Transform() ITransformSet[T]

Transform transforms the set to a map if the size is greater than mixSetMaxSliceSize. or to a slice if the size is less than mixSetMinMapSize.

type MultiSublist

type MultiSublist struct {
	// contains filtered or unexported fields
}

func NewMultiBus

func NewMultiBus(length uint) *MultiSublist

func (*MultiSublist) Publish

func (m *MultiSublist) Publish(subject string, param any) error

Publish

func (*MultiSublist) SnmpInfo

func (m *MultiSublist) SnmpInfo() *Snmp

SnmpInfo

func (*MultiSublist) Subscribe

func (m *MultiSublist) Subscribe(sub *Subscription) error

Subscribe

func (*MultiSublist) Unsubscribe

func (m *MultiSublist) Unsubscribe(subs *Subscription) error

Unsubscribe

func (*MultiSublist) UnsubscribeBatch

func (m *MultiSublist) UnsubscribeBatch(subs []*Subscription) error

UnsubscribeBatch

type Node

type Node struct {
	Next  *Level              // next level
	Psubs ISet[*Subscription] // original set
	Plist []*Subscription     // cache list
}

func NewNode

func NewNode() *Node

func (*Node) IsEmpty

func (n *Node) IsEmpty() bool

IsEmpty

type Opt

type Opt func(*subsOption)

func Once

func Once() Opt

Once

func Queue

func Queue() Opt

Queue

type Set

type Set[T comparable] map[T]struct{}

func NewSet

func NewSet[T comparable](cache uint) Set[T]

func NewSetNoCache

func NewSetNoCache[T comparable]() Set[T]

func (Set[T]) Add

func (s Set[T]) Add(e ...T)

Add adds an element to the set.

func (Set[T]) AppendToSlice

func (s Set[T]) AppendToSlice(slice []T) []T

AppendToSlice appends the elements in the set to a slice.

func (Set[T]) Clear

func (s Set[T]) Clear()

Clear

func (Set[T]) Contains

func (s Set[T]) Contains(e T) bool

Contains checks if an element is in the set.

func (Set[T]) Len

func (s Set[T]) Len() int

Len returns the number of elements in the set.

func (Set[T]) Range

func (s Set[T]) Range(f func(e T) bool)

Range

func (Set[T]) Remove

func (s Set[T]) Remove(e ...T)

Remove removes an element from the set.

func (Set[T]) Transform

func (s Set[T]) Transform() ITransformSet[T]

Transform transforms the set to a slice if the size is less than mixSetMinMapSize.

type SliceSet

type SliceSet[T comparable] struct {
	// contains filtered or unexported fields
}

func NewSliceSet

func NewSliceSet[T comparable]() *SliceSet[T]

NewSliceSet creates a new set.

func (*SliceSet[T]) Add

func (s *SliceSet[T]) Add(e ...T)

Add adds an element to the set.

func (*SliceSet[T]) AppendToSlice

func (s *SliceSet[T]) AppendToSlice(slice []T) []T

AppendToSlice appends the elements in the set to a slice.

func (*SliceSet[T]) Clear

func (s *SliceSet[T]) Clear()

Clear

func (*SliceSet[T]) Contains

func (s *SliceSet[T]) Contains(e T) bool

Contains checks if an element is in the set.

func (*SliceSet[T]) Len

func (s *SliceSet[T]) Len() int

Len returns the number of elements in the set.

func (*SliceSet[T]) Range

func (s *SliceSet[T]) Range(f func(e T) bool)

Range

func (*SliceSet[T]) Remove

func (s *SliceSet[T]) Remove(e ...T)

Remove removes an element from the set.

func (*SliceSet[T]) Transform

func (s *SliceSet[T]) Transform() ITransformSet[T]

transform transforms the set to a map if the size is greater than mixSetMaxSliceSize.

type Snmp

type Snmp struct {
	Matches atomic.Uint64
	Count   atomic.Uint64
	Inserts atomic.Uint64
	Removes atomic.Uint64
}

type Sublist

type Sublist struct {
	Snmp
	// contains filtered or unexported fields
}

func NewBus

func NewBus() *Sublist

NewBus

func (*Sublist) Publish

func (s *Sublist) Publish(subject string, param any) (err error)

Publish

func (*Sublist) SnmpInfo

func (s *Sublist) SnmpInfo() *Snmp

SnmpInfo

func (*Sublist) Subscribe

func (s *Sublist) Subscribe(sub *Subscription) error

Subscribe adds the subscription into the sublist.

func (*Sublist) Unsubscribe

func (s *Sublist) Unsubscribe(sub *Subscription) error

func (*Sublist) UnsubscribeBatch

func (s *Sublist) UnsubscribeBatch(subs []*Subscription) error

UnsubscribeBatch

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubs

func NewSubs(subject string, call ICall, opts ...Opt) *Subscription

NewSubs

func (*Subscription) Subject

func (s *Subscription) Subject() string

Subject

type Var

type Var[T any] struct {
	// contains filtered or unexported fields
}

func PackParam

func PackParam[T any](val any) Var[T]

func (Var[T]) Raw

func (o Var[T]) Raw() any

Raw

func (Var[T]) Val

func (o Var[T]) Val() T

Val

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL