Documentation ¶
Index ¶
Constants ¶
const ( ReaderError errors.Code = errors.ErrInternal + 1 ConsumerError )
Variables ¶
var ErrMissingBrokers = errors.New("kafka: missing broker(s) address(es)")
Functions ¶
func NewTransport ¶
func NewTransport(auth *Authentication) (*kafka.Transport, error)
NewTransport creates a new transport using the scram sha512 mechanism.
Types ¶
type Authentication ¶
Authentication contains the user and pwd used in the connection dial. It is used by the SCRAM SHA512 SASL mechanism. Keep in mind that this authentication method requires Kafka with a version >= 0.10.2.0.
type Consumer ¶
type Consumer struct { *kafka.Reader // contains filtered or unexported fields }
Consumer wraps the Kafka reader.
func NewConsumer ¶
func NewConsumer(opt *ConsumerOptions) (*Consumer, error)
NewConsumer creates a new message reader. Returns ErrMissingBrokers if len(opt.Brokers) == 0. The remaining parameters aren't checked. It is assumed that the topic exists.
func (*Consumer) StartConsumption ¶ added in v0.6.0
func (c *Consumer) StartConsumption(fn MessageConsumer) *sub.Controller
StartConsumption initializes a new reader loop that will run in background. The loop terminates only if *Controller.Shutdown is called.
type ConsumerOptions ¶
type ConsumerOptions struct { // Brokers contains a list of addresses // in the form <ip/domain>:<port>. Brokers []string Topic string GroupID string *Authentication // Timeout is the max time spent trying // to complete the connection process. Timeout time.Duration // QueueCapacity defines the internal queue size. QueueCapacity int // ReadLagInterval is time between the // lastest message in a Kafka topic and // a message that the consumer has processed. ReadLagInterval time.Duration // QueuedErrors contains the ammount of // errors that the Controller.ErrCh can hold. QueuedErrors int ErrLimiterOpts *sub.ErrorLimiterOptions }
ConsumerOptions contains a subset of kafka reader options.
type MessageConsumer ¶ added in v0.6.0
type MessageConsumer func(m *kafka.Message) error
MessageConsumer is responsable for processing the given kafka message.
type Publisher ¶
type Publisher struct {
*kafka.Writer
}
Publisher wraps the Kafka writer.
func NewPublisher ¶
func NewPublisher( opt *PublisherOptions, tr *kafka.Transport, ) (*Publisher, error)
NewPublisher creates a new message writer. Returns ErrMissingBrokers if len(opt.Brokers) == 0. The remaining parameters aren't checked. You must give a non-nil transport. It is assumed that the topic exists.
type PublisherOptions ¶
type PublisherOptions struct { // Brokers contains a list of addresses in the form <ip/domain>:<port>. Brokers []string Topic string *Authentication }
ConsumerOptions contains a subset of kafka writer options.