Documentation ¶
Index ¶
- func ByteSerializer(value interface{}) ([]byte, error)
- func Critical(tag interface{}, message interface{})
- func Criticalf(tag interface{}, message interface{}, params ...interface{})
- func Debug(tag interface{}, message interface{})
- func Debugf(tag interface{}, message interface{}, params ...interface{})
- func Error(tag interface{}, message interface{})
- func Errorf(tag interface{}, message interface{}, params ...interface{})
- func Info(tag interface{}, message interface{})
- func Infof(tag interface{}, message interface{}, params ...interface{})
- func StringSerializer(value interface{}) ([]byte, error)
- func Trace(tag interface{}, message interface{})
- func Tracef(tag interface{}, message interface{}, params ...interface{})
- func Warn(tag interface{}, message interface{})
- func Warnf(tag interface{}, message interface{}, params ...interface{})
- type ConnectionRequest
- type DefaultLogger
- func (dl *DefaultLogger) Critical(message string, params ...interface{})
- func (dl *DefaultLogger) Debug(message string, params ...interface{})
- func (dl *DefaultLogger) Error(message string, params ...interface{})
- func (dl *DefaultLogger) Info(message string, params ...interface{})
- func (dl *DefaultLogger) Trace(message string, params ...interface{})
- func (dl *DefaultLogger) Warn(message string, params ...interface{})
- type HashPartitioner
- type KafkaLogger
- type KafkaProducer
- type LogLevel
- type ManualPartitioner
- type NetworkClient
- type NetworkClientConfig
- type NetworkRequest
- type Partitioner
- type Producer
- type ProducerConfig
- type ProducerRecord
- type RandomPartitioner
- type RecordAccumulator
- type RecordAccumulatorConfig
- type RecordMetadata
- type Selector
- type SelectorConfig
- type Serializer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ByteSerializer ¶
func Critical ¶
func Critical(tag interface{}, message interface{})
Critical writes a given message with a given tag to log with level Critical.
func Criticalf ¶
func Criticalf(tag interface{}, message interface{}, params ...interface{})
Criticalf formats a given message according to given params with a given tag to log with level Critical.
func Debug ¶
func Debug(tag interface{}, message interface{})
Debug writes a given message with a given tag to log with level Debug.
func Debugf ¶
func Debugf(tag interface{}, message interface{}, params ...interface{})
Debugf formats a given message according to given params with a given tag to log with level Debug.
func Error ¶
func Error(tag interface{}, message interface{})
Error writes a given message with a given tag to log with level Error.
func Errorf ¶
func Errorf(tag interface{}, message interface{}, params ...interface{})
Errorf formats a given message according to given params with a given tag to log with level Error.
func Info ¶
func Info(tag interface{}, message interface{})
Info writes a given message with a given tag to log with level Info.
func Infof ¶
func Infof(tag interface{}, message interface{}, params ...interface{})
Infof formats a given message according to given params with a given tag to log with level Info.
func StringSerializer ¶
func Trace ¶
func Trace(tag interface{}, message interface{})
Trace writes a given message with a given tag to log with level Trace.
func Tracef ¶
func Tracef(tag interface{}, message interface{}, params ...interface{})
Tracef formats a given message according to given params with a given tag to log with level Trace.
Types ¶
type ConnectionRequest ¶
type ConnectionRequest struct {
// contains filtered or unexported fields
}
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
DefaultLogger is a default implementation of KafkaLogger interface used in this client.
func NewDefaultLogger ¶
func NewDefaultLogger(Level LogLevel) *DefaultLogger
NewDefaultLogger creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.
func (*DefaultLogger) Critical ¶
func (dl *DefaultLogger) Critical(message string, params ...interface{})
Critical formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Debug ¶
func (dl *DefaultLogger) Debug(message string, params ...interface{})
Debug formats a given message according to given params to log with level Debug.
func (*DefaultLogger) Error ¶
func (dl *DefaultLogger) Error(message string, params ...interface{})
Error formats a given message according to given params to log with level Error.
func (*DefaultLogger) Info ¶
func (dl *DefaultLogger) Info(message string, params ...interface{})
Info formats a given message according to given params to log with level Info.
func (*DefaultLogger) Trace ¶
func (dl *DefaultLogger) Trace(message string, params ...interface{})
Trace formats a given message according to given params to log with level Trace.
func (*DefaultLogger) Warn ¶
func (dl *DefaultLogger) Warn(message string, params ...interface{})
Warn formats a given message according to given params to log with level Warn.
type HashPartitioner ¶
type HashPartitioner struct {
// contains filtered or unexported fields
}
func NewHashPartitioner ¶
func NewHashPartitioner() *HashPartitioner
func (*HashPartitioner) Partition ¶
func (hp *HashPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)
type KafkaLogger ¶
type KafkaLogger interface { //Formats a given message according to given params to log with level Trace. Trace(message string, params ...interface{}) //Formats a given message according to given params to log with level Debug. Debug(message string, params ...interface{}) //Formats a given message according to given params to log with level Info. Info(message string, params ...interface{}) //Formats a given message according to given params to log with level Warn. Warn(message string, params ...interface{}) //Formats a given message according to given params to log with level Error. Error(message string, params ...interface{}) //Formats a given message according to given params to log with level Critical. Critical(message string, params ...interface{}) }
KafkaLogger is a logger interface. Lets you plug-in your custom logging library instead of using built-in one.
var Logger KafkaLogger = NewDefaultLogger(InfoLevel)
Logger used by this client. Defaults to build-in logger with Info log level.
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewKafkaProducer ¶
func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSerializer Serializer, connector siesta.Connector) *KafkaProducer
func (*KafkaProducer) Close ¶
func (kp *KafkaProducer) Close()
func (*KafkaProducer) Send ¶
func (kp *KafkaProducer) Send(record *ProducerRecord) <-chan *RecordMetadata
type LogLevel ¶
type LogLevel string
LogLevel represents a logging level.
const ( // TraceLevel is used for debugging to find problems in functions, variables etc. TraceLevel LogLevel = "trace" // DebugLevel is used for detailed system reports and diagnostic messages. DebugLevel LogLevel = "debug" // InfoLevel is used for general information about a running application. InfoLevel LogLevel = "info" // WarnLevel is used to indicate small errors and failures that should not happen normally but are recovered automatically. WarnLevel LogLevel = "warn" // ErrorLevel is used to indicate severe errors that affect application workflow and are not handled automatically. ErrorLevel LogLevel = "error" // CriticalLevel is used to indicate fatal errors that may cause data corruption or loss. CriticalLevel LogLevel = "critical" )
type ManualPartitioner ¶
type ManualPartitioner struct{}
func NewManualPartitioner ¶
func NewManualPartitioner() *ManualPartitioner
func (*ManualPartitioner) Partition ¶
func (mp *ManualPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)
type NetworkClient ¶
type NetworkClient struct { *NetworkClientConfig // contains filtered or unexported fields }
func NewNetworkClient ¶
func NewNetworkClient(config *NetworkClientConfig, connector siesta.Connector, selector *Selector) *NetworkClient
type NetworkClientConfig ¶
type NetworkRequest ¶
type NetworkRequest struct {
// contains filtered or unexported fields
}
TODO better struct name
type Partitioner ¶
type Partitioner interface {
Partition(record *ProducerRecord, partitions []int32) (int32, error)
}
type Producer ¶
type Producer interface { // Send the given record asynchronously and return a channel which will eventually contain the response information. Send(*ProducerRecord) <-chan *RecordMetadata // Tries to close the producer cleanly. Close() }
type ProducerConfig ¶
type ProducerConfig struct { Partitioner Partitioner CompressionType string BatchSize int Linger time.Duration Retries int RetryBackoff time.Duration ClientID string MaxRequests int SendRoutines int ReceiveRoutines int ReadTimeout time.Duration WriteTimeout time.Duration RequiredAcks int AckTimeoutMs int32 BrokerList []string }
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
func ProducerConfigFromFile ¶
func ProducerConfigFromFile(filename string) (*ProducerConfig, error)
type ProducerRecord ¶
type RandomPartitioner ¶
type RandomPartitioner struct{}
func NewRandomPartitioner ¶
func NewRandomPartitioner() *RandomPartitioner
func (*RandomPartitioner) Partition ¶
func (rp *RandomPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)
type RecordAccumulator ¶
type RecordAccumulator struct {
// contains filtered or unexported fields
}
func NewRecordAccumulator ¶
func NewRecordAccumulator(config *RecordAccumulatorConfig, networkClient *NetworkClient) *RecordAccumulator
type RecordAccumulatorConfig ¶
type RecordAccumulatorConfig struct {
// contains filtered or unexported fields
}
type RecordMetadata ¶
type RecordMetadata struct { Record *ProducerRecord Offset int64 Topic string Partition int32 Error error }
type Selector ¶
type Selector struct {
// contains filtered or unexported fields
}
func NewSelector ¶
func NewSelector(config *SelectorConfig) *Selector
type SelectorConfig ¶
type SelectorConfig struct { ClientID string MaxRequests int SendRoutines int ReceiveRoutines int ReadTimeout time.Duration WriteTimeout time.Duration RequiredAcks int }
TODO proper config entry names that match upstream Kafka
func DefaultSelectorConfig ¶
func DefaultSelectorConfig() *SelectorConfig
func NewSelectorConfig ¶
func NewSelectorConfig(producerConfig *ProducerConfig) *SelectorConfig