Documentation ¶
Index ¶
- Constants
- type BatchExecFn
- type BatchExecutable
- type DataIF
- type Pipeline
- type PipelineOptionFn
- type Process
- type ProcessFn
- type ProcessOptionFn
- func ProcessWithBatchTimeout(timeout time.Duration) ProcessOptionFn
- func ProcessWithConcurrency(concurrency int) ProcessOptionFn
- func ProcessWithMaxBatchSize(size int) ProcessOptionFn
- func ProcessWithMaxRetries(retries int) ProcessOptionFn
- func ProcessWithOnFailureFns(fns ...ProcessFn) ProcessOptionFn
- func ProcessWithOnSuccessFns(fns ...ProcessFn) ProcessOptionFn
- func ProcessWithQueueDepth(depth int) ProcessOptionFn
- func ProcessWithRateLimit(limit rate.Limit) ProcessOptionFn
- func ProcessWithWaitGroup(wg *sync.WaitGroup) ProcessOptionFn
Constants ¶
const ( // default number of concurrent workers processing batch jobs DEFAULT_CONCURRENCY int = 5 // default maximum number of items to queue for the process DEFAULT_QUEUE_DEPTH int = 10000 // default maximum duration to wait to fill a batch before processing what has been batched DEFAULT_BATCH_TIMEOUT time.Duration = time.Second // default maximum number of items to process in a batch DEFAULT_MAX_BATCH_SIZE int = 500 // default maximum number of retries to attempt to before calling a failure callback function DEFAULT_MAX_RETRIES int = 10 // default maximum frequency of batch function calls DEFAULT_RATE_LIMIT rate.Limit = rate.Inf )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchExecFn ¶
BatchExecFn is a method signature which defines the expectations of a BatchExecutable Execute function
type BatchExecutable ¶
BatchExecutable is an interface which exposes the Execute method, which is the user-defined batch execution call
type DataIF ¶
type DataIF interface {
GetID() string
}
DataIF is an interface for a data struct, the user-defined, fundamental element of the pipeline.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is an object used for chaining multiple Processes together sequentially
func NewPipeline ¶
func NewPipeline(name string, processes []*Process, fns ...PipelineOptionFn) (*Pipeline, error)
NewPipeline creates a pointer to a Pipeline
func (*Pipeline) ProcessData ¶
ProcessData puts data on the queue for batch processing. The processing is a synchronous operation, so the method returns as soon as the job is put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.
func (*Pipeline) ProcessDataAsync ¶
ProcessDataAsync puts data on the queue for batch processing and waits for the job to finish before returning. It only makes sense to use this method if there is one data point to process. To optimize performance when using this method, set the maxBatchSize to 1.
func (*Pipeline) ProcessDatum ¶
ProcessDatum puts all data on the queue for batch processing. The process is a synchronous operation, so the method returns as soon as the jobs are put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.
func (*Pipeline) ProcessDatumAsync ¶
ProcessDatumAsync puts all data on the queue for batch processing and waits until all data has been processed.
type PipelineOptionFn ¶
type PipelineOptionFn func(p *Pipeline)
PipelineOptionFn is a method signature used for configuring the configurable fields of Pipeline
func PipelineWithWaitGroup ¶
func PipelineWithWaitGroup(wg *sync.WaitGroup) PipelineOptionFn
type Process ¶
type Process struct {
// contains filtered or unexported fields
}
Process is an object used for managing the execution of batch jobs amongst multiple concurrent workers
func NewProcess ¶
func NewProcess(name string, batchExec BatchExecutable, fns ...ProcessOptionFn) *Process
NewProcess creates a pointer to a Process
func (*Process) ProcessData ¶
ProcessData puts data on the queue for batch processing. The processing is a synchronous operation, so the method returns as soon as the job is put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.
func (*Process) ProcessDataAsync ¶
ProcessDataAsync puts data on the queue for batch processing and waits for the job to finish before returning. It only makes sense to use this method if there is one data point to process. To optimize performance when using this method, set the maxBatchSize to 1.
func (*Process) ProcessDatum ¶
ProcessDatum puts all data on the queue for batch processing. The process is a synchronous operation, so the method returns as soon as the jobs are put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.
func (*Process) ProcessDatumAsync ¶
ProcessDatumAsync puts all data on the queue for batch processing and waits until all data has been processed.
type ProcessFn ¶
ProcessFn is a method signature which defines the expectations of the OnSuccess and OnFailure callback functions
type ProcessOptionFn ¶
type ProcessOptionFn func(p *Process)
ProcessOptionFn is a method signature used for configuring the configurable fields of Process
func ProcessWithBatchTimeout ¶
func ProcessWithBatchTimeout(timeout time.Duration) ProcessOptionFn
ProcessWithBatchTimeout is an option function for configuring the Process's batchTimeout
func ProcessWithConcurrency ¶
func ProcessWithConcurrency(concurrency int) ProcessOptionFn
ProcessWithConcurrency is an option function for configuring the Process's concurrency
func ProcessWithMaxBatchSize ¶
func ProcessWithMaxBatchSize(size int) ProcessOptionFn
ProcessWithMaxBatchSize is an option function for configuring the Process's maxBatchSize
func ProcessWithMaxRetries ¶
func ProcessWithMaxRetries(retries int) ProcessOptionFn
ProcessWithMaxRetries is an option function for configuring the Process's maxRetries
func ProcessWithOnFailureFns ¶
func ProcessWithOnFailureFns(fns ...ProcessFn) ProcessOptionFn
ProcessWithOnFailureFn is an option function for configuring the Process's onFailureFn
func ProcessWithOnSuccessFns ¶
func ProcessWithOnSuccessFns(fns ...ProcessFn) ProcessOptionFn
ProcessWithOnSuccessFn is an option function for configuring the Process's onSuccessFn
func ProcessWithQueueDepth ¶
func ProcessWithQueueDepth(depth int) ProcessOptionFn
ProcessWithQueueDepth is an option function for configuring the Process's queueDepth
func ProcessWithRateLimit ¶
func ProcessWithRateLimit(limit rate.Limit) ProcessOptionFn
ProcessWithRateLimit is an option function for configuring the Process's rateLimit
func ProcessWithWaitGroup ¶
func ProcessWithWaitGroup(wg *sync.WaitGroup) ProcessOptionFn
ProcessWithWaitGroup is an option function for configuring the Process's onFailureFn