ingest

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithRetry

func WithRetry(action func() (bool, error), intialBackoff time.Duration, maxBackOff time.Duration) error

WithRetry executes the supplied action until it either completes successfully or it returns false, indicating that the error is fatal

Types

type Batcher

type Batcher[T any] struct {
	// contains filtered or unexported fields
}

Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).

func NewBatcher

func NewBatcher[T any](input chan T, maxItems int, maxTimeout time.Duration, callback func([]T)) *Batcher[T]

func (*Batcher[T]) BufferLen

func (b *Batcher[T]) BufferLen() int

func (*Batcher[T]) Run

func (b *Batcher[T]) Run(ctx *armadacontext.Context)

type EventSequencesWithIds

type EventSequencesWithIds struct {
	EventSequences []*armadaevents.EventSequence
	MessageIds     []pulsar.MessageID
}

EventSequencesWithIds consists of a batch of Event Sequences along with the corresponding Pulsar Message Ids

type HasPulsarMessageIds

type HasPulsarMessageIds interface {
	GetMessageIDs() []pulsar.MessageID
}

HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end

type IngestionPipeline

type IngestionPipeline[T HasPulsarMessageIds] struct {
	// contains filtered or unexported fields
}

IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:

  • Receiving messages from pulsar
  • Combining messages into batches for efficient processing
  • Unmarshalling into event sequences
  • Acking processed messages

Callers must supply two structs, an InstructionConverter for converting event sequences into something that can be exhausted and a Sink capable of exhausting these objects

func NewFilteredMsgIngestionPipeline added in v0.3.49

func NewFilteredMsgIngestionPipeline[T HasPulsarMessageIds](
	pulsarConfig configuration.PulsarConfig,
	pulsarSubscriptionName string,
	pulsarBatchSize int,
	pulsarBatchDuration time.Duration,
	pulsarSubscriptionType pulsar.SubscriptionType,
	msgFilter func(msg pulsar.Message) bool,
	converter InstructionConverter[T],
	sink Sink[T],
	metricsConfig configuration.MetricsConfig,
	metrics *commonmetrics.Metrics,
) *IngestionPipeline[T]

NewFilteredMsgIngestionPipeline creates an IngestionPipeline that processes only messages corresponding to the supplied message filter

func NewIngestionPipeline

func NewIngestionPipeline[T HasPulsarMessageIds](
	pulsarConfig configuration.PulsarConfig,
	pulsarSubscriptionName string,
	pulsarBatchSize int,
	pulsarBatchDuration time.Duration,
	pulsarSubscriptionType pulsar.SubscriptionType,
	converter InstructionConverter[T],
	sink Sink[T],
	metricsConfig configuration.MetricsConfig,
	metrics *commonmetrics.Metrics,
) *IngestionPipeline[T]

NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages

func (*IngestionPipeline[T]) Run

func (ingester *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error

Run will run the ingestion pipeline until the supplied context is shut down

type InstructionConverter

type InstructionConverter[T HasPulsarMessageIds] interface {
	Convert(ctx *armadacontext.Context, msg *EventSequencesWithIds) T
}

InstructionConverter should be implemented by structs that can convert a batch of event sequences into an object suitable for passing to the sink

type Sink

type Sink[T HasPulsarMessageIds] interface {
	// Store should persist the sink.  The store is responsible for retrying failed attempts and should only return an error
	// When it is satisfied that operation cannot be retries.
	Store(ctx *armadacontext.Context, msg T) error
}

Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL