Documentation ¶
Overview ¶
Package engine handles the operational components of a runner, to track elements, watermarks, timers, triggers etc
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- type Block
- type BlockKind
- type Config
- type ElementManager
- func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)
- func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder
- func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle
- func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)
- func (em *ElementManager) FailBundle(rb RunBundle)
- func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte
- func (em *ElementManager) Impulse(stageID string)
- func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
- func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, ...)
- func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time)
- func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals)
- func (em *ElementManager) StageAggregates(ID string)
- func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool)
- func (em *ElementManager) StageStateful(ID string)
- func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData
- type LinkID
- type PColInfo
- type Residual
- type Residuals
- type RunBundle
- type StateData
- type TentativeData
- func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)
- func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)
- func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)
- func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)
- func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)
- func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte
- func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte
- func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte
- func (d *TentativeData) WriteData(colID string, data []byte)
- func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
- type TestStreamBuilder
- type TestStreamElement
- type TimerKey
Constants ¶
This section is empty.
Variables ¶
var ( OneKeyPerBundle bool // OneKeyPerBundle sets if a bundle is restricted to a single key. OneElementPerKey bool // OneElementPerKey sets if a key in a bundle is restricted to one element. )
TODO: Move to better place for configuration
Functions ¶
This section is empty.
Types ¶
type BlockKind ¶
type BlockKind int32
BlockKind indicates how the block is to be handled.
const ( BlockData BlockKind // BlockData represents data for the bundle. BlockTimer // BlockTimer represents timers for the bundle. )
type Config ¶
type Config struct { // MaxBundleSize caps the number of elements permitted in a bundle. // 0 or less means this is ignored. MaxBundleSize int }
type ElementManager ¶
type ElementManager struct {
// contains filtered or unexported fields
}
ElementManager handles elements, watermarks, and related errata to determine if a stage is able to be executed. It is the core execution engine of Prism.
Essentially, it needs to track the current watermarks for each PCollection and transform/stage. But it's tricky, since the watermarks for the PCollections are always relative to transforms/stages.
Key parts:
- The parallel input's PCollection's watermark is relative to committed consumed elements. That is, the input elements consumed by the transform after a successful bundle, can advance the watermark, based on the minimum of their elements.
- An output PCollection's watermark is relative to its producing transform, which relates to *all of it's outputs*.
This means that a PCollection's watermark is the minimum of all it's consuming transforms.
So, the watermark manager needs to track: Pending Elements for each stage, along with their windows and timestamps. Each transform's view of the watermarks for the PCollections.
Watermarks are advanced based on consumed input, except if the stage produces residuals.
func NewElementManager ¶
func NewElementManager(config Config) *ElementManager
func (*ElementManager) AddStage ¶
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID)
AddStage adds a stage to this element manager, connecting it's PCollections and nodes to the watermark propagation graph.
func (*ElementManager) AddTestStream ¶
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder
AddTestStream provides a builder interface for the execution layer to build the test stream from the protos.
func (*ElementManager) Bundles ¶
func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle
Bundles is the core execution loop. It produces a sequences of bundles able to be executed. The returned channel is closed when the context is canceled, or there are no pending elements remaining.
func (*ElementManager) DataAndTimerInputForBundle ¶
func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int)
DataAndTimerInputForBundle returns pre-allocated data for the given bundle and the estimated number of elements. Elements are encoded with the PCollection's coders.
func (*ElementManager) FailBundle ¶
func (em *ElementManager) FailBundle(rb RunBundle)
FailBundle clears the extant data allowing the execution to shut down.
func (*ElementManager) GetSideData ¶
func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte
GetSideData returns side input data for the provided stage+transform+input tuple, valid to the watermark.
func (*ElementManager) Impulse ¶
func (em *ElementManager) Impulse(stageID string)
Impulse marks and initializes the given stage as an impulse which is a root transform that starts processing.
func (*ElementManager) InputForBundle ¶
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte
InputForBundle returns pre-allocated data for the given bundle, encoding the elements using the PCollection's coders.
func (*ElementManager) PersistBundle ¶
func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals)
PersistBundle uses the tentative bundle output to update the watermarks for the stage. Each stage has two monotonically increasing watermarks, the input watermark, and the output watermark.
MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks) MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
PersistBundle takes in the stage ID, ID of the bundle associated with the pending input elements, and the committed output elements.
func (*ElementManager) ProcessingTimeNow ¶
func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time)
ProcessingTimeNow gives the current processing time for the runner.
func (*ElementManager) ReturnResiduals ¶
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals)
ReturnResiduals is called after a successful split, so the remaining work can be re-assigned to a new bundle.
func (*ElementManager) StageAggregates ¶
func (em *ElementManager) StageAggregates(ID string)
StageAggregates marks the given stage as an aggregation, which means elements will only be processed based on windowing strategies.
func (*ElementManager) StageProcessingTimeTimers ¶
func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool)
StageProcessingTimeTimers indicates which timers are processingTime domain timers.
func (*ElementManager) StageStateful ¶
func (em *ElementManager) StageStateful(ID string)
StageStateful marks the given stage as stateful, which means elements are processed by key.
func (*ElementManager) StateForBundle ¶
func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData
StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
type LinkID ¶
type LinkID struct {
Transform, Local, Global string
}
LinkID represents a fully qualified input or output.
type Residual ¶
type Residual struct { Element []byte Delay time.Duration // The relative time delay. Bounded bool // Whether this element is finite or not. }
Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
type Residuals ¶
type Residuals struct { Data []Residual TransformID, InputID string // Prism only allows one SDF at the root of a bundledescriptor so there should only be one each. MinOutputWatermarks map[string]mtime.Time // Output watermarks (technically per Residual, but aggregated here until it makes a difference.) }
Residuals is used to specify process continuations within a bundle.
type StateData ¶
StateData is a "union" between Bag state and MultiMap state to increase common code.
type TentativeData ¶
TentativeData is where data for in progress bundles is put until the bundle executes successfully.
func (*TentativeData) AppendBagState ¶
func (d *TentativeData) AppendBagState(stateID LinkID, wKey, uKey, data []byte)
AppendBagState appends the incoming data to the existing tentative data bundle.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) AppendMultimapState ¶
func (d *TentativeData) AppendMultimapState(stateID LinkID, wKey, uKey, mapKey, data []byte)
AppendMultimapState appends the incoming data to the existing tentative data bundle.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearBagState ¶
func (d *TentativeData) ClearBagState(stateID LinkID, wKey, uKey []byte)
ClearBagState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearMultimapKeysState ¶
func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte)
ClearMultimapKeysState clears tentative data for all user map keys. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) ClearMultimapState ¶
func (d *TentativeData) ClearMultimapState(stateID LinkID, wKey, uKey, mapKey []byte)
ClearMultimapState clears any tentative data for the state. Since state data is only initialized if any exists, Clear takes the approach to not create state that doesn't already exist. Existing state is zeroed to allow that to be committed post bundle commpletion.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetBagState ¶
func (d *TentativeData) GetBagState(stateID LinkID, wKey, uKey []byte) [][]byte
GetBagState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetMultimapKeysState ¶
func (d *TentativeData) GetMultimapKeysState(stateID LinkID, wKey, uKey []byte) [][]byte
GetMultimapKeysState retrieves all available user map keys.
The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) GetMultimapState ¶
func (d *TentativeData) GetMultimapState(stateID LinkID, wKey, uKey, mapKey []byte) [][]byte
GetMultimapState retrieves available state from the tentative bundle data. The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively.
func (*TentativeData) WriteData ¶
func (d *TentativeData) WriteData(colID string, data []byte)
WriteData adds data to a given global collectionID.
func (*TentativeData) WriteTimers ¶
func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte)
WriteTimers adds timers to the associated transform handler.
type TestStreamBuilder ¶
type TestStreamBuilder interface { AddElementEvent(tag string, elements []TestStreamElement) AddWatermarkEvent(tag string, newWatermark mtime.Time) AddProcessingTimeEvent(d time.Duration) }
TestStreamBuilder builds a synthetic sequence of events for the engine to execute. A pipeline may only have a single TestStream and may panic.
type TestStreamElement ¶
TestStreamElement wraps the provided bytes and timestamp for ingestion and use.