Documentation
¶
Overview ¶
Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- type Block
- type BlockKind
- type Config
- type ElementManager
- func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)
- func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder
- func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, ...) <-chan RunBundle
- func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)
- func (em *ElementManager) FailBundle(rb RunBundle)
- func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte
- func (em *ElementManager) Impulse(stageID string)
- func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
- func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, ...)
- func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time)
- func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals)
- func (em *ElementManager) StageAggregates(ID string)
- func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool)
- func (em *ElementManager) StageStateful(ID string)
- func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData
- type LinkID
- type PColInfo
- type Residual
- type Residuals
- type RunBundle
- type StateData
- type TentativeData
- func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)
- func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)
- func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)
- func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)
- func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)
- func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte
- func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte
- func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte
- func (d *TentativeData) WriteData(colID string, data []byte)
- func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
- type TestStreamBuilder
- type TestStreamElement
- type TimerKey
- type WinCoderType
Constants ¶
This section is empty.
Variables ¶
var ( OneKeyPerBundle bool // OneKeyPerBundle sets if a bundle is restricted to a single key. OneElementPerKey bool // OneElementPerKey sets if a key in a bundle is restricted to one element. )
TODO: Move to better place for configuration
Functions ¶
This section is empty.
Types ¶
type Block ¶ added in v2.54.0
Block represents a contiguous set of data or timers for the same destination.
type BlockKind ¶ added in v2.54.0
type BlockKind int32
BlockKind indicates how the block is to be handled.
const ( BlockData BlockKind // BlockData represents data for the bundle. BlockTimer // BlockTimer represents timers for the bundle. )
type Config ¶
type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int }
type ElementManager ¶
type ElementManager struct {
// contains filtered or unexported fields
}
ElementManager handles elements, watermarks, and related errata to determine if a stage is able to be executed. It is the core execution engine of Prism.
Essentially, it needs to track the current watermarks for each PCollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.
Key parts:
- The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
- An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.
This means that a PCollection's watermark is the minimum of all it's consuming transforms.
So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.
Watermarks are advanced based on consumed input, except if the stage produces residuals.
func NewElementManager ¶
func NewElementManager(config Config) *ElementManager
func (*ElementManager) AddStage ¶
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)
AddStage adds a stage to this element manager, connecting it's PCollections and nodes to the watermark propagation graph.
func (*ElementManager) AddTestStream ¶ added in v2.55.0
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder
AddTestStream provides a builder interface for the execution layer to build the test stream from the protos.
func (*ElementManager) Bundles ¶
func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle
Bundles is the core execution loop. It produces a sequences of bundles able to be executed. The returned channel is closed when the context is canceled, or there are no pending elements remaining.
func (*ElementManager) DataAndTimerInputForBundle ¶ added in v2.54.0
func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)
DataAndTimerInputForBundle returns pre-allocated data for the given bundle and the estimated number of elements. Elements are encoded with the PCollection's coders.
func (*ElementManager) FailBundle ¶ added in v2.51.0
func (em *ElementManager) FailBundle(rb RunBundle)
FailBundle clears the extant data allowing the execution to shut down.
func (*ElementManager) GetSideData ¶ added in v2.53.0
func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte
GetSideData returns side input data for the provided stage+transform+input tuple, valid to the watermark.
func (*ElementManager) Impulse ¶
func (em *ElementManager) Impulse(stageID string)
Impulse marks and initializes the given stage as an impulse which is a root transform that starts processing.
func (*ElementManager) InputForBundle ¶
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the PCollection's coders.
func (*ElementManager) PersistBundle ¶
func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals)
PersistBundle uses the tentative bundle output to update the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.
MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.
func (*ElementManager) ProcessingTimeNow ¶ added in v2.57.0
func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time)
ProcessingTimeNow gives the current processing time for the runner.
func (*ElementManager) ReturnResiduals ¶ added in v2.48.0
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals)
ReturnResiduals is called after a successful split, so the remaining work can be re-assigned to a new bundle.
func (*ElementManager) StageAggregates ¶
func (em *ElementManager) StageAggregates(ID string)
StageAggregates marks the given stage as an aggregation, which means elements will only be processed based on windowing strategies.
func (*ElementManager) StageProcessingTimeTimers ¶ added in v2.57.0
func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool)
StageProcessingTimeTimers indicates which timers are processingTime domain timers.
func (*ElementManager) StageStateful ¶ added in v2.54.0
func (em *ElementManager) StageStateful(ID string)
StageStateful marks the given stage as stateful, which means elements are processed by key.
func (*ElementManager) StateForBundle ¶ added in v2.54.0
func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData
StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
type LinkID ¶ added in v2.53.0
type LinkID struct {
Transform, Local, Global string
}
LinkID represents a fully qualified input or output.
type PColInfo ¶
type PColInfo struct { GlobalID string WindowCoder WinCoderType WDec exec.WindowDecoder WEnc exec.WindowEncoder EDec func(io.Reader) []byte KeyDec func(io.Reader) []byte }
type Residual ¶ added in v2.57.0
type Residual struct { Element []byte Delay time.Duration // The relative time delay. Bounded bool // Whether this element is finite or not. }
Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
type Residuals ¶ added in v2.57.0
type Residuals struct { Data []Residual TransformID, InputID string // Prism only allows one SDF at the root of a bundledescriptor so there should only be one each. MinOutputWatermarks map[string]mtime.Time // Output watermarks (technically per Residual, but aggregated here until it makes a difference.) }
Residuals is used to specify process continuations within a bundle.
type StateData ¶ added in v2.54.0
StateData is a "union" between Bag state and MultiMap state to increase common code.
type TentativeData ¶
TentativeData is where data for in progress bundles is put until the bundle executes successfully.
func (*TentativeData) AppendBagState ¶ added in v2.54.0
func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)
AppendBagState appends the incoming data to the existing tentative data bundle.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) AppendMultimapState ¶ added in v2.54.0
func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)
AppendMultimapState appends the incoming data to the existing tentative data bundle.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearBagState ¶ added in v2.54.0
func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)
ClearBagState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearMultimapKeysState ¶ added in v2.54.0
func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)
ClearMultimapKeysState clears tentative data for all user map keys. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearMultimapState ¶ added in v2.54.0
func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)
ClearMultimapState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetBagState ¶ added in v2.54.0
func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte
GetBagState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetMultimapKeysState ¶ added in v2.54.0
func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte
GetMultimapKeysState retrieves all available user map keys.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetMultimapState ¶ added in v2.54.0
func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte
GetMultimapState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) WriteData ¶
func (d *TentativeData) WriteData(colID string, data []byte)
WriteData adds data to a given global collectionID.
func (*TentativeData) WriteTimers ¶ added in v2.54.0
func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
WriteTimers adds timers to the associated transform handler.
type TestStreamBuilder ¶ added in v2.55.0
type TestStreamBuilder interface { AddElementEvent(tag string, elements []TestStreamElement) AddWatermarkEvent(tag string, newWatermark mtime.Time) AddProcessingTimeEvent(d time.Duration) }
TestStreamBuilder builds a synthetic sequence of events for the engine to execute. A pipeline may only have a single TestStream and may panic.
type TestStreamElement ¶ added in v2.55.0
TestStreamElement wraps the provided bytes and timestamp for ingestion and use.
type TimerKey ¶ added in v2.54.0
type TimerKey struct {
Transform, Family string
}
TimerKey is for use as a key for timers.
type WinCoderType ¶ added in v2.59.0
type WinCoderType int
WinCoderType indicates what kind of coder the window is using. There are only 3 valid single window encodings.
- Global (for Global windows)
- Interval (for fixed, sliding, and session windows)
- Custom (for custom user windows)
TODO: Handle custom variants with built in "known" coders, and length prefixed ones as separate cases. As a rule we don't care about the bytes, but we do need to be able to get to the next element.
const ( // WinGlobal indicates the window is empty coded, with 0 bytes. WinGlobal WinCoderType = iota // WinInterval indicates the window is interval coded with the end event time timestamp followed by the duration in milliseconds WinInterval // WinCustom indicates the window customm coded with end event time timestamp followed by a custom coder. WinCustom )