Documentation ¶
Index ¶
- Constants
- func CloseProducer(log common.Logger, syncProducer sarama.SyncProducer)
- func DefaultConsumerConfig(clientId string, kafkaVersion sarama.KafkaVersion) *sarama.Config
- func DefaultProducerConfig(clientId string, kafkaVersion sarama.KafkaVersion) *sarama.Config
- func EnableSasl(log common.Logger, conf *sarama.Config, username string, password string, ...) (*sarama.Config, error)
- func NewKafkaClient(log common.Logger, brokers []string, config *sarama.Config) sarama.Client
- func NewProducer(brokers []string, config *sarama.Config) (sarama.SyncProducer, error)
- func TimeBasedPartitionCount(client sarama.Client, topic string) ([]int, []int, []int)
- type GeneralTopicInfo
- type JokkConsumer
- func (jc *JokkConsumer) Cleanup(session sarama.ConsumerGroupSession) error
- func (jc *JokkConsumer) Close() error
- func (jc *JokkConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (jc *JokkConsumer) Setup(session sarama.ConsumerGroupSession) (err error)
- func (jc *JokkConsumer) StartReceivingMessages(topic string)
- type PartitionChannelInfo
- type PartitionCountInfo
- type PartitionDetailCountInfo
- type PartitionDetailInfo
- type PartitionInfo
- type TopicDetailInfo
- type TopicInfo
Constants ¶
View Source
const (
OldestOffset = int64(-2)
)
Variables ¶
This section is empty.
Functions ¶
func CloseProducer ¶
func CloseProducer(log common.Logger, syncProducer sarama.SyncProducer)
func DefaultConsumerConfig ¶
func DefaultConsumerConfig(clientId string, kafkaVersion sarama.KafkaVersion) *sarama.Config
func DefaultProducerConfig ¶
func DefaultProducerConfig(clientId string, kafkaVersion sarama.KafkaVersion) *sarama.Config
func EnableSasl ¶
func NewKafkaClient ¶
func NewProducer ¶
Types ¶
type GeneralTopicInfo ¶
type JokkConsumer ¶
type JokkConsumer struct { MsgChannel chan sarama.ConsumerMessage // contains filtered or unexported fields }
func NewConsumer ¶
func (*JokkConsumer) Cleanup ¶
func (jc *JokkConsumer) Cleanup(session sarama.ConsumerGroupSession) error
func (*JokkConsumer) Close ¶
func (jc *JokkConsumer) Close() error
func (*JokkConsumer) ConsumeClaim ¶
func (jc *JokkConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*JokkConsumer) Setup ¶
func (jc *JokkConsumer) Setup(session sarama.ConsumerGroupSession) (err error)
func (*JokkConsumer) StartReceivingMessages ¶
func (jc *JokkConsumer) StartReceivingMessages(topic string)
type PartitionChannelInfo ¶
type PartitionChannelInfo struct {
// contains filtered or unexported fields
}
type PartitionCountInfo ¶
type PartitionCountInfo struct { TotalMessageCount int64 Partitions []PartitionInfo }
func PartitionMessageCount ¶
func PartitionMessageCount(client sarama.Client, topic string, timestamp int64) PartitionCountInfo
* Set 'time' to OldestOffset to use the default time range when calculating messages/partitions. * Set time to get the most recent available offset at the given time (in milliseconds.)
type PartitionDetailCountInfo ¶
type PartitionDetailCountInfo struct { TotalMessageCount int64 Partitions []PartitionDetailInfo }
func DetailedPartitionInfo ¶
func DetailedPartitionInfo(admin sarama.ClusterAdmin, client sarama.Client, topic string) PartitionDetailCountInfo
type PartitionDetailInfo ¶
type PartitionDetailInfo struct { PartitionInfo PartitionInfo Leader int32 Replicas []int32 Isr []int32 OfflineReplicas []int32 }
type PartitionInfo ¶
type TopicDetailInfo ¶
type TopicDetailInfo struct { GeneralTopicInfo GeneralTopicInfo PartionDetailedInfo []PartitionDetailInfo }
type TopicInfo ¶
type TopicInfo struct { GeneralTopicInfo GeneralTopicInfo PartitionsInfo []PartitionInfo }
Click to show internal directories.
Click to hide internal directories.