pusher

package
v0.0.0-rc12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

The Pusher Interface

The pusher interface allows us to implement different pushing behavior depending on the transfer mechanism. The interface defines two methods Push and Ack, both of these work with a chunk of data currently being processed.

The Chunk

Chunk holds all necessary infos we need during processing.

  • The slice of ChangeItems to push to target.
  • The name fo the file these CI are coming from.
  • Information if the current chunk is the last chunk of data from a file.
  • The offset of the data we read (used in state tracking).
  • The size of the processed data, used for throttling the read speed to not run OOM.
Push

Push forwards a chunk of data to the underlying pusher, may this be sync or async pusher.

Ack

Removes already processed chunks of data from state to keep the state clean. (In the case of async pusher)

Snapshotting

In the case of a snapshotting transfer we use the default synchronous abstract.Pusher. No real state management is necessary for the sync pusher since each batch of files is processed form start to finish before moving on to the next.

Replication

For replication a parsqueue is used for async pushing. The pusher needs to keep a state of files being processed since the reader will keep reading new file even though previous ones might not have been fully pushed to target.

Peculiarities of the Parsqueue pusher:

  1. State of data chunks is tracked in memory so that we know if we are done processing a file from start to finish.
  2. Since push's to the underlying queue happen asynchronously and are buffered in the parsequeue we need to throttle push speed to not run OOM.
  3. State can be kept as small as possible since already done files are persisted either to DB state (for polling replication) or messages are deleted (for SQS)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Chunk

type Chunk struct {
	FilePath  string
	Completed bool
	Offset    any
	Size      int64
	Items     []abstract.ChangeItem
}

type ParsequeuePusher

type ParsequeuePusher struct {
	State PusherState
	// contains filtered or unexported fields
}

func NewParsequeuePusher

func NewParsequeuePusher(queue *parsequeue.ParseQueue[Chunk], logger log.Logger, inflightLimit int64) *ParsequeuePusher

func (*ParsequeuePusher) Ack

func (p *ParsequeuePusher) Ack(chunk Chunk) (bool, error)

func (*ParsequeuePusher) Push

func (p *ParsequeuePusher) Push(ctx context.Context, chunk Chunk) error

type Progress

type Progress struct {
	ReadOffsets []any
	Done        bool
}

type Pusher

type Pusher interface {
	Push(ctx context.Context, chunk Chunk) error
	// Ack is used in the parsqueue pusher as a way of keeping the state of files currently being processed clean.
	// Ack has no effect in the sync pusher, here files are processed from start to finish before new ones are fetched so no state is needed.
	// Ack is called by the ack method of the parsqueue once a chunk is pushed.
	// It returns a bool that gives information if a file was fully processed and is done.
	// It errors out if more then one ack was called on the same chunk of data.
	Ack(chunk Chunk) (bool, error)
}

func New

func New(pusher abstract.Pusher, queue *parsequeue.ParseQueue[Chunk], logger log.Logger, inflightLimit int64) Pusher

type PusherState

type PusherState struct {
	PushProgress map[string]Progress
	// contains filtered or unexported fields
}

type SyncPusher

type SyncPusher struct {
	// contains filtered or unexported fields
}

func NewSyncPusher

func NewSyncPusher(pusher abstract.Pusher) *SyncPusher

func (*SyncPusher) Ack

func (p *SyncPusher) Ack(chunk Chunk) (bool, error)

func (*SyncPusher) Push

func (p *SyncPusher) Push(_ context.Context, chunk Chunk) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL