Documentation ¶
Index ¶
- Constants
- func GetMessageHasher() core.Hasher
- type CursorHook
- type CursorTracker
- type Message
- func (m *Message) BatchPayload(msgs []interface{}, version int) []byte
- func (m *Message) BatchURL(msgs []interface{}, endpoint string, version int) string
- func (m *Message) GetDebugPath() string
- func (m *Message) GetHeaders(conf sink.HTTPSinkConf) map[string]string
- func (m *Message) GetPayload() []byte
- func (m *Message) GetRawMsg() *pulsar.ConsumerMessage
- func (m *Message) GetURL(endpoint string) string
- func (m *Message) IsProcessed() bool
- func (m *Message) MarkDone()
- type MessageHasher
- type MessageProcessor
- type PulsarConf
- type PulsarCursorTracker
- type PulsarMessageFactoryImpl
- type PulsarSource
- func (p *PulsarSource) Generate(out chan<- interface{})
- func (p *PulsarSource) GetKey(msg interface{}) []byte
- func (p *PulsarSource) GetOffset(msg interface{}) int64
- func (p *PulsarSource) GetPartition(msg interface{}) int32
- func (p *PulsarSource) GetValue(msg interface{}) []byte
- func (p *PulsarSource) RegisterHook(hook SourceHook)
- func (p *PulsarSource) Stop()
- type SourceHook
Constants ¶
const CustomURLKey = "__KEY_NAME__"
CustomURLKey place holder name, which will be replaced by kafka key
Variables ¶
This section is empty.
Functions ¶
func GetMessageHasher ¶
Types ¶
type CursorHook ¶
type CursorHook struct {
// contains filtered or unexported fields
}
func GetPulsarHook ¶
func GetPulsarHook(tracker PulsarCursorTracker, enableDebugLog bool) *CursorHook
func (*CursorHook) PostHTTPCall ¶
func (h *CursorHook) PostHTTPCall(msg interface{}, success bool)
PostHTTPCall is invoked - after HttpSink execution. This implementation calls KafkaMessage MarkDone method on the data argument of Post, to mark this message and successfully processed.
func (*CursorHook) Pre ¶
func (h *CursorHook) Pre(p MessageProcessor)
Pre is invoked - before pulsar source pushes message to DMux. This implementation invokes CursorTracker TrackMe method here, to track Message is queued before its execution
func (*CursorHook) PreHTTPCall ¶
func (h *CursorHook) PreHTTPCall(msg interface{})
PreHTTPCall is invoked - before HttpSink execution.
type CursorTracker ¶
type CursorTracker struct {
// contains filtered or unexported fields
}
func (*CursorTracker) TrackMe ¶
func (t *CursorTracker) TrackMe(msg MessageProcessor)
type Message ¶
type Message struct { Msg *pulsar.ConsumerMessage Processed bool Sidelined bool }
Message is a container of message and it's state and implements HTTPMessage
func (*Message) BatchPayload ¶
BatchPayload implements HTTPMsg interface
func (*Message) GetDebugPath ¶
GetDebugPath implements HTTPMsg interface
func (*Message) GetHeaders ¶
func (m *Message) GetHeaders(conf sink.HTTPSinkConf) map[string]string
GetHeaders implements HTTPMsg interface
func (*Message) GetPayload ¶
func (*Message) GetRawMsg ¶
func (m *Message) GetRawMsg() *pulsar.ConsumerMessage
func (*Message) IsProcessed ¶
type MessageHasher ¶
type MessageHasher struct { }
func (*MessageHasher) ComputeHash ¶
func (m *MessageHasher) ComputeHash(data interface{}) int
type MessageProcessor ¶
type MessageProcessor interface { MarkDone() GetRawMsg() *pulsar.ConsumerMessage IsProcessed() bool }
MessageProcessor is an interface to update Message
type PulsarConf ¶
type PulsarConf struct { SubscriptionName string `json:"name"` Url string `json:"url"` Topic string `json:"topic"` ForceRestart bool `json:"force_restart"` ReadNewest bool `json:"read_newest"` SeekByTime int64 `json:"seek_by_time"` AuthClientId string `json:"client_id"` AuthClientSecret string `json:"auth_client_secret"` AuthIssuerURL string `json:"auth_issuer_url"` AuthAudience string `json:"auth_audience"` SubscriptionType string `json:"subscription_type"` }
type PulsarCursorTracker ¶
type PulsarCursorTracker interface {
TrackMe(p MessageProcessor)
}
func GetCursorTracker ¶
func GetCursorTracker(size int, source *PulsarSource) PulsarCursorTracker
type PulsarMessageFactoryImpl ¶
type PulsarMessageFactoryImpl struct { }
func (*PulsarMessageFactoryImpl) Create ¶
func (*PulsarMessageFactoryImpl) Create(msg pulsar.ConsumerMessage) MessageProcessor
type PulsarSource ¶
type PulsarSource struct {
// contains filtered or unexported fields
}
func GetPulsarSource ¶
func GetPulsarSource(conf PulsarConf) *PulsarSource
func (*PulsarSource) Generate ¶
func (p *PulsarSource) Generate(out chan<- interface{})
Generate is Source method implementation, which connects to Pulsar and pushes PulsarMessage into the channel
func (*PulsarSource) GetKey ¶
func (p *PulsarSource) GetKey(msg interface{}) []byte
func (*PulsarSource) GetOffset ¶
func (p *PulsarSource) GetOffset(msg interface{}) int64
func (*PulsarSource) GetPartition ¶
func (p *PulsarSource) GetPartition(msg interface{}) int32
func (*PulsarSource) GetValue ¶
func (p *PulsarSource) GetValue(msg interface{}) []byte
func (*PulsarSource) RegisterHook ¶
func (p *PulsarSource) RegisterHook(hook SourceHook)
func (*PulsarSource) Stop ¶
func (p *PulsarSource) Stop()
Stop method implements Source interface stop method, to Stop the KafkaConsumer
type SourceHook ¶
type SourceHook interface {
Pre(p MessageProcessor)
}