piper

package module
v0.0.0-...-8b8c573 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2020 License: MIT Imports: 5 Imported by: 0

README

GoDoc Circle CI Coverage Status

piper

Piper is a customizable data pipeline processing library written in Go.

Piper abstracts the components of a data pipeline into chained processes, which run concurrent batch jobs to execute user-supplied callback functions. Each successful job within a batch moves through the pipeline to the next process until completion. Each failed job within a batch gets retried up to the maximum number of retry attempts. Failed jobs that have exceed the maximum number of retry attempts are passed to a user-supplied callback function, which could be sent to a dead-letter queue for instance.

Getting Started

Installation

Install the piper library with the following command:

go install github.com/jasonyunicorn/piper

Then import the piper package in your application as such:

import "github.com/jasonyunicorn/piper"
Usage

To create a batch process, do the following:

  1. define a data structure (containing the fields pertinent to the data pipeline) which implements the piper.DataIF interface
  2. define a batch function which implements the piper.BatchExecutable interface
  3. use the piper.NewProcess API to create a new batch process
  4. initialize the process using the process.Start function
  5. enqueue data for processing by using the process.ProcessData function

To create a pipeline (two or more processes chained together), do the following:

  1. follow steps 1-3 above to define a new process for each process in the pipeline
  2. use the piper.NewPipeline API to create a new pipeline
  3. initialize the process using the process.Start()function
  4. enqueue data for processing by using the process.ProcessData function

Tests

There are no dependencies required to run any of the unit tests.

To run the tests from the command line:

go test -v -cover -race github.com/jyu617/piper

Contributing

Pull requests for bug fixes, new features or performance enhancements are welcome!

Versioning

This project follows the semantic versioning guidelines for versioning. For the versions available, see the tags on this repository.

Authors

  • Jason Yu

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Documentation

Index

Constants

View Source
const (
	// default number of concurrent workers processing batch jobs
	DEFAULT_CONCURRENCY int = 5

	// default maximum number of items to queue for the process
	DEFAULT_QUEUE_DEPTH int = 10000

	// default maximum duration to wait to fill a batch before processing what has been batched
	DEFAULT_BATCH_TIMEOUT time.Duration = time.Second

	// default maximum number of items to process in a batch
	DEFAULT_MAX_BATCH_SIZE int = 500

	// default maximum number of retries to attempt to before calling a failure callback function
	DEFAULT_MAX_RETRIES int = 10

	// default maximum frequency of batch function calls
	DEFAULT_RATE_LIMIT rate.Limit = rate.Inf
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchExecFn

type BatchExecFn func(context.Context, []DataIF) (map[string]error, error)

BatchExecFn is a method signature which defines the expectations of a BatchExecutable Execute function

type BatchExecutable

type BatchExecutable interface {
	Execute(context.Context, []DataIF) (map[string]error, error)
}

BatchExecutable is an interface which exposes the Execute method, which is the user-defined batch execution call

type DataIF

type DataIF interface {
	GetID() string
}

DataIF is an interface for a data struct, the user-defined, fundamental element of the pipeline.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is an object used for chaining multiple Processes together sequentially

func NewPipeline

func NewPipeline(name string, processes []*Process, fns ...PipelineOptionFn) (*Pipeline, error)

NewPipeline creates a pointer to a Pipeline

func (*Pipeline) ProcessData

func (p *Pipeline) ProcessData(data DataIF)

ProcessData puts data on the queue for batch processing. The processing is a synchronous operation, so the method returns as soon as the job is put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.

func (*Pipeline) ProcessDataAsync

func (p *Pipeline) ProcessDataAsync(data DataIF)

ProcessDataAsync puts data on the queue for batch processing and waits for the job to finish before returning. It only makes sense to use this method if there is one data point to process. To optimize performance when using this method, set the maxBatchSize to 1.

func (*Pipeline) ProcessDatum

func (p *Pipeline) ProcessDatum(datum []DataIF)

ProcessDatum puts all data on the queue for batch processing. The process is a synchronous operation, so the method returns as soon as the jobs are put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.

func (*Pipeline) ProcessDatumAsync

func (p *Pipeline) ProcessDatumAsync(datum []DataIF)

ProcessDatumAsync puts all data on the queue for batch processing and waits until all data has been processed.

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context)

Start is used to trigger the Pipeline's startup sequence

func (*Pipeline) Stop

func (p *Pipeline) Stop(ctx context.Context)

Stop is used to trigger the Pipeline's shutdown sequence

type PipelineOptionFn

type PipelineOptionFn func(p *Pipeline)

PipelineOptionFn is a method signature used for configuring the configurable fields of Pipeline

func PipelineWithWaitGroup

func PipelineWithWaitGroup(wg *sync.WaitGroup) PipelineOptionFn

type Process

type Process struct {
	// contains filtered or unexported fields
}

Process is an object used for managing the execution of batch jobs amongst multiple concurrent workers

func NewProcess

func NewProcess(name string, batchExec BatchExecutable, fns ...ProcessOptionFn) *Process

NewProcess creates a pointer to a Process

func (*Process) ProcessData

func (p *Process) ProcessData(data DataIF)

ProcessData puts data on the queue for batch processing. The processing is a synchronous operation, so the method returns as soon as the job is put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.

func (*Process) ProcessDataAsync

func (p *Process) ProcessDataAsync(data DataIF)

ProcessDataAsync puts data on the queue for batch processing and waits for the job to finish before returning. It only makes sense to use this method if there is one data point to process. To optimize performance when using this method, set the maxBatchSize to 1.

func (*Process) ProcessDatum

func (p *Process) ProcessDatum(datum []DataIF)

ProcessDatum puts all data on the queue for batch processing. The process is a synchronous operation, so the method returns as soon as the jobs are put on the queue, which should be almost instantly assuming the number of jobs in the queue is less than the queue depth.

func (*Process) ProcessDatumAsync

func (p *Process) ProcessDatumAsync(datum []DataIF)

ProcessDatumAsync puts all data on the queue for batch processing and waits until all data has been processed.

func (*Process) Start

func (p *Process) Start(ctx context.Context)

Start is used to trigger the Process's startup sequence

func (*Process) Stop

func (p *Process) Stop(ctx context.Context)

Start is used to trigger the Process's shutdown sequence

type ProcessFn

type ProcessFn func(DataIF) []DataIF

ProcessFn is a method signature which defines the expectations of the OnSuccess and OnFailure callback functions

type ProcessOptionFn

type ProcessOptionFn func(p *Process)

ProcessOptionFn is a method signature used for configuring the configurable fields of Process

func ProcessWithBatchTimeout

func ProcessWithBatchTimeout(timeout time.Duration) ProcessOptionFn

ProcessWithBatchTimeout is an option function for configuring the Process's batchTimeout

func ProcessWithConcurrency

func ProcessWithConcurrency(concurrency int) ProcessOptionFn

ProcessWithConcurrency is an option function for configuring the Process's concurrency

func ProcessWithMaxBatchSize

func ProcessWithMaxBatchSize(size int) ProcessOptionFn

ProcessWithMaxBatchSize is an option function for configuring the Process's maxBatchSize

func ProcessWithMaxRetries

func ProcessWithMaxRetries(retries int) ProcessOptionFn

ProcessWithMaxRetries is an option function for configuring the Process's maxRetries

func ProcessWithOnFailureFns

func ProcessWithOnFailureFns(fns ...ProcessFn) ProcessOptionFn

ProcessWithOnFailureFn is an option function for configuring the Process's onFailureFn

func ProcessWithOnSuccessFns

func ProcessWithOnSuccessFns(fns ...ProcessFn) ProcessOptionFn

ProcessWithOnSuccessFn is an option function for configuring the Process's onSuccessFn

func ProcessWithQueueDepth

func ProcessWithQueueDepth(depth int) ProcessOptionFn

ProcessWithQueueDepth is an option function for configuring the Process's queueDepth

func ProcessWithRateLimit

func ProcessWithRateLimit(limit rate.Limit) ProcessOptionFn

ProcessWithRateLimit is an option function for configuring the Process's rateLimit

func ProcessWithWaitGroup

func ProcessWithWaitGroup(wg *sync.WaitGroup) ProcessOptionFn

ProcessWithWaitGroup is an option function for configuring the Process's onFailureFn

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL