Documentation
¶
Index ¶
- func Ack(ctx context.Context, consumers []pulsar.Consumer, ...)
- func CompactAndPublishSequences(ctx context.Context, sequences []*armadaevents.EventSequence, ...) error
- func IsPulsarError(err error) bool
- func NewMessageId(id int) pulsar.MessageID
- func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)
- func ParsePulsarCompressionLevel(compressionLevelStr string) (pulsar.CompressionLevel, error)
- func ParsePulsarCompressionType(compressionTypeStr string) (pulsar.CompressionType, error)
- func PublishSequences(ctx context.Context, producer pulsar.Producer, ...) error
- func PulsarError(err error) *pulsar.Error
- func Receive(ctx context.Context, consumer pulsar.Consumer, consumerId int, bufferSize int, ...) chan *ConsumerMessage
- type ConsumerMessage
- type ConsumerMessageId
- type MockMessageId
- type MockPulsarMessage
- type PulsarMessageId
- func (id *PulsarMessageId) BatchIdx() int32
- func (id *PulsarMessageId) EntryID() int64
- func (id *PulsarMessageId) Equal(other pulsar.MessageID) (bool, error)
- func (id *PulsarMessageId) Greater(other pulsar.MessageID) (bool, error)
- func (id *PulsarMessageId) GreaterEqual(other pulsar.MessageID) (bool, error)
- func (id *PulsarMessageId) LedgerID() int64
- func (id *PulsarMessageId) PartitionIdx() int32
- func (id *PulsarMessageId) Serialize() []byte
- func (id *PulsarMessageId) String() string
- type PulsarToChannel
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Ack ¶ added in v0.3.13
func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, wg *sync.WaitGroup)
Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done in parallel, however its unlikely that they will be a performance bottleneck
func CompactAndPublishSequences ¶ added in v0.3.32
func CompactAndPublishSequences(ctx context.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes int) error
CompactAndPublishSequences reduces the number of sequences to the smallest possible, while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar.
func IsPulsarError ¶
IsPulsarError returns true if there is a pulsar.Error in the chain.
func NewMessageId ¶ added in v0.3.13
func NewPulsarClient ¶
func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)
func ParsePulsarCompressionLevel ¶
func ParsePulsarCompressionLevel(compressionLevelStr string) (pulsar.CompressionLevel, error)
func ParsePulsarCompressionType ¶
func ParsePulsarCompressionType(compressionTypeStr string) (pulsar.CompressionType, error)
func PublishSequences ¶ added in v0.3.32
func PublishSequences(ctx context.Context, producer pulsar.Producer, sequences []*armadaevents.EventSequence) error
PublishSequence publishes several event sequences to Pulsar. For efficiency, all sequences are queued for publishing and then flushed. Returns once all sequences have been received by Pulsar.
To reduce the number of separate sequences sent and ensure limit message size, call eventutil.CompactEventSequences(sequences) and eventutil.LimitSequencesByteSize(sequences, int(srv.MaxAllowedMessageSize)) before passing to this function.
func PulsarError ¶
PulsarError returns the first pulsar.Error in the chain, or nil if no such error is found.
func Receive ¶ added in v0.3.13
func Receive( ctx context.Context, consumer pulsar.Consumer, consumerId int, bufferSize int, receiveTimeout time.Duration, backoffTime time.Duration, ) chan *ConsumerMessage
Receive returns a channel containing messages received from pulsar. This channel will remain open until the supplied context is closed. consumerId: Internal Id of the consumer. We use this so that when messages from different consumers are
multiplexed, we know which messages originated form which consumers
bufferSize: sets the size of the buffer in the returned channel receiveTimeout: sets how long the pulsar consumer will wait for a message before retrying backoffTime: sets how long the consumer will wait before retrying if the pulsar consumer indicates an error receiving from pulsar.
Types ¶
type ConsumerMessage ¶ added in v0.3.13
ConsumerMessage wraps a pulsar message and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.
type ConsumerMessageId ¶ added in v0.3.13
ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.
func NewConsumerMessageId ¶ added in v0.3.13
func NewConsumerMessageId(id int) *ConsumerMessageId
type MockMessageId ¶ added in v0.3.13
type MockPulsarMessage ¶ added in v0.3.13
func EmptyPulsarMessage ¶ added in v0.3.13
func EmptyPulsarMessage(id int, publishTime time.Time) MockPulsarMessage
func NewPulsarMessage ¶ added in v0.3.13
func NewPulsarMessage(id int, publishTime time.Time, payload []byte) MockPulsarMessage
func (MockPulsarMessage) ID ¶ added in v0.3.13
func (m MockPulsarMessage) ID() pulsar.MessageID
func (MockPulsarMessage) Payload ¶ added in v0.3.13
func (m MockPulsarMessage) Payload() []byte
func (MockPulsarMessage) Properties ¶ added in v0.3.13
func (MockPulsarMessage) Properties() map[string]string
func (MockPulsarMessage) PublishTime ¶ added in v0.3.13
func (m MockPulsarMessage) PublishTime() time.Time
type PulsarMessageId ¶
type PulsarMessageId struct {
// contains filtered or unexported fields
}
PulsarMessageId implements the pulsar.MessageID interface (which uniquely identifies a Pulsar message). We need this since the pulsar client library does not export a MessageID implementation. For PulsarMessageId, we provide, e.g., comparison functions.
func FromMessageId ¶
func FromMessageId(id pulsar.MessageID) *PulsarMessageId
FromMessageId converts a pulsar.MessageID interface type to a *PulsarMessageId, which can be used, e.g., for comparison.
func New ¶
func New(ledgerID, entryID int64, partitionIdx, batchIdx int32) *PulsarMessageId
func (*PulsarMessageId) BatchIdx ¶
func (id *PulsarMessageId) BatchIdx() int32
func (*PulsarMessageId) EntryID ¶
func (id *PulsarMessageId) EntryID() int64
func (*PulsarMessageId) Equal ¶
func (id *PulsarMessageId) Equal(other pulsar.MessageID) (bool, error)
func (*PulsarMessageId) Greater ¶
func (id *PulsarMessageId) Greater(other pulsar.MessageID) (bool, error)
Greater returns true if id occurred after other, or an error if the message ids are not comparable (i.e., if they are from different partitions).
func (*PulsarMessageId) GreaterEqual ¶
func (id *PulsarMessageId) GreaterEqual(other pulsar.MessageID) (bool, error)
func (*PulsarMessageId) LedgerID ¶
func (id *PulsarMessageId) LedgerID() int64
func (*PulsarMessageId) PartitionIdx ¶
func (id *PulsarMessageId) PartitionIdx() int32
func (*PulsarMessageId) Serialize ¶
func (id *PulsarMessageId) Serialize() []byte
func (*PulsarMessageId) String ¶ added in v0.3.32
func (id *PulsarMessageId) String() string
type PulsarToChannel ¶ added in v0.3.32
PulsarToChannel is a service for receiving messages from Pulsar and forwarding those on C.
func NewPulsarToChannel ¶ added in v0.3.32
func NewPulsarToChannel(consumer pulsar.Consumer) *PulsarToChannel