Documentation ¶
Index ¶
- Constants
- func File() api.Sink
- type CsvReader
- type FileSource
- func (fs *FileSource) Close(ctx api.StreamContext) error
- func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error
- func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error
- func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)
- type FileSourceConfig
- type FileType
- type FormatReader
- func CreateCsvReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
- func CreateJsonReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
- func CreateLineReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
- func GetReader(fileType FileType, fileStream io.Reader, config *FileSourceConfig, ...) (FormatReader, error)
- type JsonReader
- type LineReader
- type ReaderError
Constants ¶
View Source
const ( GZIP = "gzip" ZSTD = "zstd" )
View Source
const (
TupleError int = iota // display error in tuple
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type FileSource ¶
type FileSource struct {
// contains filtered or unexported fields
}
FileSource The BATCH to load data from file at once
func (*FileSource) Close ¶
func (fs *FileSource) Close(ctx api.StreamContext) error
func (*FileSource) Configure ¶
func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error
func (*FileSource) Load ¶
func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error
func (*FileSource) Open ¶
func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)
type FileSourceConfig ¶
type FileSourceConfig struct { FileType FileType `json:"fileType"` Path string `json:"path"` Interval int `json:"interval"` IsTable bool `json:"isTable"` Parallel bool `json:"parallel"` SendInterval int `json:"sendInterval"` ActionAfterRead int `json:"actionAfterRead"` MoveTo string `json:"moveTo"` HasHeader bool `json:"hasHeader"` Columns []string `json:"columns"` IgnoreStartLines int `json:"ignoreStartLines"` IgnoreEndLines int `json:"ignoreEndLines"` Delimiter string `json:"delimiter"` Decompression string `json:"decompression"` }
type FormatReader ¶
type FormatReader interface { Read() (map[string]interface{}, error) // Reads the next record. Returns EOF when the input has reached its end. Close() error }
func CreateCsvReader ¶
func CreateCsvReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
func CreateJsonReader ¶
func CreateJsonReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
func CreateLineReader ¶
func CreateLineReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
func GetReader ¶
func GetReader(fileType FileType, fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
type JsonReader ¶
type JsonReader struct {
// contains filtered or unexported fields
}
func (*JsonReader) Close ¶
func (r *JsonReader) Close() error
func (*JsonReader) Read ¶
func (r *JsonReader) Read() (map[string]interface{}, error)
type LineReader ¶
type LineReader struct {
// contains filtered or unexported fields
}
func (*LineReader) Close ¶
func (r *LineReader) Close() error
func (*LineReader) Read ¶
func (r *LineReader) Read() (map[string]interface{}, error)
type ReaderError ¶
func BuildError ¶
func BuildError(code int, msg string) *ReaderError
func (ReaderError) Error ¶
func (e ReaderError) Error() string
Source Files ¶
Click to show internal directories.
Click to hide internal directories.