Documentation ¶
Overview ¶
Copyright 2021-2023 EMQ Technologies Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- func File() api.Sink
- type CsvReader
- type FileSource
- func (fs *FileSource) Close(ctx api.StreamContext) error
- func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error
- func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error
- func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)
- type FileSourceConfig
- type FileType
- type FormatReader
- func CreateCsvReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
- func CreateJsonReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
- func CreateLineReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
- func CreateParquetReader(ctx api.StreamContext, filename string, config *FileSourceConfig) (FormatReader, error)
- func GetReader(ctx api.StreamContext, fileType FileType, fileStream io.Reader, ...) (FormatReader, error)
- type JsonReader
- type LineReader
- type ParquetReader
- type ReaderError
Constants ¶
View Source
const ( GZIP = "gzip" ZSTD = "zstd" )
View Source
const (
TupleError int = iota // display error in tuple
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type FileSource ¶
type FileSource struct {
// contains filtered or unexported fields
}
FileSource The BATCH to load data from file at once
func (*FileSource) Close ¶
func (fs *FileSource) Close(ctx api.StreamContext) error
func (*FileSource) Configure ¶
func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error
func (*FileSource) Load ¶
func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error
func (*FileSource) Open ¶
func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)
type FileSourceConfig ¶
type FileSourceConfig struct { FileType FileType `json:"fileType"` Path string `json:"path"` Interval int `json:"interval"` IsTable bool `json:"isTable"` Parallel bool `json:"parallel"` SendInterval int `json:"sendInterval"` ActionAfterRead int `json:"actionAfterRead"` MoveTo string `json:"moveTo"` HasHeader bool `json:"hasHeader"` Columns []string `json:"columns"` IgnoreStartLines int `json:"ignoreStartLines"` IgnoreEndLines int `json:"ignoreEndLines"` Delimiter string `json:"delimiter"` Decompression string `json:"decompression"` }
type FormatReader ¶
type FormatReader interface { Read() (map[string]interface{}, error) // Reads the next record. Returns EOF when the input has reached its end. Close() error }
func CreateCsvReader ¶
func CreateCsvReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
func CreateJsonReader ¶
func CreateJsonReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
func CreateLineReader ¶
func CreateLineReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
func CreateParquetReader ¶
func CreateParquetReader(ctx api.StreamContext, filename string, config *FileSourceConfig) (FormatReader, error)
func GetReader ¶
func GetReader(ctx api.StreamContext, fileType FileType, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)
type JsonReader ¶
type JsonReader struct {
// contains filtered or unexported fields
}
func (*JsonReader) Close ¶
func (r *JsonReader) Close() error
func (*JsonReader) Read ¶
func (r *JsonReader) Read() (map[string]interface{}, error)
type LineReader ¶
type LineReader struct {
// contains filtered or unexported fields
}
func (*LineReader) Close ¶
func (r *LineReader) Close() error
func (*LineReader) Read ¶
func (r *LineReader) Read() (map[string]interface{}, error)
type ParquetReader ¶
type ParquetReader struct {
// contains filtered or unexported fields
}
func (*ParquetReader) Close ¶
func (pr *ParquetReader) Close() error
type ReaderError ¶
func BuildError ¶
func BuildError(code int, msg string) *ReaderError
func (ReaderError) Error ¶
func (e ReaderError) Error() string
Source Files ¶
Click to show internal directories.
Click to hide internal directories.