Documentation
¶
Index ¶
- Constants
- Variables
- func NSKey(ctx context.Context) (string, bool)
- func WithNSKey(ctx context.Context, ns string) context.Context
- type ConsumeCallback
- type ConsumerMessage
- type KafkaClient
- func (kc *KafkaClient) Close() error
- func (kc *KafkaClient) Errors() <-chan *ProducerError
- func (ksc *KafkaClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)
- func (kc *KafkaClient) SendKeyMsg(topic string, key string, msg []byte) errordeprecated
- func (kc *KafkaClient) SendMsg(topic string, msg []byte) errordeprecated
- func (kc *KafkaClient) Success() <-chan *ProducerMessage
- type KafkaConsumeClient
- func (kcc *KafkaConsumeClient) Close() error
- func (kcc *KafkaConsumeClient) CommitUpto(message *ConsumerMessage)
- func (kcc *KafkaConsumeClient) Errors() <-chan error
- func (kcc *KafkaConsumeClient) GetMessages() <-chan *ConsumerMessage
- func (kcc *KafkaConsumeClient) Messages(closeChan chan bool, maxQueueSize int) chan []byte
- type KafkaConsumeConfig
- type KafkaProductConfig
- type KafkaSyncClient
- type ProducerError
- type ProducerMessage
- type RecordHeader
- type SendResponse
- type TestReporter
Constants ¶
const ( KafkaSuccess int = 0 //kafka 0 成功 KafkaSendInnerError int = 200 //kafka 100 内部错误 KafkaSendNotInit int = 201 //kafka 101 未init KafkaSendError int = 202 KafkaConsumeError int = 203 KafkaConsumeInitError int = 204 KafkaProducerInitError int = 205 KafkaGetConsumeClientError int = 206 KafkaGetProducerClientError int = 207 )
Variables ¶
var ( KAFKA_CLIENT_NOT_INIT = errors.New("kafka client not init") KAFKA_PARAMS_ERROR = errors.New("kafka params error") )
var ( REQUIRED_ACK_NO_RESPONSE string = "NoResponse" REQUIRED_ACK_WAIT_FOR_LOCAL string = "WaitForLocal" REQUIRED_ACK_WAIT_FOR_ALL string = "WaitForAll" )
Functions ¶
Types ¶
type ConsumeCallback ¶
type ConsumeCallback interface {
Process(values []byte)
}
type ConsumerMessage ¶
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
MessageID string
CreateAt time.Time
// contains filtered or unexported fields
}
func (*ConsumerMessage) Context ¶
func (m *ConsumerMessage) Context() context.Context
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
func NewKafkaClient(conf KafkaProductConfig) (*KafkaClient, error)
func NewMockAsyncProducerClient ¶
func NewMockAsyncProducerClient() (asyncClient *KafkaClient, asyncMock *mocks.AsyncProducer, err error)
NewMockAsyncProducerClient returns standard async client ,asyncMocker and err. asyncMocker example shows here: http://github.com/Shopify/sarama/mocks
func (*KafkaClient) Close ¶
func (kc *KafkaClient) Close() error
func (*KafkaClient) Errors ¶
func (kc *KafkaClient) Errors() <-chan *ProducerError
func (*KafkaClient) Send ¶
func (ksc *KafkaClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)
func (*KafkaClient) SendKeyMsg
deprecated
func (kc *KafkaClient) SendKeyMsg(topic string, key string, msg []byte) error
Deprecated: SendKeyMsg func should not use anymore. Use Send func instead
func (*KafkaClient) SendMsg
deprecated
func (kc *KafkaClient) SendMsg(topic string, msg []byte) error
Deprecated: SendMsg func should not use anymore. Use Send func instead
func (*KafkaClient) Success ¶
func (kc *KafkaClient) Success() <-chan *ProducerMessage
type KafkaConsumeClient ¶
type KafkaConsumeClient struct {
// contains filtered or unexported fields
}
func NewKafkaConsumeClient ¶
func NewKafkaConsumeClient(conf KafkaConsumeConfig) (*KafkaConsumeClient, error)
func (*KafkaConsumeClient) Close ¶
func (kcc *KafkaConsumeClient) Close() error
func (*KafkaConsumeClient) CommitUpto ¶
func (kcc *KafkaConsumeClient) CommitUpto(message *ConsumerMessage)
func (*KafkaConsumeClient) Errors ¶
func (kcc *KafkaConsumeClient) Errors() <-chan error
func (*KafkaConsumeClient) GetMessages ¶
func (kcc *KafkaConsumeClient) GetMessages() <-chan *ConsumerMessage
type KafkaConsumeConfig ¶
type KafkaConsumeConfig struct { ConsumeFrom string `toml:"consume_from"` Zookeeperhost string `toml:"zkpoints"` Topic string `toml:"topic"` Group string `toml:"group"` Initoffset int `toml:"initoffset"` ProcessTimeout int `toml:"process_timeout"` CommitInterval int `toml:"commit_interval"` GetError bool `toml:"get_error"` TraceEnable bool `toml:"trace_enable"` ConsumeAll bool `toml:"consume_all"` }
type KafkaProductConfig ¶
type KafkaProductConfig struct { ProducerTo string `toml:"producer_to"` Broken string `toml:"kafka_broken"` RetryMax int `toml:"retrymax"` RequiredAcks string `toml:"RequiredAcks"` GetError bool `toml:"get_error"` GetSuccess bool `toml:"get_success"` RequestTimeout int `toml:"request_timeout"` Printf bool UseSync bool }
type KafkaSyncClient ¶
type KafkaSyncClient struct {
// contains filtered or unexported fields
}
func NewMockSyncProducerClient ¶
func NewMockSyncProducerClient() (syncClient *KafkaSyncClient, syncMock *mocks.SyncProducer, err error)
NewMockSyncProducerClient returns standard sync client ,syncMocker and err. syncMocker example shows here: http://github.com/Shopify/sarama/mocks
func NewSyncProducterClient ¶
func NewSyncProducterClient(conf KafkaProductConfig) (*KafkaSyncClient, error)
func (*KafkaSyncClient) Close ¶
func (kc *KafkaSyncClient) Close() error
func (*KafkaSyncClient) Send ¶
func (ksc *KafkaSyncClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)
Send message to kafka cluster, ctx is http/rpc context headers rfs = https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
func (*KafkaSyncClient) SendSyncMsg
deprecated
type ProducerError ¶
type ProducerError struct { Msg *ProducerMessage Err error }
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
type ProducerMessage ¶
type ProducerMessage struct { Topic string // The Kafka topic for this message. // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key string // The actual message to store in Kafka. Pre-existing Encoders include // StringEncoder and ByteEncoder. Value []byte // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} // Offset is the offset of the message stored on the broker. This is only // guaranteed to be defined if the message was successfully delivered and // RequiredAcks is not NoResponse. Offset int64 // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 // Timestamp is the timestamp assigned to the message by the broker. This // is only guaranteed to be defined if the message was successfully // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at // least version 0.10.0. Timestamp time.Time // MessageID MessageID string }
ProducerMessage is the collection of elements passed to the Producer in order to send a message.
type RecordHeader ¶
type SendResponse ¶
type TestReporter ¶
type TestReporter struct {
// contains filtered or unexported fields
}
TestReporter records producer/consumer errors
func NewTestReporter ¶
func NewTestReporter() *TestReporter
func (*TestReporter) Errorf ¶
func (tr *TestReporter) Errorf(format string, args ...interface{})