client

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2022 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsProducerErrTemporary

func IsProducerErrTemporary(err error) bool

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(network string, addresses []string, conf Config) (*Client, error)

New returns a new Kafka client

func NewAzureEventHubs

func NewAzureEventHubs(address, connectionString string, conf Config) (*Client, error)

NewAzureEventHubs returns a Kafka client pre-configured to connect to Azure Event Hubs

func NewConfluentCloud

func NewConfluentCloud(address, key, secret string, conf Config) (*Client, error)

NewConfluentCloud returns a Kafka client pre-configured to connect to Confluent Cloud

func (*Client) NewConsumer

func (c *Client) NewConsumer(topic string, conf ConsumerConfig) *Consumer

NewConsumer instantiates a new consumer.

func (*Client) NewProducer

func (c *Client) NewProducer(topic string, producerConf ProducerConfig) (p *Producer, err error)

NewProducer instantiates a new producer. To use it asynchronously just do "go p.Publish(ctx, msgs)".

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping is used to check the connectivity only, then it discards the connection

type Config

type Config struct {
	ClientID    string
	DialTimeout time.Duration
	TLS         *TLS
	SASL        *SASL
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides a high-level API for reading messages from Kafka

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

Close tries to close the consumer, but it will return sooner if the context is canceled. A routine in background will still try to close the producer since the underlying library does not support contexts on Close().

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (Message, error)

Receive reads and returns the next message from the consumer. The method blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.

type ConsumerConfig

type ConsumerConfig struct {
	GroupID             string
	Partition           int
	StartOffset         ConsumerStartOffset
	CommitInterval      time.Duration
	FetchBatchesMaxWait time.Duration
	Logger              Logger
	ErrorLogger         Logger
}

type ConsumerStartOffset

type ConsumerStartOffset int64
const (
	// LastOffset is the most recent offset available for a partition
	LastOffset ConsumerStartOffset = iota
	// FirstOffset is the least recent offset available for a partition
	FirstOffset
)

type Logger

type Logger interface {
	Printf(format string, args ...interface{})
}

Logger specifies a logger used to report internal changes within the consumer

type Message

type Message struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Headers    []MessageHeader
	Timestamp  time.Time
}

Message is a data structure representing a Kafka message

type MessageHeader

type MessageHeader struct {
	Key   string
	Value []byte
}

MessageHeader is a key/value pair type representing headers set on records

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides a high-level API for producing messages to Kafka

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) error

Close tries to close the producer, but it will return sooner if the context is canceled. A routine in background will still try to close the producer since the underlying library does not support contexts on Close().

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, msgs ...Message) error

Publish allows the production of one or more message to Kafka. To use it asynchronously just do "go p.Publish(ctx, msgs)".

type ProducerConfig

type ProducerConfig struct {
	ClientID string
	WriteTimeout,
	ReadTimeout time.Duration
	Logger      Logger
	ErrorLogger Logger
}

type SASL

type SASL struct {
	ScramHashGen       ScramHashGenerator
	Username, Password string
}

type ScramHashGenerator

type ScramHashGenerator uint8
const (
	ScramPlainText ScramHashGenerator = iota
	ScramSHA256
	ScramSHA512
)

func ScramHashGeneratorFromString

func ScramHashGeneratorFromString(s string) (ScramHashGenerator, error)

ScramHashGeneratorFromString returns the proper ScramHashGenerator from its string counterpart

func (ScramHashGenerator) String

func (s ScramHashGenerator) String() string

type TLS

type TLS struct {
	Cert, Key,
	CACertificate []byte
	WithSystemCertPool,
	InsecureSkipVerify bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL