Documentation ¶
Index ¶
- Variables
- type AcceptMessageFunc
- type ConsumG
- type ConsumerConfiguration
- type KafkaClient
- type KafkaClientImpl
- func (this *KafkaClientImpl) ConfigurationName() string
- func (this *KafkaClientImpl) GetHost() []string
- func (this *KafkaClientImpl) GetMetrics() map[string]*Metrics
- func (this *KafkaClientImpl) GetTopicMetrics(metricTopicIdx string) *Metrics
- func (this *KafkaClientImpl) GroupLoop() fault.TypedError
- func (this *KafkaClientImpl) Init(log *logger.Logger, kafkaConfig *KafkaConfig)
- func (this *KafkaClientImpl) IsWorked() bool
- func (this *KafkaClientImpl) KafkaConfig() *KafkaConfig
- func (this *KafkaClientImpl) Loop() fault.TypedError
- func (this *KafkaClientImpl) Ping() fault.TypedError
- func (this *KafkaClientImpl) Pong() (fault.TypedError, string)
- func (this *KafkaClientImpl) Push(message []byte) fault.TypedError
- func (this *KafkaClientImpl) PushEvent(message interface{}) fault.TypedError
- func (this *KafkaClientImpl) PushText(message string) fault.TypedError
- func (this *KafkaClientImpl) SetAcceptMessageFunc(acceptMessageFunc AcceptMessageFunc)
- func (this *KafkaClientImpl) Status() (status string)
- func (this *KafkaClientImpl) Version() (version string)
- type KafkaConfig
- type Metrics
Constants ¶
This section is empty.
Variables ¶
View Source
var KafkaClientRef = &KafkaClientImpl{}
View Source
var SampleKafkaConfig = KafkaConfig{ Brokers: "127.0.0.1:9092", Version: "", Topic: "sarama", Verbose: true, Consumer: ConsumerConfiguration{ Group: "example", Oldest: true, }, }
Functions ¶
This section is empty.
Types ¶
type AcceptMessageFunc ¶
type AcceptMessageFunc func(logrus.FieldLogger, []byte) (fault.TypedError, string)
type ConsumG ¶
type ConsumG struct { Log *logger.Logger AcceptMessageFunc AcceptMessageFunc KafkaClient *KafkaClientImpl // contains filtered or unexported fields }
Consumer represents a Sarama consumer group consumer
func (*ConsumG) Cleanup ¶
func (consumer *ConsumG) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumG) ConsumeClaim ¶
func (this *ConsumG) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type ConsumerConfiguration ¶
type ConsumerConfiguration struct { Group string Oldest bool AcceptMessageFunc AcceptMessageFunc }
type KafkaClient ¶
type KafkaClient interface { Init(*logger.Logger, *KafkaConfig) KafkaConfig() *KafkaConfig Loop() fault.TypedError GroupLoop() fault.TypedError ConfigurationName() string Version() string Push([]byte) fault.TypedError PushText(string) fault.TypedError PushEvent(interface{}) fault.TypedError Ping() fault.TypedError Status() string IsWorked() bool GetMetrics() map[string]*Metrics GetHost() []string SetAcceptMessageFunc(AcceptMessageFunc) }
type KafkaClientImpl ¶
type KafkaClientImpl struct { Log *logger.Logger Topic string Partition int32 Offset string MsgCount int64 IsWorkedStatus bool StatusError error Metrics map[string]*Metrics Brokers []string // contains filtered or unexported fields }
func (*KafkaClientImpl) ConfigurationName ¶
func (this *KafkaClientImpl) ConfigurationName() string
func (*KafkaClientImpl) GetHost ¶
func (this *KafkaClientImpl) GetHost() []string
func (*KafkaClientImpl) GetMetrics ¶
func (this *KafkaClientImpl) GetMetrics() map[string]*Metrics
func (*KafkaClientImpl) GetTopicMetrics ¶
func (this *KafkaClientImpl) GetTopicMetrics(metricTopicIdx string) *Metrics
func (*KafkaClientImpl) GroupLoop ¶
func (this *KafkaClientImpl) GroupLoop() fault.TypedError
func (*KafkaClientImpl) Init ¶
func (this *KafkaClientImpl) Init( log *logger.Logger, kafkaConfig *KafkaConfig, )
func (*KafkaClientImpl) IsWorked ¶
func (this *KafkaClientImpl) IsWorked() bool
func (*KafkaClientImpl) KafkaConfig ¶
func (this *KafkaClientImpl) KafkaConfig() *KafkaConfig
func (*KafkaClientImpl) Loop ¶
func (this *KafkaClientImpl) Loop() fault.TypedError
func (*KafkaClientImpl) Ping ¶
func (this *KafkaClientImpl) Ping() fault.TypedError
func (*KafkaClientImpl) Pong ¶
func (this *KafkaClientImpl) Pong() (fault.TypedError, string)
func (*KafkaClientImpl) Push ¶
func (this *KafkaClientImpl) Push( message []byte, ) fault.TypedError
func (*KafkaClientImpl) PushEvent ¶
func (this *KafkaClientImpl) PushEvent( message interface{}, ) fault.TypedError
func (*KafkaClientImpl) PushText ¶
func (this *KafkaClientImpl) PushText(message string) fault.TypedError
func (*KafkaClientImpl) SetAcceptMessageFunc ¶
func (this *KafkaClientImpl) SetAcceptMessageFunc(acceptMessageFunc AcceptMessageFunc)
func (*KafkaClientImpl) Status ¶
func (this *KafkaClientImpl) Status() (status string)
func (*KafkaClientImpl) Version ¶
func (this *KafkaClientImpl) Version() (version string)
type KafkaConfig ¶
Click to show internal directories.
Click to hide internal directories.