Documentation ¶
Overview ¶
Package harness implements the SDK side of the Beam FnAPI.
Index ¶
- Constants
- func EnableCaptureHook(name string, opts []string)
- func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error
- func RegisterCaptureHook(name string, c CaptureHookFactory)
- type CaptureHook
- type CaptureHookFactory
- type DataChannel
- type DataChannelManager
- type ScopedDataManager
- type ScopedStateReader
- func (s *ScopedStateReader) Close() error
- func (s *ScopedStateReader) GetSideInputCache() exec.SideCache
- func (s *ScopedStateReader) OpenBagUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenBagUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenBagUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenIterable(ctx context.Context, id exec.StreamID, key []byte) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenIterableSideInput(ctx context.Context, id exec.StreamID, sideInputID string, w []byte) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenMultiMapSideInput(ctx context.Context, id exec.StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenMultimapKeysUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.ReadCloser, error)
- func (s *ScopedStateReader) OpenMultimapUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenMultimapUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.Writer, error)
- func (s *ScopedStateReader) OpenMultimapUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, ...) (io.ReadCloser, error)
- type StateChannel
- type StateChannelManager
Constants ¶
const DefaultRemoteLoggingHook = "default_remote_logging"
DefaultRemoteLoggingHook is the key used for the default remote logging hook. If a runner wants to use an alternative logging solution, it can be disabled with hooks.DisableHook(harness.DefaultRemoteLoggingHook).
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
Variables ¶
This section is empty.
Functions ¶
func EnableCaptureHook ¶
EnableCaptureHook is called to request the use of a hook in a pipeline. It updates the supplied pipelines to capture this request.
func Main ¶
Main is the main entrypoint for the Go harness. It runs at "runtime" -- not "pipeline-construction time" -- on each worker. It is a FnAPI client and ultimately responsible for correctly executing user code.
func RegisterCaptureHook ¶
func RegisterCaptureHook(name string, c CaptureHookFactory)
RegisterCaptureHook registers a CaptureHookFactory for the supplied identifier.
Types ¶
type CaptureHook ¶
type CaptureHook io.WriteCloser
CaptureHook writes the messaging content consumed and produced by the worker, allowing the data to be used as an input for the session runner. Since workers can exist in a variety of environments, this allows the runner to tailor the behavior best for its particular needs.
type CaptureHookFactory ¶
type CaptureHookFactory func([]string) CaptureHook
CaptureHookFactory produces a CaptureHook from the supplied options.
type DataChannel ¶
type DataChannel struct {
// contains filtered or unexported fields
}
DataChannel manages a single gRPC stream over the Data API. Data from multiple bundles can be multiplexed over this stream. Data is pushed over the channel, so data for a reader may arrive before the reader connects. Thread-safe.
func (*DataChannel) OpenRead ¶
func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string, instID instructionID) io.ReadCloser
OpenRead returns an io.ReadCloser of the data elements for the given instruction and ptransform.
func (*DataChannel) OpenWrite ¶
func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser
OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
type DataChannelManager ¶
type DataChannelManager struct {
// contains filtered or unexported fields
}
DataChannelManager manages data channels over the Data API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.
func (*DataChannelManager) Open ¶
func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataChannel, error)
Open opens a R/W DataChannel over the given port.
type ScopedDataManager ¶
type ScopedDataManager struct {
// contains filtered or unexported fields
}
ScopedDataManager scopes the global gRPC data manager to a single instruction. The indirection makes it easier to control access.
func NewScopedDataManager ¶
func NewScopedDataManager(mgr *DataChannelManager, instID instructionID) *ScopedDataManager
NewScopedDataManager returns a ScopedDataManager for the given instruction.
func (*ScopedDataManager) Close ¶
func (s *ScopedDataManager) Close() error
Close prevents new IO for this instruction.
func (*ScopedDataManager) OpenRead ¶
func (s *ScopedDataManager) OpenRead(ctx context.Context, id exec.StreamID) (io.ReadCloser, error)
OpenRead opens an io.ReadCloser on the given stream.
func (*ScopedDataManager) OpenWrite ¶
func (s *ScopedDataManager) OpenWrite(ctx context.Context, id exec.StreamID) (io.WriteCloser, error)
OpenWrite opens an io.WriteCloser on the given stream.
type ScopedStateReader ¶
type ScopedStateReader struct {
// contains filtered or unexported fields
}
ScopedStateReader scopes the global gRPC state manager to a single instruction for side input use. The indirection makes it easier to control access.
func NewScopedStateReader ¶
func NewScopedStateReader(mgr *StateChannelManager, instID instructionID) *ScopedStateReader
NewScopedStateReader returns a ScopedStateReader for the given instruction.
func NewScopedStateReaderWithCache ¶ added in v2.34.0
func NewScopedStateReaderWithCache(mgr *StateChannelManager, instID instructionID, cache *statecache.SideInputCache) *ScopedStateReader
NewScopedStateReaderWithCache returns a ScopedState reader for the given instruction with a pointer to a SideInputCache.
func (*ScopedStateReader) Close ¶
func (s *ScopedStateReader) Close() error
Close closes all open readers.
func (*ScopedStateReader) GetSideInputCache ¶ added in v2.34.0
func (s *ScopedStateReader) GetSideInputCache() exec.SideCache
GetSideInputCache returns a pointer to the SideInputCache being used by the SDK harness.
func (*ScopedStateReader) OpenBagUserStateAppender ¶ added in v2.42.0
func (s *ScopedStateReader) OpenBagUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
OpenBagUserStateAppender opens a byte stream for appending user bag state.
func (*ScopedStateReader) OpenBagUserStateClearer ¶ added in v2.42.0
func (s *ScopedStateReader) OpenBagUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
OpenBagUserStateClearer opens a byte stream for clearing user bag state.
func (*ScopedStateReader) OpenBagUserStateReader ¶ added in v2.42.0
func (s *ScopedStateReader) OpenBagUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
OpenBagUserStateReader opens a byte stream for reading user bag state.
func (*ScopedStateReader) OpenIterable ¶
func (s *ScopedStateReader) OpenIterable(ctx context.Context, id exec.StreamID, key []byte) (io.ReadCloser, error)
OpenIterable opens a byte stream for reading unwindowed iterables from the runner.
func (*ScopedStateReader) OpenIterableSideInput ¶ added in v2.37.0
func (s *ScopedStateReader) OpenIterableSideInput(ctx context.Context, id exec.StreamID, sideInputID string, w []byte) (io.ReadCloser, error)
OpenIterableSideInput opens a byte stream for reading iterable side input
func (*ScopedStateReader) OpenMultiMapSideInput ¶ added in v2.37.0
func (s *ScopedStateReader) OpenMultiMapSideInput(ctx context.Context, id exec.StreamID, sideInputID string, key, w []byte) (io.ReadCloser, error)
OpenMultiMapSideInput opens a byte stream for reading multimap side input.
func (*ScopedStateReader) OpenMultimapKeysUserStateClearer ¶ added in v2.42.0
func (s *ScopedStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state.
func (*ScopedStateReader) OpenMultimapKeysUserStateReader ¶ added in v2.42.0
func (s *ScopedStateReader) OpenMultimapKeysUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
OpenMultimapKeysUserStateReader opens a byte stream for reading the keys of user multimap state.
func (*ScopedStateReader) OpenMultimapUserStateAppender ¶ added in v2.42.0
func (s *ScopedStateReader) OpenMultimapUserStateAppender(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error)
OpenMultimapUserStateAppender opens a byte stream for appending user multimap state.
func (*ScopedStateReader) OpenMultimapUserStateClearer ¶ added in v2.42.0
func (s *ScopedStateReader) OpenMultimapUserStateClearer(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.Writer, error)
OpenMultimapUserStateClearer opens a byte stream for clearing user multimap state by key.
func (*ScopedStateReader) OpenMultimapUserStateReader ¶ added in v2.42.0
func (s *ScopedStateReader) OpenMultimapUserStateReader(ctx context.Context, id exec.StreamID, userStateID string, key []byte, w []byte, mk []byte) (io.ReadCloser, error)
OpenMultimapUserStateReader opens a byte stream for reading user multimap state.
type StateChannel ¶
type StateChannel struct { DoneCh <-chan struct{} // contains filtered or unexported fields }
StateChannel manages state transactions over a single gRPC connection. It does not need to track readers and writers as carefully as the DataChannel, because the state protocol is request-based.
func (*StateChannel) Send ¶
func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, error)
Send sends a state request and returns the response.
type StateChannelManager ¶
type StateChannelManager struct {
// contains filtered or unexported fields
}
StateChannelManager manages data channels over the State API. A fixed number of channels are generally used, each managing multiple logical byte streams. Thread-safe.
func (*StateChannelManager) Open ¶
func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateChannel, error)
Open opens a R/W StateChannel over the given port.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package init contains the harness initialization code defined by the FnAPI.
|
Package init contains the harness initialization code defined by the FnAPI. |
Package statecache implements the state caching feature described by the Beam Fn API
|
Package statecache implements the state caching feature described by the Beam Fn API |