Documentation
¶
Index ¶
- Variables
- type Option
- func WithBrokers(brokers ...string) Option
- func WithClientID(clientID string) Option
- func WithClusterConfig(clusterConfig *cluster.Config) Option
- func WithCompression(codec sarama.CompressionCodec, level int) Option
- func WithErrorHandler(h nc.ErrorHandler) Option
- func WithFlashFrequency(frequency time.Duration) Option
- func WithFlashMessages(messageCount int) Option
- func WithGroupName(name string) Option
- func WithKafkaURL(urlString string) Option
- func WithKafkaVersion(version sarama.KafkaVersion) Option
- func WithPanicHandler(h nc.PanicHandler) Option
- func WithPublisherErrorHandler(h PublisherErrorHandler) Option
- func WithPublisherSuccessHandler(h PublisherSuccessHandler) Option
- func WithSaramaConfig(streamConfig *sarama.Config) Option
- func WithSubscriberNotificationHandler(h SubscriberNotificationHandler) Option
- func WithTopics(topics ...string) Option
- type Options
- type Publisher
- type PublisherErrorHandler
- type PublisherSuccessHandler
- type Subscriber
- type SubscriberNotificationHandler
Constants ¶
This section is empty.
Variables ¶
var ErrMessageInvalidConsumer = errors.New(`[message] invalid consumer`)
ErrMessageInvalidConsumer error
Functions ¶
This section is empty.
Types ¶
type Option ¶ added in v1.0.0
type Option func(options *Options)
Option function type
func WithBrokers ¶ added in v1.1.0
WithBrokers overrides the list of brokers to connect
func WithClusterConfig ¶ added in v1.0.0
WithClusterConfig custom config
func WithCompression ¶ added in v1.0.0
func WithCompression(codec sarama.CompressionCodec, level int) Option
WithCompression of the pipe
func WithErrorHandler ¶ added in v1.0.0
func WithErrorHandler(h nc.ErrorHandler) Option
WithErrorHandler set handler of error processing
func WithFlashFrequency ¶ added in v1.0.0
WithFlashFrequency of flushing
func WithFlashMessages ¶ added in v1.0.0
WithFlashMessages minimal count
func WithGroupName ¶ added in v1.0.0
WithGroupName of the message consuming
func WithKafkaURL ¶ added in v1.1.0
WithKafkaURL is an Option to set the URL the client should connect to. The url can contain username/password semantics. e.g. kafka://derek:pass@localhost:4222/{groupName}?topics=topic1,topic2 Comma separated arrays are also supported, e.g. urlA, urlB.
func WithKafkaVersion ¶ added in v1.0.0
func WithKafkaVersion(version sarama.KafkaVersion) Option
WithKafkaVersion minimal version
func WithPanicHandler ¶ added in v1.0.0
func WithPanicHandler(h nc.PanicHandler) Option
WithPanicHandler set handler of panic processing
func WithPublisherErrorHandler ¶ added in v1.0.0
func WithPublisherErrorHandler(h PublisherErrorHandler) Option
WithPublisherErrorHandler set handler of the sarama errors
func WithPublisherSuccessHandler ¶ added in v1.0.0
func WithPublisherSuccessHandler(h PublisherSuccessHandler) Option
WithPublisherSuccessHandler set handler of the sarama success
func WithSaramaConfig ¶ added in v1.0.0
WithSaramaConfig custom config
func WithSubscriberNotificationHandler ¶ added in v1.0.0
func WithSubscriberNotificationHandler(h SubscriberNotificationHandler) Option
WithSubscriberNotificationHandler set handler of the cluster group notifications
func WithTopics ¶ added in v1.1.0
WithTopics will set the list of topics for publishing or subscribing
type Options ¶ added in v1.0.0
type Options struct { ClusterConfig cluster.Config // IsSynchronous type of producer // TODO: make it work for sync publisher IsSynchronous bool // Brokers contains list of broker hosts with port Brokers []string // Name of the subscription group GroupName string // Names of topics for subscribing or publishing Topics []string // ErrorHandler of message processing ErrorHandler nc.ErrorHandler // PanicHandler process panic PanicHandler nc.PanicHandler // PublisherErrorHandler provides handler of message send errors PublisherErrorHandler PublisherErrorHandler // PublisherSuccessHandler provides handler of message send success PublisherSuccessHandler PublisherSuccessHandler // SubscriberNotificationHandler provides handler of received messages SubscriberNotificationHandler SubscriberNotificationHandler // Message encoder interface Encoder encoder.Encoder // Logger of subscriber Logger loggerInterface }
Options for publisher or subscriber
type Publisher ¶ added in v1.0.0
type Publisher struct {
// contains filtered or unexported fields
}
Publisher implementation of Publisher interface for the Kafka driver
func MustNewPublisher ¶ added in v1.0.0
MustNewPublisher connection or panic
func NewPublisher ¶ added in v1.0.0
NewPublisher to the kafka with some brokers and topics for sending
type PublisherErrorHandler ¶ added in v1.0.0
type PublisherErrorHandler func(*sarama.ProducerError)
PublisherErrorHandler callback function
type PublisherSuccessHandler ¶ added in v1.0.0
type PublisherSuccessHandler func(*sarama.ProducerMessage)
PublisherSuccessHandler callback function
type Subscriber ¶
type Subscriber struct { notificationcenter.ModelSubscriber // contains filtered or unexported fields }
Subscriber for kafka
func NewSubscriber ¶
func NewSubscriber(options ...Option) (*Subscriber, error)
NewSubscriber connection to kafka "group" from list of topics
type SubscriberNotificationHandler ¶ added in v1.0.0
type SubscriberNotificationHandler func(notification *cluster.Notification)
SubscriberNotificationHandler callback function