Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchMerger ¶ added in v0.15.0
type BatchMerger[T utils.ArmadaEvent] func(batch []*utils.EventsWithIds[T]) *utils.EventsWithIds[T]
BatchMerger merges together events within the batch, where possible
type BatchMetricPublisher ¶ added in v0.15.0
type BatchMetricPublisher[T utils.ArmadaEvent] func(metrics *commonmetrics.Metrics, batch *utils.EventsWithIds[T])
BatchMetricPublisher logs a summary of the batching process
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).
func NewBatcher ¶
func (*Batcher[T]) Run ¶
func (b *Batcher[T]) Run(ctx *armadacontext.Context)
type EventCounter ¶ added in v0.15.0
type EventCounter[T utils.ArmadaEvent] func(events *utils.EventsWithIds[T]) int
EventCounter determines the true count of events, as some utils.ArmadaEvent can contain nested events
type HasPulsarMessageIds ¶
HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end
type IngestionPipeline ¶
type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct { // contains filtered or unexported fields }
IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:
- Receiving messages from pulsar
- Unmarshalling into eventsWithIds
- Combining messages into batches for efficient processing
- Publishing relevant metrics related to batch
- Converting eventsWithIds to instructions
- Acking processed messages
Callers must supply two structs, an InstructionConverter for converting eventsWithIds into something that can be exhausted and a Sink capable of exhausting these objects
func NewIngestionPipeline ¶
func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent]( pulsarConfig commonconfig.PulsarConfig, pulsarTopic string, pulsarSubscriptionName string, pulsarBatchSize int, pulsarBatchDuration time.Duration, pulsarSubscriptionType pulsar.SubscriptionType, eventCounter EventCounter[U], messageConverter MessageUnmarshaller[U], batchMerger BatchMerger[U], metricPublisher BatchMetricPublisher[U], converter InstructionConverter[T, U], sink Sink[T], metrics *commonmetrics.Metrics, ) *IngestionPipeline[T, U]
NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages
func (*IngestionPipeline[T, U]) Run ¶
func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error
Run will run the ingestion pipeline until the supplied context is shut down
type InstructionConverter ¶
type InstructionConverter[T HasPulsarMessageIds, U utils.ArmadaEvent] interface { Convert(ctx *armadacontext.Context, msg *utils.EventsWithIds[U]) T }
InstructionConverter should be implemented by structs that can convert a batch of eventsWithIds into an object suitable for passing to the sink
type MessageUnmarshaller ¶ added in v0.15.0
type MessageUnmarshaller[T utils.ArmadaEvent] func(msg pulsar.ConsumerMessage, metrics *commonmetrics.Metrics) *utils.EventsWithIds[T]
MessageUnmarshaller converts consumed pulsar messages to the intermediate type, utils.EventsWithIds.
type Sink ¶
type Sink[T HasPulsarMessageIds] interface { // Store should persist the sink. The store is responsible for retrying failed attempts and should only return an error // When it is satisfied that operation cannot be retries. Store(ctx *armadacontext.Context, msg T) error }
Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.