Documentation ¶
Overview ¶
Package orchestrator provides orchestration and supervision of data pipelines.
These pipelines are made up of inputs and processes; an input is a long running function which listens to events (such as database triggers, or kafka topics, or webhooks, or anything really) and a process is a job (such as a docker container, or webhook dispatcher, or kubernetes job, or anything that runs once) that does something with that event.
By building a system of inputs, processes, and persistence layers it becomes easy to build sophisticated data pipelines.
For instance, consider the ecommerce analytics pipeline where:
1. A user places an order 2. A backend microservice of some sort places that order into a kafka topic for back-office processing 3. An analytics consumer slurps these messages into a datawarehouse somewhere 4. A write to this datawarehouse is picked up by an input 5. That input triggers a process which runs a series of validations and enrichement processes 6. The process then writes the enriched, validated to a different table in the warehouse 7. That write triggers a different input, which listens for enriched data 8. That input triggers a process which does some kind of final cleansing for gold standard reporting
In effect, this gives us:
[svc] -> (topic() -> [consumer] -> {data warehouse} -> [input -> [encrichment process]] -> {data warehouse} -> [input -> [reporting process]] -> {reporting system}
Or, if you like, a way of building a lightweight [medallion architecture](https://www.databricks.com/glossary/medallion-architecture)
When should you use this package? ¶
This package is useful for building customised data pipeline orchestrators, and for building customised components (where off the shelf components, such as Databricks, are fiddly or unable to be customised to the same level).
This package is also useful for running pipelines cheaply, or locally- it requires no outside service (unless you write that into your own service), and doesn't need complicated masters/worker configurations or anything else really
When should you _not_ use this package? ¶
This package will not give you the same things that off the shelf tools, such as databricks, will give you. There's no easy way to see DAGs, no simple API for updating configuration (unless you write your own).
This package wont do a lot of what you might need; it exists to serve as the engine of a pipeline tool; you must build the rest yourself.
Index ¶
- Variables
- type ContainerImageMissingErr
- type ContainerNonZeroExit
- type ContainerProcess
- type Event
- type Input
- type InputConfig
- type NewInputFunc
- type NewProcessFunc
- type Operation
- type Orchestrator
- type PostgresInput
- type Process
- type ProcessConfig
- type ProcessExitStatus
- type ProcessInterfaceConversionError
- type ProcessStatus
- type UnknownProcessError
Constants ¶
This section is empty.
Variables ¶
var ConcurrentProcessors int64 = 8
ConcurrentProcessors limits the number of processes which can be kicked off at once
Functions ¶
This section is empty.
Types ¶
type ContainerImageMissingErr ¶
type ContainerImageMissingErr struct{}
ContainerImageMissingErr is returned when the ExecutionContext passed to NewContainerProcess doesn't contain tke key "image"
To fix this, ensure that a container image is set
func (ContainerImageMissingErr) Error ¶
func (e ContainerImageMissingErr) Error() string
Error implements the error interface and returns a contextual message
This error, while simple and (at least on the face of it) an over-engineered version of fmt.Errorf("container image missing"), is verbosely implemented so that callers may use errors.Is(err, orchestrator.ContainerImageMissingErr) to handle error cases better
type ContainerNonZeroExit ¶
type ContainerNonZeroExit int64
ContainerNonZeroExit is returned when the container exists with anything other than exit code 0
Container logs should shed light on what went wrong
func (ContainerNonZeroExit) Error ¶
func (e ContainerNonZeroExit) Error() string
Error returns the error message associated with this error
type ContainerProcess ¶
type ContainerProcess struct {
// contains filtered or unexported fields
}
ContainerProcess allows for processes to be run via a container
func NewContainerProcess ¶
func NewContainerProcess(conf ProcessConfig) (c ContainerProcess, err error)
NewContainerProcess connects to a container socket, and returns a ContainerProcess which can be then used to run jobs
func (ContainerProcess) ID ¶
func (c ContainerProcess) ID() string
ID returns a unique ID for a process manager
func (ContainerProcess) Run ¶
func (c ContainerProcess) Run(ctx context.Context, e Event) (ps ProcessStatus, err error)
Run takes an Event, and passes it to a container to run
type Event ¶
type Event struct { // Location could be a table name, a topic, an event from some store, // or anything really- it is up to both the Input and the Process to // agree on what this means Location string `json:"location"` Operation Operation `json:"operation"` ID string `json:"id"` // Trigger is the name or ID of the Input which triggers this // process, which can be useful for routing/ flow control in // triggers Trigger string `json:"trigger"` }
Event represents basic metadata that each Input provides
type Input ¶
type Input interface { // Handle inputs from this input source, creating Events and // streaming down the Event channel Handle(context.Context, chan Event) error ID() string }
Input is a simple interface, and exposes a long running process called Handle which is expected to stream Events.
It is the job of the Orchestrator to understand which channel is assigned to which input and to route messages accordingly
type InputConfig ¶
type InputConfig struct { Name string `toml:"name"` Type string `toml:"type"` ConnectionString string `toml:"connection_string"` Operations []Operation `toml:"operation"` }
InputConfig contains the necessary values for coniguring an Input, such as how to connect to the input source, and the operations the input supports
func (InputConfig) ID ¶
func (ic InputConfig) ID() string
ID returns a (hopefully) unique value for this InputConfig
type NewInputFunc ¶ added in v0.1.1
type NewInputFunc func(InputConfig) (Input, error)
NewInputFunc is the suggested function that an Input should be instantiated with and, as such, can be used when creating a registry of Inputs an orchestrator supports when creating Inputs dynamically say from a config file, or from an API.
For instance:
var inputs = map[string]orchestrator.NewInputFunc{ "postgres": orchestrator.NewPostgresInput, "webhook": webhooks.NewWebhookInput, } func createInput(cfg orchestrator.InputConfig) (orchestrator.Input, error) { return inputs[cfg.Type](cfg) }
type NewProcessFunc ¶ added in v0.1.1
type NewProcessFunc func(ProcessConfig) (Process, error)
NewProcessFunc is the suggested function that an Process should be instantiated with and, as such, can be used when creating a registry of Processs an orchestrator supports when creating Processs dynamically say from a config file, or from an API.
For instance:
var processs = map[string]orchestrator.NewProcessFunc{ "docker": orchestrator.NewContainerProcess, "webhook": webhooks.NewWebhookProcess, } func createProcess(cfg orchestrator.ProcessConfig) (orchestrator.Process, error) { return processs[cfg.Type](cfg) }
type Operation ¶
type Operation uint8
Operation represents one of the basic CRUD operations on a piece of data and can be used in Inputs to do clever things around ignoring certain events
const ( OperationUnknown Operation = iota OperationCreate OperationRead OperationUpdate OperationDelete )
Supported set of operations
func (Operation) MarshalJSON ¶
MarshalJSON implements the json.Marshaler interface which allows an Operation to be represented in json (which is really a json string)
func (Operation) MarshalText ¶
MarshalText implements the encoding.TextMarshaler interface in order to get a textual representation of an Operation
func (Operation) String ¶
String returns the string representation of an Operation, or "unknown" for any Operation value it doesn't know about
func (*Operation) UnmarshalJSON ¶
UnmarshalJSON implements the json.Unmarshaler interface, allowing for the operation type to be represented in json properly
func (*Operation) UnmarshalText ¶
UnmarshalText implements the encoding.TextUnmarshaler interface allowing for a byte slice containing certain crud operations to be cast to Operations
type Orchestrator ¶
type Orchestrator struct { *dag.DAG ErrorChan chan error // contains filtered or unexported fields }
Orchestrator is the workhorse of this package. It:
- Supervises inputs
- Manages the lifecycle of processes, which run on events
- Syncs events from inputs across multiple processes in a DAG
Multiple Orchestrators _can_ be run, like in a cluster, but out of the box doesn't contain any logic to synchronise inputs and/or processes that wont cluster natively (such as the postgres sample input)
func (*Orchestrator) AddInput ¶
func (d *Orchestrator) AddInput(ctx context.Context, i Input) (err error)
AddInput takes an Input, adds it to the Orchestrator's DAG, and runs it ready for events to flow through
AddInput will error when duplicate input IDs are specified. Any other error from the running of an Input comes via the Orchestrator's ErrorChan - this is because Inputs are run in separate goroutines
func (Orchestrator) AddLink ¶
func (d Orchestrator) AddLink(input Input, process Process) (err error)
AddLink accepts an Input and a Process, and links them so that when the input triggers an event, the specified process is called
func (Orchestrator) AddProcess ¶
func (d Orchestrator) AddProcess(p Process) error
AddProcess adds a Process to the Orchestrator's DAG, ready to be triggered by Inputs.
Processes are not run until an Event is generated by an Input, and that Input is linked to the specified Process.
This means long running processes with state should either be re-architected to use some kind of persistence level, or should be a separate service which exposes (say) a webhook or similar trigger
type PostgresInput ¶
type PostgresInput struct {
// contains filtered or unexported fields
}
PostgresInput represents a sample postgres input source
This source will:
- Create a function which notifies a channel with a json payload representing an operation
- Add a trigger to every table in a database to call that function on Creat, Update, and Deletes
- Listen to the channel created in step 1
The operations passed by the database can then be passed to a Process
func NewPostgresInput ¶
func NewPostgresInput(ic InputConfig) (p PostgresInput, err error)
NewPostgresInput accepts an InputConfig and returns a PostgresInput, which implements the orchestrator.Input interface
The InputConfig.ConnectionString argument can be a DSN, or a postgres URL
type Process ¶
Process is an interface which processes must implement
The inteface is pretty simple: given a
type ProcessConfig ¶
type ProcessConfig struct { Name string `toml:"name"` Type string `toml:"type"` ExecutionContext map[string]string `toml:"execution_context"` }
ProcessConfig contains configuration options for processes, including an unkeyed map[string]string for arbitrary values
func (ProcessConfig) ID ¶
func (pc ProcessConfig) ID() string
ID returns a (hopefully) unique value for this ProcessConfig
type ProcessExitStatus ¶
type ProcessExitStatus uint8
ProcessExitStatus represents the final status of a Process
const ( ProcessUnknown ProcessExitStatus = iota ProcessUnstarted ProcessSuccess ProcessFail )
Provided set of ExitStatuses
type ProcessInterfaceConversionError ¶
type ProcessInterfaceConversionError struct {
// contains filtered or unexported fields
}
ProcessInterfaceConversionError returns when trying to load a process from our internal process store returns completely unexpected data
This error represents a huge failure somewhere and should cause a stop-the-world event
func NewTestProcessInterfaceConversionError ¶
func NewTestProcessInterfaceConversionError(input, process string, iface any) ProcessInterfaceConversionError
NewTestProcessInterfaceConversionError can be used to return a testable error (in tests)
func (ProcessInterfaceConversionError) Error ¶
func (e ProcessInterfaceConversionError) Error() string
Error returns a descriptive error message
type ProcessStatus ¶
type ProcessStatus struct { Name string Logs []string Status ProcessExitStatus }
ProcessStatus contains various bits and pieces a process might return, such as logs and statuscodes and so on
type UnknownProcessError ¶
type UnknownProcessError struct {
// contains filtered or unexported fields
}
UnknownProcessError returns when an input tries to trigger a process whch doesn't exist
func NewTestUnknownProcessError ¶
func NewTestUnknownProcessError(input, process string) UnknownProcessError
NewTestUnknownProcessError can be used to return a testable error (in tests)
func (UnknownProcessError) Error ¶
func (e UnknownProcessError) Error() string
Error returns a descriptive error message