Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultDial(ctx context.Context, endpoint string, timeout time.Duration) (*grpc.ClientConn, error)
- func Main(ctx context.Context, controlEndpoint string, opts Options, exec ExecFunc) (err error)
- type Control
- type DataChannel
- func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, ...) (<-chan Elements, error)
- func (c *DataChannel) OpenTimerWrite(ctx context.Context, ptransformID string, instID instructionID, family string) io.WriteCloser
- func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser
- type DataChannelManager
- type DataContext
- type Elements
- type ExecFunc
- type Monitor
- type NextBuffer
- type Options
- type Port
- type ScopedDataManager
- func (s *ScopedDataManager) Close() error
- func (s *ScopedDataManager) OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error)
- func (s *ScopedDataManager) OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error)
- func (s *ScopedDataManager) OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
- type ScopedStateManager
- type Splitter
- type StateChannel
- type StateChannelManager
- type StreamID
Constants ¶
const ( StateWriteAppend writeTypeEnum = 0 StateWriteClear writeTypeEnum = 1 )
Variables ¶
var Dial = DefaultDial
Dial is a convenience wrapper over grpc.Dial. It can be overridden to provide a customized dialing behavior.
Functions ¶
func DefaultDial ¶
func DefaultDial(ctx context.Context, endpoint string, timeout time.Duration) (*grpc.ClientConn, error)
DefaultDial is a dialer that specifies an insecure blocking connection with a timeout.
Types ¶
type Control ¶
type Control struct {
// contains filtered or unexported fields
}
func (*Control) GetOrLookupPlan ¶
func (ctrl *Control) GetOrLookupPlan(dc DataContext, unmarshal func(pbd *fnpb.ProcessBundleDescriptor) any) (any, error)
GetOrLookupPlan does a layered check to get an execution plan.
If there's a cached plan available already. If so, we're done. Otherwise we'll need to build a new one from a ProcessBundleDescriptor. We first check the local cache, and if it doesn't exist, we request it from the runner, reducing duplicate requests.
func (*Control) RegisterMonitor ¶
func (ctrl *Control) RegisterMonitor(dc DataContext, monFn Monitor)
func (*Control) RegisterSplitter ¶
func (ctrl *Control) RegisterSplitter(dc DataContext, splitFn Splitter)
type DataChannel ¶
type DataChannel struct {
// contains filtered or unexported fields
}
DataChannel manages a single gRPC stream over the Data API. Data from multiple bundles can be multiplexed over this stream. Data is pushed over the channel, so data for a reader may arrive before the reader connects. Thread-safe.
func (*DataChannel) OpenElementChan ¶
func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan Elements, error)
OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.
func (*DataChannel) OpenTimerWrite ¶
func (c *DataChannel) OpenTimerWrite(ctx context.Context, ptransformID string, instID instructionID, family string) io.WriteCloser
OpenTimerWrite returns io.WriteCloser for the given timerFamilyID, instruction and ptransform.
func (*DataChannel) OpenWrite ¶
func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser
OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
type DataChannelManager ¶
type DataChannelManager struct {
// contains filtered or unexported fields
}
DataChannelManager manages data channels over the Data API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.
func (*DataChannelManager) Open ¶
func (m *DataChannelManager) Open(ctx context.Context, port Port) (*DataChannel, error)
Open opens a R/W DataChannel over the given port.
type DataContext ¶
type DataContext struct { Data *ScopedDataManager State *ScopedStateManager // contains filtered or unexported fields }
DataContext holds connectors to various data connections, incl. state and side input.
func (*DataContext) LoggerForTransform ¶
func (dc *DataContext) LoggerForTransform(transformID string) *slog.Logger
LoggerForTransform produces a logger for transform with transformID. The ID must be sourced from a ProcessBundleDescriptor so messages can be matched up with their respective transform.
type Elements ¶
Elements holds data or timers sent across the data channel. If TimerFamilyID is populated, it's a timer, otherwise it's data elements.
type ExecFunc ¶
type ExecFunc func(context.Context, *Control, DataContext) (*fnpb.ProcessBundleResponse, error)
type Monitor ¶
Monitor is a function that returns any new labels, and the set of payloads being returned to the runner.
type NextBuffer ¶
type Options ¶
type Options struct { RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI. LoggingEndpoint string // Endpoint for remote logging. StatusEndpoint string // Endpoint for worker status reporting. }
Options for harness.Main that affect execution of the harness, such as runner capabilities.
type Port ¶
type Port struct {
URL string
}
Port represents the connection port of external operations.
type ScopedDataManager ¶
type ScopedDataManager struct {
// contains filtered or unexported fields
}
ScopedDataManager scopes the global gRPC data manager to a single instruction. The indirection makes it easier to control access.
func NewScopedDataManager ¶
func NewScopedDataManager(mgr *DataChannelManager, instID instructionID) *ScopedDataManager
NewScopedDataManager returns a ScopedDataManager for the given instruction.
func (*ScopedDataManager) Close ¶
func (s *ScopedDataManager) Close() error
Close prevents new IO for this instruction.
func (*ScopedDataManager) OpenElementChan ¶
func (s *ScopedDataManager) OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error)
OpenElementChan returns a channel of Elements on the given stream.
func (*ScopedDataManager) OpenTimerWrite ¶
func (s *ScopedDataManager) OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error)
OpenTimerWrite opens an io.WriteCloser on the given stream to write timers
func (*ScopedDataManager) OpenWrite ¶
func (s *ScopedDataManager) OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
OpenWrite opens an io.WriteCloser on the given stream.
type ScopedStateManager ¶
type ScopedStateManager struct {
// contains filtered or unexported fields
}
ScopedStateManager scopes the global gRPC state manager to a single instruction for side input use. The indirection makes it easier to control access.
func NewScopedStateManager ¶
func NewScopedStateManager(mgr *StateChannelManager, instID instructionID) *ScopedStateManager
NewScopedStateManager returns a ScopedStateReader for the given instruction.
func (*ScopedStateManager) Close ¶
func (s *ScopedStateManager) Close() error
Close closes all open readers.
func (*ScopedStateManager) OpenReader ¶
func (s *ScopedStateManager) OpenReader(ctx context.Context, url string, key *fnpb.StateKey) (NextBuffer, error)
type Splitter ¶
type Splitter func(map[string]*fnpb.ProcessBundleSplitRequest_DesiredSplit) (*fnpb.ProcessBundleSplitResponse, error)
Splitter is a function that requests a split from a given set of plans.
type StateChannel ¶
type StateChannel struct { DoneCh <-chan struct{} // contains filtered or unexported fields }
StateChannel manages state transactions over a single gRPC connection. It does not need to track readers and writers as carefully as the DataChannel, because the state protocol is request-based.
func (*StateChannel) Send ¶
func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, error)
Send sends a state request and returns the response.
type StateChannelManager ¶
type StateChannelManager struct {
// contains filtered or unexported fields
}
StateChannelManager manages data channels over the State API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.
func (*StateChannelManager) Open ¶
func (m *StateChannelManager) Open(ctx context.Context, url string) (*StateChannel, error)
Open opens a R/W StateChannel over the given port.