file

package
v1.14.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Copyright 2021-2023 EMQ Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	GZIP = "gzip"
	ZSTD = "zstd"
)
View Source
const (
	TupleError int = iota // display error in tuple
)

Variables

This section is empty.

Functions

func File

func File() api.Sink

Types

type CsvReader

type CsvReader struct {
	// contains filtered or unexported fields
}

func (*CsvReader) Close

func (r *CsvReader) Close() error

func (*CsvReader) Read

func (r *CsvReader) Read() (map[string]interface{}, error)

type FileSource

type FileSource struct {
	// contains filtered or unexported fields
}

FileSource The BATCH to load data from file at once

func (*FileSource) Close

func (fs *FileSource) Close(ctx api.StreamContext) error

func (*FileSource) Configure

func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error

func (*FileSource) Load

func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error

func (*FileSource) Open

func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)

type FileSourceConfig

type FileSourceConfig struct {
	FileType         FileType `json:"fileType"`
	Path             string   `json:"path"`
	Interval         int      `json:"interval"`
	IsTable          bool     `json:"isTable"`
	Parallel         bool     `json:"parallel"`
	SendInterval     int      `json:"sendInterval"`
	ActionAfterRead  int      `json:"actionAfterRead"`
	MoveTo           string   `json:"moveTo"`
	HasHeader        bool     `json:"hasHeader"`
	Columns          []string `json:"columns"`
	IgnoreStartLines int      `json:"ignoreStartLines"`
	IgnoreEndLines   int      `json:"ignoreEndLines"`
	Delimiter        string   `json:"delimiter"`
	Decompression    string   `json:"decompression"`
}

type FileType

type FileType string
const (
	JSON_TYPE    FileType = "json"
	CSV_TYPE     FileType = "csv"
	LINES_TYPE   FileType = "lines"
	PARQUET_TYPE FileType = "parquet"
)

type FormatReader

type FormatReader interface {
	Read() (map[string]interface{}, error) // Reads the next record. Returns EOF when the input has reached its end.
	Close() error
}

func CreateCsvReader

func CreateCsvReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)

func CreateJsonReader

func CreateJsonReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)

func CreateLineReader

func CreateLineReader(ctx api.StreamContext, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)

func CreateParquetReader

func CreateParquetReader(ctx api.StreamContext, filename string, config *FileSourceConfig) (FormatReader, error)

func GetReader

func GetReader(ctx api.StreamContext, fileType FileType, fileStream io.Reader, config *FileSourceConfig) (FormatReader, error)

type JsonReader

type JsonReader struct {
	// contains filtered or unexported fields
}

func (*JsonReader) Close

func (r *JsonReader) Close() error

func (*JsonReader) Read

func (r *JsonReader) Read() (map[string]interface{}, error)

type LineReader

type LineReader struct {
	// contains filtered or unexported fields
}

func (*LineReader) Close

func (r *LineReader) Close() error

func (*LineReader) Read

func (r *LineReader) Read() (map[string]interface{}, error)

type ParquetReader

type ParquetReader struct {
	// contains filtered or unexported fields
}

func (*ParquetReader) Close

func (pr *ParquetReader) Close() error

func (*ParquetReader) Read

func (pr *ParquetReader) Read() (map[string]any, error)

type ReaderError

type ReaderError struct {
	Code    int
	Message string
}

func BuildError

func BuildError(code int, msg string) *ReaderError

func (ReaderError) Error

func (e ReaderError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL