Documentation ¶
Index ¶
- type Option
- func WithBrokers(brokers ...string) Option
- func WithClientID(clientID string) Option
- func WithCompression(codec sarama.CompressionCodec, level int) Option
- func WithErrorHandler(h nc.ErrorHandler) Option
- func WithFlashFrequency(frequency time.Duration) Option
- func WithFlashMessages(messageCount int) Option
- func WithGroupName(name string) Option
- func WithKafkaURL(urlString string) Option
- func WithKafkaVersion(version sarama.KafkaVersion) Option
- func WithPanicHandler(h nc.PanicHandler) Option
- func WithPublisherErrorHandler(h PublisherErrorHandler) Option
- func WithPublisherSuccessHandler(h PublisherSuccessHandler) Option
- func WithSaramaConfig(streamConfig *sarama.Config) Option
- func WithTopics(topics ...string) Option
- type Options
- type Publisher
- type PublisherErrorHandler
- type PublisherSuccessHandler
- type Subscriber
- func (s *Subscriber) Cleanup(sarama.ConsumerGroupSession) error
- func (s *Subscriber) Close() error
- func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *Subscriber) Listen(ctx context.Context) error
- func (s *Subscriber) Setup(sarama.ConsumerGroupSession) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(options *Options)
Option function type
func WithBrokers ¶
WithBrokers overrides the list of brokers to connect
func WithCompression ¶
func WithCompression(codec sarama.CompressionCodec, level int) Option
WithCompression of the pipe
func WithErrorHandler ¶
func WithErrorHandler(h nc.ErrorHandler) Option
WithErrorHandler set handler of error processing
func WithFlashFrequency ¶
WithFlashFrequency of flushing
func WithFlashMessages ¶
WithFlashMessages minimal count
func WithKafkaURL ¶
WithKafkaURL is an Option to set the URL the client should connect to. The url can contain username/password semantics. e.g. kafka://derek:pass@localhost:4222/{groupName}?topics=topic1,topic2 Comma separated arrays are also supported, e.g. urlA, urlB.
func WithKafkaVersion ¶
func WithKafkaVersion(version sarama.KafkaVersion) Option
WithKafkaVersion minimal version
func WithPanicHandler ¶
func WithPanicHandler(h nc.PanicHandler) Option
WithPanicHandler set handler of panic processing
func WithPublisherErrorHandler ¶
func WithPublisherErrorHandler(h PublisherErrorHandler) Option
WithPublisherErrorHandler set handler of the sarama errors
func WithPublisherSuccessHandler ¶
func WithPublisherSuccessHandler(h PublisherSuccessHandler) Option
WithPublisherSuccessHandler set handler of the sarama success
func WithSaramaConfig ¶
WithSaramaConfig custom config
func WithTopics ¶
WithTopics will set the list of topics for publishing or subscribing
type Options ¶
type Options struct { ClusterConfig sarama.Config // IsSynchronous type of producer // TODO: make it work for sync publisher IsSynchronous bool // Brokers contains list of broker hosts with port Brokers []string // Name of the subscription group GroupName string // Names of topics for subscribing or publishing Topics []string // ErrorHandler of message processing ErrorHandler nc.ErrorHandler // PanicHandler process panic PanicHandler nc.PanicHandler // PublisherErrorHandler provides handler of message send errors PublisherErrorHandler PublisherErrorHandler // PublisherSuccessHandler provides handler of message send success PublisherSuccessHandler PublisherSuccessHandler // Message encoder interface Encoder encoder.Encoder // Logger of subscriber Logger loggerInterface }
Options for publisher or subscriber
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher implementation of Publisher interface for the Kafka driver
func MustNewPublisher ¶
MustNewPublisher connection or panic
func NewPublisher ¶
NewPublisher to the kafka with some brokers and topics for sending
type PublisherErrorHandler ¶
type PublisherErrorHandler func(*sarama.ProducerError)
PublisherErrorHandler callback function
type PublisherSuccessHandler ¶
type PublisherSuccessHandler func(*sarama.ProducerMessage)
PublisherSuccessHandler callback function
type Subscriber ¶
type Subscriber struct { notificationcenter.ModelSubscriber // contains filtered or unexported fields }
Subscriber for kafka
func NewSubscriber ¶
func NewSubscriber(options ...Option) (*Subscriber, error)
NewSubscriber connection to kafka "group" from list of topics
func (*Subscriber) Cleanup ¶
func (s *Subscriber) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
func (*Subscriber) ConsumeClaim ¶
func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
func (*Subscriber) Listen ¶
func (s *Subscriber) Listen(ctx context.Context) error
Listen kafka consumer
func (*Subscriber) Setup ¶
func (s *Subscriber) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.