messaging

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2020 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMessageSizeLimit indicate that message is rejected by server due to size limitation
	ErrMessageSizeLimit = errors.New("message was too large, server rejected it to avoid allocation error")
)

Functions

func CreateTLSConfig added in v0.6.0

func CreateTLSConfig(tlsConfig auth.TLS) (*tls.Config, error)

CreateTLSConfig return tls config

Types

type Client

type Client interface {
	NewConsumer(appName, consumerName string, concurrency int) (Consumer, error)
	NewConsumerWithClusterName(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error)
	NewProducer(appName string) (Producer, error)
	NewProducerWithClusterName(sourceCluster string) (Producer, error)
}

Client is the interface used to abstract out interaction with messaging system for replication

func NewKafkaClient added in v0.4.0

func NewKafkaClient(kc *KafkaConfig, metricsClient metrics.Client, zLogger *zap.Logger, logger log.Logger, metricScope tally.Scope,
	checkCluster, checkApp bool) Client

NewKafkaClient is used to create an instance of KafkaClient

type CloseableProducer added in v0.27.0

type CloseableProducer interface {
	Producer
	Close() error
}

CloseableProducer is a Producer that can be closed

type ClusterConfig

type ClusterConfig struct {
	Brokers []string `yaml:"brokers"`
}

ClusterConfig describes the configuration for a single Kafka cluster

type Consumer added in v0.4.0

type Consumer interface {
	// Start starts the consumer
	Start() error
	// Stop stops the consumer
	Stop()
	// Messages return the message channel for this consumer
	Messages() <-chan Message
}

Consumer is the unified interface for both internal and external kafka clients

type KafkaConfig

type KafkaConfig struct {
	TLS            auth.TLS                 `yaml:"tls"`
	Clusters       map[string]ClusterConfig `yaml:"clusters"`
	Topics         map[string]TopicConfig   `yaml:"topics"`
	ClusterToTopic map[string]TopicList     `yaml:"temporal-cluster-topics"`
	Applications   map[string]TopicList     `yaml:"applications"`
}

KafkaConfig describes the configuration needed to connect to all kafka clusters

func (*KafkaConfig) Validate added in v0.4.0

func (k *KafkaConfig) Validate(checkCluster bool, checkApp bool)

Validate will validate config for kafka

type Message added in v0.4.0

type Message interface {
	// Value is a mutable reference to the message's value
	Value() []byte
	// Partition is the ID of the partition from which the message was read.
	Partition() int32
	// Offset is the message's offset.
	Offset() int64
	// Ack marks the message as successfully processed.
	Ack() error
	// Nack marks the message processing as failed and the message will be retried or sent to DLQ.
	Nack() error
}

Message is the unified interface for a Kafka message

type Producer

type Producer interface {
	Publish(message interface{}) error
}

Producer is the interface used to send replication tasks to other clusters through replicator

func NewKafkaProducer

func NewKafkaProducer(topic string, producer sarama.SyncProducer, logger log.Logger) Producer

NewKafkaProducer is used to create the Kafka based producer implementation

func NewMetricProducer added in v0.5.0

func NewMetricProducer(producer Producer,
	metricsClient metrics.Client) Producer

NewMetricProducer creates a new instance of producer that emits metrics

type TopicConfig

type TopicConfig struct {
	Cluster string `yaml:"cluster"`
}

TopicConfig describes the mapping from topic to Kafka cluster

type TopicList added in v0.3.12

type TopicList struct {
	Topic      string `yaml:"topic"`
	RetryTopic string `yaml:"retry-topic"`
	DLQTopic   string `yaml:"dlq-topic"`
}

TopicList describes the topic names for each cluster

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL