pulsarutils

package
v0.3.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Ack added in v0.3.13

func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, wg *sync.WaitGroup)

Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done in parallel, however its unlikely that they will be a performance bottleneck

func IsPulsarError

func IsPulsarError(err error) bool

IsPulsarError returns true if there is a pulsar.Error in the chain.

func NewMessageId added in v0.3.13

func NewMessageId(id int) pulsar.MessageID

func NewPulsarClient

func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)

func ParsePulsarCompressionLevel

func ParsePulsarCompressionLevel(compressionLevelStr string) (pulsar.CompressionLevel, error)

func ParsePulsarCompressionType

func ParsePulsarCompressionType(compressionTypeStr string) (pulsar.CompressionType, error)

func PulsarError

func PulsarError(err error) *pulsar.Error

PulsarError returns the first pulsar.Error in the chain, or nil if no such error is found.

func Receive added in v0.3.13

func Receive(ctx context.Context, consumer pulsar.Consumer, consumerId int, bufferSize int, receiveTimeout time.Duration, backoffTime time.Duration) chan *ConsumerMessage

Receive returns a channel containing messages received from pulsar. This channel will remain open until the supplied context is closed. consumerId: Internal Id of the consumer. We use this so that when messages from different consumers are multiplexed, we know which messages originated form which consumers bufferSize: sets the size of the buffer in the returned channel receiveTimeout: sets how long the pulsar consumer will wait for a message before retrying backoffTime: sets how long the consumer will wait before retrying if the pulsar consumer indicates an error receiving from pulsar.

Types

type ConsumerMessage added in v0.3.13

type ConsumerMessage struct {
	Message    pulsar.Message
	ConsumerId int
}

ConsumerMessage wraps a pulsar message and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.

type ConsumerMessageId added in v0.3.13

type ConsumerMessageId struct {
	MessageId  pulsar.MessageID
	Index      int64
	ConsumerId int
}

ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.

func NewConsumerMessageId added in v0.3.13

func NewConsumerMessageId(id int) *ConsumerMessageId

type MockMessageId added in v0.3.13

type MockMessageId struct {
	pulsar.MessageID
	// contains filtered or unexported fields
}

type MockPulsarMessage added in v0.3.13

type MockPulsarMessage struct {
	pulsar.Message
	// contains filtered or unexported fields
}

func EmptyPulsarMessage added in v0.3.13

func EmptyPulsarMessage(id int, publishTime time.Time) MockPulsarMessage

func NewPulsarMessage added in v0.3.13

func NewPulsarMessage(id int, publishTime time.Time, payload []byte) MockPulsarMessage

func (MockPulsarMessage) ID added in v0.3.13

func (MockPulsarMessage) Payload added in v0.3.13

func (m MockPulsarMessage) Payload() []byte

func (MockPulsarMessage) Properties added in v0.3.13

func (MockPulsarMessage) Properties() map[string]string

func (MockPulsarMessage) PublishTime added in v0.3.13

func (m MockPulsarMessage) PublishTime() time.Time

type PulsarMessageId

type PulsarMessageId struct {
	// contains filtered or unexported fields
}

PulsarMessageId implements the pulsar.MessageID interface (which uniquely identifies a Pulsar message). We need this since the pulsar client library does not export a MessageID implementation. For PulsarMessageId, we provide, e.g., comparison functions.

func FromMessageId

func FromMessageId(id pulsar.MessageID) *PulsarMessageId

FromMessageId converts a pulsar.MessageID interface type to a *PulsarMessageId, which can be used, e.g., for comparison.

func New

func New(ledgerID, entryID int64, partitionIdx, batchIdx int32) *PulsarMessageId

func (*PulsarMessageId) BatchIdx

func (id *PulsarMessageId) BatchIdx() int32

func (*PulsarMessageId) EntryID

func (id *PulsarMessageId) EntryID() int64

func (*PulsarMessageId) Equal

func (id *PulsarMessageId) Equal(other *PulsarMessageId) (bool, error)

func (*PulsarMessageId) Greater

func (id *PulsarMessageId) Greater(other *PulsarMessageId) (bool, error)

Greater returns true if id occurred after other, or an error if the message ids are not comparable (i.e., if they are from different partitions).

func (*PulsarMessageId) GreaterEqual

func (id *PulsarMessageId) GreaterEqual(other *PulsarMessageId) (bool, error)

func (*PulsarMessageId) LedgerID

func (id *PulsarMessageId) LedgerID() int64

func (*PulsarMessageId) PartitionIdx

func (id *PulsarMessageId) PartitionIdx() int32

func (*PulsarMessageId) Serialize

func (id *PulsarMessageId) Serialize() []byte

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL