Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrSaramaSessionContextDone = errors.New("session context done")
Functions ¶
Types ¶
type AvroEncoder ¶
TODO: move to serializer AvroEncoder encodes schemaId and Avro message.
func (*AvroEncoder) Encode ¶
func (a *AvroEncoder) Encode() ([]byte, error)
type AvroProducer ¶
type AvroProducer struct {
// contains filtered or unexported fields
}
func NewAvroProducer ¶
func NewAvroProducer( brokers []string, kafkaVersion string, configTLS TLSConfig, ) ( *AvroProducer, error, )
func (*AvroProducer) Close ¶
func (c *AvroProducer) Close()
type Client ¶
type Client interface { // Topics return all the topics present in kafka, it keeps a cache // which is refreshed every cacheValidity seconds. Topics() ([]string, error) // LastOffset returns the current offset for the topic partition. LastOffset(topic string, partition int32) (int64, error) // CurrentOffset talks to kafka and finds the current offset for the // consumer group. It makes call to all brokers to determine // the current offset. If group is not found it returns -1. CurrentOffset(id string, topic string, partition int32) (int64, error) // List the consumer groups available in the cluster. ListConsumerGroups() (map[string]string, error) // Delete a consumer group. DeleteConsumerGroup(name string) error }
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { GroupID string `yaml:"groupID"` TopicRegexes string `yaml:"topicRegexes"` // TODO: LoaderTopicPrefix is "" for loader consumer groups // it should be an optional field, a pointer. LoaderTopicPrefix string `yaml:"loaderTopicPrefix"` // default is there Kafka KafkaConfig `yaml:"kafka"` Sarama SaramaConfig `yaml:"sarama"` }
type ConsumerGroupInterface ¶
type ConsumerGroupInterface interface { Topics() ([]string, error) // RefreshMetadata takes a list of topics and // queries the cluster to refresh the // available metadata for those topics. // If no topics are provided, it will refresh // metadata for all topics. RefreshMetadata(topics ...string) error LastOffset(topic string, partition int32) (int64, error) Consume(ctx context.Context, topics []string) error Close() error }
func NewConsumerGroup ¶
func NewConsumerGroup( config ConsumerGroupConfig, consumerGroupHandler sarama.ConsumerGroupHandler, ) ( ConsumerGroupInterface, error, )
func NewSaramaConsumerGroup ¶
func NewSaramaConsumerGroup( config ConsumerGroupConfig, consumerGroupHandler sarama.ConsumerGroupHandler, ) ( ConsumerGroupInterface, error, )
type KafkaConfig ¶
type Manager ¶
type Manager struct { // ready is used to signal the main thread about the readiness of // the manager Ready chan bool // contains filtered or unexported fields }
func NewManager ¶
func NewManager( consumerGroup ConsumerGroupInterface, consumerGroupID string, regexes string, ) *Manager
type SaramaConfig ¶
type SaramaConfig struct { Assignor string `yaml:"assignor"` // default is there Oldest bool `yaml:"oldest"` Log bool `yaml:"log"` AutoCommit bool `yaml:"autoCommit"` SessionTimeoutSeconds *int `yaml:"sessionTimeoutSeconds,omitempty"` // default 20s HearbeatIntervalSeconds *int `yaml:"hearbeatIntervalSeconds,omitempty"` // default 6s MaxProcessingTime *int32 `yaml:"maxProcessingTime,omitempty"` // default is of sarama }
type TLSConfig ¶
type TLSConfig struct { Enable bool `yaml:"enable"` UserCert string `yaml:"userCert"` UserKey string `yaml:"userKey"` CACert string `yaml:"caCert"` }
TLSConfig stores the base64 encoded string for all the certificate and keys required for the TLS setup and authentication with kafka for the client
Click to show internal directories.
Click to hide internal directories.