Documentation ¶
Index ¶
- Constants
- Variables
- func AggregateChannel[T any](ctx context.Context, inputStream <-chan T, f AggregateFunc[T]) <-chan T
- func Detach(ctx context.Context) (context.Context, error)
- func FanIn[T any](sources ...<-chan T) <-chan T
- func FanOut[T any](source <-chan T, n int) []<-chan T
- func FilterChannel[T any](ctx context.Context, inputStream <-chan T, f FilterFunc[T]) <-chan T
- func Pipeline[T any](jobs ...Job[T])
- func Workers[T any](ctx context.Context, f WorkFunc[T], n int, wg *sync.WaitGroup, source <-chan T, ...)
- func Wrap(ctx context.Context, f WrappedFunc) error
- type AggregateFunc
- type Broadcast
- type Broker
- func (b *Broker[T]) CloseAll() error
- func (b *Broker[T]) CloseTopic(topic string) error
- func (b *Broker[T]) CreateTopic(ctx context.Context, topic string) error
- func (b *Broker[T]) Publish(ctx context.Context, topic string, message T) error
- func (b *Broker[T]) Subscribe(topic string) (<-chan T, error)
- func (b *Broker[T]) SubscribersCountAll() (int, error)
- func (b *Broker[T]) SubscribersCountByTopic(topic string) (int, error)
- func (b *Broker[T]) Unsubscribe(topic string, ch <-chan T) error
- type Circuit
- type Effector
- type FilterFunc
- type Iterator
- type Job
- type MutexBucket
- type Topic
- type WorkFunc
- type WorkersCurator
- type WrappedFunc
Constants ¶
View Source
const (
SegmentCount = 256
)
Variables ¶
View Source
var ( ErrBrokerClosed = errors.New("broker closed") ErrTopicNotFound = errors.New("topic not found") ErrPublishFailed = errors.New("publish failed") ErrTopicAlreadyExists = errors.New("topic already exists") )
View Source
var (
ErrCircuitWait = errors.New("err need more time wait before request")
)
View Source
var (
ErrEmptyParentContext = errors.New("empty parent context")
)
View Source
var (
ErrToManyCalls = errors.New("too many calls")
)
Functions ¶
func AggregateChannel ¶
func AggregateChannel[T any](ctx context.Context, inputStream <-chan T, f AggregateFunc[T]) <-chan T
func FilterChannel ¶
func FilterChannel[T any](ctx context.Context, inputStream <-chan T, f FilterFunc[T]) <-chan T
Types ¶
type AggregateFunc ¶
type AggregateFunc[T any] func(data T) T
type Broadcast ¶
type Broadcast[T any] struct { // contains filtered or unexported fields }
func (*Broadcast[T]) SubscribersCount ¶ added in v1.0.3
func (*Broadcast[T]) Unsubscribe ¶ added in v1.0.3
func (b *Broadcast[T]) Unsubscribe(channel <-chan T)
type Broker ¶ added in v1.0.3
type Broker[T any] struct { // contains filtered or unexported fields }
func (*Broker[T]) CloseTopic ¶ added in v1.0.3
func (*Broker[T]) CreateTopic ¶ added in v1.0.3
func (*Broker[T]) SubscribersCountAll ¶ added in v1.0.3
func (*Broker[T]) SubscribersCountByTopic ¶ added in v1.0.3
func (*Broker[T]) Unsubscribe ¶ added in v1.0.3
type FilterFunc ¶
type Iterator ¶
type Iterator[T any] struct { // contains filtered or unexported fields }
func NewIterator ¶
type MutexBucket ¶ added in v1.0.3
type MutexBucket[T any] struct { // contains filtered or unexported fields }
func NewMutexBucket ¶ added in v1.0.3
func NewMutexBucket[T any](elems [SegmentCount]*T) *MutexBucket[T]
func (*MutexBucket[T]) UnderLock ¶ added in v1.0.3
func (mb *MutexBucket[T]) UnderLock(identifier int64, f func(p *T) error) error
func (*MutexBucket[T]) UnderReadLock ¶ added in v1.0.3
func (mb *MutexBucket[T]) UnderReadLock(identifier int64, f func(p *T) error) error
type WorkersCurator ¶
type WorkersCurator[T any] struct { // contains filtered or unexported fields }
func NewWorkersCurator ¶
func (*WorkersCurator[T]) GetErrCh ¶
func (c *WorkersCurator[T]) GetErrCh() chan error
func (*WorkersCurator[T]) Wait ¶
func (c *WorkersCurator[T]) Wait()
type WrappedFunc ¶
type WrappedFunc func() error
Click to show internal directories.
Click to hide internal directories.