patterns

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2023 License: Apache-2.0 Imports: 6 Imported by: 2

Documentation

Index

Constants

View Source
const (
	SegmentCount = 256
)

Variables

View Source
var (
	ErrBrokerClosed       = errors.New("broker closed")
	ErrTopicNotFound      = errors.New("topic not found")
	ErrPublishFailed      = errors.New("publish failed")
	ErrTopicAlreadyExists = errors.New("topic already exists")
)
View Source
var (
	ErrCircuitWait = errors.New("err need more time wait before request")
)
View Source
var (
	ErrEmptyParentContext = errors.New("empty parent context")
)
View Source
var (
	ErrToManyCalls = errors.New("too many calls")
)

Functions

func AggregateChannel

func AggregateChannel[T any](ctx context.Context, inputStream <-chan T, f AggregateFunc[T]) <-chan T

func Detach

func Detach(ctx context.Context) (context.Context, error)

func FanIn added in v1.0.1

func FanIn[T any](sources ...<-chan T) <-chan T

func FanOut added in v1.0.1

func FanOut[T any](source <-chan T, n int) []<-chan T

func FilterChannel

func FilterChannel[T any](ctx context.Context, inputStream <-chan T, f FilterFunc[T]) <-chan T

func Pipeline

func Pipeline[T any](jobs ...Job[T])

func Workers

func Workers[T any](
	ctx context.Context,
	f WorkFunc[T],
	n int,
	wg *sync.WaitGroup,
	source <-chan T,
	errCh chan error,
	panicCh chan struct{},
)

func Wrap

func Wrap(ctx context.Context, f WrappedFunc) error

Types

type AggregateFunc

type AggregateFunc[T any] func(data T) T

type Broadcast

type Broadcast[T any] struct {
	// contains filtered or unexported fields
}

func NewBroadcast

func NewBroadcast[T any](ctx context.Context, source <-chan T) *Broadcast[T]

func (*Broadcast[T]) Subscribe

func (b *Broadcast[T]) Subscribe() <-chan T

func (*Broadcast[T]) SubscribersCount added in v1.0.3

func (b *Broadcast[T]) SubscribersCount() int

func (*Broadcast[T]) Unsubscribe added in v1.0.3

func (b *Broadcast[T]) Unsubscribe(channel <-chan T)

type Broker added in v1.0.3

type Broker[T any] struct {
	// contains filtered or unexported fields
}

func NewBroker added in v1.0.3

func NewBroker[T any]() *Broker[T]

func (*Broker[T]) CloseAll added in v1.0.3

func (b *Broker[T]) CloseAll() error

func (*Broker[T]) CloseTopic added in v1.0.3

func (b *Broker[T]) CloseTopic(topic string) error

func (*Broker[T]) CreateTopic added in v1.0.3

func (b *Broker[T]) CreateTopic(ctx context.Context, topic string) error

func (*Broker[T]) Publish added in v1.0.3

func (b *Broker[T]) Publish(ctx context.Context, topic string, message T) error

func (*Broker[T]) Subscribe added in v1.0.3

func (b *Broker[T]) Subscribe(topic string) (<-chan T, error)

func (*Broker[T]) SubscribersCountAll added in v1.0.3

func (b *Broker[T]) SubscribersCountAll() (int, error)

func (*Broker[T]) SubscribersCountByTopic added in v1.0.3

func (b *Broker[T]) SubscribersCountByTopic(topic string) (int, error)

func (*Broker[T]) Unsubscribe added in v1.0.3

func (b *Broker[T]) Unsubscribe(topic string, ch <-chan T) error

type CacheMap added in v1.0.5

type CacheMap[K comparable, T any] struct {
	// contains filtered or unexported fields
}

func NewCacheMap added in v1.0.5

func NewCacheMap[K comparable, T any](f ReloadFunc[K, T], clearTick time.Duration) *CacheMap[K, T]

func (*CacheMap[K, T]) ClearCache added in v1.0.5

func (cm *CacheMap[K, T]) ClearCache()

func (*CacheMap[K, T]) Get added in v1.0.5

func (cm *CacheMap[K, T]) Get(key K) (T, bool)

func (*CacheMap[K, T]) RemoveCache added in v1.0.5

func (cm *CacheMap[K, T]) RemoveCache()

func (*CacheMap[K, T]) Set added in v1.0.5

func (cm *CacheMap[K, T]) Set(key K, value T)

type Circuit

type Circuit[T any] func(ctx context.Context) (T, error)

func Breaker

func Breaker[T any](circuit Circuit[T], waitDuration time.Duration, failureThreshold uint) Circuit[T]

func Debounce

func Debounce[T any](circuit Circuit[T], d time.Duration) Circuit[T]

type Effector

type Effector[T any] func(ctx context.Context) (T, error)

func Retry

func Retry[T any](effector Effector[T], retries int, delay time.Duration) Effector[T]

func Throttle

func Throttle[T any](e Effector[T], max, refill uint, d time.Duration) Effector[T]

type FilterFunc

type FilterFunc[T any] func(data T) bool

type Iterator

type Iterator[T any] struct {
	// contains filtered or unexported fields
}

func NewIterator

func NewIterator[T any](data []T) *Iterator[T]

func (*Iterator[T]) GetNext

func (i *Iterator[T]) GetNext() T

func (*Iterator[T]) HasNext

func (i *Iterator[T]) HasNext() bool

type Job

type Job[T any] func(in, out chan T)

type MutexBucket added in v1.0.3

type MutexBucket[T any] struct {
	// contains filtered or unexported fields
}

func NewMutexBucket added in v1.0.3

func NewMutexBucket[T any](elems [SegmentCount]*T) *MutexBucket[T]

func (*MutexBucket[T]) UnderLock added in v1.0.3

func (mb *MutexBucket[T]) UnderLock(identifier int64, f func(p *T) error) error

func (*MutexBucket[T]) UnderReadLock added in v1.0.3

func (mb *MutexBucket[T]) UnderReadLock(identifier int64, f func(p *T) error) error

type ReloadFunc added in v1.0.5

type ReloadFunc[K comparable, T any] func(oldCache map[K]T) map[K]T

type Topic added in v1.0.3

type Topic[T any] struct {
	// contains filtered or unexported fields
}

type WorkFunc

type WorkFunc[T any] func(data T) error

type WorkersCurator

type WorkersCurator[T any] struct {
	// contains filtered or unexported fields
}

func NewWorkersCurator

func NewWorkersCurator[T any](
	ctx context.Context,
	f WorkFunc[T],
	source <-chan T,
	workerCount int,
) *WorkersCurator[T]

func (*WorkersCurator[T]) GetErrCh

func (c *WorkersCurator[T]) GetErrCh() chan error

func (*WorkersCurator[T]) Wait

func (c *WorkersCurator[T]) Wait()

type WrappedFunc

type WrappedFunc func() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL