messagebroker

package
v0.0.0-...-58c0a64 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Kafka identifier
	Kafka = "kafka"
	// Pulsar identifier
	Pulsar = "pulsar"
)
View Source
const (

	// UberTraceID contains tracing information of the publisher span
	UberTraceID = "uber-trace-id"
)

Variables

This section is empty.

Functions

func GetSpanContext

func GetSpanContext(ctx context.Context, attributes map[string]string) opentracing.SpanContext

GetSpanContext will extract information from attributes and return a SpanContext

func IsErrorRecoverable

func IsErrorRecoverable(err error) bool

IsErrorRecoverable ...

func SpanContextOption

func SpanContextOption(messageContext opentracing.SpanContext) opentracing.StartSpanOption

SpanContextOption returns a StartSpanOption appropriate for a Consumer span with `messageContext` representing the metadata for the producer Span if available. otherwise it will be a root span

Types

type AddTopicPartitionRequest

type AddTopicPartitionRequest struct {
	Name          string
	NumPartitions int
}

AddTopicPartitionRequest ...

type AddTopicPartitionResponse

type AddTopicPartitionResponse struct {
	Response interface{}
}

AddTopicPartitionResponse ...

type Admin

type Admin interface {
	// CreateTopic creates a new topic if not available
	CreateTopic(context.Context, CreateTopicRequest) (CreateTopicResponse, error)

	// DeleteTopic deletes an existing topic
	DeleteTopic(context.Context, DeleteTopicRequest) (DeleteTopicResponse, error)

	// AddTopicPartitions adds partitions to an existing topic
	AddTopicPartitions(context.Context, AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)

	// AlterTopicConfigs alters topic configuration
	AlterTopicConfigs(context.Context, ModifyTopicConfigRequest) ([]string, error)

	// DescribeTopicConfigs describes topic configuration
	DescribeTopicConfigs(context.Context, []string) (map[string]map[string]string, error)

	// IsHealthy checks health of underlying broker
	IsHealthy(context.Context) (bool, error)

	// FetchProjectTopics fetches a list of all topics for a given project
	FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)
}

Admin for admin operations on topics, partitions, updating schema registry etc

func NewAdminClient

func NewAdminClient(ctx context.Context, variant string, bConfig *BrokerConfig, options *AdminClientOptions) (Admin, error)

NewAdminClient returns an instance of a admin, kafka or pulsar

type AdminClientOptions

type AdminClientOptions struct{}

AdminClientOptions holds client specific configuration for admin

type AdminConfig

type AdminConfig struct {
	EnableTopicCleanUp bool
}

AdminConfig holds configuration for admin APIs

type Broker

type Broker interface {
	Admin
	Producer
	Consumer
}

Broker interface for all types of ops

type BrokerConfig

type BrokerConfig struct {
	// A list of host/port pairs to use for establishing the initial connection to the broker cluster
	Brokers             []string
	EnableTLS           bool
	UserCertificate     string
	UserKey             string
	CACertificate       string
	CertDir             string
	Version             string
	DebugEnabled        bool
	OperationTimeoutMs  int
	ConnectionTimeoutMs int
	Producer            *ProducerConfig
	Consumer            *ConsumerConfig
	Admin               *AdminConfig
}

BrokerConfig holds broker's configuration

type CommitOnTopicRequest

type CommitOnTopicRequest struct {
	Topic     string
	Partition int32
	Offset    int32
	ID        string
}

CommitOnTopicRequest ...

type CommitOnTopicResponse

type CommitOnTopicResponse struct {
	Response interface{}
}

CommitOnTopicResponse ...

type Consumer

type Consumer interface {
	// ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages"
	// from the previous committed offset. If the available messages in the queue are less, returns
	// how many ever messages are available
	ReceiveMessages(context.Context, GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error)

	// CommitByPartitionAndOffset Commits messages if any
	// This func will commit the message consumed
	// by all the previous calls to GetMessages
	CommitByPartitionAndOffset(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error)

	// CommitByMsgID Commits a message by ID
	CommitByMsgID(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error)

	// GetTopicMetadata gets the topic metadata
	GetTopicMetadata(context.Context, GetTopicMetadataRequest) (GetTopicMetadataResponse, error)

	// Pause pause the consumer
	Pause(context.Context, PauseOnTopicRequest) error

	// Resume resume the consumer
	Resume(context.Context, ResumeOnTopicRequest) error

	// FetchConsumerLag returns the watermark and current offset for a consumer
	FetchConsumerLag(context.Context) (map[string]uint64, error)

	// Close closes the consumer
	Close(context.Context) error
}

Consumer interface for consuming messages

func NewConsumerClient

func NewConsumerClient(ctx context.Context, variant string, bConfig *BrokerConfig, options *ConsumerClientOptions) (Consumer, error)

NewConsumerClient returns an instance of a consumer, kafka or pulsar

type ConsumerClientOptions

type ConsumerClientOptions struct {
	// Specify a list of topics to consume messages from
	Topics []string
	// Specify the subscription name for this consumer. Only used for pulsar
	Subscription string
	// A unique string that identifies the consumer group this consumer belongs to.
	GroupID string
	// A unique identifier of the consumer instance provided by the end user.
	// Only non-empty strings are permitted. If set, the consumer is treated as a static member,
	// which means that only one instance with this ID is allowed in the consumer group at any time
	GroupInstanceID string
	// Specify the auto offset reset value in case no offset is committed yet.
	AutoOffsetReset string
}

ConsumerClientOptions holds client specific configuration for consumer

type ConsumerConfig

type ConsumerConfig struct {
	SubscriptionType int
	OffsetReset      string
	EnableAutoCommit bool
}

ConsumerConfig holds consumer's configuration

type CreateTopicRequest

type CreateTopicRequest struct {
	Name          string
	NumPartitions int
	Config        map[string]string
}

CreateTopicRequest ...

type CreateTopicResponse

type CreateTopicResponse struct {
	Response interface{}
}

CreateTopicResponse ...

type DeleteTopicRequest

type DeleteTopicRequest struct {
	Name           string
	Force          bool //  only required for pulsar and ignored for kafka
	NonPartitioned bool //  only required for pulsar and ignored for kafka
}

DeleteTopicRequest ...

type DeleteTopicResponse

type DeleteTopicResponse struct {
	Response interface{}
}

DeleteTopicResponse ...

type GetMessagesFromTopicRequest

type GetMessagesFromTopicRequest struct {
	NumOfMessages int32
	TimeoutMs     int
}

GetMessagesFromTopicRequest ...

type GetMessagesFromTopicResponse

type GetMessagesFromTopicResponse struct {
	Messages []ReceivedMessage
}

GetMessagesFromTopicResponse ...

func (*GetMessagesFromTopicResponse) HasNonZeroMessages

func (g *GetMessagesFromTopicResponse) HasNonZeroMessages() bool

HasNonZeroMessages ...

type GetTopicMetadataRequest

type GetTopicMetadataRequest struct {
	Topic     string
	Partition int32
}

GetTopicMetadataRequest ...

type GetTopicMetadataResponse

type GetTopicMetadataResponse struct {
	Topic     string
	Partition int32
	Offset    int32
}

GetTopicMetadataResponse ...

type KafkaBroker

type KafkaBroker struct {
	Producer *kafkapkg.Producer
	Consumer *kafkapkg.Consumer
	Admin    *kafkapkg.AdminClient

	// holds the broker config
	Config *BrokerConfig

	// holds the client configs
	POptions *ProducerClientOptions
	COptions *ConsumerClientOptions
	AOptions *AdminClientOptions
	// contains filtered or unexported fields
}

KafkaBroker for kafka

func (*KafkaBroker) AddTopicPartitions

func (k *KafkaBroker) AddTopicPartitions(ctx context.Context, request AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)

AddTopicPartitions adds partitions to an existing topic NOTE: Use with extreme caution! Calling this will result in a consumer-group re-balance and could result in undesired behaviour if the topic is being used for ordered messages.

func (*KafkaBroker) AlterTopicConfigs

func (k *KafkaBroker) AlterTopicConfigs(ctx context.Context, request ModifyTopicConfigRequest) ([]string, error)

AlterTopicConfigs alters the topic config

func (*KafkaBroker) Close

func (k *KafkaBroker) Close(ctx context.Context) error

Close closes the consumer

func (*KafkaBroker) CommitByMsgID

CommitByMsgID Commits a message by ID

func (*KafkaBroker) CommitByPartitionAndOffset

func (k *KafkaBroker) CommitByPartitionAndOffset(ctx context.Context, request CommitOnTopicRequest) (CommitOnTopicResponse, error)

CommitByPartitionAndOffset Commits messages if any This func will commit the message consumed by all the previous calls to GetMessages

func (*KafkaBroker) CreateTopic

func (k *KafkaBroker) CreateTopic(ctx context.Context, request CreateTopicRequest) (CreateTopicResponse, error)

CreateTopic creates a new topic if not available

func (*KafkaBroker) DeleteTopic

func (k *KafkaBroker) DeleteTopic(ctx context.Context, request DeleteTopicRequest) (DeleteTopicResponse, error)

DeleteTopic deletes an existing topic

func (*KafkaBroker) DescribeTopicConfigs

func (k *KafkaBroker) DescribeTopicConfigs(ctx context.Context, names []string) (map[string]map[string]string, error)

DescribeTopicConfigs describes the topic config

func (*KafkaBroker) FetchConsumerLag

func (k *KafkaBroker) FetchConsumerLag(ctx context.Context) (map[string]uint64, error)

FetchConsumerLag calculates consumer lag for all assigned partitions

func (*KafkaBroker) FetchProjectTopics

func (k *KafkaBroker) FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)

FetchProjectTopics fetches a list of all topics for a given project

func (*KafkaBroker) Flush

func (k *KafkaBroker) Flush(timeoutMs int) error

Flush flushes the producer buffer

func (*KafkaBroker) GetTopicMetadata

GetTopicMetadata fetches the given topics metadata stored in the broker

func (*KafkaBroker) IsClosed

func (k *KafkaBroker) IsClosed(_ context.Context) bool

IsClosed checks if producer has been closed

func (*KafkaBroker) IsHealthy

func (k *KafkaBroker) IsHealthy(ctx context.Context) (bool, error)

IsHealthy checks the health of the kafka

func (*KafkaBroker) Pause

func (k *KafkaBroker) Pause(ctx context.Context, request PauseOnTopicRequest) error

Pause pause the consumer

func (*KafkaBroker) ReceiveMessages

ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages" from the previous committed offset. If the available messages in the queue are less, returns how many ever messages are available

func (*KafkaBroker) Resume

func (k *KafkaBroker) Resume(_ context.Context, request ResumeOnTopicRequest) error

Resume resume the consumer

func (*KafkaBroker) SendMessage

SendMessage sends a message on the topic

func (*KafkaBroker) Shutdown

func (k *KafkaBroker) Shutdown(ctx context.Context)

Shutdown closes the producer

type MessageHeader

type MessageHeader struct {
	// the unique identifier of each broker message
	MessageID string
	// time at which the message was pushed onto the broker
	PublishTime time.Time
	// the first topic from where the message originated from
	// the message eventually can cycle through multiple delay and dead-letter topics
	SourceTopic string
	// the current retry topic through which the message is read from
	RetryTopic string
	// the subscription name from where this message originated
	Subscription string
	// the number of retries already attempted for the current message
	CurrentRetryCount int32
	// max attempted based on the dead-letter policy configured in the subscription
	MaxRetryCount int32
	// the current topic where the message is being read from
	CurrentTopic string
	// the delay with which retry began
	InitialDelayInterval uint
	// the current delay calculated being after applying backoff
	CurrentDelayInterval uint
	// the actual interval being used from the pre-defined list of intervals
	ClosestDelayInterval uint
	// the dead letter topic name where is message will be pushed after exhausted all retries
	DeadLetterTopic string
	// the time interval after which this message can be read by the delay-consumer on a retry cycle
	NextDeliveryTime time.Time
	// Sequence number for ordered message delivery
	CurrentSequence int32
	// sequence number of the previous message in ordered message delivery
	PrevSequence int32
}

MessageHeader contains the fields passed around in the message headers

func (MessageHeader) LogFields

func (mh MessageHeader) LogFields() []interface{}

LogFields ...

type ModifyTopicConfigRequest

type ModifyTopicConfigRequest struct {
	TopicConfigs []TopicConfig
}

ModifyTopicConfigRequest ...

func NewModifyConfigRequest

func NewModifyConfigRequest(topicConfigsMap map[string]TopicConfig) ModifyTopicConfigRequest

NewModifyConfigRequest ...

type PartitionOffset

type PartitionOffset struct {
	Partition int32
	Offset    int32
}

PartitionOffset ...

func NewPartitionOffset

func NewPartitionOffset(partition, offset int32) PartitionOffset

NewPartitionOffset ...

func (PartitionOffset) String

func (po PartitionOffset) String() string

type PauseOnTopicRequest

type PauseOnTopicRequest struct {
	Topic     string
	Partition int32
}

PauseOnTopicRequest ...

type Producer

type Producer interface {
	// SendMessage sends a message on the topic
	SendMessage(context.Context, SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)

	// IsClosed checks if producer has been closed
	IsClosed(context.Context) bool

	// Shutdown closes the producer
	Shutdown(context.Context)

	// Flush flushes the producer buffer
	Flush(timeoutMs int) error
}

Producer for produce operations

func NewProducerClient

func NewProducerClient(ctx context.Context, variant string, bConfig *BrokerConfig, options *ProducerClientOptions) (Producer, error)

NewProducerClient returns an instance of a producer, kafka or pulsar

type ProducerClientOptions

type ProducerClientOptions struct {
	Topic     string
	TimeoutMs int64
}

ProducerClientOptions holds client specific configuration for producer

type ProducerConfig

type ProducerConfig struct {
	RetryBackoff       time.Duration
	Partitioner        string
	MaxRetry           int
	MaxMessages        int
	CompressionEnabled bool
	CompressionType    string
	RetryAck           string
}

ProducerConfig holds producer's configuration'

type PulsarBroker

type PulsarBroker struct {
	Consumer pulsar.Consumer
	Producer pulsar.Producer
	Admin    pulsarctl.Client

	// holds the broker config
	Config *BrokerConfig

	// holds the client configs
	POptions *ProducerClientOptions
	COptions *ConsumerClientOptions
	AOptions *AdminClientOptions
}

PulsarBroker for pulsar

func (*PulsarBroker) AddTopicPartitions

AddTopicPartitions adds partitions to an existing topic

func (*PulsarBroker) AlterTopicConfigs

func (p *PulsarBroker) AlterTopicConfigs(ctx context.Context, request ModifyTopicConfigRequest) ([]string, error)

AlterTopicConfigs alters the topic config

func (*PulsarBroker) Close

func (p *PulsarBroker) Close(_ context.Context) error

Close closes the consumer

func (*PulsarBroker) CommitByMsgID

func (p *PulsarBroker) CommitByMsgID(ctx context.Context, request CommitOnTopicRequest) (CommitOnTopicResponse, error)

CommitByMsgID Commits a message by ID

func (*PulsarBroker) CommitByPartitionAndOffset

func (p *PulsarBroker) CommitByPartitionAndOffset(_ context.Context, _ CommitOnTopicRequest) (CommitOnTopicResponse, error)

CommitByPartitionAndOffset Commits messages if any This func will commit the message consumed by all the previous calls to GetMessages

func (*PulsarBroker) CreateTopic

func (p *PulsarBroker) CreateTopic(ctx context.Context, request CreateTopicRequest) (CreateTopicResponse, error)

CreateTopic creates a new topic if not available

func (*PulsarBroker) DeleteTopic

func (p *PulsarBroker) DeleteTopic(ctx context.Context, request DeleteTopicRequest) (DeleteTopicResponse, error)

DeleteTopic deletes an existing topic

func (*PulsarBroker) DescribeTopicConfigs

func (p *PulsarBroker) DescribeTopicConfigs(ctx context.Context, names []string) (map[string]map[string]string, error)

DescribeTopicConfigs describes the topic config

func (*PulsarBroker) FetchConsumerLag

func (p *PulsarBroker) FetchConsumerLag(ctx context.Context) (map[string]uint64, error)

FetchConsumerLag ...

func (*PulsarBroker) FetchProjectTopics

func (p *PulsarBroker) FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)

FetchProjectTopics fetches a list of all topics for a given project

func (*PulsarBroker) Flush

func (p *PulsarBroker) Flush(timeoutMs int) error

Flush flushes the producer buffer

func (*PulsarBroker) GetTopicMetadata

GetTopicMetadata ...

func (*PulsarBroker) IsClosed

func (p *PulsarBroker) IsClosed(_ context.Context) bool

IsClosed checks if producer has been closed

func (*PulsarBroker) IsHealthy

func (p *PulsarBroker) IsHealthy(_ context.Context) (bool, error)

IsHealthy checks the health of pulsar

func (*PulsarBroker) Pause

Pause pause the consumer

func (PulsarBroker) ReceiveMessages

ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages" from the previous committed offset. If the available messages in the queue are less, returns how many ever messages are available

func (*PulsarBroker) Resume

Resume resume the consumer

func (PulsarBroker) SendMessage

SendMessage sends a message on the topic

func (*PulsarBroker) Shutdown

func (p *PulsarBroker) Shutdown(ctx context.Context)

Shutdown closes the producer

type ReceivedMessage

type ReceivedMessage struct {
	Data        []byte
	Topic       string
	Partition   int32
	Offset      int32
	OrderingKey string
	Attributes  []map[string][]byte
	MessageHeader
}

ReceivedMessage ...

func (ReceivedMessage) CanProcessMessage

func (rm ReceivedMessage) CanProcessMessage() bool

CanProcessMessage a message can be processed only: if current current time is greater than the next delivery time OR the number of allowed retries have been exhausted

func (ReceivedMessage) HasReachedRetryThreshold

func (rm ReceivedMessage) HasReachedRetryThreshold() bool

HasReachedRetryThreshold ...

func (ReceivedMessage) RequiresOrdering

func (rm ReceivedMessage) RequiresOrdering() bool

RequiresOrdering checks if the message contains an ordering key

type ResumeOnTopicRequest

type ResumeOnTopicRequest struct {
	Topic     string
	Partition int32
}

ResumeOnTopicRequest ...

type SendMessageToTopicRequest

type SendMessageToTopicRequest struct {
	Topic       string
	Message     []byte
	OrderingKey string
	TimeoutMs   int
	Attributes  []map[string][]byte
	MessageHeader
}

SendMessageToTopicRequest ...

type SendMessageToTopicResponse

type SendMessageToTopicResponse struct {
	MessageID string
}

SendMessageToTopicResponse ...

type TopicConfig

type TopicConfig struct {
	Name   string
	Config map[string]string
}

TopicConfig ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL