readloop

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package readloop is responsible for the first part of reduce subsystem. It is responsible for feeding in the data to the second part of the reduce subsystem called ProcessAndForward. `readloop` processes the read data from the ISB, writes to the outbound channel called PBQ so the message can be asynchronously processed by `ProcessAndForward`, and then closes the partition if possible based on watermark progression. To write to the outbound channel (PBQ), `readloop` has to partition the message first. Hence, `readloop` also contains partitioning logic. Partitioner identifies a set of elements with a common key and time, and buckets them in to a common window. A partition is uniquely identified using a tuple {window, key}. Type of window does not matter. Partitioner is responsible for managing the persistence and processing of each partition. It uses PBQ for durable persistence of elements that belong to a partition and orchestrates the processing of elements using ProcessAndForward function. Partitioner tracks active partitions, closes the partitions based on watermark progression and co-ordinates the materialization and forwarding the results to the next vertex in the pipeline.

Index

Constants

View Source
const (
	LabelReason = "reason"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ReadLoop

type ReadLoop struct {
	UDF applier.ReduceApplier
	// contains filtered or unexported fields
}

ReadLoop is responsible for reading and forwarding the message from ISB to PBQ.

func NewReadLoop

func NewReadLoop(ctx context.Context,
	vertexName string,
	pipelineName string,
	vr int32,
	udf applier.ReduceApplier,
	pbqManager *pbq.Manager,
	windowingStrategy window.Windower,
	toBuffers map[string]isb.BufferWriter,
	whereToDecider forward.ToWhichStepDecider,
	pw map[string]publish.Publisher,
	idleManager *wmb.IdleManager,
) (*ReadLoop, error)

NewReadLoop initializes and returns ReadLoop.

func (*ReadLoop) ClosePartitions added in v0.7.3

func (rl *ReadLoop) ClosePartitions(partitions []partition.ID)

ClosePartitions closes the partitions by invoking close-of-book (COB).

func (*ReadLoop) Process

func (rl *ReadLoop) Process(ctx context.Context, messages []*isb.ReadMessage)

Process is one iteration of the read loop which writes the messages to the PBQs followed by acking the messages, and then closing the windows that can closed.

func (*ReadLoop) ShutDown

func (rl *ReadLoop) ShutDown(ctx context.Context)

ShutDown shutdowns the read-loop.

func (*ReadLoop) Startup

func (rl *ReadLoop) Startup(ctx context.Context) error

Startup starts up the read-loop, because during boot up, it has to replay the data from the persistent store of PBQ before it can start reading from ISB. Startup will return only after the replay has been completed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL