Documentation ¶
Index ¶
- func NewLogger() *logger
- func SetConsumer(name string, consumer *Consumer)
- func SetProducer(name string, producer *Producer)
- type Acknowledgment
- type BatchListener
- type Callback
- type Consumer
- type ConsumerMessage
- type ConsumerMessageCarrier
- type Kafka
- type Listener
- type Option
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Options() Option
- func (p *Producer) Send(topic, value string, cb Callback)
- func (p *Producer) SendMessage(message ProducerMessage, cb Callback)
- func (p *Producer) SyncSend(ctx context.Context, topic, value string) error
- func (p *Producer) SyncSendMessage(ctx context.Context, message ProducerMessage) error
- type ProducerMessage
- type ProducerMessageCarrier
- type RecordMetadata
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetConsumer ¶
func SetProducer ¶
Types ¶
type Acknowledgment ¶
type Acknowledgment struct {
// contains filtered or unexported fields
}
func (*Acknowledgment) Acknowledge ¶
func (a *Acknowledgment) Acknowledge()
type BatchListener ¶
type BatchListener interface { Listen([]ConsumerMessage, *Acknowledgment) BatchCount() int }
type Callback ¶
type Callback func(*RecordMetadata, error)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer 是一个group的消费者
func GetConsumer ¶
func NewConsumer ¶
func (*Consumer) AddBatchListener ¶
func (c *Consumer) AddBatchListener(topic string, listener BatchListener)
func (*Consumer) AddListener ¶
type ConsumerMessage ¶
type ConsumerMessageCarrier ¶
type ConsumerMessageCarrier struct { *sarama.ConsumerMessage // contains filtered or unexported fields }
consumer message carrier
func NewConsumerMessageCarrier ¶
func NewConsumerMessageCarrier(m *sarama.ConsumerMessage) *ConsumerMessageCarrier
func (ConsumerMessageCarrier) Get ¶
func (carrier ConsumerMessageCarrier) Get(key string) string
func (ConsumerMessageCarrier) Keys ¶
func (carrier ConsumerMessageCarrier) Keys() []string
func (ConsumerMessageCarrier) Set ¶
func (carrier ConsumerMessageCarrier) Set(key string, value string)
type Listener ¶
type Listener interface {
Listen(ConsumerMessage, *Acknowledgment)
}
type Option ¶
type Option struct { Name string Addr []string Version string MaxOpenRequests int DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration SASL struct { // Whether or not to use SASL authentication when connecting to the broker // (defaults to false). Enable bool // SASLMechanism is the name of the enabled SASL mechanism. // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN). Mechanism sarama.SASLMechanism // Version is the SASL Protocol Version to use // Kafka > 1.x should use V1, except on Azure EventHub which use V0 Version int16 // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. Handshake bool // AuthIdentity is an (optional) authorization identity (authzid) to // use for SASL/PLAIN authentication (if different from User) when // an authenticated user is permitted to act as the presented // alternative user. See RFC4616 for details. AuthIdentity string // User is the authentication identity (authcid) to present for // SASL/PLAIN or SASL/SCRAM authentication User string // Password for SASL/PLAIN authentication Password string // authz id used for SASL/SCRAM authentication SCRAMAuthzID string // SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM // client used to perform the SCRAM exchange with the server. SCRAMClientGeneratorFunc func() sarama.SCRAMClient // TokenProvider is a user-defined callback for generating // access tokens for SASL/OAUTHBEARER auth. See the // AccessTokenProvider interface docs for proper implementation // guidelines. TokenProvider sarama.AccessTokenProvider GSSAPI sarama.GSSAPIConfig } Metadata struct { Retries int Timeout time.Duration } Consumer struct { Group string EnableAutoCommit bool AutoCommitInterval time.Duration InitialOffset int64 SessionTimeout time.Duration MinFetchBytes int32 DefaultFetchBytes int32 MaxFetchBytes int32 MaxFetchWait time.Duration Retries int } Producer struct { MaxMessageBytes int Acks sarama.RequiredAcks Timeout time.Duration Retries int MaxFlushBytes int MaxFlushMessages int FlushFrequency time.Duration Idempotent bool } EnableMetrics bool EnableTracer bool }
func NewDefaultOptions ¶
func NewDefaultOptions() *Option
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func GetProducer ¶
func NewProducer ¶
func NewProducer(opt *Option, logger xlog.Logger, metrics metrics.Provider, tracer tracer.Provider) (*Producer, error)
NewProducer 创建一个异步的生产者
func (*Producer) SendMessage ¶
func (p *Producer) SendMessage(message ProducerMessage, cb Callback)
SendMessage 是异步发送接口
func (*Producer) SyncSendMessage ¶
func (p *Producer) SyncSendMessage(ctx context.Context, message ProducerMessage) error
SyncSendMessage 是同步发送接口。
type ProducerMessage ¶
type ProducerMessageCarrier ¶
type ProducerMessageCarrier struct { *sarama.ProducerMessage // contains filtered or unexported fields }
producer message carrier
func NewProducerMessageCarrier ¶
func NewProducerMessageCarrier(m *sarama.ProducerMessage) *ProducerMessageCarrier
func (ProducerMessageCarrier) Get ¶
func (carrier ProducerMessageCarrier) Get(key string) string
func (ProducerMessageCarrier) Keys ¶
func (carrier ProducerMessageCarrier) Keys() []string
func (ProducerMessageCarrier) Set ¶
func (carrier ProducerMessageCarrier) Set(key string, value string)
Click to show internal directories.
Click to hide internal directories.