Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrWrongEpoch = errors.New("ring buffer: wrong epoch")
Returned from Push() when the supplied epoch does not match the buffer's current epoch.
Functions ¶
func GoWithCancel ¶
GoWithCancel runs a context-aware error-returning function on the provided |*errgroup.Group|. It passes the function a child context of the provided |ctx| and returns a |func()| to cancel that child context. If the provided function returns a |context.Canceled| error, then the |*errgroup.Group| will see a return of |ctx.Err()|, instead of the child context's |Err()|.
If the provided function returns or approrpiately wraps a |context.Canceled| error that it sees in processing, this function allows for dispatching cancelable work on an |*errgroup.Group| and canceling that work, without the |*errgroup.Group| itself seeing an |err| and canceling.
Types ¶
type ActionExecutor ¶
type ActionExecutor struct {
// contains filtered or unexported fields
}
ActionExecutor is designed for asynchronous workloads that should run when a new task is available. The closest analog would be to have a long-running goroutine that receives from a channel, however ActionExecutor provides three major points of differentiation. The first is that there is no need to close the queue, as goroutines automatically exit when the queue is empty. The second is that a concurrency parameter may be set, that will spin up goroutines as needed until the maximum number is attained. The third is that you don't have to declare the buffer size beforehand as with channels, allowing the queue to respond to demand. You may declare a max buffer though, for RAM-limited situations, which then blocks appends until the buffer is below the max given.
func NewActionExecutor ¶
func NewActionExecutor(ctx context.Context, action Action, concurrency uint32, maxBuffer uint64) *ActionExecutor
NewActionExecutor returns an ActionExecutor that will run the given action on each appended value, and run up to the max number of goroutines as defined by concurrency. If concurrency is 0, then it is set to 1. If maxBuffer is 0, then it is unlimited. Panics on a nil action.
func (*ActionExecutor) Execute ¶
func (aq *ActionExecutor) Execute(val interface{})
Execute adds the value to the end of the queue to be executed. If any action encountered an error before this call, then the value is not added and this returns immediately.
func (*ActionExecutor) WaitForEmpty ¶
func (aq *ActionExecutor) WaitForEmpty() error
WaitForEmpty waits until all the work that has been submitted before the call to |WaitForEmpty| has completed. It returns any errors that any actions may have encountered.
type AsyncReader ¶
type AsyncReader struct {
// contains filtered or unexported fields
}
AsyncReader is a TableReadCloser implementation that spins up a go routine to keep reading data into a buffer so that it is ready when the caller wants it.
func NewAsyncReader ¶
func NewAsyncReader(rf ReadFunc, bufferSize int) *AsyncReader
NewAsyncReader creates a new AsyncReader
func (*AsyncReader) Close ¶
func (asRd *AsyncReader) Close() error
Close releases resources being held
func (*AsyncReader) Read ¶
func (asRd *AsyncReader) Read() (interface{}, error)
ReadObject reads an object
type ReadFunc ¶
ReadFunc is a function that is called repeatedly in order to retrieve a stream of objects. When all objects have been been read from the stream then (nil, io.EOF) should be returned.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer is a dynamically sized ring buffer that is thread safe
func NewRingBuffer ¶
func NewRingBuffer(allocSize int) *RingBuffer
NewRingBuffer creates a new RingBuffer instance
func (*RingBuffer) Close ¶
func (rb *RingBuffer) Close() error
Close closes a RingBuffer so that no new items can be pushed onto it. Items that are already in the buffer can still be read via Pop and TryPop. Close will broadcast to all go routines waiting inside Pop
func (*RingBuffer) Pop ¶
func (rb *RingBuffer) Pop() (item interface{}, err error)
Pop will return the next item in the RingBuffer. If there are no items available, Pop will wait until a new item is pushed, or the RingBuffer is closed.
func (*RingBuffer) Push ¶
func (rb *RingBuffer) Push(item interface{}, epoch int) error
Push will add a new item to the RingBuffer and signal a go routine waiting inside Pop that new data is available
func (*RingBuffer) Reset ¶
func (rb *RingBuffer) Reset() int
Reset clears a ring buffer so that it can be reused
func (*RingBuffer) TryPop ¶
func (rb *RingBuffer) TryPop() (item interface{}, ok bool)
TryPop will return the next item in the RingBuffer. If there are no items available TryPop will return immediately with with `ok` set to false.