Documentation ¶
Index ¶
- func ByteSerializer(value interface{}) ([]byte, error)
- func Critical(tag interface{}, message interface{})
- func Criticalf(tag interface{}, message interface{}, params ...interface{})
- func Debug(tag interface{}, message interface{})
- func Debugf(tag interface{}, message interface{}, params ...interface{})
- func Error(tag interface{}, message interface{})
- func Errorf(tag interface{}, message interface{}, params ...interface{})
- func Info(tag interface{}, message interface{})
- func Infof(tag interface{}, message interface{}, params ...interface{})
- func StringSerializer(value interface{}) ([]byte, error)
- func Trace(tag interface{}, message interface{})
- func Tracef(tag interface{}, message interface{}, params ...interface{})
- func Warn(tag interface{}, message interface{})
- func Warnf(tag interface{}, message interface{}, params ...interface{})
- type ConnectionRequest
- type DefaultLogger
- func (dl *DefaultLogger) Critical(message string, params ...interface{})
- func (dl *DefaultLogger) Debug(message string, params ...interface{})
- func (dl *DefaultLogger) Error(message string, params ...interface{})
- func (dl *DefaultLogger) Info(message string, params ...interface{})
- func (dl *DefaultLogger) Trace(message string, params ...interface{})
- func (dl *DefaultLogger) Warn(message string, params ...interface{})
- type HashPartitioner
- type KafkaLogger
- type KafkaProducer
- type LogLevel
- type ManualPartitioner
- type Metadata
- type Metric
- type NetworkClient
- type NetworkClientConfig
- type NetworkRequest
- type PartitionInfo
- type Partitioner
- type Producer
- type ProducerConfig
- type ProducerRecord
- type RandomPartitioner
- type RecordAccumulator
- type RecordAccumulatorConfig
- type RecordBatch
- type RecordMetadata
- type Selector
- type SelectorConfig
- type Serializer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ByteSerializer ¶
func Critical ¶
func Critical(tag interface{}, message interface{})
Critical writes a given message with a given tag to log with level Critical.
func Criticalf ¶
func Criticalf(tag interface{}, message interface{}, params ...interface{})
Criticalf formats a given message according to given params with a given tag to log with level Critical.
func Debug ¶
func Debug(tag interface{}, message interface{})
Debug writes a given message with a given tag to log with level Debug.
func Debugf ¶
func Debugf(tag interface{}, message interface{}, params ...interface{})
Debugf formats a given message according to given params with a given tag to log with level Debug.
func Error ¶
func Error(tag interface{}, message interface{})
Error writes a given message with a given tag to log with level Error.
func Errorf ¶
func Errorf(tag interface{}, message interface{}, params ...interface{})
Errorf formats a given message according to given params with a given tag to log with level Error.
func Info ¶
func Info(tag interface{}, message interface{})
Info writes a given message with a given tag to log with level Info.
func Infof ¶
func Infof(tag interface{}, message interface{}, params ...interface{})
Infof formats a given message according to given params with a given tag to log with level Info.
func StringSerializer ¶
func Trace ¶
func Trace(tag interface{}, message interface{})
Trace writes a given message with a given tag to log with level Trace.
func Tracef ¶
func Tracef(tag interface{}, message interface{}, params ...interface{})
Tracef formats a given message according to given params with a given tag to log with level Trace.
Types ¶
type ConnectionRequest ¶
type ConnectionRequest struct {
// contains filtered or unexported fields
}
type DefaultLogger ¶
type DefaultLogger struct {
// contains filtered or unexported fields
}
DefaultLogger is a default implementation of KafkaLogger interface used in this client.
func NewDefaultLogger ¶
func NewDefaultLogger(Level LogLevel) *DefaultLogger
NewDefaultLogger creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.
func (*DefaultLogger) Critical ¶
func (dl *DefaultLogger) Critical(message string, params ...interface{})
Critical formats a given message according to given params to log with level Critical.
func (*DefaultLogger) Debug ¶
func (dl *DefaultLogger) Debug(message string, params ...interface{})
Debug formats a given message according to given params to log with level Debug.
func (*DefaultLogger) Error ¶
func (dl *DefaultLogger) Error(message string, params ...interface{})
Error formats a given message according to given params to log with level Error.
func (*DefaultLogger) Info ¶
func (dl *DefaultLogger) Info(message string, params ...interface{})
Info formats a given message according to given params to log with level Info.
func (*DefaultLogger) Trace ¶
func (dl *DefaultLogger) Trace(message string, params ...interface{})
Trace formats a given message according to given params to log with level Trace.
func (*DefaultLogger) Warn ¶
func (dl *DefaultLogger) Warn(message string, params ...interface{})
Warn formats a given message according to given params to log with level Warn.
type HashPartitioner ¶
type HashPartitioner struct {
// contains filtered or unexported fields
}
func NewHashPartitioner ¶
func NewHashPartitioner() *HashPartitioner
func (*HashPartitioner) Partition ¶
func (hp *HashPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)
type KafkaLogger ¶
type KafkaLogger interface { //Formats a given message according to given params to log with level Trace. Trace(message string, params ...interface{}) //Formats a given message according to given params to log with level Debug. Debug(message string, params ...interface{}) //Formats a given message according to given params to log with level Info. Info(message string, params ...interface{}) //Formats a given message according to given params to log with level Warn. Warn(message string, params ...interface{}) //Formats a given message according to given params to log with level Error. Error(message string, params ...interface{}) //Formats a given message according to given params to log with level Critical. Critical(message string, params ...interface{}) }
KafkaLogger is a logger interface. Lets you plug-in your custom logging library instead of using built-in one.
var Logger KafkaLogger = NewDefaultLogger(InfoLevel)
Logger used by this client. Defaults to build-in logger with Info log level.
type KafkaProducer ¶
type KafkaProducer struct { RecordsMetadata chan *RecordMetadata // contains filtered or unexported fields }
func NewKafkaProducer ¶
func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSerializer Serializer, connector siesta.Connector) *KafkaProducer
func (*KafkaProducer) Close ¶
func (kp *KafkaProducer) Close(timeout time.Duration)
TODO return channel and remove timeout
func (*KafkaProducer) Flush ¶
func (kp *KafkaProducer) Flush()
func (*KafkaProducer) Metrics ¶
func (kp *KafkaProducer) Metrics() map[string]Metric
func (*KafkaProducer) PartitionsFor ¶
func (kp *KafkaProducer) PartitionsFor(topic string) []PartitionInfo
func (*KafkaProducer) Send ¶
func (kp *KafkaProducer) Send(record *ProducerRecord) <-chan *RecordMetadata
type LogLevel ¶
type LogLevel string
LogLevel represents a logging level.
const ( // TraceLevel is used for debugging to find problems in functions, variables etc. TraceLevel LogLevel = "trace" // DebugLevel is used for detailed system reports and diagnostic messages. DebugLevel LogLevel = "debug" // InfoLevel is used for general information about a running application. InfoLevel LogLevel = "info" // WarnLevel is used to indicate small errors and failures that should not happen normally but are recovered automatically. WarnLevel LogLevel = "warn" // ErrorLevel is used to indicate severe errors that affect application workflow and are not handled automatically. ErrorLevel LogLevel = "error" // CriticalLevel is used to indicate fatal errors that may cause data corruption or loss. CriticalLevel LogLevel = "critical" )
type ManualPartitioner ¶
type ManualPartitioner struct{}
func NewManualPartitioner ¶
func NewManualPartitioner() *ManualPartitioner
func (*ManualPartitioner) Partition ¶
func (mp *ManualPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
func NewMetadata ¶
type NetworkClient ¶
type NetworkClient struct {
// contains filtered or unexported fields
}
func NewNetworkClient ¶
func NewNetworkClient(config NetworkClientConfig, connector siesta.Connector, producerConfig *ProducerConfig) *NetworkClient
type NetworkClientConfig ¶
type NetworkClientConfig struct { }
type NetworkRequest ¶
type NetworkRequest struct {
// contains filtered or unexported fields
}
TODO better struct name
type PartitionInfo ¶
type PartitionInfo struct{}
type Partitioner ¶
type Partitioner interface {
Partition(record *ProducerRecord, partitions []int32) (int32, error)
}
type Producer ¶
type Producer interface { // Send the given record asynchronously and return a channel which will eventually contain the response information. Send(*ProducerRecord) <-chan *RecordMetadata // Flush any accumulated records from the producer. Blocks until all sends are complete. Flush() // Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change // over time so this list should not be cached. PartitionsFor(topic string) []PartitionInfo // Return a map of metrics maintained by the producer Metrics() map[string]Metric // Tries to close the producer cleanly within the specified timeout. If the close does not complete within the // timeout, fail any pending send requests and force close the producer. Close(timeout time.Duration) }
type ProducerConfig ¶
type ProducerConfig struct { Partitioner Partitioner MetadataExpire time.Duration CompressionType string BatchSize int Linger time.Duration Retries int RetryBackoff time.Duration BlockOnBufferFull bool ClientID string MaxRequests int SendRoutines int ReceiveRoutines int ReadTimeout time.Duration WriteTimeout time.Duration RequiredAcks int AckTimeoutMs int32 BrokerList []string }
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
func ProducerConfigFromFile ¶
func ProducerConfigFromFile(filename string) (*ProducerConfig, error)
type ProducerRecord ¶
type RandomPartitioner ¶
type RandomPartitioner struct{}
func NewRandomPartitioner ¶
func NewRandomPartitioner() *RandomPartitioner
func (*RandomPartitioner) Partition ¶
func (rp *RandomPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)
type RecordAccumulator ¶
type RecordAccumulator struct {
// contains filtered or unexported fields
}
func NewRecordAccumulator ¶
func NewRecordAccumulator(config *RecordAccumulatorConfig, metadataChan chan *RecordMetadata) *RecordAccumulator
type RecordAccumulatorConfig ¶
type RecordAccumulatorConfig struct {
// contains filtered or unexported fields
}
type RecordBatch ¶
type RecordMetadata ¶
type RecordMetadata struct { Record *ProducerRecord Offset int64 Topic string Partition int32 Error error }
type Selector ¶
type Selector struct {
// contains filtered or unexported fields
}
func NewSelector ¶
func NewSelector(config *SelectorConfig) *Selector
type SelectorConfig ¶
type SelectorConfig struct { ClientID string MaxRequests int SendRoutines int ReceiveRoutines int ReadTimeout time.Duration WriteTimeout time.Duration RequiredAcks int }
TODO proper config entry names that match upstream Kafka
func DefaultSelectorConfig ¶
func DefaultSelectorConfig() *SelectorConfig
func NewSelectorConfig ¶
func NewSelectorConfig(producerConfig *ProducerConfig) *SelectorConfig