ikafka

package
v0.0.0-...-13800eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

func NewSyncProducer

func NewSyncProducer(cfg *ProducerConfig) (sarama.SyncProducer, error)

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

func NewAsyncProducer

func NewAsyncProducer(cfg *ProducerConfig) (*AsyncProducer, error)

func (*AsyncProducer) AsyncMessage

func (p *AsyncProducer) AsyncMessage(topic string, val sarama.Encoder, opts ...MessageOption)

func (*AsyncProducer) Close

func (p *AsyncProducer) Close()

type ConfluentConsumer

type ConfluentConsumer struct {
	// contains filtered or unexported fields
}

func NewConfluentConsumer

func NewConfluentConsumer(cfg *ConsumerConfig) (*ConfluentConsumer, error)

func (*ConfluentConsumer) Close

func (s *ConfluentConsumer) Close() error

func (*ConfluentConsumer) Start

func (s *ConfluentConsumer) Start(do ConfluentDo)

type ConfluentDo

type ConfluentDo func(c *kafka.Consumer, msg *kafka.Message)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) (*Consumer, error)

func (*Consumer) Cleanup

func (*Consumer) Close

func (s *Consumer) Close() error

func (*Consumer) ConsumeClaim

func (s *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*Consumer) Setup

func (*Consumer) Start

func (s *Consumer) Start(do Do) error

type ConsumerConfig

type ConsumerConfig struct {
	Brokers           []string      `json:"brokers" yaml:"brokers"`
	Topics            []string      `json:"topics" yaml:"topics"`
	GroupId           string        `json:"group_id" yaml:"group_id"`
	Username          string        `json:"username" yaml:"username"`
	Password          string        `json:"password" yaml:"password"`
	OffsetOldest      bool          `json:"offset_oldest" yaml:"offset_oldest"`
	EnableSASL        bool          `json:"enable_sasl" yaml:"enable_sasl"`
	EnableTLS         bool          `json:"enable_tls" yaml:"enable_tls"`
	Version           string        `json:"version" yaml:"version"`
	AutoCommit        bool          `json:"auto_commit" yaml:"auto_commit"`
	MaxProcessingTime time.Duration `json:"max_processing_time" yaml:"max_processing_time"`
}

type Do

type Do func(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage)

type Log

type Log struct{}

func (*Log) Print

func (l *Log) Print(v ...interface{})

func (*Log) Printf

func (l *Log) Printf(format string, v ...interface{})

func (*Log) Println

func (l *Log) Println(v ...interface{})

type MessageOption

type MessageOption func(o *messageOption)

func WithMessageHeaders

func WithMessageHeaders(in []sarama.RecordHeader) MessageOption

func WithMessageKey

func WithMessageKey(in sarama.Encoder) MessageOption

func WithMessageMetadata

func WithMessageMetadata(in interface{}) MessageOption

func WithMessageOffset

func WithMessageOffset(in int64) MessageOption

func WithMessagePartition

func WithMessagePartition(in int32) MessageOption

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(cfg *ProducerConfig) (*Producer, error)

func (*Producer) AsyncMessage

func (p *Producer) AsyncMessage(topic string, value sarama.Encoder, opts ...MessageOption)

func (*Producer) Borrow

func (p *Producer) Borrow() (producer sarama.AsyncProducer)

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Release

func (p *Producer) Release(producer sarama.AsyncProducer)

type ProducerConfig

type ProducerConfig struct {
	Brokers      []string `json:"brokers" yaml:"brokers"`
	Username     string   `json:"username" yaml:"username"`
	Password     string   `json:"password" yaml:"password"`
	EnableSASL   bool     `json:"enable_sasl" yaml:"enableSASL"`
	EnableTLS    bool     `json:"enable_tls" yaml:"enableTLS"`
	Version      string   `json:"version" yaml:"version"`
	Topic        string   `json:"topic" yaml:"topic"`
	Algorithm    string   `json:"algorithm"`
	RequiredAcks int      `json:"required_acks"`
}

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL