Documentation
¶
Index ¶
- Variables
- func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)
- func CleanHeaderRow(header []string) []string
- func CleanName(name string) (newName string)
- func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)
- func CreateDummyFields(numCols int) (cols []string)
- func DecimalByteArrayToString(dec []byte, precision int, scale int) string
- func GetISO8601DateMap(t time.Time) map[string]interface{}
- func IsDummy(columns []Column) bool
- func Iso8601ToGoLayout(dateFormat string) (goDateFormat string)
- func MakeDecNumScale(scale int) *big.Rat
- func MakeRowsChan() chan []any
- func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream
- func ParseFIX(message string) (fixMap map[string]any, err error)
- func ReplaceNonPrintable(val string) string
- func Row(vals ...any) []any
- func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)
- func StrIntToBinary(num string, order string, length int, signed bool) string
- func StringToDecimalByteArray(s string, numSca *big.Rat, pType parquet.Type, length int) (decBytes []byte)
- func Unzip(src string, dest string) (nodes []map[string]any, err error)
- type Avro
- type Batch
- func (b *Batch) AddTransform(transf func(row []any) []any)
- func (b *Batch) Close()
- func (b *Batch) ColumnsChanged() bool
- func (b *Batch) Ds() *Datastream
- func (b *Batch) ID() string
- func (b *Batch) IsFirst() bool
- func (b *Batch) Push(row []any)
- func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)
- type BatchReader
- type CSV
- func (c *CSV) InferSchema() error
- func (c *CSV) NewReader() (*io.PipeReader, error)
- func (c *CSV) Read() (data Dataset, err error)
- func (c *CSV) ReadStream() (ds *Datastream, err error)
- func (c *CSV) ReadStreamContext(ctx context.Context) (ds *Datastream, err error)
- func (c *CSV) Sample(n int) (Dataset, error)
- func (c *CSV) SetFields(fields []string)
- func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
- type Column
- func (col *Column) GoType() reflect.Type
- func (col *Column) HasNulls() bool
- func (col *Column) HasNullsPlus1() bool
- func (col *Column) IsBinary() bool
- func (col *Column) IsBool() bool
- func (col *Column) IsDate() bool
- func (col *Column) IsDatetime() bool
- func (col *Column) IsDecimal() bool
- func (col *Column) IsFloat() bool
- func (col *Column) IsInteger() bool
- func (col *Column) IsKeyType(keyType KeyType) bool
- func (col *Column) IsNumber() bool
- func (col *Column) IsString() bool
- func (col *Column) IsUnique() bool
- func (col *Column) Key() string
- func (col *Column) SetLengthPrecisionScale()
- func (col *Column) SetMetadata(key string, value string)
- type ColumnStats
- type ColumnType
- func (ct ColumnType) IsBinary() bool
- func (ct ColumnType) IsBool() bool
- func (ct ColumnType) IsDate() bool
- func (ct ColumnType) IsDatetime() bool
- func (ct ColumnType) IsDecimal() bool
- func (ct ColumnType) IsFloat() bool
- func (ct ColumnType) IsInteger() bool
- func (ct ColumnType) IsJSON() bool
- func (ct ColumnType) IsNumber() bool
- func (ct ColumnType) IsString() bool
- func (ct ColumnType) IsValid() bool
- type Columns
- func (cols Columns) Clone() (newCols Columns)
- func (cols Columns) Coerce(castCols Columns, hasHeader bool) (newCols Columns)
- func (cols Columns) Data(includeParent bool) (fields []string, rows [][]any)
- func (cols Columns) Dataset() Dataset
- func (cols Columns) DbTypes(args ...bool) []string
- func (cols Columns) FieldMap(toLower bool) map[string]int
- func (cols Columns) GetColumn(name string) Column
- func (cols Columns) GetKeys(keyType KeyType) Columns
- func (cols Columns) GetMissing(newCols ...Column) (missing Columns)
- func (cols Columns) IsDifferent(newCols Columns) bool
- func (cols Columns) IsDummy() bool
- func (cols Columns) IsSimilarTo(otherCols Columns) bool
- func (cols Columns) JSON(includeParent bool) (output string)
- func (cols Columns) Keys() []string
- func (cols Columns) MakeRec(row []any) map[string]any
- func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)
- func (cols Columns) Merge(newCols Columns, overwrite bool) (col2 Columns, added schemaChg, changed []schemaChg)
- func (cols Columns) Names(args ...bool) []string
- func (cols Columns) PrettyTable(includeParent bool) (output string)
- func (cols Columns) SetKeys(keyType KeyType, colNames ...string) (err error)
- func (cols Columns) Sourced() (sourced bool)
- func (cols Columns) Types(args ...bool) []string
- func (cols Columns) WithoutMeta() (newCols Columns)
- type Compressor
- type CompressorType
- type Dataflow
- func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
- func (df *Dataflow) AddEgressBytes(bytes uint64)
- func (df *Dataflow) Bytes() (inBytes, outBytes uint64)
- func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
- func (df *Dataflow) CleanUp()
- func (df *Dataflow) Close()
- func (df *Dataflow) CloseCurrentBatches()
- func (df *Dataflow) Collect() (data Dataset, err error)
- func (df *Dataflow) Count() (cnt uint64)
- func (df *Dataflow) Defer(f func())
- func (df *Dataflow) DsTotalBytes() (bytes uint64)
- func (df *Dataflow) Err() (err error)
- func (df *Dataflow) IsClosed() bool
- func (df *Dataflow) IsEmpty() bool
- func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
- func (df *Dataflow) MergeColumns(columns []Column, inferred bool) (processOk bool)
- func (df *Dataflow) Pause(exceptDs ...string) bool
- func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
- func (df *Dataflow) ResetConfig()
- func (df *Dataflow) SetConfig(cfg *StreamConfig)
- func (df *Dataflow) SetEmpty()
- func (df *Dataflow) SetReady()
- func (df *Dataflow) Size() int
- func (df *Dataflow) SyncColumns()
- func (df *Dataflow) SyncStats()
- func (df *Dataflow) Unpause(exceptDs ...string)
- func (df *Dataflow) WaitClosed()
- func (df *Dataflow) WaitReady() error
- type Dataset
- func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (data *Dataset) Append(row ...[]any)
- func (data *Dataset) ColValues(col int) []interface{}
- func (data *Dataset) ColValuesStr(col int) []string
- func (data *Dataset) FirstRow() []interface{}
- func (data *Dataset) FirstVal() interface{}
- func (data *Dataset) GetFields(lower ...bool) []string
- func (data *Dataset) InferColumnTypes()
- func (data *Dataset) Pick(colNames ...string) (nData Dataset)
- func (data *Dataset) PrettyTable(fields ...string) (output string)
- func (data *Dataset) Print(limit int)
- func (data *Dataset) Records(lower ...bool) []map[string]interface{}
- func (data *Dataset) RecordsCasted(lower ...bool) []map[string]interface{}
- func (data *Dataset) RecordsString(lower ...bool) []map[string]interface{}
- func (data *Dataset) SetFields(fields []string)
- func (data *Dataset) Sort(args ...any)
- func (data *Dataset) Stream(Props ...map[string]string) *Datastream
- func (data *Dataset) ToJSONMap() map[string]interface{}
- func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)
- type Datastream
- func MergeDataflow(df *Dataflow) (dsN *Datastream)
- func NewDatastream(columns Columns) (ds *Datastream)
- func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
- func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
- func ReadCsvStream(path string) (ds *Datastream, err error)
- func (ds *Datastream) AddBytes(b int64)
- func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (ds *Datastream) CastRowToString(row []any) []string
- func (ds *Datastream) CastRowToStringSafe(row []any) []string
- func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
- func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
- func (ds *Datastream) Close()
- func (ds *Datastream) Collect(limit int) (Dataset, error)
- func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (err error)
- func (ds *Datastream) ConsumeExcelReader(reader io.Reader, props map[string]string) (err error)
- func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[string]string) (err error)
- func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXML bool) (err error)
- func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error)
- func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
- func (ds *Datastream) Defer(f func())
- func (ds *Datastream) Df() *Dataflow
- func (ds *Datastream) Err() (err error)
- func (ds *Datastream) GetConfig() (configMap map[string]string)
- func (ds *Datastream) GetFields(args ...bool) []string
- func (ds *Datastream) IsClosed() bool
- func (ds *Datastream) LatestBatch() *Batch
- func (ds *Datastream) Limited(limit ...int) bool
- func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
- func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
- func (ds *Datastream) NewBatch(columns Columns) *Batch
- func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader
- func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)
- func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)
- func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader
- func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)
- func (ds *Datastream) NewExcelReaderChnl(rowLimit int, bytesLimit int64, sheetName string) (readerChn chan *BatchReader)
- func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
- func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewParquetArrowReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)
- func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)
- func (ds *Datastream) Pause()
- func (ds *Datastream) Push(row []any)
- func (ds *Datastream) Records() <-chan map[string]any
- func (ds *Datastream) Rows() chan []any
- func (ds *Datastream) SetConfig(configMap map[string]string)
- func (ds *Datastream) SetEmpty()
- func (ds *Datastream) SetFields(fields []string)
- func (ds *Datastream) SetFileURI()
- func (ds *Datastream) SetIterator(it *Iterator)
- func (ds *Datastream) SetMetadata(jsonStr string)
- func (ds *Datastream) SetReady()
- func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
- func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
- func (ds *Datastream) Start() (err error)
- func (ds *Datastream) TryPause() bool
- func (ds *Datastream) Unpause()
- func (ds *Datastream) WaitClosed()
- func (ds *Datastream) WaitReady() error
- type Excel
- func (xls *Excel) GetDataset(sheet string) (data Dataset)
- func (xls *Excel) GetDatasetFromRange(sheet, cellRange string) (data Dataset, err error)
- func (xls *Excel) RefreshSheets() (err error)
- func (xls *Excel) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
- func (xls *Excel) WriteToFile(path string) (err error)
- func (xls *Excel) WriteToWriter(w io.Writer) (err error)
- type GoogleSheet
- func (ggs *GoogleSheet) DeleteSheet(shtName string) (err error)
- func (ggs *GoogleSheet) GetDataset(shtName string) (data Dataset, err error)
- func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRange string) (data Dataset, err error)
- func (ggs *GoogleSheet) RefreshSheets() (err error)
- func (ggs *GoogleSheet) URL() string
- func (ggs *GoogleSheet) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
- type GzipCompressor
- type Iterator
- type KeyType
- type KeyValue
- type Metadata
- type NoneCompressor
- type Parquet
- type ParquetArrowDumper
- type ParquetArrowReader
- type ParquetArrowWriter
- type ParquetWriter
- type ReaderReady
- type RecNode
- func (rn *RecNode) Compression() compress.Codec
- func (rn *RecNode) Encoding() encoding.Encoding
- func (rn *RecNode) Fields() []parquet.Field
- func (rn *RecNode) GoType() reflect.Type
- func (rn *RecNode) ID() int
- func (rn *RecNode) Leaf() bool
- func (rn *RecNode) Optional() bool
- func (rn *RecNode) Repeated() bool
- func (rn *RecNode) Required() bool
- func (rn *RecNode) String() string
- func (rn *RecNode) Type() parquet.Type
- type Record
- type SAS
- type SSHClient
- func (s *SSHClient) Close()
- func (s *SSHClient) Connect() (err error)
- func (s *SSHClient) GetOutput() (stdout string, stderr string)
- func (s *SSHClient) OpenPortForward() (localPort int, err error)
- func (s *SSHClient) RunAsProcess() (localPort int, err error)
- func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)
- type Shaper
- type SnappyCompressor
- type StreamConfig
- type StreamProcessor
- func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}
- func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToStringSafe(i int, val interface{}, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}
- func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
- func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}
- func (sp *StreamProcessor) EncodingTransform(t Transform, val string) (newVal string, err error)
- func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)
- func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
- func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
- func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
- func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
- func (sp *StreamProcessor) ResetConfig()
- func (sp *StreamProcessor) SetConfig(configMap map[string]string)
- type Transform
- type TransformFunc
- type Transformers
- type ZStandardCompressor
Constants ¶
This section is empty.
Variables ¶
var ( // RemoveTrailingDecZeros removes the trailing zeros in CastToString RemoveTrailingDecZeros = false SampleSize = 900 )
var KeyTypes = []KeyType{AggregateKey, ClusterKey, DuplicateKey, HashKey, PartitionKey, PrimaryKey, SortKey, UniqueKey, UpdateKey}
var Transforms = map[Transform]TransformFunc{}
Functions ¶
func AutoDecompress ¶
AutoDecompress auto detects compression to decompress. Otherwise return same reader
func CleanHeaderRow ¶
CleanHeaderRow cleans the header row from incompatible characters
func CompareColumns ¶
CompareColumns compared two columns to see if there are similar
func CreateDummyFields ¶
CreateDummyFields creates dummy columns for csvs with no header row
func DecimalByteArrayToString ¶ added in v1.1.6
DecimalByteArrayToString converts bytes to decimal string from https://github.com/xitongsys/parquet-go/blob/8ca067b2bd324788a77bf61d4e1ef9a5f8b4b1d2/types/converter.go#L131
func GetISO8601DateMap ¶
GetISO8601DateMap return a map of date parts for string formatting
func Iso8601ToGoLayout ¶
https://www.w3.org/QA/Tips/iso-date https://www.w3.org/TR/NOTE-datetime https://www.iso.org/iso-8601-date-and-time-format.html
func MakeDecNumScale ¶ added in v1.1.6
func MakeRowsChan ¶
func MakeRowsChan() chan []any
MakeRowsChan returns a buffered channel with default size
func NewJSONStream ¶
func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream
func ReplaceNonPrintable ¶ added in v1.1.13
https://stackoverflow.com/a/46637343/2295355 https://web.itu.edu.tr/sgunduz/courses/mikroisl/ascii.html
func ScanCarrRet ¶
ScanCarrRet removes the \r runes that are without \n rightafter
func StrIntToBinary ¶ added in v1.1.6
order=LittleEndian or BigEndian; length is byte num
func StringToDecimalByteArray ¶ added in v1.1.6
func StringToDecimalByteArray(s string, numSca *big.Rat, pType parquet.Type, length int) (decBytes []byte)
StringToDecimalByteArray converts a string decimal to bytes improvised from https://github.com/xitongsys/parquet-go/blob/8ca067b2bd324788a77bf61d4e1ef9a5f8b4b1d2/types/types.go#L81 This function is costly, and slows write dramatically. TODO: Find ways to optimize, if possible
Types ¶
type Avro ¶
type Avro struct { Path string Reader *goavro.OCFReader Data *Dataset // contains filtered or unexported fields }
Avro is a avro` object
func NewAvroStream ¶
func NewAvroStream(reader io.ReadSeeker, columns Columns) (a *Avro, err error)
type Batch ¶
type Batch struct { Columns Columns Rows chan []any Previous *Batch Count int64 Limit int64 // contains filtered or unexported fields }
func (*Batch) AddTransform ¶
func (*Batch) ColumnsChanged ¶
func (*Batch) Ds ¶
func (b *Batch) Ds() *Datastream
type BatchReader ¶
type CSV ¶
type CSV struct { Path string NoHeader bool Delimiter rune Escape string FieldsPerRecord int Columns []Column File *os.File Data Dataset Reader io.Reader Config map[string]string NoDebug bool // contains filtered or unexported fields }
CSV is a csv object
func (*CSV) NewReader ¶
func (c *CSV) NewReader() (*io.PipeReader, error)
NewReader creates a Reader
func (*CSV) ReadStream ¶
func (c *CSV) ReadStream() (ds *Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) ReadStreamContext ¶ added in v1.1.15
func (c *CSV) ReadStreamContext(ctx context.Context) (ds *Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) WriteStream ¶
func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
WriteStream to CSV file
type Column ¶
type Column struct { Position int `json:"position"` Name string `json:"name"` Type ColumnType `json:"type"` DbType string `json:"db_type,omitempty"` DbPrecision int `json:"db_precision,omitempty"` DbScale int `json:"db_scale,omitempty"` Sourced bool `json:"-"` // whether col was sourced/inferred from a typed source Stats ColumnStats `json:"stats,omitempty"` Table string `json:"table,omitempty"` Schema string `json:"schema,omitempty"` Database string `json:"database,omitempty"` Description string `json:"description,omitempty"` FileURI string `json:"file_uri,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` // contains filtered or unexported fields }
Column represents a schemata column
func InferFromStats ¶
InferFromStats using the stats to infer data types
func (*Column) HasNullsPlus1 ¶ added in v1.1.7
HasNullsPlus1 denotes when a column is all nulls plus 1 non-null
func (*Column) IsDatetime ¶
IsDatetime returns whether the column is a datetime object
func (*Column) SetLengthPrecisionScale ¶
func (col *Column) SetLengthPrecisionScale()
SetLengthPrecisionScale parse length, precision, scale
func (*Column) SetMetadata ¶
type ColumnStats ¶
type ColumnStats struct { MinLen int `json:"min_len,omitempty"` MaxLen int `json:"max_len,omitempty"` MaxDecLen int `json:"max_dec_len,omitempty"` Min int64 `json:"min"` Max int64 `json:"max"` NullCnt int64 `json:"null_cnt"` IntCnt int64 `json:"int_cnt,omitempty"` DecCnt int64 `json:"dec_cnt,omitempty"` BoolCnt int64 `json:"bool_cnt,omitempty"` JsonCnt int64 `json:"json_cnt,omitempty"` StringCnt int64 `json:"string_cnt,omitempty"` DateCnt int64 `json:"date_cnt,omitempty"` DateTimeCnt int64 `json:"datetime_cnt,omitempty"` TotalCnt int64 `json:"total_cnt"` UniqCnt int64 `json:"uniq_cnt"` Checksum uint64 `json:"checksum"` }
ColumnStats holds statistics for a column
func (*ColumnStats) DistinctPercent ¶
func (cs *ColumnStats) DistinctPercent() float64
func (*ColumnStats) DuplicateCount ¶
func (cs *ColumnStats) DuplicateCount() int64
func (*ColumnStats) DuplicatePercent ¶
func (cs *ColumnStats) DuplicatePercent() float64
type ColumnType ¶
type ColumnType string
const ( BigIntType ColumnType = "bigint" BinaryType ColumnType = "binary" BoolType ColumnType = "bool" DateType ColumnType = "date" DatetimeType ColumnType = "datetime" DecimalType ColumnType = "decimal" IntegerType ColumnType = "integer" JsonType ColumnType = "json" SmallIntType ColumnType = "smallint" StringType ColumnType = "string" TextType ColumnType = "text" TimestampType ColumnType = "timestamp" TimestampzType ColumnType = "timestampz" FloatType ColumnType = "float" TimeType ColumnType = "time" TimezType ColumnType = "timez" )
func (ColumnType) IsBinary ¶ added in v1.2.3
func (ct ColumnType) IsBinary() bool
IsBinary returns whether the column is a binary
func (ColumnType) IsBool ¶
func (ct ColumnType) IsBool() bool
IsBool returns whether the column is a boolean
func (ColumnType) IsDate ¶ added in v1.1.8
func (ct ColumnType) IsDate() bool
IsDatetime returns whether the column is a datetime object
func (ColumnType) IsDatetime ¶
func (ct ColumnType) IsDatetime() bool
IsDatetime returns whether the column is a datetime object
func (ColumnType) IsDecimal ¶
func (ct ColumnType) IsDecimal() bool
IsDecimal returns whether the column is a decimal
func (ColumnType) IsFloat ¶ added in v1.1.14
func (ct ColumnType) IsFloat() bool
IsFloat returns whether the column is a float
func (ColumnType) IsInteger ¶
func (ct ColumnType) IsInteger() bool
IsInteger returns whether the column is an integer
func (ColumnType) IsJSON ¶
func (ct ColumnType) IsJSON() bool
IsJSON returns whether the column is a json
func (ColumnType) IsNumber ¶
func (ct ColumnType) IsNumber() bool
IsNumber returns whether the column is a decimal or an integer
func (ColumnType) IsString ¶
func (ct ColumnType) IsString() bool
IsString returns whether the column is a string
func (ColumnType) IsValid ¶
func (ct ColumnType) IsValid() bool
IsValid returns whether the column has a valid type
type Columns ¶
type Columns []Column
Columns represent many columns
func NewColumns ¶
NewColumnsFromFields creates Columns from fields
func NewColumnsFromFields ¶
NewColumnsFromFields creates Columns from fields
func (Columns) DbTypes ¶
DbTypes return the column names/db types args -> (lower bool, cleanUp bool)
func (Columns) FieldMap ¶
FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased
func (Columns) GetMissing ¶ added in v1.1.8
GetMissing returns the missing columns from newCols
func (Columns) IsDifferent ¶
func (Columns) IsSimilarTo ¶
IsSimilarTo returns true if has same number of columns and contains the same columns, but may be in different order
func (Columns) MakeShaper ¶
func (Columns) PrettyTable ¶ added in v1.1.8
PrettyTable returns a text pretty table
func (Columns) WithoutMeta ¶ added in v1.2.2
WithoutMeta returns the columns with metadata columns
type Compressor ¶
type Compressor interface { Self() Compressor Compress(io.Reader) io.Reader Decompress(io.Reader) (io.Reader, error) Suffix() string }
Compressor implements differnt kind of compression
func NewCompressor ¶
func NewCompressor(cpType CompressorType) Compressor
type CompressorType ¶
type CompressorType string
CompressorType is an int type for enum for the Compressor Type
const ( // AutoCompressorType is for auto compression AutoCompressorType CompressorType = "AUTO" // NoneCompressorType is for no compression NoneCompressorType CompressorType = "NONE" // ZipCompressorType is for Zip compression ZipCompressorType CompressorType = "ZIP" // GzipCompressorType is for Gzip compression GzipCompressorType CompressorType = "GZIP" // SnappyCompressorType is for Snappy compression SnappyCompressorType CompressorType = "SNAPPY" // ZStandardCompressorType is for ZStandard ZStandardCompressorType CompressorType = "ZSTD" )
func CompressorTypePtr ¶
func CompressorTypePtr(v CompressorType) *CompressorType
CompressorTypePtr returns a pointer to the CompressorType value passed in.
type Dataflow ¶
type Dataflow struct { Columns Columns Buffer [][]interface{} StreamCh chan *Datastream Streams []*Datastream Context *g.Context Limit uint64 EgressBytes uint64 Ready bool Inferred bool FsURL string OnColumnChanged func(col Column) error OnColumnAdded func(col Column) error StreamMap map[string]*Datastream SchemaVersion int // for column type version // contains filtered or unexported fields }
Dataflow is a collection of concurrent Datastreams
func MakeDataFlow ¶
func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)
MakeDataFlow create a dataflow from datastreams
func NewDataflowContext ¶ added in v1.1.15
func (*Dataflow) AddColumns ¶
func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
SetColumns sets the columns
func (*Dataflow) AddEgressBytes ¶ added in v1.2.2
AddEgressBytes add egress bytes
func (*Dataflow) ChangeColumn ¶
func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
SetColumns sets the columns
func (*Dataflow) CloseCurrentBatches ¶
func (df *Dataflow) CloseCurrentBatches()
func (*Dataflow) Defer ¶
func (df *Dataflow) Defer(f func())
Defer runs a given function as close of Dataflow
func (*Dataflow) DsTotalBytes ¶
func (*Dataflow) MakeStreamCh ¶
func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
MakeStreamCh determines whether to merge all the streams into one or keep them separate. If data is small per stream, it's best to merge For example, Bigquery has limits on number of operations can be called within a time limit
func (*Dataflow) MergeColumns ¶ added in v1.1.15
SetColumns sets the columns
func (*Dataflow) PushStreamChan ¶
func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
func (*Dataflow) ResetConfig ¶ added in v1.1.15
func (df *Dataflow) ResetConfig()
ResetConfig resets the Sp config, so that, for example, delimiter settings are not carried through.
func (*Dataflow) SetConfig ¶ added in v1.1.15
func (df *Dataflow) SetConfig(cfg *StreamConfig)
SetConfig set the Sp config
func (*Dataflow) SetEmpty ¶
func (df *Dataflow) SetEmpty()
SetEmpty sets all underlying datastreams empty
func (*Dataflow) SyncColumns ¶
func (df *Dataflow) SyncColumns()
SyncColumns a workaround to synch the ds.Columns to the df.Columns
func (*Dataflow) SyncStats ¶
func (df *Dataflow) SyncStats()
SyncStats sync stream processor stats aggregated to the df.Columns
func (*Dataflow) WaitClosed ¶
func (df *Dataflow) WaitClosed()
WaitClosed waits until dataflow is closed hack to make sure all streams are pushed
type Dataset ¶
type Dataset struct { Result *sqlx.Rows Columns Columns Rows [][]interface{} SQL string Duration float64 Sp *StreamProcessor Inferred bool SafeInference bool NoDebug bool }
Dataset is a query returned dataset
func NewDatasetFromMap ¶
NewDatasetFromMap return a new dataset
func NewExcelDataset ¶ added in v1.2.2
func (*Dataset) AddColumns ¶
SetColumns sets the columns
func (*Dataset) ColValuesStr ¶
ColValuesStr returns the values of a one column as array or string
func (*Dataset) FirstRow ¶
func (data *Dataset) FirstRow() []interface{}
FirstRow returns the first row
func (*Dataset) FirstVal ¶
func (data *Dataset) FirstVal() interface{}
FirstVal returns the first value from the first row
func (*Dataset) InferColumnTypes ¶
func (data *Dataset) InferColumnTypes()
InferColumnTypes determines the columns types
func (*Dataset) PrettyTable ¶ added in v1.1.8
func (*Dataset) RecordsCasted ¶
RecordsCasted return rows of maps or casted values
func (*Dataset) RecordsString ¶
RecordsString return rows of maps or string values
func (*Dataset) Sort ¶
Sort sorts by cols example: `data.Sort(0, 2, 3, false)` will sort col0, col2, col3 descending example: `data.Sort(0, 2, true)` will sort col0, col2 ascending
func (*Dataset) Stream ¶
func (data *Dataset) Stream(Props ...map[string]string) *Datastream
Stream returns a datastream of the dataset
type Datastream ¶
type Datastream struct { Columns Columns Buffer [][]any BatchChan chan *Batch Batches []*Batch CurrentBatch *Batch Count uint64 Context *g.Context Ready bool Bytes atomic.Uint64 Sp *StreamProcessor SafeInference bool NoDebug bool Inferred bool ID string Metadata Metadata // map of column name to metadata type // contains filtered or unexported fields }
Datastream is a stream of rows
func MergeDataflow ¶
func MergeDataflow(df *Dataflow) (dsN *Datastream)
MergeDataflow merges the dataflow streams into one
func NewDatastream ¶
func NewDatastream(columns Columns) (ds *Datastream)
NewDatastream return a new datastream
func NewDatastreamContext ¶
func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
NewDatastreamContext return a new datastream
func NewDatastreamIt ¶
func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
NewDatastreamIt with it
func ReadCsvStream ¶
func ReadCsvStream(path string) (ds *Datastream, err error)
ReadCsvStream reads CSV and returns datasream
func (*Datastream) AddBytes ¶
func (ds *Datastream) AddBytes(b int64)
AddBytes add bytes as processed
func (*Datastream) AddColumns ¶
func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
SetColumns sets the columns
func (*Datastream) CastRowToString ¶
func (ds *Datastream) CastRowToString(row []any) []string
CastRowToString returns the row as string casted
func (*Datastream) CastRowToStringSafe ¶ added in v1.2.6
func (ds *Datastream) CastRowToStringSafe(row []any) []string
CastRowToStringSafe returns the row as string casted (safer)
func (*Datastream) ChangeColumn ¶
func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
ChangeColumn applies a column type change
func (*Datastream) Chunk ¶
func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
Chunk splits the datastream into chunk datastreams (in sequence)
func (*Datastream) Collect ¶
func (ds *Datastream) Collect(limit int) (Dataset, error)
Collect reads a stream and return a dataset limit of 0 is unlimited
func (*Datastream) ConsumeAvroReader ¶
func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)
ConsumeAvroReader uses the provided reader to stream rows
func (*Datastream) ConsumeAvroReaderSeeker ¶
func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)
ConsumeAvroReaderSeeker uses the provided reader to stream rows
func (*Datastream) ConsumeCsvReader ¶
func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
ConsumeCsvReader uses the provided reader to stream rows
func (*Datastream) ConsumeCsvReaderChl ¶ added in v1.2.4
func (ds *Datastream) ConsumeCsvReaderChl(readerChn chan *ReaderReady) (err error)
ConsumeCsvReaderChl reads a channel of readers. Should be safe to use with header top row
func (*Datastream) ConsumeExcelReader ¶ added in v1.2.2
ConsumeSASReader uses the provided reader to stream rows
func (*Datastream) ConsumeExcelReaderSeeker ¶ added in v1.2.2
func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[string]string) (err error)
ConsumeSASReaderSeeker uses the provided reader to stream rows
func (*Datastream) ConsumeJsonReader ¶
func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream
func (*Datastream) ConsumeJsonReaderChl ¶ added in v1.2.6
func (ds *Datastream) ConsumeJsonReaderChl(readerChn chan *ReaderReady, isXML bool) (err error)
func (*Datastream) ConsumeParquetReader ¶
func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeParquetReaderSeeker ¶
func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeSASReader ¶
func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)
ConsumeSASReader uses the provided reader to stream rows
func (*Datastream) ConsumeSASReaderSeeker ¶
func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)
ConsumeSASReaderSeeker uses the provided reader to stream rows
func (*Datastream) ConsumeXmlReader ¶
func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream
func (*Datastream) Defer ¶
func (ds *Datastream) Defer(f func())
Defer runs a given function as close of Datastream
func (*Datastream) Df ¶
func (ds *Datastream) Df() *Dataflow
func (*Datastream) GetConfig ¶
func (ds *Datastream) GetConfig() (configMap map[string]string)
GetConfig get config
func (*Datastream) GetFields ¶
func (ds *Datastream) GetFields(args ...bool) []string
GetFields return the fields of the Data
func (*Datastream) IsClosed ¶
func (ds *Datastream) IsClosed() bool
IsClosed is true is ds is closed
func (*Datastream) LatestBatch ¶
func (ds *Datastream) LatestBatch() *Batch
func (*Datastream) Limited ¶ added in v1.2.4
func (ds *Datastream) Limited(limit ...int) bool
func (*Datastream) Map ¶
func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
Map applies the provided function to every row and returns the result
func (*Datastream) MapParallel ¶
func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.
func (*Datastream) NewBatch ¶
func (ds *Datastream) NewBatch(columns Columns) *Batch
NewBatch create new batch with fixed columns should be used each time column type changes, or columns are added
func (*Datastream) NewCsvBufferReader ¶
func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader
NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvBufferReaderChnl ¶
func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)
NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory
func (*Datastream) NewCsvBytesChnl ¶
func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)
NewCsvBytesChnl returns a channel yield chunk of bytes of csv
func (*Datastream) NewCsvReader ¶
func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader
NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvReaderChnl ¶
func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)
NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewExcelReaderChnl ¶ added in v1.2.2
func (ds *Datastream) NewExcelReaderChnl(rowLimit int, bytesLimit int64, sheetName string) (readerChn chan *BatchReader)
func (*Datastream) NewIterator ¶
func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
func (*Datastream) NewJsonLinesReaderChnl ¶
func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewJsonReaderChnl ¶
func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
func (*Datastream) NewParquetArrowReaderChnl ¶ added in v1.1.7
func (ds *Datastream) NewParquetArrowReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)
NewParquetArrowReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes WARN: Not using this one since it doesn't write Decimals properly.
func (*Datastream) NewParquetReaderChnl ¶
func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)
NewParquetReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) Pause ¶
func (ds *Datastream) Pause()
func (*Datastream) Records ¶
func (ds *Datastream) Records() <-chan map[string]any
Records return rows of maps
func (*Datastream) Rows ¶
func (ds *Datastream) Rows() chan []any
func (*Datastream) SetConfig ¶
func (ds *Datastream) SetConfig(configMap map[string]string)
SetConfig sets the ds.config values
func (*Datastream) SetEmpty ¶
func (ds *Datastream) SetEmpty()
SetEmpty sets the ds.Rows channel as empty
func (*Datastream) SetFields ¶
func (ds *Datastream) SetFields(fields []string)
SetFields sets the fields/columns of the Datastream
func (*Datastream) SetFileURI ¶ added in v1.1.15
func (ds *Datastream) SetFileURI()
SetFileURI sets the FileURI of the columns of the Datastream
func (*Datastream) SetIterator ¶ added in v1.1.14
func (ds *Datastream) SetIterator(it *Iterator)
func (*Datastream) SetMetadata ¶
func (ds *Datastream) SetMetadata(jsonStr string)
func (*Datastream) Shape ¶
func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows
func (*Datastream) Split ¶
func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
Split splits the datastream into parallel datastreams
func (*Datastream) Start ¶
func (ds *Datastream) Start() (err error)
Start generates the stream Should cycle the Iter Func until done
func (*Datastream) TryPause ¶
func (ds *Datastream) TryPause() bool
func (*Datastream) WaitClosed ¶
func (ds *Datastream) WaitClosed()
WaitClosed waits until dataflow is closed hack to make sure all streams are pushed
func (*Datastream) WaitReady ¶
func (ds *Datastream) WaitReady() error
WaitReady waits until datastream is ready
type Excel ¶ added in v1.2.2
type Excel struct { File *excelize.File Sheets []string Path string // contains filtered or unexported fields }
Excel represent an Excel object pointing to its file
func NewExcelFromFile ¶ added in v1.2.2
NewExcelFromFile return a new Excel instance from a local file
func NewExcelFromReader ¶ added in v1.2.2
NewExcelFromReader return a new Excel instance from a reader
func (*Excel) GetDataset ¶ added in v1.2.2
GetDataset returns a dataset of the provided sheet
func (*Excel) GetDatasetFromRange ¶ added in v1.2.2
GetDatasetFromRange returns a dataset of the provided sheet / range cellRange example: `$AH$13:$AI$20` or `AH13:AI20` or `A:E`
func (*Excel) RefreshSheets ¶ added in v1.2.2
RefreshSheets refresh sheet index data
func (*Excel) WriteSheet ¶ added in v1.2.2
func (xls *Excel) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
WriteSheet write a datastream into a sheet mode can be: `new`, `append` or `overwrite`. Default is `new`
func (*Excel) WriteToFile ¶ added in v1.2.2
WriteToFile write to a file
type GoogleSheet ¶ added in v1.2.2
type GoogleSheet struct { Sheets []string SpreadsheetID string // contains filtered or unexported fields }
GoogleSheet represent a Google Sheet object
func NewGoogleSheet ¶ added in v1.2.2
func NewGoogleSheet(props ...string) (ggs *GoogleSheet, err error)
NewGoogleSheet is a blank spreadsheet title is the new spreadsheet title
func NewGoogleSheetFromURL ¶ added in v1.2.2
func NewGoogleSheetFromURL(urlStr string, props ...string) (ggs *GoogleSheet, err error)
NewGoogleSheetFromURL return a new GoogleSheet instance from a provided url
func (*GoogleSheet) DeleteSheet ¶ added in v1.2.2
func (ggs *GoogleSheet) DeleteSheet(shtName string) (err error)
func (*GoogleSheet) GetDataset ¶ added in v1.2.2
func (ggs *GoogleSheet) GetDataset(shtName string) (data Dataset, err error)
GetDataset returns a dataset of the sheet
func (*GoogleSheet) GetDatasetFromRange ¶ added in v1.2.2
func (ggs *GoogleSheet) GetDatasetFromRange(shtName, cellRange string) (data Dataset, err error)
GetDatasetFromRange returns a dataset from the specified range
func (*GoogleSheet) RefreshSheets ¶ added in v1.2.2
func (ggs *GoogleSheet) RefreshSheets() (err error)
RefreshSheets refreshes sheets data
func (*GoogleSheet) URL ¶ added in v1.2.2
func (ggs *GoogleSheet) URL() string
func (*GoogleSheet) WriteSheet ¶ added in v1.2.2
func (ggs *GoogleSheet) WriteSheet(shtName string, ds *Datastream, mode string) (err error)
WriteSheet write a datastream into a sheet mode can be: `new`, `append` or `overwrite`. Default is `new`
type GzipCompressor ¶
type GzipCompressor struct { Compressor // contains filtered or unexported fields }
func (*GzipCompressor) Compress ¶
func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*GzipCompressor) Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*GzipCompressor) Suffix ¶
func (cp *GzipCompressor) Suffix() string
type Iterator ¶
type Iterator struct { Row []any Reprocess chan []any IsCasted bool RowIsCasted bool Counter uint64 StreamRowNum uint64 Context *g.Context Closed bool // contains filtered or unexported fields }
Iterator is the row provider for a datastream
func (*Iterator) Ds ¶
func (it *Iterator) Ds() *Datastream
type KeyType ¶
type KeyType string
const ( AggregateKey KeyType = "aggregate" ClusterKey KeyType = "cluster" DistributionKey KeyType = "distribution" DuplicateKey KeyType = "duplicate" HashKey KeyType = "hash" IndexKey KeyType = "index" PartitionKey KeyType = "partition" PrimaryKey KeyType = "primary" SortKey KeyType = "sort" UniqueKey KeyType = "unique" UpdateKey KeyType = "update" )
type Metadata ¶
type NoneCompressor ¶
type NoneCompressor struct { Compressor // contains filtered or unexported fields }
func (*NoneCompressor) Decompress ¶
func (*NoneCompressor) Suffix ¶
func (cp *NoneCompressor) Suffix() string
type Parquet ¶
type Parquet struct { Path string Reader *parquet.Reader Data *Dataset // contains filtered or unexported fields }
Parquet is a parquet object
func NewParquetReader ¶ added in v1.1.7
type ParquetArrowDumper ¶ added in v1.1.6
type ParquetArrowDumper struct {
// contains filtered or unexported fields
}
func NewParquetArrowDumper ¶ added in v1.1.6
func NewParquetArrowDumper(ccReader file.ColumnChunkReader) *ParquetArrowDumper
func (*ParquetArrowDumper) Next ¶ added in v1.1.6
func (pad *ParquetArrowDumper) Next() (interface{}, bool)
type ParquetArrowReader ¶ added in v1.1.6
type ParquetArrowReader struct { Path string Reader *file.Reader Data *Dataset Context *g.Context // contains filtered or unexported fields }
ParquetArrowReader is a parquet reader object
func NewParquetArrowReader ¶ added in v1.1.7
func NewParquetArrowReader(reader *os.File, selected []string) (p *ParquetArrowReader, err error)
func (*ParquetArrowReader) Columns ¶ added in v1.1.6
func (p *ParquetArrowReader) Columns() Columns
type ParquetArrowWriter ¶ added in v1.1.6
func NewParquetArrowWriter ¶ added in v1.1.6
func NewParquetArrowWriter(w io.Writer, columns Columns, codec compress.Compression) (p *ParquetArrowWriter, err error)
func (*ParquetArrowWriter) AppendNewRowGroup ¶ added in v1.1.6
func (p *ParquetArrowWriter) AppendNewRowGroup() (err error)
func (*ParquetArrowWriter) Close ¶ added in v1.1.6
func (p *ParquetArrowWriter) Close() (err error)
func (*ParquetArrowWriter) Columns ¶ added in v1.1.6
func (p *ParquetArrowWriter) Columns() Columns
func (*ParquetArrowWriter) WriteRow ¶ added in v1.1.6
func (p *ParquetArrowWriter) WriteRow(row []any) (err error)
type ParquetWriter ¶ added in v1.1.7
func NewParquetWriter ¶ added in v1.1.7
func (*ParquetWriter) Close ¶ added in v1.1.7
func (pw *ParquetWriter) Close() error
func (*ParquetWriter) WriteRow ¶ added in v1.1.7
func (pw *ParquetWriter) WriteRow(row []any) error
type ReaderReady ¶ added in v1.2.6
type RecNode ¶
type RecNode struct {
// contains filtered or unexported fields
}
func NewRecNode ¶
func (*RecNode) Compression ¶
type SAS ¶
type SAS struct { Path string Reader *datareader.SAS7BDAT Data *Dataset // contains filtered or unexported fields }
SAS is a sas7bdat object
func NewSASStream ¶
func NewSASStream(reader io.ReadSeeker, columns Columns) (s *SAS, err error)
type SSHClient ¶
type SSHClient struct { Host string Port int User string Password string TgtHost string TgtPort int PrivateKey string Passphrase string Err error // contains filtered or unexported fields }
SSHClient is a client to connect to a ssh server with the main goal of forwarding ports
func (*SSHClient) OpenPortForward ¶
OpenPortForward forwards the port as specified
func (*SSHClient) RunAsProcess ¶
RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key
type SnappyCompressor ¶
type SnappyCompressor struct { Compressor // contains filtered or unexported fields }
func (*SnappyCompressor) Compress ¶
func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*SnappyCompressor) Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*SnappyCompressor) Suffix ¶
func (cp *SnappyCompressor) Suffix() string
type StreamConfig ¶ added in v1.1.15
type StreamConfig struct { TrimSpace bool `json:"trim_space"` EmptyAsNull bool `json:"empty_as_null"` Header bool `json:"header"` Compression string `json:"compression"` // AUTO | ZIP | GZIP | SNAPPY | NONE NullIf string `json:"null_if"` NullAs string `json:"null_as"` DatetimeFormat string `json:"datetime_format"` SkipBlankLines bool `json:"skip_blank_lines"` Delimiter string `json:"delimiter"` Escape string `json:"escape"` FileMaxRows int64 `json:"file_max_rows"` MaxDecimals int `json:"max_decimals"` Flatten bool `json:"flatten"` FieldsPerRec int `json:"fields_per_rec"` Jmespath string `json:"jmespath"` BoolAsInt bool `json:"-"` Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is! Map map[string]string `json:"-"` // contains filtered or unexported fields }
func DefaultStreamConfig ¶ added in v1.1.15
func DefaultStreamConfig() *StreamConfig
type StreamProcessor ¶
type StreamProcessor struct { N uint64 Config *StreamConfig // contains filtered or unexported fields }
StreamProcessor processes rows and values
func NewStreamProcessor ¶
func NewStreamProcessor() *StreamProcessor
NewStreamProcessor returns a new StreamProcessor
func (*StreamProcessor) CastRow ¶
func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}
CastRow casts each value of a row slows down processing about 40%?
func (*StreamProcessor) CastToString ¶
func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string
CastToString to string. used for csv writing slows processing down 5% with upstream CastRow or 35% without upstream CastRow
func (*StreamProcessor) CastToStringSafe ¶ added in v1.2.6
func (sp *StreamProcessor) CastToStringSafe(i int, val interface{}, valType ...ColumnType) string
CastToStringSafe to string (safer)
func (*StreamProcessor) CastToTime ¶
func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
CastToTime converts interface to time
func (*StreamProcessor) CastType ¶
func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}
CastType casts the type of an interface CastType is used to cast the interface place holders?
func (*StreamProcessor) CastVal ¶
func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'
func (*StreamProcessor) CastValWithoutStats ¶
func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}
CastValWithoutStats casts the value without counting stats
func (*StreamProcessor) EncodingTransform ¶ added in v1.2.3
func (sp *StreamProcessor) EncodingTransform(t Transform, val string) (newVal string, err error)
func (*StreamProcessor) GetType ¶
func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)
GetType returns the type of an interface
func (*StreamProcessor) ParseString ¶
func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"
func (*StreamProcessor) ParseTime ¶
func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
ParseTime parses a date string and returns time.Time
func (*StreamProcessor) ParseVal ¶
func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
ParseVal parses the value into its appropriate type
func (*StreamProcessor) ProcessRow ¶
func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
ProcessRow processes a row
func (*StreamProcessor) ProcessVal ¶
func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
ProcessVal processes a value
func (*StreamProcessor) ResetConfig ¶ added in v1.1.15
func (sp *StreamProcessor) ResetConfig()
func (*StreamProcessor) SetConfig ¶
func (sp *StreamProcessor) SetConfig(configMap map[string]string)
SetConfig sets the data.Sp.config values
type Transform ¶ added in v1.2.2
type Transform string
const ( TransformDecodeLatin1 Transform = "decode_latin1" TransformDecodeLatin5 Transform = "decode_latin5" TransformDecodeLatin9 Transform = "decode_latin9" TransformDecodeUtf8 Transform = "decode_utf8" TransformDecodeUtf8Bom Transform = "decode_utf8_bom" TransformDecodeUtf16 Transform = "decode_utf16" TransformDecodeWindows1250 Transform = "decode_windows1250" TransformDecodeWindows1252 Transform = "decode_windows1252" TransformDuckdbListToText Transform = "duckdb_list_to_text" TransformEncodeLatin1 Transform = "encode_latin1" TransformEncodeLatin5 Transform = "encode_latin5" TransformEncodeLatin9 Transform = "encode_latin9" TransformEncodeUtf8 Transform = "encode_utf8" TransformEncodeUtf8Bom Transform = "encode_utf8_bom" TransformEncodeUtf16 Transform = "encode_utf16" TransformEncodeWindows1250 Transform = "encode_windows1250" TransformEncodeWindows1252 Transform = "encode_windows1252" TransformHashMd5 Transform = "hash_md5" TransformHashSha256 Transform = "hash_sha256" TransformHashSha512 Transform = "hash_sha512" TransformParseBit Transform = "parse_bit" TransformParseFix Transform = "parse_fix" TransformParseUuid Transform = "parse_uuid" TransformReplace0x00 Transform = "replace_0x00" TransformReplaceAccents Transform = "replace_accents" TransformReplaceNonPrintable Transform = "replace_non_printable" TransformTrimSpace Transform = "trim_space" )
type TransformFunc ¶
type TransformFunc func(*StreamProcessor, string) (string, error)
type Transformers ¶ added in v1.2.2
type Transformers struct { Accent transform.Transformer DecodeUTF8 transform.Transformer DecodeUTF8BOM transform.Transformer DecodeUTF16 transform.Transformer DecodeISO8859_1 transform.Transformer DecodeISO8859_5 transform.Transformer DecodeISO8859_15 transform.Transformer DecodeWindows1250 transform.Transformer DecodeWindows1252 transform.Transformer EncodeUTF8 transform.Transformer EncodeUTF8BOM transform.Transformer EncodeUTF16 transform.Transformer EncodeISO8859_1 transform.Transformer EncodeISO8859_5 transform.Transformer EncodeISO8859_15 transform.Transformer EncodeWindows1250 transform.Transformer EncodeWindows1252 transform.Transformer }
func NewTransformers ¶ added in v1.2.2
func NewTransformers() Transformers
type ZStandardCompressor ¶
type ZStandardCompressor struct { Compressor // contains filtered or unexported fields }
func (*ZStandardCompressor) Compress ¶
func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*ZStandardCompressor) Decompress ¶
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*ZStandardCompressor) Suffix ¶
func (cp *ZStandardCompressor) Suffix() string