Documentation ¶
Overview ¶
Package beam is a version of an Apache Beam SDK for Go that leverages generics. It's currently aimed at exploring the approach for the purposes of Data processing.
Status: Prototype ¶
This is published to receive feedback and similar on the approach, and design.
This package serves three purposes:
- implementing DoFns
- Constructing Pipelines.
- Launching and managing Jobs.
Implementing DoFns ¶
DoFns define execution time behavior for the pipeline, processing, and transforming elements.
DoFns are primarily intended to be pointers to a struct type that implement the Transform generic interface, using the DFC type.
Further, the ElmC type is used within Process functions for Per Element use.
TODO(lostluck): Elaborate on how to build a DoFn.
Constructing Pipelines ¶
Pipelines are constructed and launched by loading construction functions into a Configuration, calling Configuration.Ready to receive a Launcher, and then Launcher.Run to start pipeline execution. This returns a Pipeline handle that can be used to block until the pipeline is complete, or query active pipeline status and metrics.
Simple single pipelines can use the Launch or LaunchAndWait convenience
TODO(lostluck): Elaborate on how to build a pipeline.
Running Jobs ¶
- TODO Cover launching a binary
- TODO Cover testing and Metrics
- TODO Cover flags
Index ¶
- func Expand[I Composite[O], O any](parent *Scope, name string, comp I) O
- func MakeCoder[E any]() coders.Coder[E]
- func ParDo[E Element, DF Transform[E]](s *Scope, input PCol[E], dofn DF, opts ...Options) DF
- type AccumulatorCreator
- type AccumulatorMerger
- type AfterBundle
- type BoundedSDF
- type Combiner
- func AddMerge[A, I Element, IA InputAdder[A, I]](c IA) Combiner[A, I, A, IA]
- func FullCombine[A, I, O Element, C FullCombiner[A, I, O]](c C) Combiner[A, I, O, C]
- func MergeExtract[A, O Element, OE OutputExtractor[A, O]](c OE) Combiner[A, A, O, OE]
- func SimpleMerge[A Element, AM AccumulatorMerger[A]](c AM) Combiner[A, A, A, AM]
- type Composite
- type Configuration
- type CounterInt64
- type DFC
- type DistributionInt64
- type Element
- type ElmC
- type FullCombiner
- type InputAdder
- type Iter
- type KV
- type Keys
- type Launcher
- type Metrics
- type ORTracker
- func (tracker *ORTracker) GetError() error
- func (tracker *ORTracker) GetProgress() (done, remaining float64)
- func (tracker *ORTracker) GetRestriction() OffsetRange
- func (tracker *ORTracker) IsDone() bool
- func (t *ORTracker) Size(rest OffsetRange) float64
- func (tracker *ORTracker) TryClaim(pos int64) bool
- func (tracker *ORTracker) TrySplit(fraction float64) (primary, residual OffsetRange, err error)
- type ObserveWindow
- type OffsetRange
- type OnBundleFinish
- type Options
- type OutputExtractor
- type PCol
- func CombinePerKey[K Keys, A, I, O Element, AM AccumulatorMerger[A]](s *Scope, input PCol[KV[K, I]], comb Combiner[A, I, O, AM]) PCol[KV[K, O]]
- func Create[E Element](s *Scope, values ...E) PCol[E]
- func Flatten[E Element](s *Scope, inputs ...PCol[E]) PCol[E]
- func GBK[K Keys, V Element](s *Scope, input PCol[KV[K, V]], opts ...Options) PCol[KV[K, Iter[V]]]
- func Impulse(s *Scope) PCol[[]byte]
- func Map[I, O Element](s *Scope, input PCol[I], lambda func(I) O, opts ...beamopts.Options) PCol[O]
- func Reshuffle[E Element](s *Scope, input PCol[E], opts ...Options) PCol[E]
- type Pipeline
- type Process
- type ProcessRestriction
- type Restriction
- type RestrictionFactory
- type Scope
- type SideInputIter
- type SideInputMap
- type StateBag
- type StateCombining
- type StateMap
- type StateSet
- type StateValue
- type TODO
- type TimerEvent
- type TimerProcessing
- type Tracker
- type Transform
- type TryClaim
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParDo ¶
ParDo takes the users's DoFn and returns the same type for downstream piepline construction.
The returned DoFn's emitter fields can then be used as inputs into other DoFns. What if we used Emitters as PCollections directly? Obviously, we'd rename the type PCollection or similar If only to also
Types ¶
type AccumulatorCreator ¶
type AccumulatorCreator[A Element] interface { CreateAccumulator() A AccumulatorMerger[A] }
AccumulatorCreator is an interface to allow combiners to produce a more sophisticated accumulator type, when the zero value is inappropriate for accumulation.
type AccumulatorMerger ¶
type AccumulatorMerger[A Element] interface { MergeAccumulators(A, A) A }
AccumulatorMerger is an interface for combiners that only need a binary merge, and the input, output, and accumulator types are all the same.
type AfterBundle ¶
type AfterBundle struct {
// contains filtered or unexported fields
}
AfterBundle allows a DoFn to register a function that runs after the bundle has been durably committed. Emitting elements here will fail.
TODO consider moving this to a simple interface function. Upside, not likely to try to incorrectly emit in the closure. Downside, the caching for anything to finalize needs to be stored in the DoFn struct this violates the potential of a ConfigOnly DoFn.
func (*AfterBundle) Do ¶
func (*AfterBundle) Do(dfc bundleFinalizer, finalizeBundle func() error)
type BoundedSDF ¶
type BoundedSDF[FAC RestrictionFactory[E, R, P], E any, T Tracker[R, P], R Restriction[P], P, WES any] struct{}
BoundedSDF indicates this DoFn is able to split elements into independently processessable sub parts, called Restrictions.
Due to the handling required, call the BoundedSDF Process method, instead of the one on DFC.
func (BoundedSDF[FAC, E, T, R, P, WES]) Process ¶
func (sdf BoundedSDF[FAC, E, T, R, P, WES]) Process(dfc *DFC[E], makeTracker func(R) T, proc ProcessRestriction[E, R, P]) error
Process is called during ProcessBundle set up to define the processing happening per element.
type Combiner ¶
type Combiner[A, I, O Element, AM AccumulatorMerger[A]] struct { // contains filtered or unexported fields }
Combiners represent an optimizable approach to aggregating, by breaking down the aggregation into 3 component types.
func AddMerge ¶
func AddMerge[A, I Element, IA InputAdder[A, I]](c IA) Combiner[A, I, A, IA]
AddMerge produces a Combiner from an InputAdder.
func FullCombine ¶
func FullCombine[A, I, O Element, C FullCombiner[A, I, O]](c C) Combiner[A, I, O, C]
MergeExtract produces a Combiner from a FullCombiner.
func MergeExtract ¶
func MergeExtract[A, O Element, OE OutputExtractor[A, O]](c OE) Combiner[A, A, O, OE]
MergeExtract produces a Combiner from an OutputExtractor.
func SimpleMerge ¶
func SimpleMerge[A Element, AM AccumulatorMerger[A]](c AM) Combiner[A, A, A, AM]
SimpleMerge produces a Combiner from an AccumulatorMerger.
type Configuration ¶
type Configuration struct {
// contains filtered or unexported fields
}
Configuration represents static configuration for pipelines.
The same [Config] may be used for multiple pipeline launches, however, that requires registration of DoFns in advance.
Serializable Construction time options may be registered with the TODO method, and referred to in calls to Launch or [Prepare].
You may [Prepare] multiple pipelines in advance, in order to ensure their configuration is registered with an ID. However these pipelines will not be built until Launch is called with the provided ID.
After prepare has been called,
func New ¶
func New() *Configuration
New produces a new Beam framework configuration to statically configure pipelines for execution.
func (*Configuration) Flags ¶
func (cfg *Configuration) Flags(fs *flag.FlagSet)
Flags initializes this Config from the flags of the provided command line set.
func (*Configuration) FromCommandLine ¶
func (cfg *Configuration) FromCommandLine()
FromCommandLine registers this Config to be initialized from the command line flags of the binary.
func (*Configuration) Load ¶
func (cfg *Configuration) Load(pid string, expand func(*Scope) error)
Load registers a pipeline construction function to be invoked later. The provided PID must be static to over multiple runs of the binary
Use Load when there are multiple possible pipelines that may be invoked from this binary.
This can be useful for a server binary that is also used as a processing pipeline, or when it's desirable to run tests for pipelines using a container execution mode, instead of loopback mode.
Load panics if the expand function is nil, or if the PID is already in use.
func (*Configuration) Ready ¶
func (cfg *Configuration) Ready(ctx context.Context) Launcher
Ready is called once the framework has been configured, and flags have been parsed. Ready serves as an execution split point.
When Ready detects the binary is being executed as a worker, Ready will not return, calling os.Exit when execution is complete. Otherwise Ready will return and allow the remainder of execution to take place.
type CounterInt64 ¶
type CounterInt64 struct {
// contains filtered or unexported fields
}
CounterInt64 represents a int64 counter metric.
func (*CounterInt64) Inc ¶
func (c *CounterInt64) Inc(dfc Metrics, diff int64)
type DFC ¶
type DFC[E Element] struct { // contains filtered or unexported fields }
DFC or the DoFn Context is the local registry and router for Beam features used by the DoFn, in conjunction with reflection analysis of the DoFn's fields.
The generic type `E` represents the type of elements the DoFn processes.
Most ProcessBundle method implementations must call DFC.Process with an element processing func (Process). To not do so is a fatal error, caught at runtime.
Within a ProcessBundle method, the DFC may be passed to other Beam feature fields, such as OnBundleFinish, AfterBundle, metrics like CounterInt64, DistributionInt64,
Advance DoFns may use a BoundedSDF, which enable sub element work stealing at execution time. Such a DoFn is considered an SplittableDoFn.
type DistributionInt64 ¶
type DistributionInt64 struct {
// contains filtered or unexported fields
}
func (*DistributionInt64) Update ¶
func (c *DistributionInt64) Update(dfc Metrics, val int64)
type Element ¶
type Element interface { any // Sadly, can't really restrict this without breaking iterators in GBK results. }
Element represents any user type. Beam processses arbitrary user types, but requires them to be encodeable.
type ElmC ¶
type ElmC struct {
// contains filtered or unexported fields
}
ElmC is the per element context.
This includes
- Key (for state and timer using functions)
- Windows
- Timestamp
- Pane
Provides the downstream emission context, so it actually sends data to the next DoFn.
Use an ObserveWindow feature to extract the Window and Pane.
type FullCombiner ¶
type FullCombiner[A, I, O Element] interface { InputAdder[A, I] AccumulatorMerger[A] OutputExtractor[A, O] }
type InputAdder ¶
type InputAdder[A, I Element] interface { AddInput(A, I) A AccumulatorMerger[A] }
InputAdder is an interface to allow combiners to incorporate an input type
type KV ¶
type KV[K, V Element] struct { // TODO consider making these methods instead // and decode them on demand? Key K Value V }
KV represents key vlaue pairs. These are useful to Beam pipelines, including allowing to GroupByKey, and stateful transforms.
type Keys ¶
type Keys interface { comparable }
type Launcher ¶
type Launcher struct {
// contains filtered or unexported fields
}
Launcher is able to Run pipelines [Load]ed from a [Ready] Configuration.
type Metrics ¶
type Metrics interface {
// contains filtered or unexported methods
}
Metrics implementations enable users to update beam metrics for a job, to be aggregated by the runner.
Metrics are recorded as a tuple of the transform and the metric in question, so the same metric name used in different transforms are considered distinct.
The 'DFC' type implements Metrics and can be used for standard DoFns. Other contexts where user metrics are appropriate may also have a paremeter that implement Metrics.
type ORTracker ¶
type ORTracker struct { Rest OffsetRange // contains filtered or unexported fields }
ORTracker is a tracker for an offset range restriction.
func (*ORTracker) GetError ¶
GetError returns the error that caused the tracker to stop, if there is one.
func (*ORTracker) GetProgress ¶
GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.
func (*ORTracker) GetRestriction ¶
func (tracker *ORTracker) GetRestriction() OffsetRange
GetRestriction returns the restriction.
func (*ORTracker) IsDone ¶
IsDone returns true if the most recent claimed element is at or past the end of the restriction
func (*ORTracker) Size ¶
func (t *ORTracker) Size(rest OffsetRange) float64
Size returns a an estimate of the amount of work in this restrction.
type ObserveWindow ¶
type ObserveWindow struct{}
ObserveWindow indicates this DoFn needs to be aware of windows explicitly. Required to use as a field, but may be embedded for legibility.
DoFns that observe windows must process the element for each window individually. If ObserveWindow isn't being used, remove it to possibly improve performance.
func (*ObserveWindow) Of ¶
func (*ObserveWindow) Of(ec ElmC) TODO
Of returns the window for this element.
func (*ObserveWindow) PaneOf ¶
func (*ObserveWindow) PaneOf(ec ElmC) TODO
PaneOf returns the window for this element.
type OffsetRange ¶
type OffsetRange struct {
Min, Max int64
}
OffsetRange is an offset range restriction.
func (OffsetRange) Bounded ¶
func (r OffsetRange) Bounded() bool
func (OffsetRange) End ¶
func (r OffsetRange) End() int64
func (OffsetRange) Start ¶
func (r OffsetRange) Start() int64
type OnBundleFinish ¶
type OnBundleFinish struct{}
OnBundleFinish allows a DoFn to register a function that runs just before a bundle finishes. Elements may be emitted downstream, if an ElmC is retrieved from the DFC.
func (*OnBundleFinish) Do ¶
func (*OnBundleFinish) Do(dfc bundleFinisher, finishBundle func() error)
Do registers a callback to execute after all bundle elements have been processed. Any resources that a DoFn needs explicitly cleaned up explicitly rather than implicitly via garbage collection, should be called here.
Only a single callback may be registered, and it will be the last one passed to Do.
type Options ¶
Options configure Run, ParDo, and Combine with specific features. Each function takes a variadic list of options, where properties set in later options override the value of previously set properties.
type OutputExtractor ¶
type OutputExtractor[A, O Element] interface { AccumulatorMerger[A] ExtractOutput(A) O }
type PCol ¶
type PCol[E Element] struct { // contains filtered or unexported fields }
PCol or PCollection represents an a logical collection of elements produced, or consumed by of a DoFn.
At pipeline execution time, they are used in a ProcessBundle method to emit elements and pass along per element context, such as the EventTime and Window.
Used as an Exported value field of a DoFn struct, they represent the outputs from the DoFn. After the DoFn is added to the graph, the processed DoFn's PCol fields are initialized and can be passed around by value, to further build the pipeline graph.
func CombinePerKey ¶
func Create ¶
Create static output for a transform, for simple initialization, and testing.
Values in create are serialzied as part of the Beam Pipeline graph, so it is not suitable for large elements, or large numbers of elements.
func Flatten ¶
Flatten joins together multiple Emitters of the same type into a single Emitter for downstream consumption.
func Impulse ¶
Impulse adds an impulse transform to the graph, which emits single element to downstream transforms, allowing processing to begin.
The element is a single byte slice in the global window, with an event timestamp at the start of the global window.
func Reshuffle ¶
Reshuffle inserts a fusion break in the pipeline, preventing a producer transform from being fused with the consuming transform.
func (*PCol[E]) Emit ¶
Emit the element within the current element's context.
The ElmC value is sourced from the DFC.Process method.
type Pipeline ¶
type Pipeline struct { // TODO make these methods instead & support cancellation. Counters map[string]int64 Distributions map[string]struct{ Count, Sum, Min, Max int64 } // contains filtered or unexported fields }
Pipeline is a handle to a running or terminated pipeline for programmatic access to the given runner.
func Launch ¶
Launch begins to execute the pipeline built in the construction function, returning a pipeline handle.
Launch is non-blocking. Call wait in the pipeline handle to wait until the job is finished, or use LaunchAndWait instead.
type Process ¶
Process is the function type for handling a single element in a bundle.
Typically a closure returned from a Transform's ProcessBundle method.
Errors returned from Process functions abort bundle processing, and may cause pipeline termination. A runner may retry a bundle that has failed.
type ProcessRestriction ¶
ProcessRestriction defines processing the given element with respect to the provided restriction.
type Restriction ¶
type Restriction[P any] interface { // Start is the earliest position in this restriction. Start() P // End is the last position that must be processed with this restriction. End() P // Bounded whether this restiction is bounded or not. Bounded() bool }
Restriction is a range of logical positions to be processed for this element. Restriction implementations must be serializable.
type RestrictionFactory ¶
type RestrictionFactory[O Element, R Restriction[P], P any] interface { // Setup takes in the DoFn for configuration? Setup() error // Produce returns a restriction that processess the entire element. Produce(O) R // InitialSplit returns an iterator of a non-overlapping sub restriction and it's relative size. InitialSplit(O, R) iter.Seq2[R, float64] }
Restriction Factory must have a valid zero value. It will not be serialized? HMMM. Might need to support receiving the user DoFn as configuration.
type Scope ¶
type Scope struct {
// contains filtered or unexported fields
}
Scope is used for building pipeline graphs.
Scope is a hierarchical grouping for composite transforms. Scopes can be enclosed in other scopes and for a tree structure. For pipeline updates, the scope chain form a unique name. The scope chain can also be used for monitoring and visualization purposes.
type SideInputIter ¶
type SideInputIter[E Element] struct { // contains filtered or unexported fields }
func AsSideIter ¶
func AsSideIter[E Element](emt PCol[E]) SideInputIter[E]
AsSideIter initializes an IterSideInput from a valid upstream Emitter. It allows access to the data of that Emitter's PCollection,
type SideInputMap ¶
type SideInputMap[K, V Element] struct { // contains filtered or unexported fields }
SideInputMap allows a side input to be accessed via multip-map key lookups.
func AsSideMap ¶
func AsSideMap[K, V Element](emt PCol[KV[K, V]]) SideInputMap[K, V]
AsSideMap initializes a MapSideInput from a valid upstream Emitter.
type StateCombining ¶
type StateCombining[E Element] struct { // contains filtered or unexported fields }
type StateValue ¶
type StateValue[E Element] struct { // contains filtered or unexported fields }
type TODO ¶
type TODO any
TODO is a marker indicating that a better type should go here at some point but it's not yet implemented.
type TimerEvent ¶
type TimerEvent struct {
// contains filtered or unexported fields
}
type TimerProcessing ¶
type TimerProcessing struct {
// contains filtered or unexported fields
}
type Tracker ¶
type Tracker[R Restriction[P], P any] interface { // Size returns a an estimate of the amount of work in this restrction. // A zero size restriction isn't permitted. Size(R) float64 // TryClaim attempts to claim the given position within the restriction. // Claiming a position at or beyond the end of the restriction signals that the // entire restriction has been processed and is now done, at which point this // method signals to end processing. TryClaim(P) bool // TrySplit splits at the nearest position greater than the given fraction of the remainder. If the // fraction given is outside of the position's range, it is clamped to Min or Max. TrySplit(fraction float64) (primary, residual R, err error) IsDone() bool GetError() error GetProgress() (done, remaining float64) GetRestriction() R }
Tracker manages state around splitting an element.
Tracker implementations are not serialized.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
beamgo
beamgo is a convenience builder and launcher for Beam Go WASI SDK pipelines jobs.
|
beamgo is a convenience builder and launcher for Beam Go WASI SDK pipelines jobs. |
Package coders is a pair of convenience handles for encoding and decoding values to [][byte].
|
Package coders is a pair of convenience handles for encoding and decoding values to [][byte]. |
examples
|
|
wordcount
wordcount is an example pipeline that demonstrates basics of Apache Beam.
|
wordcount is an example pipeline that demonstrates basics of Apache Beam. |
extworker
Package extworker provides an external worker service and related utilities.
|
Package extworker provides an external worker service and related utilities. |
pipelinex
Package pipelinex contains utilities for manipulating Beam proto pipelines.
|
Package pipelinex contains utilities for manipulating Beam proto pipelines. |
runner/prism
Package prism downloads, unzips, boots up a prism binary to run a pipeline against.
|
Package prism downloads, unzips, boots up a prism binary to run a pipeline against. |
io
|
|
blobio
Package blobio abstracts the scalable reading and writing of file blobs against conformant services.
|
Package blobio abstracts the scalable reading and writing of file blobs against conformant services. |
textio
Package textio contains transforms for reading and writing text blobs.
|
Package textio contains transforms for reading and writing text blobs. |
transforms
|
|
io/synthetic
Package synthetic produces elements and load.
|
Package synthetic produces elements and load. |