Documentation ¶
Index ¶
- Variables
- func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)
- func CleanHeaderRow(header []string) []string
- func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)
- func CreateDummyFields(numCols int) (cols []string)
- func GetISO8601DateMap(t time.Time) map[string]interface{}
- func IsDummy(columns []Column) bool
- func Iso8601ToGoLayout(dateFormat string) (goDateFormat string)
- func MakeRowsChan() chan []any
- func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream
- func ParseUUID(sp *StreamProcessor, val string) (string, error)
- func ReplaceAccents(sp *StreamProcessor, val string) (string, error)
- func Row(vals ...any) []any
- func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)
- func Unzip(src string, dest string) ([]string, error)
- type Batch
- func (b *Batch) AddTransform(transf func(row []any) []any)
- func (b *Batch) Close()
- func (b *Batch) ColumnsChanged() bool
- func (b *Batch) Ds() *Datastream
- func (b *Batch) ID() string
- func (b *Batch) IsFirst() bool
- func (b *Batch) Push(row []any)
- func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)
- type BatchReader
- type CSV
- func (c *CSV) InferSchema() error
- func (c *CSV) NewReader() (*io.PipeReader, error)
- func (c *CSV) Read() (data Dataset, err error)
- func (c *CSV) ReadStream() (ds *Datastream, err error)
- func (c *CSV) Sample(n int) (Dataset, error)
- func (c *CSV) SetFields(fields []string)
- func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
- type Column
- type ColumnStats
- type ColumnType
- type Columns
- func (cols Columns) Add(newCols Columns, overwrite bool) (col2 Columns, added Columns)
- func (cols Columns) Clone() (newCols Columns)
- func (cols Columns) Dataset() Dataset
- func (cols Columns) DbTypes(args ...bool) []string
- func (cols Columns) FieldMap(toLower bool) map[string]int
- func (cols Columns) GetColumn(name string) Column
- func (cols Columns) IsDifferent(newCols Columns) bool
- func (cols Columns) IsDummy() bool
- func (cols Columns) IsSimilarTo(otherCols Columns) bool
- func (cols Columns) Keys() []string
- func (cols Columns) MakeRec(row []any) map[string]any
- func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)
- func (cols Columns) Names(args ...bool) []string
- func (cols Columns) Types(args ...bool) []string
- type Compressor
- type CompressorType
- type Dataflow
- func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
- func (df *Dataflow) AddInBytes(bytes uint64)
- func (df *Dataflow) AddOutBytes(bytes uint64)
- func (df *Dataflow) Bytes() (inBytes, outBytes uint64)
- func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
- func (df *Dataflow) CleanUp()
- func (df *Dataflow) Close()
- func (df *Dataflow) CloseCurrentBatches()
- func (df *Dataflow) Collect() (data Dataset, err error)
- func (df *Dataflow) Count() (cnt uint64)
- func (df *Dataflow) Defer(f func())
- func (df *Dataflow) DsTotalBytes() (bytes uint64)
- func (df *Dataflow) Err() (err error)
- func (df *Dataflow) IsClosed() bool
- func (df *Dataflow) IsEmpty() bool
- func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
- func (df *Dataflow) Pause(exceptDs ...string) bool
- func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
- func (df *Dataflow) SetColumns(columns []Column)
- func (df *Dataflow) SetEmpty()
- func (df *Dataflow) SetReady()
- func (df *Dataflow) Size() int
- func (df *Dataflow) SyncColumns()
- func (df *Dataflow) SyncStats()
- func (df *Dataflow) Unpause(exceptDs ...string)
- func (df *Dataflow) WaitClosed()
- func (df *Dataflow) WaitReady() error
- type Dataset
- func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (data *Dataset) Append(row []interface{})
- func (data *Dataset) ColValues(col int) []interface{}
- func (data *Dataset) ColValuesStr(col int) []string
- func (data *Dataset) FirstRow() []interface{}
- func (data *Dataset) FirstVal() interface{}
- func (data *Dataset) GetFields(lower ...bool) []string
- func (data *Dataset) InferColumnTypes()
- func (data *Dataset) Pick(colNames ...string) (nData Dataset)
- func (data *Dataset) Print(limit int)
- func (data *Dataset) Records(lower ...bool) []map[string]interface{}
- func (data *Dataset) SetFields(fields []string)
- func (data *Dataset) Sort(args ...any)
- func (data *Dataset) Stream() *Datastream
- func (data *Dataset) StringRecords(lower ...bool) []map[string]interface{}
- func (data *Dataset) ToJSONMap() map[string]interface{}
- func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)
- type Datastream
- func MergeDataflow(df *Dataflow) (dsN *Datastream)
- func NewDatastream(columns Columns) (ds *Datastream)
- func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
- func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
- func ReadCsvStream(path string) (ds *Datastream, err error)
- func (ds *Datastream) AddBytes(b int64)
- func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
- func (ds *Datastream) CastRowToString(row []any) []string
- func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
- func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
- func (ds *Datastream) Close()
- func (ds *Datastream) Collect(limit int) (Dataset, error)
- func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
- func (ds *Datastream) ConsumeParquetReaderSeeker(reader io.ReadSeeker) (err error)
- func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
- func (ds *Datastream) Defer(f func())
- func (ds *Datastream) Df() *Dataflow
- func (ds *Datastream) Err() (err error)
- func (ds *Datastream) GetConfig() (configMap map[string]string)
- func (ds *Datastream) GetFields(args ...bool) []string
- func (ds *Datastream) IsClosed() bool
- func (ds *Datastream) LatestBatch() *Batch
- func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
- func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
- func (ds *Datastream) NewBatch(columns Columns) *Batch
- func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader
- func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)
- func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)
- func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader
- func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)
- func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
- func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
- func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)
- func (ds *Datastream) Pause()
- func (ds *Datastream) Push(row []any)
- func (ds *Datastream) Records() <-chan map[string]any
- func (ds *Datastream) Rows() chan []any
- func (ds *Datastream) SetConfig(configMap map[string]string)
- func (ds *Datastream) SetEmpty()
- func (ds *Datastream) SetFields(fields []string)
- func (ds *Datastream) SetMetadata(jsonStr string)
- func (ds *Datastream) SetReady()
- func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
- func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
- func (ds *Datastream) Start() (err error)
- func (ds *Datastream) TryPause() bool
- func (ds *Datastream) Unpause()
- func (ds *Datastream) WaitReady() error
- type GzipCompressor
- type Iterator
- type KeyValue
- type Metadata
- type NoneCompressor
- type Parquet
- type Record
- type SSHClient
- func (s *SSHClient) Close()
- func (s *SSHClient) Connect() (err error)
- func (s *SSHClient) GetOutput() (stdout string, stderr string)
- func (s *SSHClient) OpenPortForward() (localPort int, err error)
- func (s *SSHClient) RunAsProcess() (localPort int, err error)
- func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)
- type Shaper
- type SnappyCompressor
- type StreamProcessor
- func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}
- func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string
- func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}
- func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
- func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}
- func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)
- func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
- func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
- func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
- func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
- func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
- func (sp *StreamProcessor) SetConfig(configMap map[string]string)
- type ZStandardCompressor
Constants ¶
This section is empty.
Variables ¶
var ( // RemoveTrailingDecZeros removes the trailing zeros in CastToString RemoveTrailingDecZeros = false SampleSize = 900 )
Functions ¶
func AutoDecompress ¶ added in v0.2.3
AutoDecompress auto detexts compression to decompress. Otherwise return same reader
func CleanHeaderRow ¶
CleanHeaderRow cleans the header row from incompatible characters
func CompareColumns ¶
CompareColumns compared two columns to see if there are similar
func CreateDummyFields ¶
CreateDummyFields creates dummy columns for csvs with no header row
func GetISO8601DateMap ¶
GetISO8601DateMap return a map of date parts for string formatting
func Iso8601ToGoLayout ¶ added in v0.3.49
https://www.w3.org/QA/Tips/iso-date https://www.w3.org/TR/NOTE-datetime https://www.iso.org/iso-8601-date-and-time-format.html
func MakeRowsChan ¶
func MakeRowsChan() chan []any
MakeRowsChan returns a buffered channel with default size
func NewJSONStream ¶ added in v0.3.1
func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream
func ReplaceAccents ¶ added in v0.3.257
func ReplaceAccents(sp *StreamProcessor, val string) (string, error)
func ScanCarrRet ¶
ScanCarrRet removes the \r runes that are without \n rightafter
Types ¶
type Batch ¶ added in v0.3.185
type Batch struct { Columns Columns Rows chan []any Previous *Batch Count int64 Limit int64 // contains filtered or unexported fields }
func (*Batch) AddTransform ¶ added in v0.3.215
func (*Batch) ColumnsChanged ¶ added in v0.3.185
func (*Batch) Ds ¶ added in v0.3.215
func (b *Batch) Ds() *Datastream
type BatchReader ¶ added in v0.3.188
type CSV ¶
type CSV struct { Path string NoHeader bool Delimiter rune FieldsPerRecord int Columns []Column File *os.File Data Dataset Reader io.Reader Config map[string]string NoDebug bool // contains filtered or unexported fields }
CSV is a csv object
func (*CSV) NewReader ¶
func (c *CSV) NewReader() (*io.PipeReader, error)
NewReader creates a Reader
func (*CSV) ReadStream ¶
func (c *CSV) ReadStream() (ds *Datastream, err error)
ReadStream returns the read CSV stream with Line 1 as header
func (*CSV) WriteStream ¶
func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)
WriteStream to CSV file
type Column ¶
type Column struct { Position int `json:"position"` Name string `json:"name"` Type ColumnType `json:"type"` DbType string `json:"db_type,omitempty"` DbPrecision int `json:"-"` DbScale int `json:"-"` Sourced bool `json:"-"` // whether is was sourced from a typed source Stats ColumnStats `json:"stats,omitempty"` Table string `json:"table,omitempty"` Schema string `json:"schema,omitempty"` Database string `json:"database,omitempty"` // contains filtered or unexported fields }
Column represents a schemata column
func InferFromStats ¶
InferFromStats using the stats to infer data types
func (*Column) IsDatetime ¶
IsDatetime returns whether the column is a datetime object
type ColumnStats ¶
type ColumnStats struct { MinLen int `json:"min_len,omitempty"` MaxLen int `json:"max_len,omitempty"` MaxDecLen int `json:"max_dec_len,omitempty"` Min int64 `json:"min"` Max int64 `json:"max"` NullCnt int64 `json:"null_cnt"` IntCnt int64 `json:"int_cnt,omitempty"` DecCnt int64 `json:"dec_cnt,omitempty"` BoolCnt int64 `json:"bool_cnt,omitempty"` JsonCnt int64 `json:"json_cnt,omitempty"` StringCnt int64 `json:"string_cnt,omitempty"` DateCnt int64 `json:"date_cnt,omitempty"` TotalCnt int64 `json:"total_cnt"` UniqCnt int64 `json:"uniq_cnt"` Checksum uint64 `json:"checksum"` }
ColumnStats holds statistics for a column
func (*ColumnStats) DistinctPercent ¶ added in v0.3.66
func (cs *ColumnStats) DistinctPercent() float64
func (*ColumnStats) DuplicateCount ¶ added in v0.3.66
func (cs *ColumnStats) DuplicateCount() int64
func (*ColumnStats) DuplicatePercent ¶ added in v0.3.66
func (cs *ColumnStats) DuplicatePercent() float64
type ColumnType ¶ added in v0.3.66
type ColumnType string
const ( BigIntType ColumnType = "bigint" BinaryType ColumnType = "binary" BoolType ColumnType = "bool" DateType ColumnType = "date" DatetimeType ColumnType = "datetime" DecimalType ColumnType = "decimal" IntegerType ColumnType = "integer" JsonType ColumnType = "json" SmallIntType ColumnType = "smallint" StringType ColumnType = "string" TextType ColumnType = "text" TimestampType ColumnType = "timestamp" TimestampzType ColumnType = "timestampz" FloatType ColumnType = "float" TimeType ColumnType = "time" TimezType ColumnType = "timez" )
func (ColumnType) IsBool ¶ added in v0.3.66
func (ct ColumnType) IsBool() bool
IsBool returns whether the column is a boolean
func (ColumnType) IsDatetime ¶ added in v0.3.66
func (ct ColumnType) IsDatetime() bool
IsDatetime returns whether the column is a datetime object
func (ColumnType) IsDecimal ¶ added in v0.3.66
func (ct ColumnType) IsDecimal() bool
IsDecimal returns whether the column is a decimal
func (ColumnType) IsInteger ¶ added in v0.3.66
func (ct ColumnType) IsInteger() bool
IsInteger returns whether the column is an integer
func (ColumnType) IsJSON ¶ added in v0.3.130
func (ct ColumnType) IsJSON() bool
IsJSON returns whether the column is a json
func (ColumnType) IsNumber ¶ added in v0.3.66
func (ct ColumnType) IsNumber() bool
IsNumber returns whether the column is a decimal or an integer
func (ColumnType) IsString ¶ added in v0.3.66
func (ct ColumnType) IsString() bool
IsString returns whether the column is a string
func (ColumnType) IsValid ¶ added in v0.3.252
func (ct ColumnType) IsValid() bool
IsValid returns whether the column has a valid type
type Columns ¶
type Columns []Column
Columns represent many columns
func NewColumns ¶ added in v0.3.168
NewColumnsFromFields creates Columns from fields
func NewColumnsFromFields ¶
NewColumnsFromFields creates Columns from fields
func (Columns) DbTypes ¶ added in v0.3.185
DbTypes return the column names/db types args -> (lower bool, cleanUp bool)
func (Columns) FieldMap ¶ added in v0.0.5
FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased
func (Columns) IsDifferent ¶ added in v0.3.185
func (Columns) IsSimilarTo ¶ added in v0.3.202
IsSimilarTo returns true if has same number of columns and contains the same columns, but may be in different order
func (Columns) Keys ¶ added in v0.3.244
Names return the column names args -> (lower bool, cleanUp bool)
func (Columns) MakeShaper ¶ added in v0.3.202
type Compressor ¶
type Compressor interface { Self() Compressor Compress(io.Reader) io.Reader Decompress(io.Reader) (io.Reader, error) Suffix() string }
Compressor implements differnt kind of compression
func NewCompressor ¶ added in v0.2.3
func NewCompressor(cpType CompressorType) Compressor
type CompressorType ¶
type CompressorType string
CompressorType is an int type for enum for the Compressor Type
const ( // AutoCompressorType is for auto compression AutoCompressorType CompressorType = "AUTO" // NoneCompressorType is for no compression NoneCompressorType CompressorType = "NONE" // ZipCompressorType is for Zip compression ZipCompressorType CompressorType = "ZIP" // GzipCompressorType is for Gzip compression GzipCompressorType CompressorType = "GZIP" // SnappyCompressorType is for Snappy compression SnappyCompressorType CompressorType = "SNAPPY" // ZStandardCompressorType is for ZStandard ZStandardCompressorType CompressorType = "ZSTD" )
func CompressorTypePtr ¶ added in v0.3.56
func CompressorTypePtr(v CompressorType) *CompressorType
CompressorTypePtr returns a pointer to the CompressorType value passed in.
type Dataflow ¶
type Dataflow struct { Columns Columns Buffer [][]interface{} StreamCh chan *Datastream Streams []*Datastream Context *g.Context Limit uint64 InBytes uint64 OutBytes uint64 Ready bool Inferred bool FsURL string OnColumnChanged func(col Column) error OnColumnAdded func(col Column) error StreamMap map[string]*Datastream SchemaVersion int // for column type version // contains filtered or unexported fields }
Dataflow is a collection of concurrent Datastreams
func MakeDataFlow ¶
func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)
MakeDataFlow create a dataflow from datastreams
func (*Dataflow) AddColumns ¶ added in v0.3.185
func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)
SetColumns sets the columns
func (*Dataflow) AddInBytes ¶ added in v0.3.0
AddInBytes add ingress bytes
func (*Dataflow) AddOutBytes ¶ added in v0.3.0
AddOutBytes add egress bytes
func (*Dataflow) ChangeColumn ¶ added in v0.3.185
func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool
SetColumns sets the columns
func (*Dataflow) CleanUp ¶ added in v0.3.0
func (df *Dataflow) CleanUp()
CleanUp refers the defer functions
func (*Dataflow) CloseCurrentBatches ¶ added in v0.3.185
func (df *Dataflow) CloseCurrentBatches()
func (*Dataflow) Collect ¶ added in v0.3.104
Collect reads from one or more streams and return a dataset
func (*Dataflow) Defer ¶
func (df *Dataflow) Defer(f func())
Defer runs a given function as close of Dataflow
func (*Dataflow) DsTotalBytes ¶ added in v0.3.0
func (*Dataflow) MakeStreamCh ¶ added in v0.3.136
func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)
MakeStreamCh determines whether to merge all the streams into one or keep them separate. If data is small per stream, it's best to merge For example, Bigquery has limits on number of operations can be called within a time limit
func (*Dataflow) PushStreamChan ¶ added in v0.3.102
func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)
func (*Dataflow) SetColumns ¶
SetColumns sets the columns
func (*Dataflow) SetEmpty ¶
func (df *Dataflow) SetEmpty()
SetEmpty sets all underlying datastreams empty
func (*Dataflow) SetReady ¶ added in v0.3.102
func (df *Dataflow) SetReady()
SetReady sets the df.ready
func (*Dataflow) SyncColumns ¶ added in v0.3.185
func (df *Dataflow) SyncColumns()
SyncColumns a workaround to synch the ds.Columns to the df.Columns
func (*Dataflow) SyncStats ¶
func (df *Dataflow) SyncStats()
SyncStats sync stream processor stats aggregated to the df.Columns
func (*Dataflow) WaitClosed ¶ added in v0.3.185
func (df *Dataflow) WaitClosed()
WaitClosed waits until dataflow is closed hack to make sure all streams are pushed
type Dataset ¶
type Dataset struct { Result *sqlx.Rows Columns Columns Rows [][]interface{} SQL string Duration float64 Sp *StreamProcessor Inferred bool SafeInference bool NoDebug bool }
Dataset is a query returned dataset
func NewDatasetFromMap ¶
NewDatasetFromMap return a new dataset
func (*Dataset) AddColumns ¶ added in v0.3.185
SetColumns sets the columns
func (*Dataset) ColValuesStr ¶
ColValuesStr returns the values of a one column as array or string
func (*Dataset) FirstRow ¶
func (data *Dataset) FirstRow() []interface{}
FirstRow returns the first row
func (*Dataset) FirstVal ¶
func (data *Dataset) FirstVal() interface{}
FirstVal returns the first value from the first row
func (*Dataset) InferColumnTypes ¶
func (data *Dataset) InferColumnTypes()
InferColumnTypes determines the columns types
func (*Dataset) Sort ¶ added in v0.3.168
Sort sorts by cols example: `data.Sort(0, 2, 3, false)` will sort col0, col2, col3 descending example: `data.Sort(0, 2, true)` will sort col0, col2 ascending
func (*Dataset) Stream ¶
func (data *Dataset) Stream() *Datastream
Stream returns a datastream of the dataset
func (*Dataset) StringRecords ¶ added in v0.3.252
Records return rows of maps or string values
type Datastream ¶
type Datastream struct { Columns Columns Buffer [][]any BatchChan chan *Batch Batches []*Batch CurrentBatch *Batch Count uint64 Context *g.Context Ready bool Bytes uint64 Sp *StreamProcessor SafeInference bool NoDebug bool Inferred bool ID string Metadata Metadata // map of column name to metadata type // contains filtered or unexported fields }
Datastream is a stream of rows
func MergeDataflow ¶
func MergeDataflow(df *Dataflow) (dsN *Datastream)
MergeDataflow merges the dataflow streams into one
func NewDatastream ¶
func NewDatastream(columns Columns) (ds *Datastream)
NewDatastream return a new datastream
func NewDatastreamContext ¶
func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)
NewDatastreamContext return a new datastream
func NewDatastreamIt ¶
func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)
NewDatastreamIt with it
func ReadCsvStream ¶
func ReadCsvStream(path string) (ds *Datastream, err error)
ReadCsvStream reads CSV and returns datasream
func (*Datastream) AddBytes ¶
func (ds *Datastream) AddBytes(b int64)
AddBytes add bytes as processed
func (*Datastream) AddColumns ¶ added in v0.3.185
func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)
SetColumns sets the columns
func (*Datastream) CastRowToString ¶ added in v0.3.168
func (ds *Datastream) CastRowToString(row []any) []string
CastRowToString returns the row as string casted
func (*Datastream) ChangeColumn ¶ added in v0.3.185
func (ds *Datastream) ChangeColumn(i int, newType ColumnType)
ChangeColumn applies a column type change
func (*Datastream) Chunk ¶
func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)
Chunk splits the datastream into chunk datastreams (in sequence)
func (*Datastream) Collect ¶
func (ds *Datastream) Collect(limit int) (Dataset, error)
Collect reads a stream and return a dataset limit of 0 is unlimited
func (*Datastream) ConsumeCsvReader ¶ added in v0.1.0
func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)
ConsumeCsvReader uses the provided reader to stream rows
func (*Datastream) ConsumeJsonReader ¶ added in v0.1.0
func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)
ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream
func (*Datastream) ConsumeParquetReader ¶ added in v0.3.220
func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeParquetReaderSeeker ¶ added in v0.3.220
func (ds *Datastream) ConsumeParquetReaderSeeker(reader io.ReadSeeker) (err error)
ConsumeParquetReader uses the provided reader to stream rows
func (*Datastream) ConsumeXmlReader ¶ added in v0.3.1
func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)
ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream
func (*Datastream) Defer ¶
func (ds *Datastream) Defer(f func())
Defer runs a given function as close of Datastream
func (*Datastream) Df ¶ added in v0.3.111
func (ds *Datastream) Df() *Dataflow
func (*Datastream) GetConfig ¶ added in v0.3.202
func (ds *Datastream) GetConfig() (configMap map[string]string)
GetConfig get config
func (*Datastream) GetFields ¶
func (ds *Datastream) GetFields(args ...bool) []string
GetFields return the fields of the Data
func (*Datastream) IsClosed ¶
func (ds *Datastream) IsClosed() bool
IsClosed is true is ds is closed
func (*Datastream) LatestBatch ¶ added in v0.3.202
func (ds *Datastream) LatestBatch() *Batch
func (*Datastream) Map ¶
func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)
Map applies the provided function to every row and returns the result
func (*Datastream) MapParallel ¶
func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)
MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.
func (*Datastream) NewBatch ¶ added in v0.3.185
func (ds *Datastream) NewBatch(columns Columns) *Batch
NewBatch create new batch with fixed columns should be used each time column type changes, or columns are added
func (*Datastream) NewCsvBufferReader ¶
func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader
NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvBufferReaderChnl ¶
func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)
NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory
func (*Datastream) NewCsvBytesChnl ¶
func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)
NewCsvBytesChnl returns a channel yield chunk of bytes of csv
func (*Datastream) NewCsvReader ¶
func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader
NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.
func (*Datastream) NewCsvReaderChnl ¶
func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)
NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewIterator ¶ added in v0.3.185
func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator
func (*Datastream) NewJsonLinesReaderChnl ¶ added in v0.3.16
func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) NewJsonReaderChnl ¶ added in v0.3.16
func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)
func (*Datastream) NewParquetReaderChnl ¶ added in v0.3.220
func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)
NewParquetReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes
func (*Datastream) Pause ¶ added in v0.3.185
func (ds *Datastream) Pause()
func (*Datastream) Records ¶
func (ds *Datastream) Records() <-chan map[string]any
Records return rows of maps
func (*Datastream) Rows ¶
func (ds *Datastream) Rows() chan []any
func (*Datastream) SetConfig ¶
func (ds *Datastream) SetConfig(configMap map[string]string)
SetConfig sets the ds.config values
func (*Datastream) SetEmpty ¶
func (ds *Datastream) SetEmpty()
SetEmpty sets the ds.Rows channel as empty
func (*Datastream) SetFields ¶
func (ds *Datastream) SetFields(fields []string)
SetFields sets the fields/columns of the Datastream
func (*Datastream) SetMetadata ¶ added in v0.3.109
func (ds *Datastream) SetMetadata(jsonStr string)
func (*Datastream) SetReady ¶ added in v0.3.102
func (ds *Datastream) SetReady()
SetReady sets the ds.ready
func (*Datastream) Shape ¶
func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)
Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows
func (*Datastream) Split ¶ added in v0.3.0
func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)
Split splits the datastream into parallel datastreams
func (*Datastream) Start ¶
func (ds *Datastream) Start() (err error)
Start generates the stream Should cycle the Iter Func until done
func (*Datastream) TryPause ¶ added in v0.3.185
func (ds *Datastream) TryPause() bool
func (*Datastream) Unpause ¶ added in v0.3.185
func (ds *Datastream) Unpause()
Unpause unpauses all streams
func (*Datastream) WaitReady ¶
func (ds *Datastream) WaitReady() error
WaitReady waits until datastream is ready
type GzipCompressor ¶ added in v0.2.3
type GzipCompressor struct { Compressor // contains filtered or unexported fields }
func (*GzipCompressor) Compress ¶ added in v0.2.3
func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*GzipCompressor) Decompress ¶ added in v0.2.3
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*GzipCompressor) Suffix ¶ added in v0.2.3
func (cp *GzipCompressor) Suffix() string
type Iterator ¶
type Iterator struct { Row []any Reprocess chan []any IsCasted bool Counter uint64 Context *g.Context Closed bool // contains filtered or unexported fields }
Iterator is the row provider for a datastream
func (*Iterator) Ds ¶ added in v0.3.126
func (it *Iterator) Ds() *Datastream
type Metadata ¶ added in v0.3.109
type NoneCompressor ¶ added in v0.2.3
type NoneCompressor struct { Compressor // contains filtered or unexported fields }
func (*NoneCompressor) Compress ¶ added in v0.2.3
func (cp *NoneCompressor) Compress(reader io.Reader) io.Reader
func (*NoneCompressor) Decompress ¶ added in v0.2.3
func (*NoneCompressor) Suffix ¶ added in v0.2.3
func (cp *NoneCompressor) Suffix() string
type Parquet ¶
type Parquet struct { Path string Reader *goparquet.FileReader Data *Dataset // contains filtered or unexported fields }
Parquet is a parquet object
func NewParquetStream ¶ added in v0.3.220
func NewParquetStream(reader io.ReadSeeker, columns Columns) (p *Parquet, err error)
type SSHClient ¶
type SSHClient struct { Host string Port int User string Password string TgtHost string TgtPort int PrivateKey string Err error // contains filtered or unexported fields }
SSHClient is a client to connect to a ssh server with the main goal of forwarding ports
func (*SSHClient) OpenPortForward ¶
OpenPortForward forwards the port as specified
func (*SSHClient) RunAsProcess ¶
RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key
type SnappyCompressor ¶ added in v0.2.3
type SnappyCompressor struct { Compressor // contains filtered or unexported fields }
func (*SnappyCompressor) Compress ¶ added in v0.2.3
func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*SnappyCompressor) Decompress ¶ added in v0.2.3
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*SnappyCompressor) Suffix ¶ added in v0.2.3
func (cp *SnappyCompressor) Suffix() string
type StreamProcessor ¶
type StreamProcessor struct { N uint64 // contains filtered or unexported fields }
StreamProcessor processes rows and values
func NewStreamProcessor ¶
func NewStreamProcessor() *StreamProcessor
NewStreamProcessor returns a new StreamProcessor
func (*StreamProcessor) CastRow ¶
func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}
CastRow casts each value of a row slows down processing about 40%?
func (*StreamProcessor) CastToString ¶
func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string
CastToString to string. used for csv writing slows processing down 5% with upstream CastRow or 35% without upstream CastRow
func (*StreamProcessor) CastToTime ¶
func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)
CastToTime converts interface to time
func (*StreamProcessor) CastType ¶
func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}
CastType casts the type of an interface CastType is used to cast the interface place holders?
func (*StreamProcessor) CastVal ¶
func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}
CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/flarco/dbio/iop' -bench '^BenchmarkProcessVal'
func (*StreamProcessor) CastValWithoutStats ¶
func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}
CastValWithoutStats casts the value without counting stats
func (*StreamProcessor) GetType ¶
func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)
GetType returns the type of an interface
func (*StreamProcessor) ParseString ¶
func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}
ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"
func (*StreamProcessor) ParseTime ¶
func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)
ParseTime parses a date string and returns time.Time
func (*StreamProcessor) ParseVal ¶
func (sp *StreamProcessor) ParseVal(val interface{}) interface{}
ParseVal parses the value into its appropriate type
func (*StreamProcessor) ProcessRow ¶
func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}
ProcessRow processes a row
func (*StreamProcessor) ProcessVal ¶
func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}
ProcessVal processes a value
func (*StreamProcessor) SetConfig ¶
func (sp *StreamProcessor) SetConfig(configMap map[string]string)
SetConfig sets the data.Sp.config values
type ZStandardCompressor ¶ added in v0.2.3
type ZStandardCompressor struct { Compressor // contains filtered or unexported fields }
func (*ZStandardCompressor) Compress ¶ added in v0.2.3
func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader
Compress uses gzip to compress
func (*ZStandardCompressor) Decompress ¶ added in v0.2.3
Decompress uses gzip to decompress if it is gzip. Otherwise return same reader
func (*ZStandardCompressor) Suffix ¶ added in v0.2.3
func (cp *ZStandardCompressor) Suffix() string