Documentation ¶
Index ¶
- Constants
- Variables
- func RunToError(fns ...func() error) error
- type AutoflushBuffer
- func (ab *AutoflushBuffer) Add(obj interface{})
- func (ab *AutoflushBuffer) AddMany(objs ...interface{})
- func (ab *AutoflushBuffer) Flush()
- func (ab *AutoflushBuffer) FlushAsync()
- func (ab *AutoflushBuffer) Interval() time.Duration
- func (ab *AutoflushBuffer) MaxLen() int
- func (ab *AutoflushBuffer) NotifyStarted() <-chan struct{}
- func (ab *AutoflushBuffer) NotifyStopped() <-chan struct{}
- func (ab *AutoflushBuffer) ShouldFlushOnAbort() bool
- func (ab *AutoflushBuffer) Start() error
- func (ab *AutoflushBuffer) Stop() error
- func (ab *AutoflushBuffer) WithFlushHandler(handler func(objs []interface{})) *AutoflushBuffer
- func (ab *AutoflushBuffer) WithFlushOnAbort(should bool) *AutoflushBuffer
- type Interval
- func (i *Interval) Action() func() error
- func (i *Interval) Delay() time.Duration
- func (i *Interval) Errors() chan error
- func (i Interval) Interval() time.Duration
- func (i *Interval) IsRunning() bool
- func (i *Interval) Latch() *Latch
- func (i *Interval) NotifyStarted() <-chan struct{}
- func (i *Interval) NotifyStopped() <-chan struct{}
- func (i *Interval) Start() error
- func (i *Interval) Stop() error
- func (i *Interval) WithAction(action func() error) *Interval
- func (i *Interval) WithDelay(d time.Duration) *Interval
- func (i *Interval) WithErrors(errors chan error) *Interval
- func (i *Interval) WithInterval(d time.Duration) *Interval
- type Latch
- func (l *Latch) CanStart() bool
- func (l *Latch) CanStop() bool
- func (l *Latch) IsRunning() bool
- func (l *Latch) IsStarting() bool
- func (l *Latch) IsStopped() (isStopped bool)
- func (l *Latch) IsStopping() bool
- func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})
- func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})
- func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})
- func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})
- func (l *Latch) Reset()
- func (l *Latch) Started()
- func (l *Latch) Starting()
- func (l *Latch) Stopped()
- func (l *Latch) Stopping()
- type QueueWorker
- func (qw *QueueWorker) Enqueue(obj interface{})
- func (qw *QueueWorker) ErrorCollector() chan error
- func (qw *QueueWorker) Latch() *Latch
- func (qw *QueueWorker) MaxWork() int
- func (qw *QueueWorker) Start()
- func (qw *QueueWorker) Stop()
- func (qw *QueueWorker) WithErrorCollector(errors chan error) *QueueWorker
- func (qw *QueueWorker) WithMaxWork(maxWork int) *QueueWorker
Constants ¶
const ( // LatchStopped is a latch lifecycle state. LatchStopped int32 = 0 // LatchStarting is a latch lifecycle state. LatchStarting int32 = 1 // LatchRunning is a latch lifecycle state. LatchRunning int32 = 2 // LatchStopping is a latch lifecycle state. LatchStopping int32 = 3 )
const (
// DefaultQueueWorkerMaxWork is the maximum number of work items before queueing blocks.
DefaultQueueWorkerMaxWork = 1 << 10
)
Variables ¶
var ( ErrCannotStart exception.Class = "cannot start; already started" ErrCannotStop exception.Class = "cannot stop; already stopped" )
Errors
Functions ¶
func RunToError ¶
RunToError runs a given set of functions until one of them panics or errors. It is useful when you need to start multiple servers and exit if any of them crashes.
Types ¶
type AutoflushBuffer ¶
type AutoflushBuffer struct {
// contains filtered or unexported fields
}
AutoflushBuffer is a backing store that operates either on a fixed length flush or a fixed interval flush. A handler should be provided but without one the buffer will just clear. Adds that would cause fixed length flushes do not block on the flush handler.
func NewAutoflushBuffer ¶
func NewAutoflushBuffer(maxLen int, interval time.Duration) *AutoflushBuffer
NewAutoflushBuffer creates a new autoflush buffer.
func (*AutoflushBuffer) Add ¶
func (ab *AutoflushBuffer) Add(obj interface{})
Add adds a new object to the buffer, blocking if it triggers a flush. If the buffer is full, it will call the flush handler on a separate goroutine.
func (*AutoflushBuffer) AddMany ¶
func (ab *AutoflushBuffer) AddMany(objs ...interface{})
AddMany adds many objects to the buffer at once.
func (*AutoflushBuffer) Flush ¶
func (ab *AutoflushBuffer) Flush()
Flush clears the buffer, if a handler is provided it is passed the contents of the buffer. This call is synchronous, in that it will call the flush handler on the same goroutine.
func (*AutoflushBuffer) FlushAsync ¶
func (ab *AutoflushBuffer) FlushAsync()
FlushAsync clears the buffer, if a handler is provided it is passed the contents of the buffer. This call is asynchronous, in that it will call the flush handler on its own goroutine.
func (*AutoflushBuffer) Interval ¶
func (ab *AutoflushBuffer) Interval() time.Duration
Interval returns the flush interval.
func (*AutoflushBuffer) MaxLen ¶
func (ab *AutoflushBuffer) MaxLen() int
MaxLen returns the maximum buffer length before a flush is triggered.
func (*AutoflushBuffer) NotifyStarted ¶ added in v0.3.1
func (ab *AutoflushBuffer) NotifyStarted() <-chan struct{}
NotifyStarted returns the started signal.
func (*AutoflushBuffer) NotifyStopped ¶ added in v0.3.1
func (ab *AutoflushBuffer) NotifyStopped() <-chan struct{}
NotifyStopped returns the started stopped.
func (*AutoflushBuffer) ShouldFlushOnAbort ¶
func (ab *AutoflushBuffer) ShouldFlushOnAbort() bool
ShouldFlushOnAbort returns if the buffer will do one final flush on abort.
func (*AutoflushBuffer) Start ¶
func (ab *AutoflushBuffer) Start() error
Start starts the buffer flusher.
func (*AutoflushBuffer) Stop ¶
func (ab *AutoflushBuffer) Stop() error
Stop stops the buffer flusher.
func (*AutoflushBuffer) WithFlushHandler ¶
func (ab *AutoflushBuffer) WithFlushHandler(handler func(objs []interface{})) *AutoflushBuffer
WithFlushHandler sets the buffer flush handler and returns a reference to the buffer.
func (*AutoflushBuffer) WithFlushOnAbort ¶
func (ab *AutoflushBuffer) WithFlushOnAbort(should bool) *AutoflushBuffer
WithFlushOnAbort sets if we should flush on aborts or not. This defaults to true.
type Interval ¶
type Interval struct {
// contains filtered or unexported fields
}
Interval is a managed goroutine that does things.
func NewInterval ¶
NewInterval returns a new worker that runs an action on an interval.
func (*Interval) NotifyStarted ¶ added in v0.3.1
func (i *Interval) NotifyStarted() <-chan struct{}
NotifyStarted returns the notify started signal.
func (*Interval) NotifyStopped ¶ added in v0.3.1
func (i *Interval) NotifyStopped() <-chan struct{}
NotifyStopped returns the notify stopped signal.
func (*Interval) WithAction ¶
WithAction sets the interval action.
func (*Interval) WithErrors ¶
WithErrors returns the error channel.
type Latch ¶
Latch is a helper to coordinate goroutine lifecycles. The lifecycle is generally as follows. 0 - stopped / idle 1 - starting 2 - running 3 - stopping goto 0 Each state includes a transition notification, i.e. `Starting()` triggers `NotifyStarting`
func (*Latch) IsStarting ¶
IsStarting indicates the latch is waiting to be scheduled.
func (*Latch) IsStopping ¶
IsStopping returns if the latch is waiting to finish stopping.
func (*Latch) NotifyStarted ¶
func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})
NotifyStarted returns the started signal. It is used to coordinate the transition from starting -> started.
func (*Latch) NotifyStarting ¶
func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})
NotifyStarting returns the started signal. It is used to coordinate the transition from stopped -> starting.
func (*Latch) NotifyStopped ¶
func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})
NotifyStopped returns the stopped signal. It is used to coordinate the transition from stopping -> stopped.
func (*Latch) NotifyStopping ¶
func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})
NotifyStopping returns the should stop signal. It is used to trigger the transition from running -> stopping -> stopped.
func (*Latch) Started ¶
func (l *Latch) Started()
Started signals that the latch is started and has entered the `IsRunning` state.
type QueueWorker ¶
type QueueWorker struct {
// contains filtered or unexported fields
}
QueueWorker is a worker that is pushed work over a channel.
func NewQueue ¶
func NewQueue(action func(interface{}) error) *QueueWorker
NewQueue returns a new queue worker.
func (*QueueWorker) Enqueue ¶
func (qw *QueueWorker) Enqueue(obj interface{})
Enqueue adds an item to the work queue.
func (*QueueWorker) ErrorCollector ¶
func (qw *QueueWorker) ErrorCollector() chan error
ErrorCollector returns a channel to read action errors from.
func (*QueueWorker) MaxWork ¶
func (qw *QueueWorker) MaxWork() int
MaxWork returns the maximum work.
func (*QueueWorker) WithErrorCollector ¶
func (qw *QueueWorker) WithErrorCollector(errors chan error) *QueueWorker
WithErrorCollector returns the error channel.
func (*QueueWorker) WithMaxWork ¶
func (qw *QueueWorker) WithMaxWork(maxWork int) *QueueWorker
WithMaxWork sets the worker max work.