Documentation
¶
Overview ¶
Carbo is a package that contains a framework for Gophers to provide a way to build a data pipeline.
There are multiple headaches when building an application containing a data pipeline, like controlling concurrency, scalability, back pressure, etc. This framework is built to deal with these issues without much effort.
Why Carbo? ¶
As far as I know, there are many great frameworks to control tasks and those dependencies in complex workflows, and such a tool also provides a way to monitor tasks' situations. But, there are cases where a programmer thinks configuring a cluster and running a large number of tasks there can be overkill. Carbo would fit such a case. It is a pure Golang implementation that helps run small tasks in a process with easy control of concurrencies. Additionally, Carbo also provides an easy way to feed data from one process to another with gRPC. This way provides enough scalability in many cases.
Exposing / pulling data through gRPC ¶
As described above, Carbo provides an easy way to feed data from one process to another with gRPC. In this way, for example, you can separate a data pipeline into a CPU-intensive part and an IO-intensive part as different processes, and run it with a different concurrency limit.
Additionally, this means that Carbo doesn't necessarily force you to stick to this framework itself or even Golang, thanks to the programming language-agnostic RPC protocol, gRPC. For example, you can pull data from a Golang process that uses Carbo with grpcurl for debugging. Or, you can also write another program, for example, in Python, to pull data via gRPC. This is convenient, for example, when you want to write a fast data pipeline in Golang and feed the output into Python to build a machine-learning model with scikit-learn.
Example (Flow) ¶
Build a flow and directly run it.
package main import ( "context" "fmt" "log" "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/pipe" "github.com/hiroara/carbo/sink" "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" ) func main() { ss := source.FromSlice([]string{"a", "b", "c"}) ds := task.Connect( ss.AsTask(), pipe.Map(func(ctx context.Context, s string) (string, error) { return s + s, nil }).AsTask(), 1, ) pr := task.Connect( ds, sink.ElementWise(func(ctx context.Context, s string) error { fmt.Println(s) return nil }).AsTask(), 1, ) err := flow.FromTask(pr).Run(context.Background()) if err != nil { log.Fatal(err) } }
Output: aa bb cc
Example (FlowFactory) ¶
Define a flow factory function to build a flow with a config struct, and run the flow.
package main import ( "context" "fmt" "log" "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/sink" "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" ) type MyConfig struct { StringField string `yaml:"string_field"` IntField int `yaml:"int_field"` } func main() { fac := func(cfg *MyConfig) (*flow.Flow, error) { ss := source.FromSlice([]string{cfg.StringField}) pr := task.Connect( ss.AsTask(), sink.ElementWise(func(ctx context.Context, s string) error { fmt.Println(s) return nil }).AsTask(), 1, ) return flow.FromTask(pr), nil } err := flow.RunWithConfig(context.Background(), fac, "testdata/config.yaml") if err != nil { log.Fatal(err) } }
Output: value-from-string-field
Example (Registry) ¶
Define multiple flow factories, register them to a registry, and run a flow. This is useful to make an executable that takes a subcommand.
package main import ( "context" "fmt" "log" "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/registry" "github.com/hiroara/carbo/sink" "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" ) type MyConfig struct { StringField string `yaml:"string_field"` IntField int `yaml:"int_field"` } func main() { fac1 := func() (*flow.Flow, error) { ss := source.FromSlice([]string{"item1"}) pr := task.Connect( ss.AsTask(), sink.ElementWise(func(ctx context.Context, s string) error { fmt.Println(s) return nil }).AsTask(), 1, ) return flow.FromTask(pr), nil } fac2 := func(cfg *MyConfig) (*flow.Flow, error) { ss := source.FromSlice([]int{cfg.IntField}) pr := task.Connect( ss.AsTask(), sink.ElementWise(func(ctx context.Context, i int) error { fmt.Println(i) return nil }).AsTask(), 1, ) return flow.FromTask(pr), nil } r := registry.New() r.Register("flow1", flow.NewFactory(fac1)) r.Register("flow2", flow.NewFactoryWithConfig(fac2, "testdata/config.yaml")) err := r.Run(context.Background(), "flow2") if err != nil { log.Fatal(err) } }
Output: 100
Directories
¶
Path | Synopsis |
---|---|
Package cache provides a way to define caching behavior used in a data pipeline.
|
Package cache provides a way to define caching behavior used in a data pipeline. |
store
Package defines Store interface for cache.
|
Package defines Store interface for cache. |
Package implements parsing YAML file as a configuration struct.
|
Package implements parsing YAML file as a configuration struct. |
Package provides an struct-embeddable implementation of Defer that is required by task.Task interface.
|
Package provides an struct-embeddable implementation of Defer that is required by task.Task interface. |
Package defines a Flow type that represents an entire data pipeline.
|
Package defines a Flow type that represents an entire data pipeline. |
internal
|
|
Package defines Spec which is a type to define marshaling behavior.
|
Package defines Spec which is a type to define marshaling behavior. |
Package defines Pipe which is a type of a task.
|
Package defines Pipe which is a type of a task. |
Package defines Registry that is a place to register flows.
|
Package defines Registry that is a place to register flows. |
Package defines Sink which is a type of a task.
|
Package defines Sink which is a type of a task. |
Package defines Source which is a type of a task.
|
Package defines Source which is a type of a task. |
Package defines Task interface which is a core component to build a data pipeline.
|
Package defines Task interface which is a core component to build a data pipeline. |