Documentation ¶
Index ¶
- Constants
- func GetKafkaMsgHasher() core.Hasher
- type KafkaFoxtrotConn
- type KafkaFoxtrotConnConfig
- type KafkaFoxtrotMessage
- func (k *KafkaFoxtrotMessage) BatchPayload(msgs []interface{}, version int) []byte
- func (k *KafkaFoxtrotMessage) BatchURL(msgs []interface{}, endpoint string, version int) string
- func (k *KafkaFoxtrotMessage) GetDebugPath() string
- func (k *KafkaFoxtrotMessage) GetHeaders(conf sink.HTTPSinkConf) map[string]string
- func (k *KafkaFoxtrotMessage) GetPayload() []byte
- func (k *KafkaFoxtrotMessage) GetURL(endpoint string) string
- type KafkaHTTPConn
- type KafkaHTTPConnConfig
- type KafkaMessage
- func (k *KafkaMessage) BatchPayload(msgs []interface{}, version int) []byte
- func (k *KafkaMessage) BatchURL(msgs []interface{}, endpoint string, version int) string
- func (k *KafkaMessage) GetDebugPath() string
- func (k *KafkaMessage) GetHeaders(conf sink.HTTPSinkConf) map[string]string
- func (k *KafkaMessage) GetPayload() []byte
- func (k *KafkaMessage) GetRawMsg() *sarama.ConsumerMessage
- func (k *KafkaMessage) GetURL(endpoint string) string
- func (k *KafkaMessage) IsProcessed() bool
- func (k *KafkaMessage) MarkDone()
- type KafkaMsgHasher
- type KafkaOffsetHook
Constants ¶
const CustomURLKey = "__KEY_NAME__"
CustomURLKey place holder name, which will be replaced by kafka key
Variables ¶
This section is empty.
Functions ¶
func GetKafkaMsgHasher ¶
GetKafkaMsgHasher is Gloab function to get instance of KafkaMsgHasher
Types ¶
type KafkaFoxtrotConn ¶
type KafkaFoxtrotConn struct { EnableDebugLog bool Conf interface{} }
KafkaFoxtrotConn struct to abstract this connections Run
func (*KafkaFoxtrotConn) Run ¶
func (c *KafkaFoxtrotConn) Run()
Run method to start this Connection from source to sink
type KafkaFoxtrotConnConfig ¶
type KafkaFoxtrotConnConfig struct {
KafkaHTTPConnConfig
}
KafkaFoxtrotConnConfig holds config to connect KafkaSource to http_sink
type KafkaFoxtrotMessage ¶
type KafkaFoxtrotMessage struct {
KafkaMessage
}
KafkaFoxtrotMessage is data attribute that will be passed from Source to Sink
func (*KafkaFoxtrotMessage) BatchPayload ¶
func (k *KafkaFoxtrotMessage) BatchPayload(msgs []interface{}, version int) []byte
BatchPayload implements HTTPMsg for HttpSink processing
func (*KafkaFoxtrotMessage) BatchURL ¶
func (k *KafkaFoxtrotMessage) BatchURL(msgs []interface{}, endpoint string, version int) string
BatchURL implements HTTPMsg for HttpSink processing This implementation passes in query parameter partition and offset for debuggin
func (*KafkaFoxtrotMessage) GetDebugPath ¶
func (k *KafkaFoxtrotMessage) GetDebugPath() string
func (*KafkaFoxtrotMessage) GetHeaders ¶
func (k *KafkaFoxtrotMessage) GetHeaders(conf sink.HTTPSinkConf) map[string]string
GetHeaders implements HTTPMsg for HttpSink processing
func (*KafkaFoxtrotMessage) GetPayload ¶
func (k *KafkaFoxtrotMessage) GetPayload() []byte
GetPayload implements HTTPMsg for HttpSink processing
func (*KafkaFoxtrotMessage) GetURL ¶
func (k *KafkaFoxtrotMessage) GetURL(endpoint string) string
GetURL implements HTTPMsg for HttpSink processing This implementation passes in query parameter partition and offset for debuggin
type KafkaHTTPConn ¶
type KafkaHTTPConn struct { EnableDebugLog bool Conf interface{} SidelinePlugin interface{} }
KafkaHTTPConn struct to abstract this connections Run
func (*KafkaHTTPConn) Run ¶
func (c *KafkaHTTPConn) Run()
Run method to start this Connection from source to sink
type KafkaHTTPConnConfig ¶
type KafkaHTTPConnConfig struct { Dmux core.DmuxConf `json:"dmux"` Source source.KafkaConf `json:"source"` Sink sink.HTTPSinkConf `json:"sink"` PendingAcks int `json:"pending_acks"` }
KafkaHTTPConnConfig holds config to connect KafkaSource to http_sink
type KafkaMessage ¶
type KafkaMessage struct { Msg *sarama.ConsumerMessage Processed bool //marker to know once this message has been processed by Sink Sidelined bool // marker to know if the message gets sideliend URL string // added to avoid GetURLPath to repeate concat during logging }
KafkaMessage is data attribute that will be passed from Source to Sink
func (*KafkaMessage) BatchPayload ¶
func (k *KafkaMessage) BatchPayload(msgs []interface{}, version int) []byte
BatchPayload implements HTTPMsg for HttpSink processing
func (*KafkaMessage) BatchURL ¶
func (k *KafkaMessage) BatchURL(msgs []interface{}, endpoint string, version int) string
BatchURL implements HTTPMsg for HttpSink processing
func (*KafkaMessage) GetDebugPath ¶
func (k *KafkaMessage) GetDebugPath() string
func (*KafkaMessage) GetHeaders ¶
func (k *KafkaMessage) GetHeaders(conf sink.HTTPSinkConf) map[string]string
GetHeaders implements HTTPMsg for HttpSink processing
func (*KafkaMessage) GetPayload ¶
func (k *KafkaMessage) GetPayload() []byte
GetPayload implements HTTPMsg for HttpSink processing
func (*KafkaMessage) GetRawMsg ¶
func (k *KafkaMessage) GetRawMsg() *sarama.ConsumerMessage
GetRawMsg returns saram.ConsumerMessage under KafkaMessage
func (*KafkaMessage) GetURL ¶
func (k *KafkaMessage) GetURL(endpoint string) string
GetURL implements HTTPMsg for HttpSink processing
func (*KafkaMessage) IsProcessed ¶
func (k *KafkaMessage) IsProcessed() bool
IsProcessed returns true if KafkaMessage was MarkDone
func (*KafkaMessage) MarkDone ¶
func (k *KafkaMessage) MarkDone()
MarkDone the KafkaMessage as processed
type KafkaMsgHasher ¶
type KafkaMsgHasher struct{}
KafkaMsgHasher implements hasher a hash logic implementation of KafkaMessage. This runs consistenHashin on Key of KafkaKeyedMessage
func (*KafkaMsgHasher) ComputeHash ¶
func (o *KafkaMsgHasher) ComputeHash(data interface{}) int
ComputeHash method for KafkaMessage
type KafkaOffsetHook ¶
type KafkaOffsetHook struct {
// contains filtered or unexported fields
}
KafkaOffsetHook implments HTTPSinkHook amd KafkaSourceHook interface to track kafka offsets
func GetKafkaHook ¶
func GetKafkaHook(offsetTracker source.OffsetTracker, enableDebugLog bool) *KafkaOffsetHook
GetKafkaHook is a global function that returns instance of KafkaOffsetHook
func (*KafkaOffsetHook) PostHTTPCall ¶
func (h *KafkaOffsetHook) PostHTTPCall(msg interface{}, success bool)
PostHTTPCall is invoked - after HttpSink execution. This implementation calls KafkaMessage MarkDone method on the data argument of Post, to mark this message and sucessfuly processed.
func (*KafkaOffsetHook) Pre ¶
func (h *KafkaOffsetHook) Pre(data source.KafkaMsg)
Pre is invoked - before KafaSource pushes message to DMux. This implementation invokes OffsetTracker TrackMe method here, to ensure the Message to track is queued before its execution
func (*KafkaOffsetHook) PreHTTPCall ¶
func (h *KafkaOffsetHook) PreHTTPCall(msg interface{})
PreHTTPCall is invoked - before HttpSink exection.