Documentation ¶
Index ¶
- Constants
- Variables
- func ConsumeInternal(ctx context.Context, reader *kafkago.Reader, limit int64, ...) []map[string]interface{}
- func FromAvro(message []byte, schema string) interface{}
- func ProduceInternal(ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{}, ...) error
- func ReportError(err error, msg string)
- func ReportReaderStats(ctx context.Context, currentStats kafkago.ReaderStats) error
- func ReportWriterStats(ctx context.Context, currentStats kafkago.WriterStats) error
- func SerializeAvro(configuration Configuration, topic string, data interface{}, keyOrValue string, ...) ([]byte, error)
- func SerializeByteArray(configuration Configuration, topic string, data interface{}, keyOrValue string, ...) ([]byte, error)
- func SerializeString(configuration Configuration, topic string, data interface{}, keyOrValue string, ...) ([]byte, error)
- func ToAvro(value string, schema string) []byte
- type BasicAuth
- type Configuration
- type ConsumerConfiguration
- type Credentials
- type Kafka
- func (*Kafka) Consume(ctx context.Context, reader *kafkago.Reader, limit int64, keySchema string, ...) []map[string]interface{}
- func (*Kafka) ConsumeWithConfiguration(ctx context.Context, reader *kafkago.Reader, limit int64, ...) []map[string]interface{}
- func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor int, compression string) error
- func (*Kafka) ListTopics(address string) ([]string, error)
- func (*Kafka) Produce(ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{}, ...) error
- func (*Kafka) ProduceWithConfiguration(ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{}, ...) error
- func (*Kafka) Reader(brokers []string, topic string, partition int, groupID string, offset int64, ...) *kafkago.Reader
- func (*Kafka) Writer(brokers []string, topic string, auth string, compression string) *kafkago.Writer
- type ProducerConfiguration
- type SchemaRegistryConfiguration
- type Serializer
Constants ¶
View Source
const ( Plain = "plain" SHA256 = "sha256" SHA512 = "sha512" )
Variables ¶
View Source
var ( ReaderDials = stats.New("kafka.reader.dial.count", stats.Counter) ReaderFetches = stats.New("kafka.reader.fetches.count", stats.Counter) ReaderMessages = stats.New("kafka.reader.message.count", stats.Counter) ReaderBytes = stats.New("kafka.reader.message.bytes", stats.Counter, stats.Data) ReaderRebalances = stats.New("kafka.reader.rebalance.count", stats.Counter) ReaderTimeouts = stats.New("kafka.reader.timeouts.count", stats.Counter) ReaderErrors = stats.New("kafka.reader.error.count", stats.Counter) ReaderDialTime = stats.New("kafka.reader.dial.seconds", stats.Trend, stats.Time) ReaderReadTime = stats.New("kafka.reader.read.seconds", stats.Trend, stats.Time) ReaderWaitTime = stats.New("kafka.reader.wait.seconds", stats.Trend, stats.Time) ReaderFetchSize = stats.New("kafka.reader.fetch.size", stats.Counter) ReaderFetchBytes = stats.New("kafka.reader.fetch.bytes", stats.Counter, stats.Data) ReaderOffset = stats.New("kafka.reader.offset", stats.Gauge) ReaderLag = stats.New("kafka.reader.lag", stats.Gauge) ReaderMinBytes = stats.New("kafka.reader.fetch_bytes.min", stats.Gauge) ReaderMaxBytes = stats.New("kafka.reader.fetch_bytes.max", stats.Gauge) ReaderMaxWait = stats.New("kafka.reader.fetch_wait.max", stats.Gauge, stats.Time) ReaderQueueLength = stats.New("kafka.reader.queue.length", stats.Gauge) ReaderQueueCapacity = stats.New("kafka.reader.queue.capacity", stats.Gauge) WriterDials = stats.New("kafka.writer.dial.count", stats.Counter) WriterWrites = stats.New("kafka.writer.write.count", stats.Counter) WriterMessages = stats.New("kafka.writer.message.count", stats.Counter) WriterBytes = stats.New("kafka.writer.message.bytes", stats.Counter, stats.Data) WriterRebalances = stats.New("kafka.writer.rebalance.count", stats.Counter) WriterErrors = stats.New("kafka.writer.error.count", stats.Counter) WriterDialTime = stats.New("kafka.writer.dial.seconds", stats.Trend, stats.Time) WriterWriteTime = stats.New("kafka.writer.write.seconds", stats.Trend, stats.Time) WriterWaitTime = stats.New("kafka.writer.wait.seconds", stats.Trend, stats.Time) WriterRetries = stats.New("kafka.writer.retries.count", stats.Counter) WriterBatchSize = stats.New("kafka.writer.batch.size", stats.Counter) WriterBatchBytes = stats.New("kafka.writer.batch.bytes", stats.Counter, stats.Data) WriterMaxAttempts = stats.New("kafka.writer.attempts.max", stats.Gauge) WriterMaxBatchSize = stats.New("kafka.writer.batch.max", stats.Gauge) WriterBatchTimeout = stats.New("kafka.writer.batch.timeout", stats.Gauge, stats.Time) WriterReadTimeout = stats.New("kafka.writer.read.timeout", stats.Gauge, stats.Time) WriterWriteTimeout = stats.New("kafka.writer.write.timeout", stats.Gauge, stats.Time) WriterRebalanceInterval = stats.New("kafka.writer.rebalance.interval", stats.Gauge, stats.Time) WriterRequiredAcks = stats.New("kafka.writer.acks.required", stats.Gauge) WriterAsync = stats.New("kafka.writer.async", stats.Rate) WriterQueueLength = stats.New("kafka.writer.queue.length", stats.Gauge) WriterQueueCapacity = stats.New("kafka.writer.queue.capacity", stats.Gauge) )
Functions ¶
func ConsumeInternal ¶
func ProduceInternal ¶
func ReportError ¶
func ReportReaderStats ¶
func ReportReaderStats(ctx context.Context, currentStats kafkago.ReaderStats) error
func ReportWriterStats ¶
func ReportWriterStats(ctx context.Context, currentStats kafkago.WriterStats) error
func SerializeAvro ¶
func SerializeByteArray ¶
func SerializeString ¶
Types ¶
type Configuration ¶
type Configuration struct { Consumer ConsumerConfiguration `json:"consumer"` Producer ProducerConfiguration `json:"producer"` SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"` }
type ConsumerConfiguration ¶
type Credentials ¶
type Kafka ¶
type Kafka struct{}
func (*Kafka) ConsumeWithConfiguration ¶
func (*Kafka) CreateTopic ¶
func (*Kafka) ProduceWithConfiguration ¶
type ProducerConfiguration ¶
type Serializer ¶
type Serializer func(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error)
func GetSerializer ¶
func GetSerializer(serializer string) Serializer
Click to show internal directories.
Click to hide internal directories.