Documentation ¶
Index ¶
- Constants
- func GetSpanContext(ctx context.Context, attributes map[string]string) opentracing.SpanContext
- func IsErrorRecoverable(err error) bool
- func SpanContextOption(messageContext opentracing.SpanContext) opentracing.StartSpanOption
- type AddTopicPartitionRequest
- type AddTopicPartitionResponse
- type Admin
- type AdminClientOptions
- type AdminConfig
- type Broker
- type BrokerConfig
- type CommitOnTopicRequest
- type CommitOnTopicResponse
- type Consumer
- type ConsumerClientOptions
- type ConsumerConfig
- type CreateTopicRequest
- type CreateTopicResponse
- type DeleteTopicRequest
- type DeleteTopicResponse
- type GetMessagesFromTopicRequest
- type GetMessagesFromTopicResponse
- type GetTopicMetadataRequest
- type GetTopicMetadataResponse
- type KafkaBroker
- func (k *KafkaBroker) AddTopicPartitions(ctx context.Context, request AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)
- func (k *KafkaBroker) AlterTopicConfigs(ctx context.Context, request ModifyTopicConfigRequest) ([]string, error)
- func (k *KafkaBroker) Close(ctx context.Context) error
- func (k *KafkaBroker) CommitByMsgID(_ context.Context, _ CommitOnTopicRequest) (CommitOnTopicResponse, error)
- func (k *KafkaBroker) CommitByPartitionAndOffset(ctx context.Context, request CommitOnTopicRequest) (CommitOnTopicResponse, error)
- func (k *KafkaBroker) CreateTopic(ctx context.Context, request CreateTopicRequest) (CreateTopicResponse, error)
- func (k *KafkaBroker) DeleteTopic(ctx context.Context, request DeleteTopicRequest) (DeleteTopicResponse, error)
- func (k *KafkaBroker) DescribeTopicConfigs(ctx context.Context, names []string) (map[string]map[string]string, error)
- func (k *KafkaBroker) FetchConsumerLag(ctx context.Context) (map[string]uint64, error)
- func (k *KafkaBroker) FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)
- func (k *KafkaBroker) Flush(timeoutMs int) error
- func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error)
- func (k *KafkaBroker) IsClosed(_ context.Context) bool
- func (k *KafkaBroker) IsHealthy(ctx context.Context) (bool, error)
- func (k *KafkaBroker) Pause(ctx context.Context, request PauseOnTopicRequest) error
- func (k *KafkaBroker) ReceiveMessages(ctx context.Context, request GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error)
- func (k *KafkaBroker) Resume(_ context.Context, request ResumeOnTopicRequest) error
- func (k *KafkaBroker) SendMessage(ctx context.Context, request SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)
- func (k *KafkaBroker) Shutdown(ctx context.Context)
- type MessageHeader
- type ModifyTopicConfigRequest
- type PartitionOffset
- type PauseOnTopicRequest
- type Producer
- type ProducerClientOptions
- type ProducerConfig
- type PulsarBroker
- func (p *PulsarBroker) AddTopicPartitions(_ context.Context, _ AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)
- func (p *PulsarBroker) AlterTopicConfigs(ctx context.Context, request ModifyTopicConfigRequest) ([]string, error)
- func (p *PulsarBroker) Close(_ context.Context) error
- func (p *PulsarBroker) CommitByMsgID(ctx context.Context, request CommitOnTopicRequest) (CommitOnTopicResponse, error)
- func (p *PulsarBroker) CommitByPartitionAndOffset(_ context.Context, _ CommitOnTopicRequest) (CommitOnTopicResponse, error)
- func (p *PulsarBroker) CreateTopic(ctx context.Context, request CreateTopicRequest) (CreateTopicResponse, error)
- func (p *PulsarBroker) DeleteTopic(ctx context.Context, request DeleteTopicRequest) (DeleteTopicResponse, error)
- func (p *PulsarBroker) DescribeTopicConfigs(ctx context.Context, names []string) (map[string]map[string]string, error)
- func (p *PulsarBroker) FetchConsumerLag(ctx context.Context) (map[string]uint64, error)
- func (p *PulsarBroker) FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)
- func (p *PulsarBroker) Flush(timeoutMs int) error
- func (p *PulsarBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error)
- func (p *PulsarBroker) IsClosed(_ context.Context) bool
- func (p *PulsarBroker) IsHealthy(_ context.Context) (bool, error)
- func (p *PulsarBroker) Pause(_ context.Context, _ PauseOnTopicRequest) error
- func (p PulsarBroker) ReceiveMessages(ctx context.Context, request GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error)
- func (p *PulsarBroker) Resume(_ context.Context, _ ResumeOnTopicRequest) error
- func (p PulsarBroker) SendMessage(ctx context.Context, request SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)
- func (p *PulsarBroker) Shutdown(ctx context.Context)
- type ReceivedMessage
- type ResumeOnTopicRequest
- type SendMessageToTopicRequest
- type SendMessageToTopicResponse
- type TopicConfig
Constants ¶
const ( // Kafka identifier Kafka = "kafka" // Pulsar identifier Pulsar = "pulsar" )
const (
// UberTraceID contains tracing information of the publisher span
UberTraceID = "uber-trace-id"
)
Variables ¶
This section is empty.
Functions ¶
func GetSpanContext ¶
GetSpanContext will extract information from attributes and return a SpanContext
func SpanContextOption ¶
func SpanContextOption(messageContext opentracing.SpanContext) opentracing.StartSpanOption
SpanContextOption returns a StartSpanOption appropriate for a Consumer span with `messageContext` representing the metadata for the producer Span if available. otherwise it will be a root span
Types ¶
type AddTopicPartitionRequest ¶
AddTopicPartitionRequest ...
type AddTopicPartitionResponse ¶
type AddTopicPartitionResponse struct {
Response interface{}
}
AddTopicPartitionResponse ...
type Admin ¶
type Admin interface { // CreateTopic creates a new topic if not available CreateTopic(context.Context, CreateTopicRequest) (CreateTopicResponse, error) // DeleteTopic deletes an existing topic DeleteTopic(context.Context, DeleteTopicRequest) (DeleteTopicResponse, error) // AddTopicPartitions adds partitions to an existing topic AddTopicPartitions(context.Context, AddTopicPartitionRequest) (*AddTopicPartitionResponse, error) // AlterTopicConfigs alters topic configuration AlterTopicConfigs(context.Context, ModifyTopicConfigRequest) ([]string, error) // DescribeTopicConfigs describes topic configuration DescribeTopicConfigs(context.Context, []string) (map[string]map[string]string, error) // IsHealthy checks health of underlying broker IsHealthy(context.Context) (bool, error) // FetchProjectTopics fetches a list of all topics for a given project FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error) }
Admin for admin operations on topics, partitions, updating schema registry etc
func NewAdminClient ¶
func NewAdminClient(ctx context.Context, variant string, bConfig *BrokerConfig, options *AdminClientOptions) (Admin, error)
NewAdminClient returns an instance of a admin, kafka or pulsar
type AdminClientOptions ¶
type AdminClientOptions struct{}
AdminClientOptions holds client specific configuration for admin
type AdminConfig ¶
type AdminConfig struct {
EnableTopicCleanUp bool
}
AdminConfig holds configuration for admin APIs
type BrokerConfig ¶
type BrokerConfig struct { // A list of host/port pairs to use for establishing the initial connection to the broker cluster Brokers []string EnableTLS bool UserCertificate string UserKey string CACertificate string CertDir string Version string DebugEnabled bool OperationTimeoutMs int ConnectionTimeoutMs int Producer *ProducerConfig Consumer *ConsumerConfig Admin *AdminConfig }
BrokerConfig holds broker's configuration
type CommitOnTopicRequest ¶
CommitOnTopicRequest ...
type CommitOnTopicResponse ¶
type CommitOnTopicResponse struct {
Response interface{}
}
CommitOnTopicResponse ...
type Consumer ¶
type Consumer interface { // ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages" // from the previous committed offset. If the available messages in the queue are less, returns // how many ever messages are available ReceiveMessages(context.Context, GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error) // CommitByPartitionAndOffset Commits messages if any // This func will commit the message consumed // by all the previous calls to GetMessages CommitByPartitionAndOffset(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error) // CommitByMsgID Commits a message by ID CommitByMsgID(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error) // GetTopicMetadata gets the topic metadata GetTopicMetadata(context.Context, GetTopicMetadataRequest) (GetTopicMetadataResponse, error) // Pause pause the consumer Pause(context.Context, PauseOnTopicRequest) error // Resume resume the consumer Resume(context.Context, ResumeOnTopicRequest) error // FetchConsumerLag returns the watermark and current offset for a consumer FetchConsumerLag(context.Context) (map[string]uint64, error) // Close closes the consumer Close(context.Context) error }
Consumer interface for consuming messages
func NewConsumerClient ¶
func NewConsumerClient(ctx context.Context, variant string, bConfig *BrokerConfig, options *ConsumerClientOptions) (Consumer, error)
NewConsumerClient returns an instance of a consumer, kafka or pulsar
type ConsumerClientOptions ¶
type ConsumerClientOptions struct { // Specify a list of topics to consume messages from Topics []string // Specify the subscription name for this consumer. Only used for pulsar Subscription string // A unique string that identifies the consumer group this consumer belongs to. GroupID string // A unique identifier of the consumer instance provided by the end user. // Only non-empty strings are permitted. If set, the consumer is treated as a static member, // which means that only one instance with this ID is allowed in the consumer group at any time GroupInstanceID string // Specify the auto offset reset value in case no offset is committed yet. AutoOffsetReset string }
ConsumerClientOptions holds client specific configuration for consumer
type ConsumerConfig ¶
ConsumerConfig holds consumer's configuration
type CreateTopicRequest ¶
CreateTopicRequest ...
type CreateTopicResponse ¶
type CreateTopicResponse struct {
Response interface{}
}
CreateTopicResponse ...
type DeleteTopicRequest ¶
type DeleteTopicRequest struct { Name string Force bool // only required for pulsar and ignored for kafka NonPartitioned bool // only required for pulsar and ignored for kafka }
DeleteTopicRequest ...
type DeleteTopicResponse ¶
type DeleteTopicResponse struct {
Response interface{}
}
DeleteTopicResponse ...
type GetMessagesFromTopicRequest ¶
GetMessagesFromTopicRequest ...
type GetMessagesFromTopicResponse ¶
type GetMessagesFromTopicResponse struct {
Messages []ReceivedMessage
}
GetMessagesFromTopicResponse ...
func (*GetMessagesFromTopicResponse) HasNonZeroMessages ¶
func (g *GetMessagesFromTopicResponse) HasNonZeroMessages() bool
HasNonZeroMessages ...
type GetTopicMetadataRequest ¶
GetTopicMetadataRequest ...
type GetTopicMetadataResponse ¶
GetTopicMetadataResponse ...
type KafkaBroker ¶
type KafkaBroker struct { Producer *kafkapkg.Producer Consumer *kafkapkg.Consumer Admin *kafkapkg.AdminClient // holds the broker config Config *BrokerConfig // holds the client configs POptions *ProducerClientOptions COptions *ConsumerClientOptions AOptions *AdminClientOptions // contains filtered or unexported fields }
KafkaBroker for kafka
func (*KafkaBroker) AddTopicPartitions ¶
func (k *KafkaBroker) AddTopicPartitions(ctx context.Context, request AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)
AddTopicPartitions adds partitions to an existing topic NOTE: Use with extreme caution! Calling this will result in a consumer-group re-balance and could result in undesired behaviour if the topic is being used for ordered messages.
func (*KafkaBroker) AlterTopicConfigs ¶
func (k *KafkaBroker) AlterTopicConfigs(ctx context.Context, request ModifyTopicConfigRequest) ([]string, error)
AlterTopicConfigs alters the topic config
func (*KafkaBroker) Close ¶
func (k *KafkaBroker) Close(ctx context.Context) error
Close closes the consumer
func (*KafkaBroker) CommitByMsgID ¶
func (k *KafkaBroker) CommitByMsgID(_ context.Context, _ CommitOnTopicRequest) (CommitOnTopicResponse, error)
CommitByMsgID Commits a message by ID
func (*KafkaBroker) CommitByPartitionAndOffset ¶
func (k *KafkaBroker) CommitByPartitionAndOffset(ctx context.Context, request CommitOnTopicRequest) (CommitOnTopicResponse, error)
CommitByPartitionAndOffset Commits messages if any This func will commit the message consumed by all the previous calls to GetMessages
func (*KafkaBroker) CreateTopic ¶
func (k *KafkaBroker) CreateTopic(ctx context.Context, request CreateTopicRequest) (CreateTopicResponse, error)
CreateTopic creates a new topic if not available
func (*KafkaBroker) DeleteTopic ¶
func (k *KafkaBroker) DeleteTopic(ctx context.Context, request DeleteTopicRequest) (DeleteTopicResponse, error)
DeleteTopic deletes an existing topic
func (*KafkaBroker) DescribeTopicConfigs ¶
func (k *KafkaBroker) DescribeTopicConfigs(ctx context.Context, names []string) (map[string]map[string]string, error)
DescribeTopicConfigs describes the topic config
func (*KafkaBroker) FetchConsumerLag ¶
FetchConsumerLag calculates consumer lag for all assigned partitions
func (*KafkaBroker) FetchProjectTopics ¶
func (k *KafkaBroker) FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)
FetchProjectTopics fetches a list of all topics for a given project
func (*KafkaBroker) Flush ¶
func (k *KafkaBroker) Flush(timeoutMs int) error
Flush flushes the producer buffer
func (*KafkaBroker) GetTopicMetadata ¶
func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error)
GetTopicMetadata fetches the given topics metadata stored in the broker
func (*KafkaBroker) IsClosed ¶
func (k *KafkaBroker) IsClosed(_ context.Context) bool
IsClosed checks if producer has been closed
func (*KafkaBroker) IsHealthy ¶
func (k *KafkaBroker) IsHealthy(ctx context.Context) (bool, error)
IsHealthy checks the health of the kafka
func (*KafkaBroker) Pause ¶
func (k *KafkaBroker) Pause(ctx context.Context, request PauseOnTopicRequest) error
Pause pause the consumer
func (*KafkaBroker) ReceiveMessages ¶
func (k *KafkaBroker) ReceiveMessages(ctx context.Context, request GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error)
ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages" from the previous committed offset. If the available messages in the queue are less, returns how many ever messages are available
func (*KafkaBroker) Resume ¶
func (k *KafkaBroker) Resume(_ context.Context, request ResumeOnTopicRequest) error
Resume resume the consumer
func (*KafkaBroker) SendMessage ¶
func (k *KafkaBroker) SendMessage(ctx context.Context, request SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)
SendMessage sends a message on the topic
func (*KafkaBroker) Shutdown ¶
func (k *KafkaBroker) Shutdown(ctx context.Context)
Shutdown closes the producer
type MessageHeader ¶
type MessageHeader struct { // the unique identifier of each broker message MessageID string // time at which the message was pushed onto the broker PublishTime time.Time // the first topic from where the message originated from // the message eventually can cycle through multiple delay and dead-letter topics SourceTopic string // the current retry topic through which the message is read from RetryTopic string // the subscription name from where this message originated Subscription string // the number of retries already attempted for the current message CurrentRetryCount int32 // max attempted based on the dead-letter policy configured in the subscription MaxRetryCount int32 // the current topic where the message is being read from CurrentTopic string // the delay with which retry began InitialDelayInterval uint // the current delay calculated being after applying backoff CurrentDelayInterval uint // the actual interval being used from the pre-defined list of intervals ClosestDelayInterval uint // the dead letter topic name where is message will be pushed after exhausted all retries DeadLetterTopic string // the time interval after which this message can be read by the delay-consumer on a retry cycle NextDeliveryTime time.Time // Sequence number for ordered message delivery CurrentSequence int32 // sequence number of the previous message in ordered message delivery PrevSequence int32 }
MessageHeader contains the fields passed around in the message headers
type ModifyTopicConfigRequest ¶
type ModifyTopicConfigRequest struct {
TopicConfigs []TopicConfig
}
ModifyTopicConfigRequest ...
func NewModifyConfigRequest ¶
func NewModifyConfigRequest(topicConfigsMap map[string]TopicConfig) ModifyTopicConfigRequest
NewModifyConfigRequest ...
type PartitionOffset ¶
PartitionOffset ...
func NewPartitionOffset ¶
func NewPartitionOffset(partition, offset int32) PartitionOffset
NewPartitionOffset ...
func (PartitionOffset) String ¶
func (po PartitionOffset) String() string
type PauseOnTopicRequest ¶
PauseOnTopicRequest ...
type Producer ¶
type Producer interface { // SendMessage sends a message on the topic SendMessage(context.Context, SendMessageToTopicRequest) (*SendMessageToTopicResponse, error) // IsClosed checks if producer has been closed IsClosed(context.Context) bool // Shutdown closes the producer Shutdown(context.Context) // Flush flushes the producer buffer Flush(timeoutMs int) error }
Producer for produce operations
func NewProducerClient ¶
func NewProducerClient(ctx context.Context, variant string, bConfig *BrokerConfig, options *ProducerClientOptions) (Producer, error)
NewProducerClient returns an instance of a producer, kafka or pulsar
type ProducerClientOptions ¶
ProducerClientOptions holds client specific configuration for producer
type ProducerConfig ¶
type ProducerConfig struct { RetryBackoff time.Duration Partitioner string MaxRetry int MaxMessages int CompressionEnabled bool CompressionType string RetryAck string }
ProducerConfig holds producer's configuration'
type PulsarBroker ¶
type PulsarBroker struct { Consumer pulsar.Consumer Producer pulsar.Producer Admin pulsarctl.Client // holds the broker config Config *BrokerConfig // holds the client configs POptions *ProducerClientOptions COptions *ConsumerClientOptions AOptions *AdminClientOptions }
PulsarBroker for pulsar
func (*PulsarBroker) AddTopicPartitions ¶
func (p *PulsarBroker) AddTopicPartitions(_ context.Context, _ AddTopicPartitionRequest) (*AddTopicPartitionResponse, error)
AddTopicPartitions adds partitions to an existing topic
func (*PulsarBroker) AlterTopicConfigs ¶
func (p *PulsarBroker) AlterTopicConfigs(ctx context.Context, request ModifyTopicConfigRequest) ([]string, error)
AlterTopicConfigs alters the topic config
func (*PulsarBroker) Close ¶
func (p *PulsarBroker) Close(_ context.Context) error
Close closes the consumer
func (*PulsarBroker) CommitByMsgID ¶
func (p *PulsarBroker) CommitByMsgID(ctx context.Context, request CommitOnTopicRequest) (CommitOnTopicResponse, error)
CommitByMsgID Commits a message by ID
func (*PulsarBroker) CommitByPartitionAndOffset ¶
func (p *PulsarBroker) CommitByPartitionAndOffset(_ context.Context, _ CommitOnTopicRequest) (CommitOnTopicResponse, error)
CommitByPartitionAndOffset Commits messages if any This func will commit the message consumed by all the previous calls to GetMessages
func (*PulsarBroker) CreateTopic ¶
func (p *PulsarBroker) CreateTopic(ctx context.Context, request CreateTopicRequest) (CreateTopicResponse, error)
CreateTopic creates a new topic if not available
func (*PulsarBroker) DeleteTopic ¶
func (p *PulsarBroker) DeleteTopic(ctx context.Context, request DeleteTopicRequest) (DeleteTopicResponse, error)
DeleteTopic deletes an existing topic
func (*PulsarBroker) DescribeTopicConfigs ¶
func (p *PulsarBroker) DescribeTopicConfigs(ctx context.Context, names []string) (map[string]map[string]string, error)
DescribeTopicConfigs describes the topic config
func (*PulsarBroker) FetchConsumerLag ¶
FetchConsumerLag ...
func (*PulsarBroker) FetchProjectTopics ¶
func (p *PulsarBroker) FetchProjectTopics(ctx context.Context, project string) (map[string]bool, error)
FetchProjectTopics fetches a list of all topics for a given project
func (*PulsarBroker) Flush ¶
func (p *PulsarBroker) Flush(timeoutMs int) error
Flush flushes the producer buffer
func (*PulsarBroker) GetTopicMetadata ¶
func (p *PulsarBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error)
GetTopicMetadata ...
func (*PulsarBroker) IsClosed ¶
func (p *PulsarBroker) IsClosed(_ context.Context) bool
IsClosed checks if producer has been closed
func (*PulsarBroker) IsHealthy ¶
func (p *PulsarBroker) IsHealthy(_ context.Context) (bool, error)
IsHealthy checks the health of pulsar
func (*PulsarBroker) Pause ¶
func (p *PulsarBroker) Pause(_ context.Context, _ PauseOnTopicRequest) error
Pause pause the consumer
func (PulsarBroker) ReceiveMessages ¶
func (p PulsarBroker) ReceiveMessages(ctx context.Context, request GetMessagesFromTopicRequest) (*GetMessagesFromTopicResponse, error)
ReceiveMessages gets tries to get the number of messages mentioned in the param "numOfMessages" from the previous committed offset. If the available messages in the queue are less, returns how many ever messages are available
func (*PulsarBroker) Resume ¶
func (p *PulsarBroker) Resume(_ context.Context, _ ResumeOnTopicRequest) error
Resume resume the consumer
func (PulsarBroker) SendMessage ¶
func (p PulsarBroker) SendMessage(ctx context.Context, request SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)
SendMessage sends a message on the topic
func (*PulsarBroker) Shutdown ¶
func (p *PulsarBroker) Shutdown(ctx context.Context)
Shutdown closes the producer
type ReceivedMessage ¶
type ReceivedMessage struct { Data []byte Topic string Partition int32 Offset int32 OrderingKey string Attributes []map[string][]byte MessageHeader }
ReceivedMessage ...
func (ReceivedMessage) CanProcessMessage ¶
func (rm ReceivedMessage) CanProcessMessage() bool
CanProcessMessage a message can be processed only: if current current time is greater than the next delivery time OR the number of allowed retries have been exhausted
func (ReceivedMessage) HasReachedRetryThreshold ¶
func (rm ReceivedMessage) HasReachedRetryThreshold() bool
HasReachedRetryThreshold ...
func (ReceivedMessage) RequiresOrdering ¶
func (rm ReceivedMessage) RequiresOrdering() bool
RequiresOrdering checks if the message contains an ordering key
type ResumeOnTopicRequest ¶
ResumeOnTopicRequest ...
type SendMessageToTopicRequest ¶
type SendMessageToTopicRequest struct { Topic string Message []byte OrderingKey string TimeoutMs int Attributes []map[string][]byte MessageHeader }
SendMessageToTopicRequest ...
type SendMessageToTopicResponse ¶
type SendMessageToTopicResponse struct {
MessageID string
}
SendMessageToTopicResponse ...