patterns

package
v1.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2024 License: Apache-2.0 Imports: 7 Imported by: 2

Documentation

Index

Constants

View Source
const (
	MaxPriority = math.MaxInt64
	MinPriority = math.MinInt64
)
View Source
const (
	SegmentCount = 256
)

Variables

View Source
var (
	ErrBrokerClosed       = errors.New("broker closed")
	ErrTopicNotFound      = errors.New("topic not found")
	ErrPublishFailed      = errors.New("publish failed")
	ErrTopicAlreadyExists = errors.New("topic already exists")
)
View Source
var (
	ErrKeyDoesNotExist = errors.New("key does not exist")
	ErrEmptyQueue      = errors.New("queue is empty")
	ErrEmptyChannel    = errors.New("channel is empty")
	ErrEmptyReqTime    = errors.New("request time is empty")
)
View Source
var (
	ErrEmptySingleFlight = errors.New("empty singleflight")
	ErrFailedTypeAssert  = errors.New("failed type assert")
)
View Source
var (
	ErrCircuitWait = errors.New("err need more time wait before request")
)
View Source
var (
	ErrEmptyParentContext = errors.New("empty parent context")
)
View Source
var (
	ErrInvalidCount = errors.New("invalid count")
)
View Source
var (
	ErrToManyCalls = errors.New("too many calls")
)

Functions

func AggregateChannel

func AggregateChannel[T any](ctx context.Context, inputStream <-chan T, f AggregateFunc[T]) <-chan T

func Detach

func Detach(ctx context.Context) (context.Context, error)

func Do added in v1.0.13

func Do[T any](
	sf *singleflight.Group,
	key string,
	fn func() (any, error),
) (v T, err error, shared bool)

func DoChan added in v1.0.13

func DoChan[T any](
	sf *singleflight.Group,
	key string,
	fn func() (any, error),
) (<-chan Result[T], error)

func DoChanCtx added in v1.0.13

func DoChanCtx[T any](
	ctx context.Context,
	sf *singleflight.Group,
	key string,
	fn func() (any, error),
) (<-chan Result[T], error)

func FanIn added in v1.0.1

func FanIn[T any](sources ...<-chan T) <-chan T

func FanOut added in v1.0.1

func FanOut[T any](source <-chan T, n int) []<-chan T

func FilterChannel

func FilterChannel[T any](ctx context.Context, inputStream <-chan T, f FilterFunc[T]) <-chan T

func Forget added in v1.0.13

func Forget(sf *singleflight.Group, key string) error

func Pipeline

func Pipeline[T any](jobs ...Job[T])

func Ticker added in v1.0.6

func Ticker(
	ctx context.Context,
	interval time.Duration,
	f TickerFunc,
)

func TickerV2 added in v1.0.13

func TickerV2(
	ctx context.Context,
	interval time.Duration,
	f TickerFunc,
)

func Workers

func Workers[T any](
	ctx context.Context,
	f WorkFunc[T],
	n uint,
	source <-chan T,
) <-chan struct{}

func Wrap

func Wrap(ctx context.Context, f WrappedFunc) error

Types

type AggregateFunc

type AggregateFunc[T any] func(data T) T

type Broadcast

type Broadcast[T any] struct {
	// contains filtered or unexported fields
}

func NewBroadcast

func NewBroadcast[T any](ctx context.Context, source <-chan T) *Broadcast[T]

func (*Broadcast[T]) Subscribe

func (b *Broadcast[T]) Subscribe() <-chan T

func (*Broadcast[T]) SubscribersCount added in v1.0.3

func (b *Broadcast[T]) SubscribersCount() int

func (*Broadcast[T]) Unsubscribe added in v1.0.3

func (b *Broadcast[T]) Unsubscribe(channel <-chan T)

type Broker added in v1.0.3

type Broker[T any] struct {
	// contains filtered or unexported fields
}

func NewBroker added in v1.0.3

func NewBroker[T any]() *Broker[T]

func (*Broker[T]) CloseAll added in v1.0.3

func (b *Broker[T]) CloseAll() error

func (*Broker[T]) CloseTopic added in v1.0.3

func (b *Broker[T]) CloseTopic(topic string) error

func (*Broker[T]) CreateTopic added in v1.0.3

func (b *Broker[T]) CreateTopic(ctx context.Context, topic string) error

func (*Broker[T]) Publish added in v1.0.3

func (b *Broker[T]) Publish(ctx context.Context, topic string, message T) error

func (*Broker[T]) Subscribe added in v1.0.3

func (b *Broker[T]) Subscribe(topic string) (<-chan T, error)

func (*Broker[T]) SubscribersCountAll added in v1.0.3

func (b *Broker[T]) SubscribersCountAll() (int, error)

func (*Broker[T]) SubscribersCountByTopic added in v1.0.3

func (b *Broker[T]) SubscribersCountByTopic(topic string) (int, error)

func (*Broker[T]) Unsubscribe added in v1.0.3

func (b *Broker[T]) Unsubscribe(topic string, ch <-chan T) error

type CacheMap added in v1.0.5

type CacheMap[K comparable, T any] struct {
	// contains filtered or unexported fields
}

func NewCacheMap added in v1.0.5

func NewCacheMap[K comparable, T any](f ReloadFunc[K, T], clearTick time.Duration) *CacheMap[K, T]

func (*CacheMap[K, T]) ClearCache added in v1.0.5

func (cm *CacheMap[K, T]) ClearCache()

func (*CacheMap[K, T]) Get added in v1.0.5

func (cm *CacheMap[K, T]) Get(key K) (T, bool)

func (*CacheMap[K, T]) RemoveCache added in v1.0.5

func (cm *CacheMap[K, T]) RemoveCache()

func (*CacheMap[K, T]) Set added in v1.0.5

func (cm *CacheMap[K, T]) Set(key K, value T)

type Circuit

type Circuit[T any] func(ctx context.Context) (T, error)

func Breaker

func Breaker[T any](circuit Circuit[T], waitDuration time.Duration, failureThreshold uint) Circuit[T]

func Debounce

func Debounce[T any](circuit Circuit[T], d time.Duration) Circuit[T]

type Effector

type Effector[T any] func(ctx context.Context) (T, error)

func Retry

func Retry[T any](effector Effector[T], retries int, delay time.Duration) Effector[T]

func Throttle

func Throttle[T any](e Effector[T], max, refill uint, d time.Duration) Effector[T]

type FilterFunc

type FilterFunc[T any] func(data T) bool

type Iterator

type Iterator[T any] struct {
	// contains filtered or unexported fields
}

func NewIterator

func NewIterator[T any](data []T) *Iterator[T]

func (*Iterator[T]) GetNext

func (i *Iterator[T]) GetNext() T

func (*Iterator[T]) HasNext

func (i *Iterator[T]) HasNext() bool

type Job

type Job[T any] func(in, out chan T)

type MutexBucket added in v1.0.3

type MutexBucket[T any] struct {
	// contains filtered or unexported fields
}

func NewMutexBucket added in v1.0.3

func NewMutexBucket[T any](elems [SegmentCount]T) *MutexBucket[T]

func (*MutexBucket[T]) UnderLock added in v1.0.3

func (mb *MutexBucket[T]) UnderLock(identifier int64, f MutexBucketFunc[T]) error

func (*MutexBucket[T]) UnderReadLock added in v1.0.3

func (mb *MutexBucket[T]) UnderReadLock(identifier int64, f MutexBucketFunc[T]) error

type MutexBucketFunc added in v1.0.14

type MutexBucketFunc[T any] func(p T) error

type NodeInfo added in v1.0.17

type NodeInfo[T any, M comparable] struct {
	Key      M
	Value    T
	Priority int64
}

type NodeInfoChannel added in v1.0.17

type NodeInfoChannel[T any, M comparable] chan NodeInfo[T, M]

type Queue added in v1.0.17

type Queue[T any, M comparable] struct {
	// contains filtered or unexported fields
}

func NewQueue added in v1.0.17

func NewQueue[T any, M comparable](opts ...QueueOption[T, M]) *Queue[T, M]

NewQueue creates a new queue with the given options. Use one of the methods Pop/PopByKey or PopByChannelWithReqTime to get the value from the queue.

func (*Queue[T, M]) Close added in v1.0.17

func (q *Queue[T, M]) Close()

func (*Queue[T, M]) Pop added in v1.0.17

func (q *Queue[T, M]) Pop() (NodeInfo[T, M], error)

func (*Queue[T, M]) PopByChannelWithReqTime added in v1.0.17

func (q *Queue[T, M]) PopByChannelWithReqTime(reqTime time.Duration) (<-chan NodeInfo[T, M], error)

func (*Queue[T, M]) PopByKey added in v1.0.17

func (q *Queue[T, M]) PopByKey(key M) (NodeInfo[T, M], error)

func (*Queue[T, M]) Push added in v1.0.17

func (q *Queue[T, M]) Push(
	priority int64,
	key M,
	value T,
)

func (*Queue[T, M]) Size added in v1.0.17

func (q *Queue[T, M]) Size() int

type QueueOption added in v1.0.17

type QueueOption[T any, M comparable] func(q *Queue[T, M])

func WithChannel added in v1.0.17

func WithChannel[T any, M comparable]() QueueOption[T, M]

type ReloadFunc added in v1.0.5

type ReloadFunc[K comparable, T any] func(oldCache map[K]T) map[K]T

type Result added in v1.0.13

type Result[T any] struct {
	Val    T
	Err    error
	Shared bool
}

type Semaphore added in v1.0.8

type Semaphore struct {
	// contains filtered or unexported fields
}

func NewSemaphore added in v1.0.8

func NewSemaphore(n int) (*Semaphore, error)

func (*Semaphore) Acquire added in v1.0.8

func (s *Semaphore) Acquire()

func (*Semaphore) AcquireCtx added in v1.0.8

func (s *Semaphore) AcquireCtx(ctx context.Context) error

func (*Semaphore) Release added in v1.0.8

func (s *Semaphore) Release()

type TickerFunc added in v1.0.6

type TickerFunc func()

type Topic added in v1.0.3

type Topic[T any] struct {
	// contains filtered or unexported fields
}

type WorkFunc

type WorkFunc[T any] func(data T)

type WrappedFunc

type WrappedFunc func() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL