Documentation ¶
Overview ¶
Package stagedpipe offers a generic, concurrent and parallel pipeline based on a statemachine to process work. For N number of Stages in the StateMachine, N Stages can concurrently be processed. You can run pipelines in parallel. So X Pipelines with N Stages will have X*N Stages processing.
This library requires working knowledge of both the specific type of Go statemachine implementation and basic Go pipelining.
Full introduction including a hello world example can be found here: https://vimeo.com/879175351?share=copy
Please view the README.md for more detailed information on how to get started.
Every pipeline will receive a Request, which contains the data to be manipulated. Each Request is designed to be stack allocated, meaning the data should not be a pointer unless absolutely necessary.
You define a StateMachine object that satisfies the StateMachine interface. These states represent the stages of the pipeline. All StateMachine methods that implement a Stage MUST BE PUBLIC.
A RequestGroup represents a set of related Request(s) that should be processed together. A new RequestGroup can be created with Pipelines.NewRequestGroup().
Requests enter the Pipelines via the RequestGroup.Submit() method. Requests are received with RequestGroup.Out(), which returns a channel of Request(s).
Multiple RequestGroup(s) can send into the Pipelines for processing, as everything is muxed into the Pipelines and demuxed out to the RequestGroup.Out() channel.
There is a provided CLI application called `stagedpipe-cli“ located in the `tools/` directory that can be used to generate all the boilerplate you see below for a working example. You can install it like this:
``` go install github.com/gostdlib/concurrency/pipelines/stagedpipe/tools/stagedpipe-cli@latest ``` Simply enter into your new package's directory, and type: `stagedpipe-cli -m -p "[package root]/sm"` to get:
``` ├──myPipeline
├── main.go └──sm ├── data.go └── sm.go
``` Run `go mod init <path>`, `go mod tidy` and `go fmt ./...`, to get a running program: ``` ├──myPipeline
├── go.mod ├── go.sum ├── main.go └──sm ├── data.go └── sm.go
```
Type `go run .` to run the basic pipeline that you can change to fit your needs.
Here is an example that runs inside the playground: https://go.dev/play/p/zaiNU_kbp6_3
Here is an ETL pipeline example: https://github.com/johnsiilver/concurrency/pipelines/tree/main/stagedpipe/examples/etl/bostonFoodViolations/pipelined
A video introduciton to the ETL pipeline: https://player.vimeo.com/video/879203973?h=24035c0a82
Note: This package supports OTEL spans and will record information into OTEL spans if provided.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsErrCyclic ¶
IsErrCyclic returns true if the error is a cyclic error. A cyclic error is when a stage is called more than once in a single Request. This is only returned if the DAG() option is set.
Types ¶
type Error ¶
type Error struct { // Type is the type of error. Type string // Msg is the message of the error. Msg string }
Error represents a typed error that this package can return. Not all errors are of this type.
type IngestStats ¶
type IngestStats struct { // Min is the minimum running time for a Request. Min time.Duration // Avg is the avg running time for a Request. Avg time.Duration // Max is the maximim running time for a Request. Max time.Duration }
IngestStats detail how long a request waits for a Pipeline to be ready.
type Option ¶
Option is an option for the New() constructor.
func CountSubStages ¶
CountSubStages is used when the StateMachine object does not hold all the Stage(s). This allows you to design multiple pipleines that use the same data object but will be executed as a single pipeline. CountSubStages is used to correctly calculate the concurrency. Without this, only stages in the StateMachine object will be counted toward the concurrency count.
func DAG ¶
DAG makes the StateMachine a Directed Acyllic Graph. This means that no Stage can be called more than once in a single Request. If a Stage is called more than once, the request will exit with a cyclic error that can be detected with IsErrCyclic().
func DelayWarning ¶
DelayWarning will send a log message whenever pushing entries to the out channel takes longer than the supplied time.Duration. Not setting this results will result in no warnings. Useful when chaining Pipelines and figuring out where something is stuck.
func Ordered ¶
Ordered makes the Pipelines output requests in the order they are received by a request group. This can slow down output as it stores finished requests until older ones finish processing and are output.
func PreProcessors ¶
func PreProcessors[T any](p ...PreProcesor[T]) Option[T]
PreProcessors provides a set of functions that are called in order at each stage in the StateMachine. This is used to do work that is common to each stage instead of having to call the same code. Similar to http.HandleFunc wrapping techniques.
type Pipelines ¶
type Pipelines[T any] struct { // contains filtered or unexported fields }
Pipelines provides access to a set of Pipelines that processes DBD information.
func New ¶
func New[T any](name string, num int, sm StateMachine[T], options ...Option[T]) (*Pipelines[T], error)
New creates a new Pipelines object with "num" pipelines running in parallel. Each underlying pipeline runs concurrently for each stage. The first StateMachine.Start() in the list is the starting place for executions
func (*Pipelines[T]) Close ¶
func (p *Pipelines[T]) Close()
Close closes the ingestion of the Pipeline. No further Submit calls should be made. If called more than once Close will panic.
func (*Pipelines[T]) NewRequestGroup ¶
func (p *Pipelines[T]) NewRequestGroup() *RequestGroup[T]
NewRequestGroup returns a RequestGroup that can be used to process requests in this set of Pipelines.
type PreProcesor ¶
PreProcessor is called before each Stage. If req.Err is set execution of the Request in the StateMachine stops.
type Request ¶
type Request[T any] struct { // Ctx is a Context scoped for this requestor set of requests. Ctx context.Context // Data is data that is processed in this Request. Data T // Err, if set, is an error for the Request. This type of error is for unrecoverable // errors in processing, not errors for the data being processed. For example, if it // can't communicate with a database or RPC service. For errors with the data itself, // add the error to the underlying data type as a separate error. Err error // Next is the next stage to be executed. Must be set at each stage of a StateMachine. // If set to nil, exits the pipeline. Next Stage[T] // contains filtered or unexported fields }
Requests is a Request to be processed by a pipeline.
func (Request[T]) Event ¶
Event records an OTEL event into the Request span with name and keyvalues. This allows for stages in your statemachine to record events inside each stage. keyvalues must be an even number with every even value a string representing the key, with the following value representing the value associated with that key. The following values are supported:
- bool/[]bool - float64/[]float64 - int/[]int - int64/[]int64 - string/[]string - time.Duration/[]time.Duration
Note: This is a no-op if the Request is not recording.
type RequestGroup ¶
type RequestGroup[T any] struct { // Name is the name of the RequestGroup. This is used in OTEL tracing only and is not required. Name string // contains filtered or unexported fields }
RequestGroup provides in and out channels to send a group of related data into the Pipelines and receive the processed data. This allows multiple callers to multiplex onto the same Pipelines. A RequestGroup is created with Pipelines.NewRequestGroup().
func (*RequestGroup[T]) Close ¶
func (r *RequestGroup[T]) Close()
Close signals that the input is done and will wait for all Request objects to finish proceessing, then close the output channel. The owner of the RequestGroup is still required to pull all entries out of the RequestGroup via .Out() and until that occurs, Close() will not return.
func (*RequestGroup[T]) Out ¶
func (r *RequestGroup[T]) Out() chan Request[T]
Out returns a channel to receive Request(s) that have been processed. It is unsafe to close the output channel. Instead, use .Close() when all input has been sent and the output channel will close once all data has been processed. You MUST get all data from Out() until it closes, even if you run into an error. Otherwise the pipelines become stuck.
func (*RequestGroup[T]) Submit ¶
func (r *RequestGroup[T]) Submit(req Request[T]) error
Submit submits a new Request into the Pipelines. A Request with a nil Context will cause a panic.
type StateMachine ¶
type StateMachine[T any] interface { // Start is the starting Stage of the StateMachine. Start(req Request[T]) Request[T] // Close stops the StateMachine. Close() }
StateMachine represents a state machine where the methods that implement Stage are the States and execution starts with the Start() method.
type Stats ¶
type Stats struct { // Running is the number of currently running Request(s). Running int64 // Completed is the number of completed Request(s). Completed int64 // Min is the minimum running time for a Request. Min time.Duration // Avg is the avg running time for a Request. Avg time.Duration // Max is the maximim running time for a Request. Max time.Duration // IngestStats contains information on Pipeline ingestion. IngestStats IngestStats }
Stats are the stats for the Pipeline.
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
etl/bostonFoodViolations/pipelined/etl
Package etl contains the ETL statemachine for translating the boston food violations file into a postgres database.
|
Package etl contains the ETL statemachine for translating the boston food violations file into a postgres database. |
internal
|
|
queue
Package queue provides a simple queue primitive that blocks on Pop() until an entry is available and can block on Push() when a size limit is applied.
|
Package queue provides a simple queue primitive that blocks on Pop() until an entry is available and can block on Push() when a size limit is applied. |
testing
|
|
client
Package client provides a fake client to a fictional "identity" service to use in testing.
|
Package client provides a fake client to a fictional "identity" service to use in testing. |
tools
|
|