ingestion

package
v0.3.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Batch

func Batch(values <-chan *model.PulsarEventRow, maxItems int, maxTimeout time.Duration, bufferSize int, clock clock.Clock) chan []*model.PulsarEventRow

Batch batches up events from a channel. Batches are created whenever maxItems InstructionSets have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first). This function has a lot in common with lookoutingester.batch. Hopefully when generics become available we can factor out most of the common code

func Convert

func Convert(ctx context.Context, msgs chan *pulsarutils.ConsumerMessage, bufferSize int, converter *MessageRowConverter) chan *model.PulsarEventRow

Convert takes a channel of pulsar messages and outputs a channel of events that we can insert into the database

func InsertEvents

func InsertEvents(ctx context.Context, db *eventdb.EventDb, msgs chan []*model.PulsarEventRow, bufferSize int) chan []*model.PulsarEventRow

InsertEvents takes a channel of armada events and insets them into the event db the events are republished to an output channel for further processing (e.g. Ackking)

func Run

Run will create a pipeline that will take Armada event messages from Pulsar and update the Events database accordingly. This pipeline will run until a SIGTERM is received

func SendSequenceUpdate

func SendSequenceUpdate(ctx context.Context, inputMsgs []*model.PulsarEventRow, producer pulsar.Producer) []*pulsarutils.ConsumerMessageId

SendSequenceUpdate synchronously sends sequence numbers to Pulsar TODO: Retries if the pulsar send fails

func SendSequenceUpdates

func SendSequenceUpdates(ctx context.Context, producer pulsar.Producer, msgs chan []*model.PulsarEventRow, bufferSize int) chan []*pulsarutils.ConsumerMessageId

SendSequenceUpdates takes a channel of events that have been processed and publishes the corresponding sequence numbers onto pulsar It outputs the pulsar message ids of the originating events so that the messages can be accked

Types

type MessageRowConverter

type MessageRowConverter struct {
	// contains filtered or unexported fields
}

MessageRowConverter raw converts pulsar messages into events that we can insert into the database

func (*MessageRowConverter) ConvertMsg

ConvertMsg converts pulsar messages into events that we can insert into the database

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL