Documentation ¶
Index ¶
- func WithRetry(action func() (bool, error), intialBackoff time.Duration, ...) error
- type Batcher
- type EventSequencesWithIds
- type HasPulsarMessageIds
- type IngestionPipeline
- func NewFilteredMsgIngestionPipeline[T HasPulsarMessageIds](pulsarConfig configuration.PulsarConfig, pulsarSubscriptionName string, ...) *IngestionPipeline[T]
- func NewIngestionPipeline[T HasPulsarMessageIds](pulsarConfig configuration.PulsarConfig, pulsarSubscriptionName string, ...) *IngestionPipeline[T]
- type InstructionConverter
- type Sink
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).
func NewBatcher ¶
func (*Batcher[T]) Run ¶
func (b *Batcher[T]) Run(ctx *armadacontext.Context)
type EventSequencesWithIds ¶
type EventSequencesWithIds struct { EventSequences []*armadaevents.EventSequence MessageIds []pulsar.MessageID }
EventSequencesWithIds consists of a batch of Event Sequences along with the corresponding Pulsar Message Ids
type HasPulsarMessageIds ¶
HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end
type IngestionPipeline ¶
type IngestionPipeline[T HasPulsarMessageIds] struct { // contains filtered or unexported fields }
IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:
- Receiving messages from pulsar
- Combining messages into batches for efficient processing
- Unmarshalling into event sequences
- Acking processed messages
Callers must supply two structs, an InstructionConverter for converting event sequences into something that can be exhausted and a Sink capable of exhausting these objects
func NewFilteredMsgIngestionPipeline ¶ added in v0.3.49
func NewFilteredMsgIngestionPipeline[T HasPulsarMessageIds]( pulsarConfig configuration.PulsarConfig, pulsarSubscriptionName string, pulsarBatchSize int, pulsarBatchDuration time.Duration, pulsarSubscriptionType pulsar.SubscriptionType, msgFilter func(msg pulsar.Message) bool, converter InstructionConverter[T], sink Sink[T], metricsPort uint16, metrics *commonmetrics.Metrics, ) *IngestionPipeline[T]
NewFilteredMsgIngestionPipeline creates an IngestionPipeline that processes only messages corresponding to the supplied message filter
func NewIngestionPipeline ¶
func NewIngestionPipeline[T HasPulsarMessageIds]( pulsarConfig configuration.PulsarConfig, pulsarSubscriptionName string, pulsarBatchSize int, pulsarBatchDuration time.Duration, pulsarSubscriptionType pulsar.SubscriptionType, converter InstructionConverter[T], sink Sink[T], metricsPort uint16, metrics *commonmetrics.Metrics, ) *IngestionPipeline[T]
NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages
func (*IngestionPipeline[T]) Run ¶
func (ingester *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error
Run will run the ingestion pipeline until the supplied context is shut down
type InstructionConverter ¶
type InstructionConverter[T HasPulsarMessageIds] interface { Convert(ctx *armadacontext.Context, msg *EventSequencesWithIds) T }
InstructionConverter should be implemented by structs that can convert a batch of event sequences into an object suitable for passing to the sink
type Sink ¶
type Sink[T HasPulsarMessageIds] interface { // Store should persist the sink. The store is responsible for retrying failed attempts and should only return an error // When it is satisfied that operation cannot be retries. Store(ctx *armadacontext.Context, msg T) error }
Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.