Documentation ¶
Index ¶
- Constants
- Variables
- func MsgByteSize(msg *sarama.ProducerMessage) int
- func SendMsg(msgs []*sarama.ProducerMessage, producer sarama.SyncProducer) int64
- type Kafka
- func (k *Kafka) ProcessGenericEvent(genericMsg *events.GenericEvent) (*KafkaMessage, error)
- func (k *Kafka) ProcessSQLEvent(sqlEvent *events.SQLEvent) (*KafkaMessage, error)
- func (k *Kafka) ProducerLoop(producer sarama.SyncProducer, in chan *KafkaMessage)
- func (k *Kafka) Start(_ ...interface{}) error
- func (k *Kafka) StartConsumer(kafkaChan chan *KafkaMessage)
- func (k *Kafka) StartProducer(in chan *KafkaMessage, stop chan error)
- type KafkaGSSAPI
- type KafkaMessage
- type KafkaSinkConfig
- type KafkaUser
- type Pulsar
- type PulsarSinkConfig
- type Sink
- type SinkI
- type Stdout
Constants ¶
const ( // sink Possible Statuses SinkStatusOnError = "ON_ERROR" SinkStatusRunning = "RUNNING" SinkStatusWaiting = "WAITING" DefaultChannelSize = 100 )
Possible Statuses
const KafkaType = "Kafka"
KafkaType type of sink
const MaxRetry = 20
MaxRetry retry max
const PulsarType = "Pulsar"
PulsarType type of sink
const StdoutType = "Stdout"
StdoutType type of sink
Variables ¶
var Factory = map[string]sinkCreator{ StdoutType: NewStdout, KafkaType: NewKafka, PulsarType: NewPulsar, }
Factory sink Factory
Functions ¶
func MsgByteSize ¶ added in v0.2.2
func MsgByteSize(msg *sarama.ProducerMessage) int
func SendMsg ¶ added in v0.2.2
func SendMsg(msgs []*sarama.ProducerMessage, producer sarama.SyncProducer) int64
Types ¶
type Kafka ¶
type Kafka struct { *Sink KafkaConf *KafkaSinkConfig }
Kafka representation of kafka sink
func (*Kafka) ProcessGenericEvent ¶ added in v0.2.2
func (k *Kafka) ProcessGenericEvent(genericMsg *events.GenericEvent) (*KafkaMessage, error)
ProcessGenericEvent process Generic Event
func (*Kafka) ProcessSQLEvent ¶ added in v0.2.2
func (k *Kafka) ProcessSQLEvent(sqlEvent *events.SQLEvent) (*KafkaMessage, error)
ProcessSQLEvent process Sql Event
func (*Kafka) ProducerLoop ¶ added in v0.2.2
func (k *Kafka) ProducerLoop(producer sarama.SyncProducer, in chan *KafkaMessage)
func (*Kafka) StartConsumer ¶ added in v0.2.2
func (k *Kafka) StartConsumer(kafkaChan chan *KafkaMessage)
StartConsumer consume input chan
func (*Kafka) StartProducer ¶ added in v0.2.2
func (k *Kafka) StartProducer(in chan *KafkaMessage, stop chan error)
StartProducer send message to kafka
type KafkaGSSAPI ¶ added in v0.2.0
type KafkaMessage ¶ added in v0.2.0
type KafkaMessage struct { // The Kafka topic for this message. Topic string // The partitioning key for this message. Key string // The source offset of message Offset *events.Offset // The actual serialized message to store In Kafka. Value []byte }
KafkaSinkConfig representation of Kafka Message
type KafkaSinkConfig ¶ added in v0.2.2
type KafkaSinkConfig struct { TLS bool `json:"tls"` Kerberos bool `json:"kerberos"` ShuffleEvent bool `json:"shuffle_event" mapstructure:"shuffle_event"` Topic string `json:"topic"` TopicPrefix string `json:"topic_prefix" mapstructure:"topic_prefix"` ClientID string `json:"client_id" mapstructure:"client_id"` Brokers []string `json:"brokers"` Producer *KafkaUser `json:"Producer"` Consumer *KafkaUser `json:"consumer"` GSSAPI *KafkaGSSAPI `json:"gssapi"` MaxMessageBytes int `json:"max_message_bytes" mapstructure:"max_message_bytes"` NbProducer int `json:"nb_producer" mapstructure:"nb_producer"` }
KafkaSinkConfig representation of kafka sink config
type Pulsar ¶ added in v0.2.0
type Pulsar struct { *Sink PulsarConf *PulsarSinkConfig Producer pulsar.Producer }
Pulsar representation of Pulsar sink
func (*Pulsar) GetInputChan ¶ added in v0.2.0
func (p *Pulsar) GetInputChan() chan events.LookatchEvent
GetInputChan return the input channel attached to this sink
func (*Pulsar) ProcessEvent ¶ added in v0.2.2
func (p *Pulsar) ProcessEvent(msg events.LookatchEvent) error
ProcessEvent convert LookatchEvent to Pulsar ProducerMessage
func (*Pulsar) StartProducer ¶ added in v0.2.2
func (p *Pulsar) StartProducer()
StartConsumer consume input chan
type PulsarSinkConfig ¶ added in v0.2.2
type PulsarSinkConfig struct { Topic string `json:"topic"` URL string `json:"url"` Token string `json:"token"` }
PulsarSinkConfig representation of kafka sink config
type Sink ¶
type Sink struct { In chan events.LookatchEvent Stop chan error Commit chan interface{} Name string EncryptionKey string Conf *viper.Viper }
Sink representation of sink
func (*Sink) GetCommitChan ¶ added in v0.2.0
func (s *Sink) GetCommitChan() chan interface{}
GetCommitChan return the Commit channel attached to this sink
func (*Sink) GetInputChan ¶ added in v0.2.0
func (s *Sink) GetInputChan() chan events.LookatchEvent
GetInputChan return input channel attach to sink
func (*Sink) SendCommit ¶ added in v0.2.0
func (s *Sink) SendCommit(payload interface{})
SendCommit send a Commit message into the Commit channel of this sink
type SinkI ¶
type SinkI interface { Start(...interface{}) error GetInputChan() chan events.LookatchEvent GetCommitChan() chan interface{} }
SinkI sink interface