Documentation ¶
Index ¶
- Constants
- Variables
- func AggregateChannel[T any](ctx context.Context, inputStream <-chan T, f AggregateFunc[T]) <-chan T
- func Detach(ctx context.Context) (context.Context, error)
- func Do[T any](sf *singleflight.Group, key string, fn func() (any, error)) (v T, err error, shared bool)
- func DoChan[T any](sf *singleflight.Group, key string, fn func() (any, error)) (<-chan Result[T], error)
- func DoChanCtx[T any](ctx context.Context, sf *singleflight.Group, key string, ...) (<-chan Result[T], error)
- func FanIn[T any](sources ...<-chan T) <-chan T
- func FanOut[T any](source <-chan T, n int) []<-chan T
- func FilterChannel[T any](ctx context.Context, inputStream <-chan T, f FilterFunc[T]) <-chan T
- func Forget(sf *singleflight.Group, key string) error
- func Pipeline[T any](jobs ...Job[T])
- func Ticker(ctx context.Context, interval time.Duration, f TickerFunc)
- func TickerV2(ctx context.Context, interval time.Duration, f TickerFunc)
- func Workers[T any](ctx context.Context, f WorkFunc[T], n uint, source <-chan T) <-chan struct{}
- func Wrap(ctx context.Context, f WrappedFunc) error
- type AggregateFunc
- type Broadcast
- type Broker
- func (b *Broker[T]) CloseAll() error
- func (b *Broker[T]) CloseTopic(topic string) error
- func (b *Broker[T]) CreateTopic(ctx context.Context, topic string) error
- func (b *Broker[T]) Publish(ctx context.Context, topic string, message T) error
- func (b *Broker[T]) Subscribe(topic string) (<-chan T, error)
- func (b *Broker[T]) SubscribersCountAll() (int, error)
- func (b *Broker[T]) SubscribersCountByTopic(topic string) (int, error)
- func (b *Broker[T]) Unsubscribe(topic string, ch <-chan T) error
- type CacheMap
- type Circuit
- type Effector
- type FilterFunc
- type Iterator
- type Job
- type MutexBucket
- type MutexBucketFunc
- type NodeInfo
- type NodeInfoChannel
- type Queue
- func (q *Queue[T, M]) Close()
- func (q *Queue[T, M]) Pop() (NodeInfo[T, M], error)
- func (q *Queue[T, M]) PopByChannelWithReqTime(reqTime time.Duration) (<-chan NodeInfo[T, M], error)
- func (q *Queue[T, M]) PopByKey(key M) (NodeInfo[T, M], error)
- func (q *Queue[T, M]) Push(priority int64, key M, value T)
- func (q *Queue[T, M]) Size() int
- type QueueOption
- type ReloadFunc
- type Result
- type Semaphore
- type TickerFunc
- type Topic
- type WorkFunc
- type WrappedFunc
Constants ¶
View Source
const ( MaxPriority = math.MaxInt64 MinPriority = math.MinInt64 )
View Source
const (
SegmentCount = 256
)
Variables ¶
View Source
var ( ErrBrokerClosed = errors.New("broker closed") ErrTopicNotFound = errors.New("topic not found") ErrPublishFailed = errors.New("publish failed") ErrTopicAlreadyExists = errors.New("topic already exists") )
View Source
var ( ErrKeyDoesNotExist = errors.New("key does not exist") ErrEmptyQueue = errors.New("queue is empty") ErrEmptyChannel = errors.New("channel is empty") ErrEmptyReqTime = errors.New("request time is empty") )
View Source
var ( ErrEmptySingleFlight = errors.New("empty singleflight") ErrFailedTypeAssert = errors.New("failed type assert") )
View Source
var (
ErrCircuitWait = errors.New("err need more time wait before request")
)
View Source
var (
ErrEmptyParentContext = errors.New("empty parent context")
)
View Source
var (
ErrInvalidCount = errors.New("invalid count")
)
View Source
var (
ErrToManyCalls = errors.New("too many calls")
)
Functions ¶
func AggregateChannel ¶
func AggregateChannel[T any](ctx context.Context, inputStream <-chan T, f AggregateFunc[T]) <-chan T
func FilterChannel ¶
func FilterChannel[T any](ctx context.Context, inputStream <-chan T, f FilterFunc[T]) <-chan T
func Ticker ¶ added in v1.0.6
func Ticker( ctx context.Context, interval time.Duration, f TickerFunc, )
func TickerV2 ¶ added in v1.0.13
func TickerV2( ctx context.Context, interval time.Duration, f TickerFunc, )
Types ¶
type AggregateFunc ¶
type AggregateFunc[T any] func(data T) T
type Broadcast ¶
type Broadcast[T any] struct { // contains filtered or unexported fields }
func (*Broadcast[T]) SubscribersCount ¶ added in v1.0.3
func (*Broadcast[T]) Unsubscribe ¶ added in v1.0.3
func (b *Broadcast[T]) Unsubscribe(channel <-chan T)
type Broker ¶ added in v1.0.3
type Broker[T any] struct { // contains filtered or unexported fields }
func (*Broker[T]) CloseTopic ¶ added in v1.0.3
func (*Broker[T]) CreateTopic ¶ added in v1.0.3
func (*Broker[T]) SubscribersCountAll ¶ added in v1.0.3
func (*Broker[T]) SubscribersCountByTopic ¶ added in v1.0.3
func (*Broker[T]) Unsubscribe ¶ added in v1.0.3
type CacheMap ¶ added in v1.0.5
type CacheMap[K comparable, T any] struct { // contains filtered or unexported fields }
func NewCacheMap ¶ added in v1.0.5
func NewCacheMap[K comparable, T any](f ReloadFunc[K, T], clearTick time.Duration) *CacheMap[K, T]
func (*CacheMap[K, T]) ClearCache ¶ added in v1.0.5
func (cm *CacheMap[K, T]) ClearCache()
func (*CacheMap[K, T]) RemoveCache ¶ added in v1.0.5
func (cm *CacheMap[K, T]) RemoveCache()
type FilterFunc ¶
type Iterator ¶
type Iterator[T any] struct { // contains filtered or unexported fields }
func NewIterator ¶
type MutexBucket ¶ added in v1.0.3
type MutexBucket[T any] struct { // contains filtered or unexported fields }
func NewMutexBucket ¶ added in v1.0.3
func NewMutexBucket[T any](elems [SegmentCount]T) *MutexBucket[T]
func (*MutexBucket[T]) UnderLock ¶ added in v1.0.3
func (mb *MutexBucket[T]) UnderLock(identifier int64, f MutexBucketFunc[T]) error
func (*MutexBucket[T]) UnderReadLock ¶ added in v1.0.3
func (mb *MutexBucket[T]) UnderReadLock(identifier int64, f MutexBucketFunc[T]) error
type MutexBucketFunc ¶ added in v1.0.14
type NodeInfo ¶ added in v1.0.17
type NodeInfo[T any, M comparable] struct { Key M Value T Priority int64 }
type NodeInfoChannel ¶ added in v1.0.17
type NodeInfoChannel[T any, M comparable] chan NodeInfo[T, M]
type Queue ¶ added in v1.0.17
type Queue[T any, M comparable] struct { // contains filtered or unexported fields }
func NewQueue ¶ added in v1.0.17
func NewQueue[T any, M comparable](opts ...QueueOption[T, M]) *Queue[T, M]
NewQueue creates a new queue with the given options. Use one of the methods Pop/PopByKey or PopByChannelWithReqTime to get the value from the queue.
func (*Queue[T, M]) PopByChannelWithReqTime ¶ added in v1.0.17
type QueueOption ¶ added in v1.0.17
type QueueOption[T any, M comparable] func(q *Queue[T, M])
func WithChannel ¶ added in v1.0.17
func WithChannel[T any, M comparable]() QueueOption[T, M]
type ReloadFunc ¶ added in v1.0.5
type ReloadFunc[K comparable, T any] func(oldCache map[K]T) map[K]T
type Semaphore ¶ added in v1.0.8
type Semaphore struct {
// contains filtered or unexported fields
}
func NewSemaphore ¶ added in v1.0.8
func (*Semaphore) AcquireCtx ¶ added in v1.0.8
type TickerFunc ¶ added in v1.0.6
type TickerFunc func()
type WrappedFunc ¶
type WrappedFunc func() error
Source Files ¶
Click to show internal directories.
Click to hide internal directories.