Documentation ¶
Index ¶
- Constants
- Variables
- func GetDefaultProducerId() string
- func NewRepository(config *KafkaConfig) (*repository, error)
- type CheckpointDB
- type Consumer
- func (c *Consumer) AddTopicAndHandler(event string, handler TopicHandler) error
- func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error
- func (c *Consumer) Close() error
- func (c *Consumer) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error
- func (c *Consumer) Errors() <-chan error
- func (c *Consumer) Setup(s sarama.ConsumerGroupSession) error
- func (c *Consumer) Subscribe(ctx context.Context) error
- type ConsumerGroupSession
- type IKey
- type Kafka
- type KafkaConfig
- type Segment
- type TopicHandler
Constants ¶
const ( EventBlockGroup = "blockgroup" EventTraceGroup = "tracegroup" )
const ( DefaultReplicas = 1 DefaultPartitions = 1 DefaultTopicEnvironmentName = "local" DefaultTopicResourceName = "en-0" DefaultMaxMessageBytes = 1000000 DefaultRequiredAcks = 1 DefaultSegmentSizeBytes = 1000000 // 1 MB DefaultMaxMessageNumber = 100 // max number of messages in buffer DefaultKafkaMessageVersion = MsgVersion1_0 DefaultProducerIdPrefix = "producer-" DefaultExpirationTime = time.Duration(0) )
const ( // item indices of message header MsgHeaderTotalSegments = iota MsgHeaderSegmentIdx MsgHeaderVersion MsgHeaderProducerId MsgHeaderLength )
const ( KeyTotalSegments = "totalSegments" KeySegmentIdx = "segmentIdx" KeyVersion = "version" KeyProducerId = "producerId" )
const LegacyMsgHeaderLength = 2
const (
MsgVersion1_0 = "1.0"
)
Variables ¶
var ( DefaultSetup = func(s sarama.ConsumerGroupSession) error { return nil } DefaultCleanup = func(s sarama.ConsumerGroupSession) error { return nil } )
Logger is the instance of a sarama.StdLogger interface that chaindatafetcher leaves the SDK level information. By default it is set to print all log messages as standard output, but you can set it to redirect wherever you want.
Functions ¶
func GetDefaultProducerId ¶
func GetDefaultProducerId() string
func NewRepository ¶
func NewRepository(config *KafkaConfig) (*repository, error)
Types ¶
type CheckpointDB ¶
type CheckpointDB struct {
// contains filtered or unexported fields
}
func NewCheckpointDB ¶
func NewCheckpointDB() *CheckpointDB
func (*CheckpointDB) ReadCheckpoint ¶
func (db *CheckpointDB) ReadCheckpoint() (int64, error)
func (*CheckpointDB) SetComponent ¶
func (db *CheckpointDB) SetComponent(component interface{})
func (*CheckpointDB) WriteCheckpoint ¶
func (db *CheckpointDB) WriteCheckpoint(checkpoint int64) error
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a reference structure to subscribe block or trace group produced by EN.
func NewConsumer ¶
func NewConsumer(config *KafkaConfig, groupId string) (*Consumer, error)
func (*Consumer) AddTopicAndHandler ¶
func (c *Consumer) AddTopicAndHandler(event string, handler TopicHandler) error
AddTopicAndHandler adds a topic associated the given event and its handler function to consume published messages of the topic.
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error
Cleanup is called at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
func (*Consumer) Close ¶
Close stops the ConsumerGroup and detaches any running sessions. It is required to call this function before the object passes out of scope, as it will otherwise leak memory.
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(cgs sarama.ConsumerGroupSession, cgc sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
type ConsumerGroupSession ¶
type ConsumerGroupSession interface { MarkOffset(topic string, partition int32, offset int64, metadata string) MarkMessage(msg *sarama.ConsumerMessage, metadata string) }
ConsumerGroupSession is for mocking sarama.ConsumerGroupSession for better testing.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka connects to the brokers in an existing kafka cluster.
func NewKafka ¶
func NewKafka(conf *KafkaConfig) (*Kafka, error)
func (*Kafka) CreateTopic ¶
func (*Kafka) DeleteTopic ¶
func (*Kafka) ListTopics ¶
func (k *Kafka) ListTopics() (map[string]sarama.TopicDetail, error)
type KafkaConfig ¶
type KafkaConfig struct { SaramaConfig *sarama.Config `json:"-"` // kafka client configurations. MsgVersion string // MsgVersion is the version of Kafka message. ProducerId string // ProducerId is for the identification of the message publisher. Brokers []string // Brokers is a list of broker URLs. TopicEnvironmentName string TopicResourceName string Partitions int32 // Partitions is the number of partitions of a topic. Replicas int16 // Replicas is a replication factor of kafka settings. This is the number of the replicated partitions in the kafka cluster. SegmentSizeBytes int // SegmentSizeBytes is the size of kafka message segment // (number of partitions) * (average size of segments) * buffer size should not be greater than memory size. // default max number of messages is 100 MaxMessageNumber int // MaxMessageNumber is the maximum number of consumer messages. ExpirationTime time.Duration ErrCallback func(string) error Setup func(s sarama.ConsumerGroupSession) error Cleanup func(s sarama.ConsumerGroupSession) error }
func GetDefaultKafkaConfig ¶
func GetDefaultKafkaConfig() *KafkaConfig
func (*KafkaConfig) GetTopicName ¶
func (c *KafkaConfig) GetTopicName(event string) string
func (*KafkaConfig) String ¶
func (c *KafkaConfig) String() string
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
Segment represents a message segment with the parsed headers.
type TopicHandler ¶
type TopicHandler func(message *sarama.ConsumerMessage) error
TopicHandler is a handler function in order to consume published messages.