client

package
v0.38.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: AGPL-3.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsProducerErrTemporary

func IsProducerErrTemporary(err error) bool

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(network string, addresses []string, conf Config) (*Client, error)

New returns a new Kafka client

func NewAzureEventHubs

func NewAzureEventHubs(addresses []string, connectionString string, conf Config) (*Client, error)

NewAzureEventHubs returns a Kafka client pre-configured to connect to Azure Event Hubs addresses should be in the form of "host:port" where the port is usually 9093 on Azure Event Hubs Also make sure to select at least the Standard tier since the Basic tier does not support Kafka

func NewConfluentCloud

func NewConfluentCloud(addresses []string, key, secret string, conf Config) (*Client, error)

NewConfluentCloud returns a Kafka client pre-configured to connect to Confluent Cloud

func (*Client) NewConsumer

func (c *Client) NewConsumer(topic string, conf ConsumerConfig) *Consumer

NewConsumer instantiates a new consumer.

func (*Client) NewProducer

func (c *Client) NewProducer(producerConf ProducerConfig) (p *Producer, err error)

NewProducer instantiates a new producer. To use it asynchronously just do "go p.Publish(ctx, msgs)".

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping is used to check the connectivity only, then it discards the connection Ping ensures that at least one of the provided addresses is reachable.

type Compression

type Compression = kafka.Compression
const (
	CompressionNone   Compression = 0
	CompressionGzip   Compression = kafka.Gzip
	CompressionSnappy Compression = kafka.Snappy
	CompressionLz4    Compression = kafka.Lz4
	CompressionZstd   Compression = kafka.Zstd
)

type Config

type Config struct {
	ClientID    string
	DialTimeout time.Duration
	TLS         *TLS
	SASL        *SASL
	SSHConfig   *SSHConfig
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides a high-level API for reading messages from Kafka

func (*Consumer) Ack added in v0.21.0

func (c *Consumer) Ack(ctx context.Context, msgs ...Message) error

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

Close tries to close the consumer, but it will return sooner if the context is canceled. A routine in background will still try to close the producer since the underlying library does not support contexts on Close().

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (Message, error)

Receive reads and returns the next message from the consumer. The method blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.

type ConsumerConfig

type ConsumerConfig struct {
	GroupID             string
	Partition           int
	StartOffset         ConsumerStartOffset
	CommitInterval      time.Duration
	FetchBatchesMaxWait time.Duration
	Logger              Logger
	ErrorLogger         Logger
}

type ConsumerStartOffset

type ConsumerStartOffset int64
const (
	// LastOffset is the most recent offset available for a partition
	LastOffset ConsumerStartOffset = iota
	// FirstOffset is the least recent offset available for a partition
	FirstOffset
)

type KafkaLogger

type KafkaLogger struct {
	Logger        logger
	IsErrorLogger bool
}

func (*KafkaLogger) Printf

func (l *KafkaLogger) Printf(format string, args ...interface{})

type Logger

type Logger interface {
	Printf(format string, args ...interface{})
}

Logger specifies a logger used to report internal changes within the consumer

type Message

type Message struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Headers    []MessageHeader
	Timestamp  time.Time
}

Message is a data structure representing a Kafka message

type MessageHeader

type MessageHeader struct {
	Key   string
	Value []byte
}

MessageHeader is a key/value pair type representing headers set on records

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides a high-level API for producing messages to Kafka

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) error

Close tries to close the producer, but it will return sooner if the context is canceled. A routine in background will still try to close the producer since the underlying library does not support contexts on Close().

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, msgs ...Message) error

Publish allows the production of one or more message to Kafka. To use it asynchronously just do "go p.Publish(ctx, msgs)".

type ProducerConfig

type ProducerConfig struct {
	ClientID string
	WriteTimeout,
	ReadTimeout,
	BatchTimeout time.Duration
	BatchSize   int
	Compression Compression
	Logger      Logger
	ErrorLogger Logger
}

type SASL

type SASL struct {
	ScramHashGen       ScramHashGenerator
	Username, Password string
}

type SSHConfig

type SSHConfig struct {
	User, Host, PrivateKey string
}

type ScramHashGenerator

type ScramHashGenerator uint8
const (
	ScramPlainText ScramHashGenerator = iota
	ScramSHA256
	ScramSHA512
)

func ScramHashGeneratorFromString

func ScramHashGeneratorFromString(s string) (ScramHashGenerator, error)

ScramHashGeneratorFromString returns the proper ScramHashGenerator from its string counterpart

func (ScramHashGenerator) String

func (s ScramHashGenerator) String() string

type TLS

type TLS struct {
	Cert, Key,
	CACertificate []byte
	WithSystemCertPool,
	InsecureSkipVerify bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL