Documentation ¶
Index ¶
- Constants
- func NewRepository(config *KafkaConfig) (*repository, error)
- type CheckpointDB
- type Consumer
- func (c *Consumer) AddTopicAndHandler(event string, handler TopicHandler) error
- func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error
- func (c *Consumer) Close() error
- func (c *Consumer) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error
- func (c *Consumer) Errors() <-chan error
- func (c *Consumer) Setup(s sarama.ConsumerGroupSession) error
- func (c *Consumer) Subscribe(ctx context.Context) error
- type ConsumerGroupSession
- type IKey
- type Kafka
- type KafkaConfig
- type Segment
- type TopicHandler
Constants ¶
const ( EventBlockGroup = "blockgroup" EventTraceGroup = "tracegroup" )
const ( DefaultReplicas = 1 DefaultPartitions = 1 DefaultTopicEnvironmentName = "local" DefaultTopicResourceName = "en-0" DefaultMaxMessageBytes = 1000000 DefaultRequiredAcks = 1 DefaultSegmentSizeBytes = 1000000 // 1 MB DefaultMaxMessageNumber = 100 // max number of messages in buffer )
const ( // item indices of message header MsgHeaderTotalSegments = iota MsgHeaderSegmentIdx MsgHeaderLength )
const ( KeyTotalSegments = "totalSegments" KeySegmentIdx = "segmentIdx" )
Variables ¶
This section is empty.
Functions ¶
func NewRepository ¶
func NewRepository(config *KafkaConfig) (*repository, error)
Types ¶
type CheckpointDB ¶
type CheckpointDB struct {
// contains filtered or unexported fields
}
func NewCheckpointDB ¶
func NewCheckpointDB() *CheckpointDB
func (*CheckpointDB) ReadCheckpoint ¶
func (db *CheckpointDB) ReadCheckpoint() (int64, error)
func (*CheckpointDB) SetComponent ¶
func (db *CheckpointDB) SetComponent(component interface{})
func (*CheckpointDB) WriteCheckpoint ¶
func (db *CheckpointDB) WriteCheckpoint(checkpoint int64) error
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a reference structure to subscribe block or trace group produced by EN.
func NewConsumer ¶
func NewConsumer(config *KafkaConfig, groupId string) (*Consumer, error)
func (*Consumer) AddTopicAndHandler ¶
func (c *Consumer) AddTopicAndHandler(event string, handler TopicHandler) error
AddTopicAndHandler adds a topic associated the given event and its handler function to consume published messages of the topic.
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error
Cleanup is called at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
func (*Consumer) Close ¶
Close stops the ConsumerGroup and detaches any running sessions. It is required to call this function before the object passes out of scope, as it will otherwise leak memory.
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
type ConsumerGroupSession ¶
type ConsumerGroupSession interface { MarkOffset(topic string, partition int32, offset int64, metadata string) MarkMessage(msg *sarama.ConsumerMessage, metadata string) }
ConsumerGroupSession is for mocking sarama.ConsumerGroupSession for better testing.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka connects to the brokers in an existing kafka cluster.
func NewKafka ¶
func NewKafka(conf *KafkaConfig) (*Kafka, error)
func (*Kafka) CreateTopic ¶
func (*Kafka) DeleteTopic ¶
func (*Kafka) ListTopics ¶
func (k *Kafka) ListTopics() (map[string]sarama.TopicDetail, error)
type KafkaConfig ¶
type KafkaConfig struct { SaramaConfig *sarama.Config // kafka client configurations. Brokers []string // Brokers is a list of broker URLs. TopicEnvironmentName string TopicResourceName string Partitions int32 // Partitions is the number of partitions of a topic. Replicas int16 // Replicas is a replication factor of kafka settings. This is the number of the replicated partitions in the kafka cluster. SegmentSizeBytes int // SegmentSizeBytes is the size of kafka message segment // (number of partitions) * (average size of segments) * buffer size should not be greater than memory size. // default max number of messages is 100 MaxMessageNumber int // MaxMessageNumber is the maximum number of consumer messages. }
func GetDefaultKafkaConfig ¶
func GetDefaultKafkaConfig() *KafkaConfig
func (*KafkaConfig) GetTopicName ¶
func (c *KafkaConfig) GetTopicName(event string) string
func (*KafkaConfig) String ¶
func (c *KafkaConfig) String() string
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
Segment represents a message segment with the parsed headers.
type TopicHandler ¶
type TopicHandler func(message *sarama.ConsumerMessage) error
TopicHandler is a handler function in order to consume published messages.