Documentation ¶
Index ¶
- Constants
- Variables
- type KafkaWriter
- type ToKafkaOpSpec
- type ToKafkaProcedureSpec
- type ToKafkaTransformation
- func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error)
- func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (err error)
- func (t *ToKafkaTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *ToKafkaTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error
- func (t *ToKafkaTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error
Constants ¶
View Source
const (
// ToKafkaKind is the Kind for the ToKafka Flux function
ToKafkaKind = "toKafka"
)
Variables ¶
View Source
var DefaultKafkaWriterFactory = func(conf kafka.WriterConfig) KafkaWriter {
return kafka.NewWriter(conf)
}
DefaultKafkaWriterFactory is a terrible name for a way to make a kafkaWriter that is injectable for testing
Functions ¶
This section is empty.
Types ¶
type KafkaWriter ¶
KafkaWriter is an interface for what we need fromDefaultKafkaWriterFactory
type ToKafkaOpSpec ¶
type ToKafkaOpSpec struct { Brokers []string `json:"brokers"` Topic string `json:"topic"` Balancer string `json:"balancer"` Name string `json:"name"` NameColumn string `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column. TimeColumn string `json:"timeColumn"` TagColumns []string `json:"tagColumns"` ValueColumns []string `json:"valueColumns"` MsgBufSize int `json:"msgBufferSize"` // the maximim number of messages to buffer before sending to kafka, the library we use defaults to 100 }
func (ToKafkaOpSpec) Kind ¶
func (ToKafkaOpSpec) Kind() flux.OperationKind
func (*ToKafkaOpSpec) ReadArgs ¶
func (o *ToKafkaOpSpec) ReadArgs(args flux.Arguments) error
ReadArgs loads a flux.Arguments into ToKafkaOpSpec. It sets several default values. If the time_column isn't set, it defaults to execute.TimeColLabel. If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
type ToKafkaProcedureSpec ¶
type ToKafkaProcedureSpec struct { plan.DefaultCost Spec *ToKafkaOpSpec // contains filtered or unexported fields }
func (*ToKafkaProcedureSpec) Copy ¶
func (o *ToKafkaProcedureSpec) Copy() plan.ProcedureSpec
func (*ToKafkaProcedureSpec) Kind ¶
func (o *ToKafkaProcedureSpec) Kind() plan.ProcedureKind
type ToKafkaTransformation ¶
type ToKafkaTransformation struct {
// contains filtered or unexported fields
}
func NewToKafkaTransformation ¶
func NewToKafkaTransformation(d execute.Dataset, deps dependencies.Interface, cache execute.TableBuilderCache, spec *ToKafkaProcedureSpec) (*ToKafkaTransformation, error)
func (*ToKafkaTransformation) Finish ¶
func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error)
func (*ToKafkaTransformation) RetractTable ¶
func (*ToKafkaTransformation) UpdateProcessingTime ¶
func (*ToKafkaTransformation) UpdateWatermark ¶
Click to show internal directories.
Click to hide internal directories.