Documentation ¶
Overview ¶
Package reduce reads messages from isb and is responsible for the first part of reduce subsystem. It is responsible for feeding in the data to the second part of the reduce subsystem called processAndForward and processes the read data from the ISB, writes to the outbound channel called PBQ so the message can be asynchronously processed by `processAndForward`, and then closes the partition if possible based on watermark progression. To write to the outbound channel (PBQ), it has to partition the message first. Hence, also contains partitioning logic. Partitioner identifies a set of elements with a common key and time, and buckets them in to a common window. A partition is uniquely identified using a tuple {window, key}. Type of window does not matter. Partitioner is responsible for managing the persistence and processing of each partition. It uses PBQ for durable persistence of elements that belong to a partition and orchestrates the processing of elements using processAndForward function. Partitioner tracks active partitions, closes the partitions based on watermark progression and co-ordinates the materialization and forwarding the results to the next vertex in the pipeline.
Index ¶
- Constants
- type DataForward
- func (df *DataForward) ClosePartitions(partitions []partition.ID)
- func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
- func (df *DataForward) ReplayPersistedMessages(ctx context.Context) error
- func (df *DataForward) ShutDown(ctx context.Context)
- func (df *DataForward) Start()
- type Option
- type Options
Constants ¶
const (
LabelReason = "reason"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataForward ¶
type DataForward struct {
// contains filtered or unexported fields
}
DataForward is responsible for reading and forwarding the message from ISB to PBQ.
func NewDataForward ¶
func NewDataForward(ctx context.Context, vertexInstance *dfv1.VertexInstance, fromBuffer isb.BufferReader, toBuffers map[string][]isb.BufferWriter, pbqManager *pbq.Manager, whereToDecider forward.ToWhichStepDecider, fw fetch.Fetcher, watermarkPublishers map[string]publish.Publisher, windowingStrategy window.Windower, idleManager *wmb.IdleManager, of *pnf.OrderedProcessor, opts ...Option) (*DataForward, error)
NewDataForward creates a new DataForward
func (*DataForward) ClosePartitions ¶ added in v0.8.1
func (df *DataForward) ClosePartitions(partitions []partition.ID)
ClosePartitions closes the partitions by invoking close-of-book (COB).
func (*DataForward) Process ¶ added in v0.8.1
func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
Process is one iteration of the read loop which writes the messages to the PBQs followed by acking the messages, and then closing the windows that can closed.
func (*DataForward) ReplayPersistedMessages ¶ added in v0.8.1
func (df *DataForward) ReplayPersistedMessages(ctx context.Context) error
ReplayPersistedMessages replays persisted messages, because during boot up, it has to replay the data from the persistent store of PBQ before it can start reading from ISB. ReplayPersistedMessages will return only after the replay has been completed.
func (*DataForward) ShutDown ¶ added in v0.8.1
func (df *DataForward) ShutDown(ctx context.Context)
ShutDown shutdowns the read-loop.
type Option ¶
func WithAllowedLateness ¶ added in v0.8.0
WithAllowedLateness sets allowedLateness
func WithReadBatchSize ¶
WithReadBatchSize sets the read batch size
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options for forwarding the message
func DefaultOptions ¶
func DefaultOptions() *Options