Documentation
¶
Index ¶
- Constants
- Variables
- func GetSink() api.Sink
- func GetSource() api.Source
- type KafkaSink
- func (k *KafkaSink) Close(ctx api.StreamContext) error
- func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error)
- func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error)
- func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (k *KafkaSink) Ping(ctx api.StreamContext, props map[string]any) error
- func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error
- type KafkaSource
- func (k *KafkaSource) Close(ctx api.StreamContext) error
- func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (k *KafkaSource) GetOffset() (interface{}, error)
- func (k *KafkaSource) Ping(ctx api.StreamContext, props map[string]any) error
- func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) error
- func (k *KafkaSource) ResetOffset(input map[string]interface{}) error
- func (k *KafkaSource) Rewind(offset interface{}) error
- func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error
Constants ¶
View Source
const ( LblRequest = "req" LblMessage = "message" LblException = "exception" )
View Source
const ( SASL_NONE = "none" SASL_PLAIN = "plain" SASL_SCRAM = "scram" )
Variables ¶
View Source
var ( KafkaCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "kuiper", Subsystem: "io", Name: "kafka_count", Help: "counter of Kafka IO", }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) KafkaHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "kuiper", Subsystem: "io", Name: "kafka_duration", Help: "Historgram of Kafka IO", }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) )
Functions ¶
Types ¶
type KafkaSink ¶
type KafkaSink struct {
// contains filtered or unexported fields
}
func (*KafkaSink) Collect ¶
func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error)
func (*KafkaSink) CollectList ¶
func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error)
func (*KafkaSink) Connect ¶
func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
type KafkaSource ¶
type KafkaSource struct {
// contains filtered or unexported fields
}
func (*KafkaSource) Close ¶
func (k *KafkaSource) Close(ctx api.StreamContext) error
func (*KafkaSource) Connect ¶
func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
func (*KafkaSource) GetOffset ¶
func (k *KafkaSource) GetOffset() (interface{}, error)
func (*KafkaSource) Ping ¶
func (k *KafkaSource) Ping(ctx api.StreamContext, props map[string]any) error
func (*KafkaSource) Provision ¶
func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) error
func (*KafkaSource) ResetOffset ¶
func (k *KafkaSource) ResetOffset(input map[string]interface{}) error
func (*KafkaSource) Rewind ¶
func (k *KafkaSource) Rewind(offset interface{}) error
func (*KafkaSource) Subscribe ¶
func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error
Click to show internal directories.
Click to hide internal directories.