async

package
v0.0.0-...-eb02b72 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2019 License: MIT Imports: 10 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CLOSED = errors.New("the semaphore has been closed")
View Source
var QFULL = errors.New("the channel is full, nothing can be inserted until consumers catch up")
View Source
var UNIQUEUE_CLOSED = errors.New("this uniqueue is closed")

Functions

func TickerLog

func TickerLog(d time.Duration, ctx context.Context, logfn func())

Types

type DedupWorkerPool

type DedupWorkerPool struct {
	// contains filtered or unexported fields
}

This worker pool doesn't block on submit or repeat operations over the time period specified by the passed uniqueue

func NewDedupWorkerPool

func NewDedupWorkerPool(workers int, uniq *UniQueue, se SideEffectFn, parentCtx context.Context, qcanc context.CancelFunc) *DedupWorkerPool

func (*DedupWorkerPool) Close

func (w *DedupWorkerPool) Close()

func (*DedupWorkerPool) Submit

func (w *DedupWorkerPool) Submit(work string) error

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

func NewRateLimiter

func NewRateLimiter(maxparallel, maxpersecond int, ctx context.Context) *RateLimiter

func (*RateLimiter) Acquire

func (r *RateLimiter) Acquire()

func (*RateLimiter) Release

func (r *RateLimiter) Release()

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

func NewSemaphore

func NewSemaphore(max int, parentCtx context.Context) *Semaphore

func (*Semaphore) Acquire

func (s *Semaphore) Acquire() error

func (*Semaphore) Release

func (s *Semaphore) Release()

type SideEffectFn

type SideEffectFn func(string, context.Context)

A work function that has sideeffects rather than return values.

type SideEffectWorker

type SideEffectWorker struct {
	// contains filtered or unexported fields
}

func NewSideEffectWorker

func NewSideEffectWorker(ctx context.Context, parentwg *sync.WaitGroup, fn SideEffectFn) *SideEffectWorker

func (*SideEffectWorker) SetWorkChan

func (w *SideEffectWorker) SetWorkChan(ch <-chan string)

type Stream

type Stream interface {
	Insert(string) error
	GetChan() <-chan string
}

A Stream is a struct with a non-blocking Insert and a GetChan that returns a read-only channel When the insert would have blocked, or the stream is closed an error is returned. Streams should likely be closed by context. String was chosen because it can be a single arg, a list of delimited args, or json

type UniQueue

type UniQueue struct {
	// contains filtered or unexported fields
}

func NewUniQueue

func NewUniQueue(name string, maxdedup, maxinflight int, deduptime time.Duration, parentCtx context.Context) *UniQueue

func (*UniQueue) Check

func (c *UniQueue) Check(s string) bool

Checks if the value exists in the cache most callers should call insert and read the error

func (*UniQueue) GetChan

func (c *UniQueue) GetChan() <-chan string

GetChan returns the output channel, used by n consumers to consume deduped records.

func (*UniQueue) Insert

func (c *UniQueue) Insert(s string) error

Insert attempts to push another string to the channel. checks for existence, chan and cache sizes before writing to the internal inflight channel.

func (*UniQueue) UnCache

func (c *UniQueue) UnCache(s string)

Uncaches a specific dedup cache entry, which unblocks it from flowing through the channel

type WorkerPool

type WorkerPool interface {
	Submit(string) error
	Close()
}

A Pool is a struct that performs a preset operation given the submitted string Submit is non-blocking. It returns an error if it would have blocked, if the pool has been closed or if a submission error occurs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL