Documentation ¶
Index ¶
- Constants
- func MarshalKafkaHeaders(msg *quark.Message) []sarama.RecordHeader
- func MarshalKafkaMessage(msg *quark.Message) *sarama.ProducerMessage
- func NewKafkaBroker(cfg *sarama.Config, opts ...quark.Option) *quark.Broker
- func NewKafkaHeader(msg *sarama.ConsumerMessage) quark.Header
- func UnmarshalKafkaHeaders(headers []*sarama.RecordHeader, msg *quark.Message)
- func UnmarshalKafkaMessage(msgKafka *sarama.ConsumerMessage, msg *quark.Message)
- type KafkaConfiguration
- type KafkaConsumerConfig
- type KafkaConsumerTopicConfig
- type KafkaPartitionConsumer
- type KafkaProducerConfig
- type KafkaPublisher
Constants ¶
const ( // HeaderPartition Topic partition where Message was stored in Apache Kafka commit log HeaderPartition = "quark-kafka-partition" // HeaderOffset Topic partition offset, item number inside an specific Topic partition HeaderOffset = "quark-kafka-offset" // HeaderKey Message key on a Apache Kafka Topic, used to group multiple messages by its ID, it can be also used // to compact commit logs by updating and then deleting previous item versions (removes duplicates) HeaderKey = "quark-kafka-key" // HeaderValue Apache Kafka message body in binary format HeaderValue = "quark-kafka-value" // HeaderTimestamp Apache Kafka insertion time HeaderTimestamp = "quark-kafka-timestamp" // HeaderBlockTimestamp Apache Kafka producer insertion time HeaderBlockTimestamp = "quark-kafka-block-timestamp" // HeaderMemberId Member unique identifier from an Apache Kafka Consumer Group HeaderMemberId = "quark-kafka-member-id" // HeaderGenerationId Unique identifier when Topic partition list is requested when joining an Apache Kafka // Consumer Group HeaderGenerationId = "quark-kafka-generation-id" // HeaderHighWaterMarkOffset Last message that was successfully copied to all of the log’s replicas in an Apache // Kafka cluster HeaderHighWaterMarkOffset = "quark-kafka-high-water-mark-offset" )
Variables ¶
This section is empty.
Functions ¶
func MarshalKafkaHeaders ¶
func MarshalKafkaHeaders(msg *quark.Message) []sarama.RecordHeader
MarshalKafkaHeaders parses the given Message and its metadata into Apache Kafka's header types
func MarshalKafkaMessage ¶
func MarshalKafkaMessage(msg *quark.Message) *sarama.ProducerMessage
MarshalKafkaMessage parses the given Message into a Apache Kafka producer message
func NewKafkaBroker ¶
NewKafkaBroker allocates and returns a Kafka Broker
func NewKafkaHeader ¶
func NewKafkaHeader(msg *sarama.ConsumerMessage) quark.Header
NewKafkaHeader creates a Message Header from an Apache Kafka message
func UnmarshalKafkaHeaders ¶
func UnmarshalKafkaHeaders(headers []*sarama.RecordHeader, msg *quark.Message)
UnmarshalKafkaHeaders parses the given Apache Kafka headers into the given Quark Message
func UnmarshalKafkaMessage ¶
func UnmarshalKafkaMessage(msgKafka *sarama.ConsumerMessage, msg *quark.Message)
UnmarshalKafkaMessage parses the given Apache Kafka message into a Message
Types ¶
type KafkaConfiguration ¶
type KafkaConfiguration struct { Config *sarama.Config Consumer KafkaConsumerConfig Producer KafkaProducerConfig }
KafkaConfiguration Apache Kafka specific Broker and Consumer configuration, overrides default values, contains from basic configuration to functions serving as Hooks when an action was dispatched
type KafkaConsumerConfig ¶
type KafkaConsumerConfig struct { GroupHandler sarama.ConsumerGroupHandler PartitionHandler KafkaPartitionConsumer Topic KafkaConsumerTopicConfig // Hooks OnReceived func(context.Context, *sarama.ConsumerMessage) }
KafkaConsumerConfig Apache Kafka consumer configuration
type KafkaConsumerTopicConfig ¶
KafkaConsumerTopicConfig Apache Kafka configuration used to override default consuming values
type KafkaPartitionConsumer ¶
type KafkaPartitionConsumer interface {
Consume(context.Context, sarama.PartitionConsumer, *quark.Consumer, quark.EventWriter)
}
KafkaPartitionConsumer This consumer is the default way to consume messages from Apache Kafka.
It pulls messages from an specific partition inside an Apache Kafka cluster or Broker. This way of consuming messages is useful when actual parallelization of the process itself is required. When in a pool, it will pull messages for each consumer running in a Worker, running the process at the same time in a worker pool.
type KafkaProducerConfig ¶
type KafkaProducerConfig struct { // Hooks OnSent func(ctx context.Context, message *sarama.ProducerMessage, partition int32, offset int64) }
KafkaProducerConfig Apache Kafka producer configuration
type KafkaPublisher ¶
type KafkaPublisher struct {
// contains filtered or unexported fields
}
KafkaPublisher Quark default publisher for Kafka
func NewKafkaPublisher ¶
func NewKafkaPublisher(cfg KafkaConfiguration, addrs ...string) *KafkaPublisher
NewKafkaPublisher allocates a new KafkaPublisher