ingest

package
v0.15.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithRetry

func WithRetry(action func() (bool, error), intialBackoff time.Duration, maxBackOff time.Duration) error

WithRetry executes the supplied action until it either completes successfully or it returns false, indicating that the error is fatal

Types

type BatchMerger added in v0.15.0

type BatchMerger[T utils.ArmadaEvent] func(batch []*utils.EventsWithIds[T]) *utils.EventsWithIds[T]

BatchMerger merges together events within the batch, where possible

type BatchMetricPublisher added in v0.15.0

type BatchMetricPublisher[T utils.ArmadaEvent] func(metrics *commonmetrics.Metrics, batch *utils.EventsWithIds[T])

BatchMetricPublisher logs a summary of the batching process

type Batcher

type Batcher[T any] struct {
	// contains filtered or unexported fields
}

Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).

func NewBatcher

func NewBatcher[T any](input <-chan T, maxItems int, maxTimeout time.Duration, itemCountFunc func(T) int, publish chan []T) *Batcher[T]

func (*Batcher[T]) BufferLen

func (b *Batcher[T]) BufferLen() int

func (*Batcher[T]) Run

func (b *Batcher[T]) Run(ctx *armadacontext.Context)

type EventCounter added in v0.15.0

type EventCounter[T utils.ArmadaEvent] func(events *utils.EventsWithIds[T]) int

EventCounter determines the true count of events, as some utils.ArmadaEvent can contain nested events

type HasPulsarMessageIds

type HasPulsarMessageIds interface {
	GetMessageIDs() []pulsar.MessageID
}

HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end

type IngestionPipeline

type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct {
	// contains filtered or unexported fields
}

IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:

  • Receiving messages from pulsar
  • Unmarshalling into eventsWithIds
  • Combining messages into batches for efficient processing
  • Publishing relevant metrics related to batch
  • Converting eventsWithIds to instructions
  • Acking processed messages

Callers must supply two structs, an InstructionConverter for converting eventsWithIds into something that can be exhausted and a Sink capable of exhausting these objects

func NewIngestionPipeline

func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent](
	pulsarConfig commonconfig.PulsarConfig,
	pulsarTopic string,
	pulsarSubscriptionName string,
	pulsarBatchSize int,
	pulsarBatchDuration time.Duration,
	pulsarSubscriptionType pulsar.SubscriptionType,
	eventCounter EventCounter[U],
	messageConverter MessageUnmarshaller[U],
	batchMerger BatchMerger[U],
	metricPublisher BatchMetricPublisher[U],
	converter InstructionConverter[T, U],
	sink Sink[T],
	metrics *commonmetrics.Metrics,
) *IngestionPipeline[T, U]

NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages

func (*IngestionPipeline[T, U]) Run

func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error

Run will run the ingestion pipeline until the supplied context is shut down

type InstructionConverter

type InstructionConverter[T HasPulsarMessageIds, U utils.ArmadaEvent] interface {
	Convert(ctx *armadacontext.Context, msg *utils.EventsWithIds[U]) T
}

InstructionConverter should be implemented by structs that can convert a batch of eventsWithIds into an object suitable for passing to the sink

type MessageUnmarshaller added in v0.15.0

type MessageUnmarshaller[T utils.ArmadaEvent] func(msg pulsar.ConsumerMessage, metrics *commonmetrics.Metrics) *utils.EventsWithIds[T]

MessageUnmarshaller converts consumed pulsar messages to the intermediate type, utils.EventsWithIds.

type Sink

type Sink[T HasPulsarMessageIds] interface {
	// Store should persist the sink.  The store is responsible for retrying failed attempts and should only return an error
	// When it is satisfied that operation cannot be retries.
	Store(ctx *armadacontext.Context, msg T) error
}

Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL