eventbus

package module
v0.0.0-...-164d218 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

eventbus

纯内存实现的发布订阅模式的事件总线,订阅逻辑仿照的nats

特点

  1. 并发安全
  2. 通过泛型提供有限的类型安全
  3. 支持回调和channel两种订阅方式
  4. 支持订阅者取消订阅

TODO

  • 支持queue模式
  • 支持通配匹配和前向匹配
  • 支持同步的Request模式

使用方法

安装

go get github.com/ganyyy/eventbus

使用

package main

import (
	"sync"
	"sync/atomic"

	"github.com/ganyyy/eventbus"
)

func main() {

	var bus = eventbus.NewMultiBus(4)

	const (
		Topic = "test.1"
		Num   = 100
	)

	var total1, total2 atomic.Int64
	var subCallback = eventbus.CallbackSubs(
		Topic,
		func(p eventbus.Msg[int]) {
			total1.Add(int64(p.Val()))
		},
	)

	var notifyChannel, subChan = eventbus.ChanSubs[int](
		Topic,
		Num,
	)
	var stopNotify = make(chan struct{})
	var stopReceive = make(chan struct{})

	go func() {
	end:
		for {
			// consume all
			select {
			case val := <-notifyChannel:
				total2.Add(int64(val.Val()))
			case <-stopReceive:
				select {
				case val := <-notifyChannel:
					total2.Add(int64(val.Val()))
					continue end
				default:
				}
				break end
			}
		}
		close(stopNotify)
	}()

	bus.Subscribe(subCallback)
	bus.Subscribe(subChan)

	defer bus.Unsubscribe(subChan)
	defer bus.Unsubscribe(subCallback)

	var wg sync.WaitGroup
	wg.Add(Num)

	for i := range Num {
		go func(i int) {
			bus.Publish(Topic, i)
			wg.Done()
		}(i)
	}

	wg.Wait()
	close(stopReceive)
	<-stopNotify

	println(total1.Load(), total2.Load())
}

Documentation

Index

Constants

View Source
const (
	TSep  = "."
	BtSep = '.'
)
View Source
const (
	PListCacheMin  = 128 // build a quick cache when the number of subscriptions is greater than this value
	StackCacheSize = 32  // normal stack cache size
)
View Source
const (
	InitNodeSubCache   = 16
	InitLevelNodeCache = 16
)

Variables

View Source
var (
	ErrInvalidSubject = errors.New("sublist: invalid subject")
	ErrSublistNil     = errors.New("sublist: sublist is nil")
	ErrNotFound       = errors.New("sublist: not found")
	ErrSlowConsumer   = errors.New("subjection: slow consumer")
)
View Source
var Default = NewMultiBus(32)

Default is the default multi-sublist.

Functions

func MakeChan

func MakeChan[T any](capacity int) (tx chan<- Msg[T], rx <-chan Msg[T])

MakeChan

func MatchLevel

func MatchLevel(level *Level, tokens []string, ret *sublistResult)

MatchLevel

func SplitSubject

func SplitSubject(subject string, cache []string) ([]string, bool)

SplitSubject splits the subject into tokens.

func ValidSubject

func ValidSubject(subject string) bool

ValidSubject returns true if the subject is valid.

Types

type ICall

type ICall interface {
	// contains filtered or unexported methods
}

func Any

func Any[T any, F chan Msg[T] | func(Msg[T])](inner F) ICall

func Callback

func Callback[T any](cb func(Msg[T])) ICall

Callback creates a new subscribe with a callback

func Chan

func Chan[T any](notify chan<- Msg[T]) ICall

Chan creates a new subscribe with a channel

type IReply

type IReply interface {
	// contains filtered or unexported methods
}

type ISet

type ISet[T comparable] interface {
	Add(e ...T)
	Remove(e ...T)
	Contains(e T) bool
	Len() int
	AppendToSlice(slice []T) []T
	Clear()
	Range(f func(e T) bool)
}

ISet is a set interface.

type ISublist

type ISublist interface {
	Subscribe(sub *Subscription) error
	Publish(subject string, param any) error
	Request(subject string, param any, reply IReply) error

	Unsubscribe(sub *Subscription) error
	UnsubscribeBatch(subs []*Subscription) error

	SnmpInfo() *Snmp
}

type ITransformSet

type ITransformSet[T comparable] interface {
	ISet[T]
	Transform() ITransformSet[T]
}

ITransformSet is a set interface that can transform to another set.

type Level

type Level struct {
	Nodes map[string]*Node //

}

func NewLevel

func NewLevel() *Level

func (*Level) NumNodes

func (l *Level) NumNodes() int

NumNodes

func (*Level) PruneNode

func (l *Level) PruneNode(topic string)

PruneNode

type LevelCache

type LevelCache struct {
	Level *Level
	Node  *Node
	Topic string
}

type MixSet

type MixSet[T comparable] struct{ ITransformSet[T] }

MixSet is a set that uses a slice for small sets and a map for large sets.

func InitMixSet

func InitMixSet[T comparable]() MixSet[T]

InitMixSet creates a new set with an initial capacity.

func NewMixSet

func NewMixSet[T comparable]() *MixSet[T]

NewMixSet creates a new set.

func (*MixSet[T]) Add

func (s *MixSet[T]) Add(e ...T)

Add adds an element to the set.

func (*MixSet[T]) Remove

func (s *MixSet[T]) Remove(e ...T)

Remove removes an element from the set.

func (*MixSet[T]) Transform

func (s *MixSet[T]) Transform() ITransformSet[T]

Transform transforms the set to a map if the size is greater than mixSetMaxSliceSize. or to a slice if the size is less than mixSetMinMapSize.

type Msg

type Msg[T any] struct {
	Var[T]
	// contains filtered or unexported fields
}

func (Msg[T]) Reply

func (o Msg[T]) Reply(val any) bool

Reply

type MultiSublist

type MultiSublist struct {
	// contains filtered or unexported fields
}

func NewMultiBus

func NewMultiBus(length uint) *MultiSublist

func (*MultiSublist) Publish

func (m *MultiSublist) Publish(subject string, param any) error

Publish

func (*MultiSublist) Request

func (m *MultiSublist) Request(subject string, param any, reply IReply) error

Request

func (*MultiSublist) SnmpInfo

func (m *MultiSublist) SnmpInfo() *Snmp

SnmpInfo

func (*MultiSublist) Subscribe

func (m *MultiSublist) Subscribe(sub *Subscription) error

Subscribe

func (*MultiSublist) Unsubscribe

func (m *MultiSublist) Unsubscribe(subs *Subscription) error

Unsubscribe

func (*MultiSublist) UnsubscribeBatch

func (m *MultiSublist) UnsubscribeBatch(subs []*Subscription) error

UnsubscribeBatch

type Node

type Node struct {
	Next  *Level                         // next level
	Psubs ISet[*Subscription]            // original set
	Qsubs map[string]ISet[*Subscription] // queue set
	Plist []*Subscription                // cache list
}

func NewNode

func NewNode() *Node

func (*Node) IsEmpty

func (n *Node) IsEmpty() bool

IsEmpty

type Opt

type Opt func(*subsOption)

func Once

func Once() Opt

Once

func Queue

func Queue(queue string) Opt

Queue

type Pool

type Pool[T any] struct {
	// Reset resets a T and returns true if it can be reused.
	// If is nil, it will not be reset and put back to the pool.
	Reset func(T) bool
	// contains filtered or unexported fields
}

T must pointer type

func NewPool

func NewPool[T any](newFunc func() T, resetFunc func(T) bool) *Pool[T]

func (*Pool[T]) Get

func (p *Pool[T]) Get() T

Get returns a T from the pool or create a new T.

func (*Pool[T]) Put

func (p *Pool[T]) Put(v T)

Put puts a T back to the pool.

type Reply

type Reply[T any] struct {
	// contains filtered or unexported fields
}

func NewReply

func NewReply[T any]() *Reply[T]

func (*Reply[T]) Resp

func (r *Reply[T]) Resp(ctx context.Context) (Var[T], bool)

Resp

type Set

type Set[T comparable] map[T]struct{}

func NewSet

func NewSet[T comparable](cache uint) Set[T]

func NewSetNoCache

func NewSetNoCache[T comparable]() Set[T]

func (Set[T]) Add

func (s Set[T]) Add(e ...T)

Add adds an element to the set.

func (Set[T]) AppendToSlice

func (s Set[T]) AppendToSlice(slice []T) []T

AppendToSlice appends the elements in the set to a slice.

func (Set[T]) Clear

func (s Set[T]) Clear()

Clear

func (Set[T]) Contains

func (s Set[T]) Contains(e T) bool

Contains checks if an element is in the set.

func (Set[T]) Len

func (s Set[T]) Len() int

Len returns the number of elements in the set.

func (Set[T]) Range

func (s Set[T]) Range(f func(e T) bool)

Range

func (Set[T]) Remove

func (s Set[T]) Remove(e ...T)

Remove removes an element from the set.

func (Set[T]) Transform

func (s Set[T]) Transform() ITransformSet[T]

Transform transforms the set to a slice if the size is less than mixSetMinMapSize.

type SliceSet

type SliceSet[T comparable] struct {
	// contains filtered or unexported fields
}

func NewSliceSet

func NewSliceSet[T comparable]() *SliceSet[T]

NewSliceSet creates a new set.

func (*SliceSet[T]) Add

func (s *SliceSet[T]) Add(e ...T)

Add adds an element to the set.

func (*SliceSet[T]) AppendToSlice

func (s *SliceSet[T]) AppendToSlice(slice []T) []T

AppendToSlice appends the elements in the set to a slice.

func (*SliceSet[T]) Clear

func (s *SliceSet[T]) Clear()

Clear

func (*SliceSet[T]) Contains

func (s *SliceSet[T]) Contains(e T) bool

Contains checks if an element is in the set.

func (*SliceSet[T]) Len

func (s *SliceSet[T]) Len() int

Len returns the number of elements in the set.

func (*SliceSet[T]) Range

func (s *SliceSet[T]) Range(f func(e T) bool)

Range

func (*SliceSet[T]) Remove

func (s *SliceSet[T]) Remove(e ...T)

Remove removes an element from the set.

func (*SliceSet[T]) Transform

func (s *SliceSet[T]) Transform() ITransformSet[T]

transform transforms the set to a map if the size is greater than mixSetMaxSliceSize.

type Snmp

type Snmp struct {
	Matches atomic.Uint64
	Count   atomic.Uint64
	Inserts atomic.Uint64
	Removes atomic.Uint64
}

type Sublist

type Sublist struct {
	Snmp
	// contains filtered or unexported fields
}

func NewBus

func NewBus() *Sublist

NewBus

func (*Sublist) Publish

func (s *Sublist) Publish(subject string, param any) (err error)

Publish

func (*Sublist) Request

func (s *Sublist) Request(subject string, param any, reply IReply) (err error)

Request

func (*Sublist) SnmpInfo

func (s *Sublist) SnmpInfo() *Snmp

SnmpInfo

func (*Sublist) Subscribe

func (s *Sublist) Subscribe(sub *Subscription) error

Subscribe adds the subscription into the sublist.

func (*Sublist) Unsubscribe

func (s *Sublist) Unsubscribe(sub *Subscription) error

func (*Sublist) UnsubscribeBatch

func (s *Sublist) UnsubscribeBatch(subs []*Subscription) error

UnsubscribeBatch

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func CallbackSubs

func CallbackSubs[T any](topic string, cb func(Msg[T]), opts ...Opt) *Subscription

CallbackSubs

func ChanSubs

func ChanSubs[T any](topic string, capacity int, opts ...Opt) (rx <-chan Msg[T], sub *Subscription)

ChanSubs

func NewSubs

func NewSubs(subject string, call ICall, opts ...Opt) *Subscription

NewSubs

func (*Subscription) Subject

func (s *Subscription) Subject() string

Subject

type Var

type Var[T any] struct {
	// contains filtered or unexported fields
}

func (Var[T]) Raw

func (v Var[T]) Raw() any

Raw

func (Var[T]) Val

func (v Var[T]) Val() T

Val

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL