Documentation ¶
Overview ¶
Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments.
Index ¶
- type B
- func (b *B) Cleanup(wk *W)
- func (b *B) DataOrTimerDone()
- func (b *B) Init()
- func (b *B) LogValue() slog.Value
- func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{}
- func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error)
- func (b *B) Respond(resp *fnpb.InstructionResponse)
- func (b *B) Split(ctx context.Context, wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error)
- type SideInputKey
- type W
- func (wk *W) Connected() bool
- func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error
- func (wk *W) Data(data fnpb.BeamFnData_DataServer) error
- func (wk *W) Endpoint() string
- func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error)
- func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error)
- func (wk *W) LogValue() slog.Value
- func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error
- func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse
- func (wk *W) NextInst() string
- func (wk *W) Serve()
- func (wk *W) State(state fnpb.BeamFnState_StateServer) error
- func (wk *W) Stop()
- func (wk *W) Stopped() bool
- func (wk *W) String() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type B ¶
type B struct { InstID string // ID for the instruction processing this bundle. PBDID string // ID for the ProcessBundleDescriptor // InputTransformID is where data is being sent to in the SDK. InputTransformID string Input []*engine.Block // Data and Timers for this bundle. EstimatedInputElements int HasTimers []string // IterableSideInputData is a map from transformID + inputID, to window, to data. IterableSideInputData map[SideInputKey]map[typex.Window][][]byte // MultiMapSideInputData is a map from transformID + inputID, to window, to data key, to data values. MultiMapSideInputData map[SideInputKey]map[typex.Window]map[string][][]byte // OutputCount is the number of data or timer outputs this bundle has. // We need to see this many closed data channels before the bundle is complete. OutputCount int // DataWait is how we determine if a bundle is finished, by waiting for each of // a Bundle's DataSinks to produce their last output. // After this point we can "commit" the bundle's output for downstream use. DataWait chan struct{} OutputData engine.TentativeData Resp chan *fnpb.ProcessBundleResponse BundleErr error SinkToPCollection map[string]string // contains filtered or unexported fields }
B represents an extant ProcessBundle instruction sent to an SDK worker. Generally manipulated by another package to interact with a worker.
func (*B) DataOrTimerDone ¶
func (b *B) DataOrTimerDone()
DataOrTimerDone indicates a final element has been received from a Data or Timer output.
func (*B) Init ¶
func (b *B) Init()
Init initializes the bundle's internal state for waiting on all data and for relaying a response back.
func (*B) ProcessOn ¶
ProcessOn executes the given bundle on the given W. The returned channel is closed once all expected data is returned.
Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set, response channel initialized) Assumes the bundle descriptor is already registered with the W.
While this method mostly manipulates a W, putting it on a B avoids mixing the workers public GRPC APIs up with local calls.
func (*B) Progress ¶
Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.
func (*B) Respond ¶
func (b *B) Respond(resp *fnpb.InstructionResponse)
type SideInputKey ¶
type SideInputKey struct {
TransformID, Local string
}
SideInputKey is for data lookups for a given bundle.
type W ¶
type W struct { fnpb.UnimplementedBeamFnControlServer fnpb.UnimplementedBeamFnDataServer fnpb.UnimplementedBeamFnStateServer fnpb.UnimplementedBeamFnLoggingServer fnpb.UnimplementedProvisionServiceServer ID, Env string JobKey, ArtifactEndpoint string EnvPb *pipepb.Environment PipelineOptions *structpb.Struct InstReqs chan *fnpb.InstructionRequest DataReqs chan *fnpb.Elements Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID // contains filtered or unexported fields }
A W manages worker environments, sending them work that they're able to execute, and manages the server side handlers for FnAPI RPCs.
func (*W) Control ¶
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error
Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
Requests come from the runner, and are sent to the client in the SDK.
func (*W) Data ¶
func (wk *W) Data(data fnpb.BeamFnData_DataServer) error
Data relays elements and timer bytes to SDKs and back again, coordinated via ProcessBundle instructionIDs, and receiving input transforms.
Data is multiplexed on a single stream for all active bundles on a worker.
func (*W) GetProcessBundleDescriptor ¶
func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error)
func (*W) GetProvisionInfo ¶
func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error)
func (*W) Logging ¶
func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error
Logging relates SDK worker messages back to the job that spawned them. Messages are received from the SDK,
func (*W) MonitoringMetadata ¶
func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse
MonitoringMetadata is a convenience method to request the metadata for monitoring shortIDs.