Documentation ¶
Index ¶
- Constants
- type KafkaReader
- type KafkaWriter
- type Logger
- type LoggerFunc
- type ReadMessage
- type Reader
- type ReaderConfig
- type StatsdClient
- func (m *StatsdClient) PublishKafkaReadError()
- func (m *StatsdClient) PublishKafkaReadOps(topic string)
- func (m *StatsdClient) PublishKafkaWriteError(topic string)
- func (m *StatsdClient) PublishKafkaWriteLatency(topic string, startTime time.Time)
- func (m *StatsdClient) PublishKafkaWriteOps(topic string)
- type StatsdConfig
- type WriteMessage
- type Writer
- type WriterConfig
Constants ¶
View Source
const ( LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset int64 = -2 // The least recent offset available for a partition. )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaReader ¶
type KafkaReader struct {
// contains filtered or unexported fields
}
func NewKafkaReader ¶
func NewKafkaReader(config *ReaderConfig) *KafkaReader
func (*KafkaReader) FetchMessage ¶
func (r *KafkaReader) FetchMessage(ctx context.Context) (ReadMessage, error)
FetchMessage does not commit the message Invoke CommitMessage to commit the messages
func (*KafkaReader) ReadMessage ¶ added in v0.0.5
func (r *KafkaReader) ReadMessage(ctx context.Context) (ReadMessage, error)
Also commits before returning
type KafkaWriter ¶
type KafkaWriter struct {
// contains filtered or unexported fields
}
func NewKafkaWriter ¶
func NewKafkaWriter(writerConfig *WriterConfig) *KafkaWriter
func (*KafkaWriter) Dispose ¶
func (w *KafkaWriter) Dispose() error
func (*KafkaWriter) Write ¶
func (w *KafkaWriter) Write(ctx context.Context, logs ...WriteMessage) error
type Logger ¶
type Logger interface {
Printf(string, ...interface{})
}
Logger interface API for log.Logger
type LoggerFunc ¶
type LoggerFunc func(string, ...interface{})
LoggerFunc is a bridge between Logger and any third party logger Usage:
l := NewLogger() // some logger r := kafka.NewReader(kafka.ReaderConfig{ Logger: kafka.LoggerFunc(l.Infof), ErrorLogger: kafka.LoggerFunc(l.Errorf), })
func (LoggerFunc) Printf ¶
func (f LoggerFunc) Printf(msg string, args ...interface{})
type ReadMessage ¶ added in v0.0.5
type Reader ¶
type Reader interface { ReadMessage(ctx context.Context) (ReadMessage, error) FetchMessage(ctx context.Context) (ReadMessage, error) Close(ctx context.Context) error }
func NewReader ¶
func NewReader(readerConfig *ReaderConfig) Reader
type ReaderConfig ¶
type ReaderConfig struct { Endpoint []string GroupId string Topics []string // StartOffset determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // Default is LastOffset StartOffset int64 QueueCapacity int CommitInterval time.Duration }
type StatsdClient ¶
type StatsdClient struct {
// contains filtered or unexported fields
}
func NewStatsdClient ¶
func NewStatsdClient(config StatsdConfig) (*StatsdClient, error)
func (*StatsdClient) PublishKafkaReadError ¶
func (m *StatsdClient) PublishKafkaReadError()
func (*StatsdClient) PublishKafkaReadOps ¶
func (m *StatsdClient) PublishKafkaReadOps(topic string)
func (*StatsdClient) PublishKafkaWriteError ¶
func (m *StatsdClient) PublishKafkaWriteError(topic string)
func (*StatsdClient) PublishKafkaWriteLatency ¶
func (m *StatsdClient) PublishKafkaWriteLatency(topic string, startTime time.Time)
func (*StatsdClient) PublishKafkaWriteOps ¶
func (m *StatsdClient) PublishKafkaWriteOps(topic string)
type StatsdConfig ¶
type WriteMessage ¶ added in v0.0.5
type Writer ¶
type Writer interface {
Write(ctx context.Context, logs ...WriteMessage) error
}
func NewWriter ¶
func NewWriter(writerConfig *WriterConfig) Writer
Click to show internal directories.
Click to hide internal directories.