Documentation ¶
Index ¶
- func InitOffsetFile()
- func ListDir(dirPth string, suffix string) (files []string, err error)
- func Max(a, b int64) int64
- func Partitions(addr []string, topic string, conf *Config) ([]int32, error)
- func Topics(addr []string, conf *Config) ([]string, error)
- type AsyncProducer
- type Config
- type Consumer
- func (cs *Consumer) Close() error
- func (cs *Consumer) CommitOffsets() error
- func (cs *Consumer) Errors() <-chan error
- func (cs *Consumer) MarkOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)
- func (cs *Consumer) Notifications() <-chan *NotifyMessage
- func (cs *Consumer) Recv() <-chan *ConsumerMessage
- func (cs *Consumer) ResetOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)
- type ConsumerError
- type ConsumerMessage
- type NotifyMessage
- type PartitionConsumer
- type PartitionOffsetManager
- func (pom *PartitionOffsetManager) Close() (error, error)
- func (pom *PartitionOffsetManager) Errors() <-chan *ConsumerError
- func (pom *PartitionOffsetManager) MarkOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)
- func (pom *PartitionOffsetManager) NextOffset() (offset int64)
- func (pom *PartitionOffsetManager) ResetOffset(topic string, partition int32, offset int64, groupId string, ifExactOnce bool)
- type ProducerError
- type ProducerMessage
- type SyncProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { Id int ProducerGroupId string // contains filtered or unexported fields }
func InitManualRetryAsyncProducer ¶
func InitManualRetryAsyncProducer(addr []string, conf *Config) (*AsyncProducer, error)
one asyncProducer without retry
func InitManualRetryAsyncProducerGroup ¶
func InitManualRetryAsyncProducerGroup(addr []string, conf *Config, groupId string) ([]*AsyncProducer, error)
some(config.AsyncProducerAmount) asyncProducer without retry
func (*AsyncProducer) Close ¶
func (asp *AsyncProducer) Close() (err error)
func (*AsyncProducer) Errors ¶
func (asp *AsyncProducer) Errors() <-chan *ProducerError
func (*AsyncProducer) Successes ¶
func (asp *AsyncProducer) Successes() <-chan *ProducerMessage
type Config ¶
type Consumer ¶
func InitConsumersOfGroup ¶
func InitOneConsumerOfGroup ¶
func (*Consumer) CommitOffsets ¶
func (*Consumer) MarkOffset ¶
func (*Consumer) Notifications ¶
func (cs *Consumer) Notifications() <-chan *NotifyMessage
func (*Consumer) Recv ¶
func (cs *Consumer) Recv() <-chan *ConsumerMessage
type ConsumerError ¶
type ConsumerError = sarama.ConsumerError
type ConsumerMessage ¶
type ConsumerMessage = sarama.ConsumerMessage
type NotifyMessage ¶
type NotifyMessage = sarama_cluster.Notification
type PartitionConsumer ¶
type PartitionConsumer struct { Topic string Partition int32 GroupId string // contains filtered or unexported fields }
func InitPartitionConsumer ¶
func InitPartitionConsumers ¶
func (*PartitionConsumer) Close ¶
func (pcs *PartitionConsumer) Close() error
func (*PartitionConsumer) Errors ¶
func (pcs *PartitionConsumer) Errors() <-chan *ConsumerError
func (*PartitionConsumer) Recv ¶
func (pcs *PartitionConsumer) Recv() <-chan *ConsumerMessage
type PartitionOffsetManager ¶
type PartitionOffsetManager struct {
// contains filtered or unexported fields
}
func (*PartitionOffsetManager) Close ¶
func (pom *PartitionOffsetManager) Close() (error, error)
func (*PartitionOffsetManager) Errors ¶
func (pom *PartitionOffsetManager) Errors() <-chan *ConsumerError
func (*PartitionOffsetManager) MarkOffset ¶
func (*PartitionOffsetManager) NextOffset ¶
func (pom *PartitionOffsetManager) NextOffset() (offset int64)
func (*PartitionOffsetManager) ResetOffset ¶
type ProducerError ¶
type ProducerError = sarama.ProducerError
type ProducerMessage ¶
type ProducerMessage = sarama.ProducerMessage
type SyncProducer ¶
type SyncProducer struct { Id int ProducerGroupId string // contains filtered or unexported fields }
func InitManualRetrySyncProducer ¶
func InitManualRetrySyncProducer(addr []string, conf *Config) (*SyncProducer, error)
one syncProducer without retry
func InitManualRetrySyncProducerGroup ¶
func InitManualRetrySyncProducerGroup(addr []string, conf *Config, groupId string) ([]*SyncProducer, error)
some(config.SyncProducerAmount) syncProducer without retry
func (*SyncProducer) Close ¶
func (sp *SyncProducer) Close() (err error)
func (*SyncProducer) SendMessage ¶
func (sp *SyncProducer) SendMessage(msg *ProducerMessage) (string, int32, int64, error)
Click to show internal directories.
Click to hide internal directories.