Documentation ¶
Index ¶
Constants ¶
View Source
const ( EndpointTypeKafka = 1 EndpointTypeES = 2 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BulkItem ¶
type BulkItem esutil.BulkIndexerItem
type Config ¶
type Config struct { EndpointType int Kafka KafkaConfig Elasticsearch EsConfig }
type ElasticsearchConfig ¶
type ElasticsearchConfig elasticsearch.Config
type EsBulkIdxCfg ¶
type EsBulkIdxCfg esutil.BulkIndexerConfig
type EsConfig ¶
type EsConfig struct { BulkIndexerCfg EsBulkIdxCfg ElasticsearchCfg ElasticsearchConfig OnSuccess func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem) OnFailure func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem, error) Hook func(data *handler.BinlogEvent) (item BulkItem, err error) }
type EsEndpoint ¶
type EsEndpoint struct { Cli *elasticsearch.Client OnSuccess func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem) OnFailure func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem, error) Hook func(data *handler.BinlogEvent) (item BulkItem, err error) // contains filtered or unexported fields }
func NewEsEndpoint ¶
func NewEsEndpoint(cfg EsConfig) (*EsEndpoint, error)
func (*EsEndpoint) Add ¶
func (e *EsEndpoint) Add(item BulkItem) error
func (*EsEndpoint) Close ¶
func (e *EsEndpoint) Close() (err error)
func (*EsEndpoint) Send ¶
func (e *EsEndpoint) Send(data *handler.BinlogEvent) error
type IEndpoint ¶
type IEndpoint interface { Send(data *handler.BinlogEvent) error Close() error }
func NewEndPoint ¶
type KafkaConfig ¶
type KafkaConfig struct { Addrs []string Cfg KafkaCfg OnSuccess func(msg *sarama.ProducerMessage) OnFailure func(err *sarama.ProducerError) TopicConfigMap map[string]TopicCfg `json:"topic_config_map"` }
type KafkaEndPoint ¶
type KafkaEndPoint struct { Addrs []string Cfg sarama.Config TopicCfgMap map[string]TopicCfg // contains filtered or unexported fields }
var DefaultEndpoint *KafkaEndPoint
func NewKafkaEndpoint ¶
func NewKafkaEndpoint(kafkaCfg KafkaConfig) (*KafkaEndPoint, error)
func (*KafkaEndPoint) Close ¶
func (p *KafkaEndPoint) Close() (err error)
func (*KafkaEndPoint) Init ¶
func (p *KafkaEndPoint) Init() error
func (*KafkaEndPoint) Ping ¶
func (p *KafkaEndPoint) Ping() error
func (*KafkaEndPoint) Send ¶
func (p *KafkaEndPoint) Send(data *handler.BinlogEvent) error
Click to show internal directories.
Click to hide internal directories.