pulsarutils

package
v0.3.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Ack added in v0.3.13

func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, wg *sync.WaitGroup)

Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done in parallel, however its unlikely that they will be a performance bottleneck

func CompactAndPublishSequences added in v0.3.32

func CompactAndPublishSequences(ctx context.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes int) error

CompactAndPublishSequences reduces the number of sequences to the smallest possible, while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar.

func IsPulsarError

func IsPulsarError(err error) bool

IsPulsarError returns true if there is a pulsar.Error in the chain.

func NewMessageId added in v0.3.13

func NewMessageId(id int) pulsar.MessageID

func NewPulsarClient

func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)

func ParsePulsarCompressionLevel

func ParsePulsarCompressionLevel(compressionLevelStr string) (pulsar.CompressionLevel, error)

func ParsePulsarCompressionType

func ParsePulsarCompressionType(compressionTypeStr string) (pulsar.CompressionType, error)

func PublishSequences added in v0.3.32

func PublishSequences(ctx context.Context, producer pulsar.Producer, sequences []*armadaevents.EventSequence) error

PublishSequence publishes several event sequences to Pulsar. For efficiency, all sequences are queued for publishing and then flushed. Returns once all sequences have been received by Pulsar.

To reduce the number of separate sequences sent and ensure limit message size, call eventutil.CompactEventSequences(sequences) and eventutil.LimitSequencesByteSize(sequences, int(srv.MaxAllowedMessageSize)) before passing to this function.

func PulsarError

func PulsarError(err error) *pulsar.Error

PulsarError returns the first pulsar.Error in the chain, or nil if no such error is found.

func Receive added in v0.3.13

func Receive(
	ctx context.Context,
	consumer pulsar.Consumer,
	consumerId int,
	bufferSize int,
	receiveTimeout time.Duration,
	backoffTime time.Duration,
) chan *ConsumerMessage

Receive returns a channel containing messages received from pulsar. This channel will remain open until the supplied context is closed. consumerId: Internal Id of the consumer. We use this so that when messages from different consumers are

multiplexed, we know which messages originated form which consumers

bufferSize: sets the size of the buffer in the returned channel receiveTimeout: sets how long the pulsar consumer will wait for a message before retrying backoffTime: sets how long the consumer will wait before retrying if the pulsar consumer indicates an error receiving from pulsar.

Types

type ConsumerMessage added in v0.3.13

type ConsumerMessage struct {
	Message    pulsar.Message
	ConsumerId int
}

ConsumerMessage wraps a pulsar message and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.

type ConsumerMessageId added in v0.3.13

type ConsumerMessageId struct {
	MessageId  pulsar.MessageID
	Index      int64
	ConsumerId int
}

ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.

func NewConsumerMessageId added in v0.3.13

func NewConsumerMessageId(id int) *ConsumerMessageId

type MockMessageId added in v0.3.13

type MockMessageId struct {
	pulsar.MessageID
	// contains filtered or unexported fields
}

type MockPulsarMessage added in v0.3.13

type MockPulsarMessage struct {
	pulsar.Message
	// contains filtered or unexported fields
}

func EmptyPulsarMessage added in v0.3.13

func EmptyPulsarMessage(id int, publishTime time.Time) MockPulsarMessage

func NewPulsarMessage added in v0.3.13

func NewPulsarMessage(id int, publishTime time.Time, payload []byte) MockPulsarMessage

func (MockPulsarMessage) ID added in v0.3.13

func (MockPulsarMessage) Payload added in v0.3.13

func (m MockPulsarMessage) Payload() []byte

func (MockPulsarMessage) Properties added in v0.3.13

func (MockPulsarMessage) Properties() map[string]string

func (MockPulsarMessage) PublishTime added in v0.3.13

func (m MockPulsarMessage) PublishTime() time.Time

type PulsarMessageId

type PulsarMessageId struct {
	// contains filtered or unexported fields
}

PulsarMessageId implements the pulsar.MessageID interface (which uniquely identifies a Pulsar message). We need this since the pulsar client library does not export a MessageID implementation. For PulsarMessageId, we provide, e.g., comparison functions.

func FromMessageId

func FromMessageId(id pulsar.MessageID) *PulsarMessageId

FromMessageId converts a pulsar.MessageID interface type to a *PulsarMessageId, which can be used, e.g., for comparison.

func New

func New(ledgerID, entryID int64, partitionIdx, batchIdx int32) *PulsarMessageId

func (*PulsarMessageId) BatchIdx

func (id *PulsarMessageId) BatchIdx() int32

func (*PulsarMessageId) EntryID

func (id *PulsarMessageId) EntryID() int64

func (*PulsarMessageId) Equal

func (id *PulsarMessageId) Equal(other pulsar.MessageID) (bool, error)

func (*PulsarMessageId) Greater

func (id *PulsarMessageId) Greater(other pulsar.MessageID) (bool, error)

Greater returns true if id occurred after other, or an error if the message ids are not comparable (i.e., if they are from different partitions).

func (*PulsarMessageId) GreaterEqual

func (id *PulsarMessageId) GreaterEqual(other pulsar.MessageID) (bool, error)

func (*PulsarMessageId) LedgerID

func (id *PulsarMessageId) LedgerID() int64

func (*PulsarMessageId) PartitionIdx

func (id *PulsarMessageId) PartitionIdx() int32

func (*PulsarMessageId) Serialize

func (id *PulsarMessageId) Serialize() []byte

func (*PulsarMessageId) String added in v0.3.32

func (id *PulsarMessageId) String() string

type PulsarToChannel added in v0.3.32

type PulsarToChannel struct {
	Consumer pulsar.Consumer
	C        chan pulsar.Message
}

PulsarToChannel is a service for receiving messages from Pulsar and forwarding those on C.

func NewPulsarToChannel added in v0.3.32

func NewPulsarToChannel(consumer pulsar.Consumer) *PulsarToChannel

func (*PulsarToChannel) Run added in v0.3.32

func (srv *PulsarToChannel) Run(ctx context.Context) error

Run starts the service.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL