Documentation
¶
Index ¶
- func Batch(values <-chan *model.PulsarEventRow, maxItems int, maxTimeout time.Duration, ...) chan []*model.PulsarEventRow
- func Convert(ctx context.Context, msgs chan *pulsarutils.ConsumerMessage, bufferSize int, ...) chan *model.PulsarEventRow
- func InsertEvents(ctx context.Context, db *eventdb.EventDb, msgs chan []*model.PulsarEventRow, ...) chan []*model.PulsarEventRow
- func Run(config *configuration.EventIngesterConfiguration)
- func SendSequenceUpdate(ctx context.Context, inputMsgs []*model.PulsarEventRow, ...) []*pulsarutils.ConsumerMessageId
- func SendSequenceUpdates(ctx context.Context, producer pulsar.Producer, ...) chan []*pulsarutils.ConsumerMessageId
- type MessageRowConverter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Batch ¶
func Batch(values <-chan *model.PulsarEventRow, maxItems int, maxTimeout time.Duration, bufferSize int, clock clock.Clock) chan []*model.PulsarEventRow
Batch batches up events from a channel. Batches are created whenever maxItems InstructionSets have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first). This function has a lot in common with lookoutingester.batch. Hopefully when generics become available we can factor out most of the common code
func Convert ¶
func Convert(ctx context.Context, msgs chan *pulsarutils.ConsumerMessage, bufferSize int, converter *MessageRowConverter) chan *model.PulsarEventRow
Convert takes a channel of pulsar messages and outputs a channel of events that we can insert into the database
func InsertEvents ¶
func InsertEvents(ctx context.Context, db *eventdb.EventDb, msgs chan []*model.PulsarEventRow, bufferSize int) chan []*model.PulsarEventRow
InsertEvents takes a channel of armada events and insets them into the event db the events are republished to an output channel for further processing (e.g. Ackking)
func Run ¶
func Run(config *configuration.EventIngesterConfiguration)
Run will create a pipeline that will take Armada event messages from Pulsar and update the Events database accordingly. This pipeline will run until a SIGTERM is received
func SendSequenceUpdate ¶
func SendSequenceUpdate(ctx context.Context, inputMsgs []*model.PulsarEventRow, producer pulsar.Producer) []*pulsarutils.ConsumerMessageId
SendSequenceUpdate synchronously sends sequence numbers to Pulsar TODO: Retries if the pulsar send fails
func SendSequenceUpdates ¶
func SendSequenceUpdates(ctx context.Context, producer pulsar.Producer, msgs chan []*model.PulsarEventRow, bufferSize int) chan []*pulsarutils.ConsumerMessageId
SendSequenceUpdates takes a channel of events that have been processed and publishes the corresponding sequence numbers onto pulsar It outputs the pulsar message ids of the originating events so that the messages can be accked
Types ¶
type MessageRowConverter ¶
type MessageRowConverter struct {
// contains filtered or unexported fields
}
MessageRowConverter raw converts pulsar messages into events that we can insert into the database
func (*MessageRowConverter) ConvertMsg ¶
func (rc *MessageRowConverter) ConvertMsg(ctx context.Context, msg *pulsarutils.ConsumerMessage) (*model.PulsarEventRow, error)
ConvertMsg converts pulsar messages into events that we can insert into the database