iop

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2024 License: GPL-3.0 Imports: 52 Imported by: 5

README

Input-Process-Output (ipo)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RemoveTrailingDecZeros removes the trailing zeros in CastToString
	RemoveTrailingDecZeros = false
	SampleSize             = 900
)
View Source
var Transforms = map[string]TransformFunc{}

Functions

func AutoDecompress

func AutoDecompress(reader io.Reader) (gReader io.Reader, err error)

AutoDecompress auto detexts compression to decompress. Otherwise return same reader

func CleanHeaderRow

func CleanHeaderRow(header []string) []string

CleanHeaderRow cleans the header row from incompatible characters

func CleanName

func CleanName(name string) (newName string)

func CompareColumns

func CompareColumns(columns1 Columns, columns2 Columns) (reshape bool, err error)

CompareColumns compared two columns to see if there are similar

func CreateDummyFields

func CreateDummyFields(numCols int) (cols []string)

CreateDummyFields creates dummy columns for csvs with no header row

func GetISO8601DateMap

func GetISO8601DateMap(t time.Time) map[string]interface{}

GetISO8601DateMap return a map of date parts for string formatting

func IsDummy

func IsDummy(columns []Column) bool

IsDummy returns true if the columns are injected by CreateDummyFields

func MakeRowsChan

func MakeRowsChan() chan []any

MakeRowsChan returns a buffered channel with default size

func NewJSONStream

func NewJSONStream(ds *Datastream, decoder decoderLike, flatten bool, jmespath string) *jsonStream

func ReplaceAccents

func ReplaceAccents(sp *StreamProcessor, val string) (string, error)

func Row

func Row(vals ...any) []any

Row is a row

func ScanCarrRet

func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)

ScanCarrRet removes the \r runes that are without \n rightafter

func Unzip

func Unzip(src string, dest string) ([]string, error)

Unzip will decompress a zip archive, moving all files and folders within the zip file (parameter 1) to an output directory (parameter 2).

Types

type Avro

type Avro struct {
	Path   string
	Reader *goavro.OCFReader
	Data   *Dataset
	// contains filtered or unexported fields
}

Avro is a avro` object

func NewAvroStream

func NewAvroStream(reader io.ReadSeeker, columns Columns) (a *Avro, err error)

func (*Avro) Columns

func (a *Avro) Columns() Columns

type Batch

type Batch struct {
	Columns  Columns
	Rows     chan []any
	Previous *Batch
	Count    int64
	Limit    int64
	// contains filtered or unexported fields
}

func (*Batch) AddTransform

func (b *Batch) AddTransform(transf func(row []any) []any)

func (*Batch) Close

func (b *Batch) Close()

func (*Batch) ColumnsChanged

func (b *Batch) ColumnsChanged() bool

func (*Batch) Ds

func (b *Batch) Ds() *Datastream

func (*Batch) ID

func (b *Batch) ID() string

func (*Batch) IsFirst

func (b *Batch) IsFirst() bool

func (*Batch) Push

func (b *Batch) Push(row []any)

func (*Batch) Shape

func (b *Batch) Shape(tgtColumns Columns, pause ...bool) (err error)

type BatchReader

type BatchReader struct {
	Batch   *Batch
	Columns Columns
	Reader  io.Reader
	Counter int
}

type CSV

type CSV struct {
	Path            string
	NoHeader        bool
	Delimiter       rune
	FieldsPerRecord int
	Columns         []Column
	File            *os.File
	Data            Dataset
	Reader          io.Reader
	Config          map[string]string
	NoDebug         bool
	// contains filtered or unexported fields
}

CSV is a csv object

func (*CSV) InferSchema

func (c *CSV) InferSchema() error

InferSchema returns a sample of n rows

func (*CSV) NewReader

func (c *CSV) NewReader() (*io.PipeReader, error)

NewReader creates a Reader

func (*CSV) Read

func (c *CSV) Read() (data Dataset, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) ReadStream

func (c *CSV) ReadStream() (ds *Datastream, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) Sample

func (c *CSV) Sample(n int) (Dataset, error)

Sample returns a sample of n rows

func (*CSV) SetFields

func (c *CSV) SetFields(fields []string)

SetFields sets the fields

func (*CSV) WriteStream

func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)

WriteStream to CSV file

type Column

type Column struct {
	Position    int         `json:"position"`
	Name        string      `json:"name"`
	Type        ColumnType  `json:"type"`
	DbType      string      `json:"db_type,omitempty"`
	DbPrecision int         `json:"db_precision,omitempty"`
	DbScale     int         `json:"db_scale,omitempty"`
	Sourced     bool        `json:"-"` // whether is was sourced from a typed source
	Stats       ColumnStats `json:"stats,omitempty"`

	Table       string `json:"table,omitempty"`
	Schema      string `json:"schema,omitempty"`
	Database    string `json:"database,omitempty"`
	Description string `json:"description,omitempty"`

	Metadata map[string]string `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

Column represents a schemata column

func InferFromStats

func InferFromStats(columns []Column, safe bool, noDebug bool) []Column

InferFromStats using the stats to infer data types

func (*Column) GoType

func (col *Column) GoType() reflect.Type

func (*Column) IsBool

func (col *Column) IsBool() bool

IsBool returns whether the column is a boolean

func (*Column) IsDatetime

func (col *Column) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (*Column) IsDecimal

func (col *Column) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (*Column) IsInteger

func (col *Column) IsInteger() bool

IsInteger returns whether the column is an integer

func (*Column) IsNumber

func (col *Column) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (*Column) IsString

func (col *Column) IsString() bool

IsString returns whether the column is a string

func (*Column) IsUnique

func (col *Column) IsUnique() bool

func (*Column) Key

func (col *Column) Key() string

func (*Column) SetLengthPrecisionScale

func (col *Column) SetLengthPrecisionScale()

SetLengthPrecisionScale parse length, precision, scale

func (*Column) SetMetadata

func (col *Column) SetMetadata(key string, value string)

type ColumnStats

type ColumnStats struct {
	MinLen    int    `json:"min_len,omitempty"`
	MaxLen    int    `json:"max_len,omitempty"`
	MaxDecLen int    `json:"max_dec_len,omitempty"`
	Min       int64  `json:"min"`
	Max       int64  `json:"max"`
	NullCnt   int64  `json:"null_cnt"`
	IntCnt    int64  `json:"int_cnt,omitempty"`
	DecCnt    int64  `json:"dec_cnt,omitempty"`
	BoolCnt   int64  `json:"bool_cnt,omitempty"`
	JsonCnt   int64  `json:"json_cnt,omitempty"`
	StringCnt int64  `json:"string_cnt,omitempty"`
	DateCnt   int64  `json:"date_cnt,omitempty"`
	TotalCnt  int64  `json:"total_cnt"`
	UniqCnt   int64  `json:"uniq_cnt"`
	Checksum  uint64 `json:"checksum"`
}

ColumnStats holds statistics for a column

func (*ColumnStats) DistinctPercent

func (cs *ColumnStats) DistinctPercent() float64

func (*ColumnStats) DuplicateCount

func (cs *ColumnStats) DuplicateCount() int64

func (*ColumnStats) DuplicatePercent

func (cs *ColumnStats) DuplicatePercent() float64

type ColumnType

type ColumnType string
const (
	BigIntType     ColumnType = "bigint"
	BinaryType     ColumnType = "binary"
	BoolType       ColumnType = "bool"
	DateType       ColumnType = "date"
	DatetimeType   ColumnType = "datetime"
	DecimalType    ColumnType = "decimal"
	IntegerType    ColumnType = "integer"
	JsonType       ColumnType = "json"
	SmallIntType   ColumnType = "smallint"
	StringType     ColumnType = "string"
	TextType       ColumnType = "text"
	TimestampType  ColumnType = "timestamp"
	TimestampzType ColumnType = "timestampz"
	FloatType      ColumnType = "float"
	TimeType       ColumnType = "time"
	TimezType      ColumnType = "timez"
)

func (ColumnType) IsBool

func (ct ColumnType) IsBool() bool

IsBool returns whether the column is a boolean

func (ColumnType) IsDatetime

func (ct ColumnType) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (ColumnType) IsDecimal

func (ct ColumnType) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (ColumnType) IsInteger

func (ct ColumnType) IsInteger() bool

IsInteger returns whether the column is an integer

func (ColumnType) IsJSON

func (ct ColumnType) IsJSON() bool

IsJSON returns whether the column is a json

func (ColumnType) IsNumber

func (ct ColumnType) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (ColumnType) IsString

func (ct ColumnType) IsString() bool

IsString returns whether the column is a string

func (ColumnType) IsValid

func (ct ColumnType) IsValid() bool

IsValid returns whether the column has a valid type

type Columns

type Columns []Column

Columns represent many columns

func NewColumns

func NewColumns(cols ...Column) Columns

NewColumnsFromFields creates Columns from fields

func NewColumnsFromFields

func NewColumnsFromFields(fields ...string) (cols Columns)

NewColumnsFromFields creates Columns from fields

func (Columns) Add

func (cols Columns) Add(newCols Columns, overwrite bool) (col2 Columns, added Columns)

func (Columns) Clone

func (cols Columns) Clone() (newCols Columns)

Names return the column names

func (Columns) Coerce

func (cols Columns) Coerce(castCols Columns, hasHeader bool) (newCols Columns)

Coerce casts columns into specified types

func (Columns) Dataset

func (cols Columns) Dataset() Dataset

Dataset return an empty inferred dataset

func (Columns) DbTypes

func (cols Columns) DbTypes(args ...bool) []string

DbTypes return the column names/db types args -> (lower bool, cleanUp bool)

func (Columns) FieldMap

func (cols Columns) FieldMap(toLower bool) map[string]int

FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased

func (Columns) GetColumn

func (cols Columns) GetColumn(name string) Column

GetColumn returns the matched Col

func (Columns) GetKeys

func (cols Columns) GetKeys(keyType KeyType) Columns

GetKeys gets key columns

func (Columns) IsDifferent

func (cols Columns) IsDifferent(newCols Columns) bool

func (Columns) IsDummy

func (cols Columns) IsDummy() bool

IsDummy returns true if the columns are injected by CreateDummyFields

func (Columns) IsSimilarTo

func (cols Columns) IsSimilarTo(otherCols Columns) bool

IsSimilarTo returns true if has same number of columns and contains the same columns, but may be in different order

func (Columns) Keys

func (cols Columns) Keys() []string

Names return the column names args -> (lower bool, cleanUp bool)

func (Columns) MakeRec

func (cols Columns) MakeRec(row []any) map[string]any

func (Columns) MakeShaper

func (cols Columns) MakeShaper(tgtColumns Columns) (shaper *Shaper, err error)

func (Columns) Names

func (cols Columns) Names(args ...bool) []string

Names return the column names args -> (lower bool, cleanUp bool)

func (Columns) SetKeys

func (cols Columns) SetKeys(keyType KeyType, names ...string)

SetKeys sets key columns

func (Columns) Types

func (cols Columns) Types(args ...bool) []string

Types return the column names/types args -> (lower bool, cleanUp bool)

type Compressor

type Compressor interface {
	Self() Compressor
	Compress(io.Reader) io.Reader
	Decompress(io.Reader) (io.Reader, error)
	Suffix() string
}

Compressor implements differnt kind of compression

func NewCompressor

func NewCompressor(cpType CompressorType) Compressor

type CompressorType

type CompressorType string

CompressorType is an int type for enum for the Compressor Type

const (
	// AutoCompressorType is for auto compression
	AutoCompressorType CompressorType = "AUTO"
	// NoneCompressorType is for no compression
	NoneCompressorType CompressorType = "NONE"
	// ZipCompressorType is for Zip compression
	ZipCompressorType CompressorType = "ZIP"
	// GzipCompressorType is for Gzip compression
	GzipCompressorType CompressorType = "GZIP"
	// SnappyCompressorType is for Snappy compression
	SnappyCompressorType CompressorType = "SNAPPY"
	// ZStandardCompressorType is for ZStandard
	ZStandardCompressorType CompressorType = "ZSTD"
)

func CompressorTypePtr

func CompressorTypePtr(v CompressorType) *CompressorType

CompressorTypePtr returns a pointer to the CompressorType value passed in.

type Dataflow

type Dataflow struct {
	Columns  Columns
	Buffer   [][]interface{}
	StreamCh chan *Datastream
	Streams  []*Datastream
	Context  *g.Context
	Limit    uint64
	InBytes  uint64
	OutBytes uint64

	Ready           bool
	Inferred        bool
	FsURL           string
	OnColumnChanged func(col Column) error
	OnColumnAdded   func(col Column) error

	StreamMap map[string]*Datastream

	SchemaVersion int // for column type version
	// contains filtered or unexported fields
}

Dataflow is a collection of concurrent Datastreams

func MakeDataFlow

func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)

MakeDataFlow create a dataflow from datastreams

func NewDataflow

func NewDataflow(limit ...int) (df *Dataflow)

NewDataflow creates a new dataflow

func (*Dataflow) AddColumns

func (df *Dataflow) AddColumns(newCols Columns, overwrite bool, exceptDs ...string) (added Columns, processOk bool)

SetColumns sets the columns

func (*Dataflow) AddInBytes

func (df *Dataflow) AddInBytes(bytes uint64)

AddInBytes add ingress bytes

func (*Dataflow) AddOutBytes

func (df *Dataflow) AddOutBytes(bytes uint64)

AddOutBytes add egress bytes

func (*Dataflow) Bytes

func (df *Dataflow) Bytes() (inBytes, outBytes uint64)

func (*Dataflow) ChangeColumn

func (df *Dataflow) ChangeColumn(i int, newType ColumnType, exceptDs ...string) bool

SetColumns sets the columns

func (*Dataflow) CleanUp

func (df *Dataflow) CleanUp()

CleanUp refers the defer functions

func (*Dataflow) Close

func (df *Dataflow) Close()

Close closes the df

func (*Dataflow) CloseCurrentBatches

func (df *Dataflow) CloseCurrentBatches()

func (*Dataflow) Collect

func (df *Dataflow) Collect() (data Dataset, err error)

Collect reads from one or more streams and return a dataset

func (*Dataflow) Count

func (df *Dataflow) Count() (cnt uint64)

Count returns the aggregate count

func (*Dataflow) Defer

func (df *Dataflow) Defer(f func())

Defer runs a given function as close of Dataflow

func (*Dataflow) DsTotalBytes

func (df *Dataflow) DsTotalBytes() (bytes uint64)

func (*Dataflow) Err

func (df *Dataflow) Err() (err error)

Err return the error if any

func (*Dataflow) IsClosed

func (df *Dataflow) IsClosed() bool

IsClosed is true is ds is closed

func (*Dataflow) IsEmpty

func (df *Dataflow) IsEmpty() bool

IsEmpty returns true is ds.Rows of all channels as empty

func (*Dataflow) MakeStreamCh

func (df *Dataflow) MakeStreamCh(forceMerge bool) (streamCh chan *Datastream)

MakeStreamCh determines whether to merge all the streams into one or keep them separate. If data is small per stream, it's best to merge For example, Bigquery has limits on number of operations can be called within a time limit

func (*Dataflow) Pause

func (df *Dataflow) Pause(exceptDs ...string) bool

Pause pauses all streams

func (*Dataflow) PushStreamChan

func (df *Dataflow) PushStreamChan(dsCh chan *Datastream)

func (*Dataflow) SetColumns

func (df *Dataflow) SetColumns(columns []Column)

SetColumns sets the columns

func (*Dataflow) SetEmpty

func (df *Dataflow) SetEmpty()

SetEmpty sets all underlying datastreams empty

func (*Dataflow) SetReady

func (df *Dataflow) SetReady()

SetReady sets the df.ready

func (*Dataflow) Size

func (df *Dataflow) Size() int

Size is the number of streams

func (*Dataflow) SyncColumns

func (df *Dataflow) SyncColumns()

SyncColumns a workaround to synch the ds.Columns to the df.Columns

func (*Dataflow) SyncStats

func (df *Dataflow) SyncStats()

SyncStats sync stream processor stats aggregated to the df.Columns

func (*Dataflow) Unpause

func (df *Dataflow) Unpause(exceptDs ...string)

Unpause unpauses all streams

func (*Dataflow) WaitClosed

func (df *Dataflow) WaitClosed()

WaitClosed waits until dataflow is closed hack to make sure all streams are pushed

func (*Dataflow) WaitReady

func (df *Dataflow) WaitReady() error

WaitReady waits until dataflow is ready

type Dataset

type Dataset struct {
	Result        *sqlx.Rows
	Columns       Columns
	Rows          [][]interface{}
	SQL           string
	Duration      float64
	Sp            *StreamProcessor
	Inferred      bool
	SafeInference bool
	NoDebug       bool
}

Dataset is a query returned dataset

func NewDataset

func NewDataset(columns Columns) (data Dataset)

NewDataset return a new dataset

func NewDatasetFromMap

func NewDatasetFromMap(m map[string]interface{}) (data Dataset)

NewDatasetFromMap return a new dataset

func ReadCsv

func ReadCsv(path string) (Dataset, error)

ReadCsv reads CSV and returns dataset

func (*Dataset) AddColumns

func (data *Dataset) AddColumns(newCols Columns, overwrite bool) (added Columns)

SetColumns sets the columns

func (*Dataset) Append

func (data *Dataset) Append(row []interface{})

Append appends a new row

func (*Dataset) ColValues

func (data *Dataset) ColValues(col int) []interface{}

ColValues returns the values of a one column as array

func (*Dataset) ColValuesStr

func (data *Dataset) ColValuesStr(col int) []string

ColValuesStr returns the values of a one column as array or string

func (*Dataset) FirstRow

func (data *Dataset) FirstRow() []interface{}

FirstRow returns the first row

func (*Dataset) FirstVal

func (data *Dataset) FirstVal() interface{}

FirstVal returns the first value from the first row

func (*Dataset) GetFields

func (data *Dataset) GetFields(lower ...bool) []string

GetFields return the fields of the Data

func (*Dataset) InferColumnTypes

func (data *Dataset) InferColumnTypes()

InferColumnTypes determines the columns types

func (*Dataset) Pick

func (data *Dataset) Pick(colNames ...string) (nData Dataset)

Pick returns a new dataset with specified columns

func (*Dataset) Print

func (data *Dataset) Print(limit int)

Print pretty prints the data with a limit 0 is unlimited

func (*Dataset) Records

func (data *Dataset) Records(lower ...bool) []map[string]interface{}

Records return rows of maps

func (*Dataset) RecordsCasted

func (data *Dataset) RecordsCasted(lower ...bool) []map[string]interface{}

RecordsCasted return rows of maps or casted values

func (*Dataset) RecordsString

func (data *Dataset) RecordsString(lower ...bool) []map[string]interface{}

RecordsString return rows of maps or string values

func (*Dataset) SetFields

func (data *Dataset) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Dataset) Sort

func (data *Dataset) Sort(args ...any)

Sort sorts by cols example: `data.Sort(0, 2, 3, false)` will sort col0, col2, col3 descending example: `data.Sort(0, 2, true)` will sort col0, col2 ascending

func (*Dataset) Stream

func (data *Dataset) Stream() *Datastream

Stream returns a datastream of the dataset

func (*Dataset) ToJSONMap

func (data *Dataset) ToJSONMap() map[string]interface{}

ToJSONMap converst to a JSON object

func (*Dataset) WriteCsv

func (data *Dataset) WriteCsv(dest io.Writer) (tbw int, err error)

WriteCsv writes to a writer

type Datastream

type Datastream struct {
	Columns       Columns
	Buffer        [][]any
	BatchChan     chan *Batch
	Batches       []*Batch
	CurrentBatch  *Batch
	Count         uint64
	Context       *g.Context
	Ready         bool
	Bytes         uint64
	Sp            *StreamProcessor
	SafeInference bool
	NoDebug       bool
	Inferred      bool

	ID       string
	Metadata Metadata // map of column name to metadata type
	// contains filtered or unexported fields
}

Datastream is a stream of rows

func MergeDataflow

func MergeDataflow(df *Dataflow) (dsN *Datastream)

MergeDataflow merges the dataflow streams into one

func NewDatastream

func NewDatastream(columns Columns) (ds *Datastream)

NewDatastream return a new datastream

func NewDatastreamContext

func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)

NewDatastreamContext return a new datastream

func NewDatastreamIt

func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)

NewDatastreamIt with it

func ReadCsvStream

func ReadCsvStream(path string) (ds *Datastream, err error)

ReadCsvStream reads CSV and returns datasream

func (*Datastream) AddBytes

func (ds *Datastream) AddBytes(b int64)

AddBytes add bytes as processed

func (*Datastream) AddColumns

func (ds *Datastream) AddColumns(newCols Columns, overwrite bool) (added Columns)

SetColumns sets the columns

func (*Datastream) CastRowToString

func (ds *Datastream) CastRowToString(row []any) []string

CastRowToString returns the row as string casted

func (*Datastream) ChangeColumn

func (ds *Datastream) ChangeColumn(i int, newType ColumnType)

ChangeColumn applies a column type change

func (*Datastream) Chunk

func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)

Chunk splits the datastream into chunk datastreams (in sequence)

func (*Datastream) Close

func (ds *Datastream) Close()

Close closes the datastream

func (*Datastream) Collect

func (ds *Datastream) Collect(limit int) (Dataset, error)

Collect reads a stream and return a dataset limit of 0 is unlimited

func (*Datastream) ConsumeAvroReader

func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error)

ConsumeAvroReader uses the provided reader to stream rows

func (*Datastream) ConsumeAvroReaderSeeker

func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)

ConsumeAvroReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeCsvReader

func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)

ConsumeCsvReader uses the provided reader to stream rows

func (*Datastream) ConsumeJsonReader

func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)

ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream

func (*Datastream) ConsumeParquetReader

func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream) ConsumeParquetReaderSeeker

func (ds *Datastream) ConsumeParquetReaderSeeker(reader io.ReaderAt) (err error)

ConsumeParquetReader uses the provided reader to stream rows

func (*Datastream) ConsumeSASReader

func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error)

ConsumeSASReader uses the provided reader to stream rows

func (*Datastream) ConsumeSASReaderSeeker

func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error)

ConsumeSASReaderSeeker uses the provided reader to stream rows

func (*Datastream) ConsumeXmlReader

func (ds *Datastream) ConsumeXmlReader(reader io.Reader) (err error)

ConsumeXmlReader uses the provided reader to stream XML This will put each XML rec as one string value so payload can be processed downstream

func (*Datastream) Defer

func (ds *Datastream) Defer(f func())

Defer runs a given function as close of Datastream

func (*Datastream) Df

func (ds *Datastream) Df() *Dataflow

func (*Datastream) Err

func (ds *Datastream) Err() (err error)

Err return the error if any

func (*Datastream) GetConfig

func (ds *Datastream) GetConfig() (configMap map[string]string)

GetConfig get config

func (*Datastream) GetFields

func (ds *Datastream) GetFields(args ...bool) []string

GetFields return the fields of the Data

func (*Datastream) IsClosed

func (ds *Datastream) IsClosed() bool

IsClosed is true is ds is closed

func (*Datastream) LatestBatch

func (ds *Datastream) LatestBatch() *Batch

func (*Datastream) Map

func (ds *Datastream) Map(newColumns Columns, transf func([]any) []any) (nDs *Datastream)

Map applies the provided function to every row and returns the result

func (*Datastream) MapParallel

func (ds *Datastream) MapParallel(transf func([]any) []any, numWorkers int) (nDs *Datastream)

MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.

func (*Datastream) NewBatch

func (ds *Datastream) NewBatch(columns Columns) *Batch

NewBatch create new batch with fixed columns should be used each time column type changes, or columns are added

func (*Datastream) NewCsvBufferReader

func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader

NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvBufferReaderChnl

func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)

NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory

func (*Datastream) NewCsvBytesChnl

func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)

NewCsvBytesChnl returns a channel yield chunk of bytes of csv

func (*Datastream) NewCsvReader

func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader

NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvReaderChnl

func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *BatchReader)

NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) NewIterator

func (ds *Datastream) NewIterator(columns Columns, nextFunc func(it *Iterator) bool) *Iterator

func (*Datastream) NewJsonLinesReaderChnl

func (ds *Datastream) NewJsonLinesReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

NewJsonLinesReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) NewJsonReaderChnl

func (ds *Datastream) NewJsonReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

func (*Datastream) NewParquetReaderChnl

func (ds *Datastream) NewParquetReaderChnl(rowLimit int, bytesLimit int64, compression CompressorType) (readerChn chan *BatchReader)

NewParquetReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) Pause

func (ds *Datastream) Pause()

func (*Datastream) Push

func (ds *Datastream) Push(row []any)

Push return the fields of the Data

func (*Datastream) Records

func (ds *Datastream) Records() <-chan map[string]any

Records return rows of maps

func (*Datastream) Rows

func (ds *Datastream) Rows() chan []any

func (*Datastream) SetConfig

func (ds *Datastream) SetConfig(configMap map[string]string)

SetConfig sets the ds.config values

func (*Datastream) SetEmpty

func (ds *Datastream) SetEmpty()

SetEmpty sets the ds.Rows channel as empty

func (*Datastream) SetFields

func (ds *Datastream) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Datastream) SetMetadata

func (ds *Datastream) SetMetadata(jsonStr string)

func (*Datastream) SetReady

func (ds *Datastream) SetReady()

SetReady sets the ds.ready

func (*Datastream) Shape

func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)

Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows

func (*Datastream) Split

func (ds *Datastream) Split(numStreams ...int) (dss []*Datastream)

Split splits the datastream into parallel datastreams

func (*Datastream) Start

func (ds *Datastream) Start() (err error)

Start generates the stream Should cycle the Iter Func until done

func (*Datastream) TryPause

func (ds *Datastream) TryPause() bool

func (*Datastream) Unpause

func (ds *Datastream) Unpause()

Unpause unpauses all streams

func (*Datastream) WaitClosed

func (ds *Datastream) WaitClosed()

WaitClosed waits until dataflow is closed hack to make sure all streams are pushed

func (*Datastream) WaitReady

func (ds *Datastream) WaitReady() error

WaitReady waits until datastream is ready

type GzipCompressor

type GzipCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*GzipCompressor) Compress

func (cp *GzipCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*GzipCompressor) Decompress

func (cp *GzipCompressor) Decompress(reader io.Reader) (gReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*GzipCompressor) Suffix

func (cp *GzipCompressor) Suffix() string

type Iterator

type Iterator struct {
	Row       []any
	Reprocess chan []any
	IsCasted  bool
	Counter   uint64
	Context   *g.Context
	Closed    bool
	// contains filtered or unexported fields
}

Iterator is the row provider for a datastream

func (*Iterator) Ds

func (it *Iterator) Ds() *Datastream

type KeyType

type KeyType string
const (
	PrimaryKey   KeyType = "primary_key"
	AggregateKey KeyType = "aggregate_key"
	UpdateKey    KeyType = "update_key"
	SortKey      KeyType = "sort_key"
)

type KeyValue

type KeyValue struct {
	Key   string `json:"key"`
	Value any    `json:"value"`
}

type Metadata

type Metadata struct {
	StreamURL KeyValue `json:"stream_url"`
	LoadedAt  KeyValue `json:"loaded_at"`
}

func (*Metadata) AsMap

func (m *Metadata) AsMap() map[string]any

AsMap return as map

type NoneCompressor

type NoneCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*NoneCompressor) Compress

func (cp *NoneCompressor) Compress(reader io.Reader) io.Reader

func (*NoneCompressor) Decompress

func (cp *NoneCompressor) Decompress(reader io.Reader) (gReader io.Reader, err error)

func (*NoneCompressor) Suffix

func (cp *NoneCompressor) Suffix() string

type Parquet

type Parquet struct {
	Path   string
	Reader *parquet.Reader
	Data   *Dataset
	// contains filtered or unexported fields
}

Parquet is a parquet object

func NewParquetStream

func NewParquetStream(reader io.ReaderAt, columns Columns) (p *Parquet, err error)

func (*Parquet) Columns

func (p *Parquet) Columns() Columns

type RecNode

type RecNode struct {
	// contains filtered or unexported fields
}

func NewRecNode

func NewRecNode(cols Columns) *RecNode

func (*RecNode) Compression

func (rn *RecNode) Compression() compress.Codec

func (*RecNode) Encoding

func (rn *RecNode) Encoding() encoding.Encoding

func (*RecNode) Fields

func (rn *RecNode) Fields() []parquet.Field

func (*RecNode) GoType

func (rn *RecNode) GoType() reflect.Type

func (*RecNode) ID

func (rn *RecNode) ID() int

func (*RecNode) Leaf

func (rn *RecNode) Leaf() bool

func (*RecNode) Optional

func (rn *RecNode) Optional() bool

func (*RecNode) Repeated

func (rn *RecNode) Repeated() bool

func (*RecNode) Required

func (rn *RecNode) Required() bool

func (*RecNode) String

func (rn *RecNode) String() string

func (*RecNode) Type

func (rn *RecNode) Type() parquet.Type

type Record

type Record struct {
	Columns *Columns
	Values  []any
}

type SAS

type SAS struct {
	Path   string
	Reader *datareader.SAS7BDAT
	Data   *Dataset
	// contains filtered or unexported fields
}

SAS is a sas7bdat object

func NewSASStream

func NewSASStream(reader io.ReadSeeker, columns Columns) (s *SAS, err error)

func (*SAS) Columns

func (s *SAS) Columns() Columns

type SSHClient

type SSHClient struct {
	Host       string
	Port       int
	User       string
	Password   string
	TgtHost    string
	TgtPort    int
	PrivateKey string
	Passphrase string
	Err        error
	// contains filtered or unexported fields
}

SSHClient is a client to connect to a ssh server with the main goal of forwarding ports

func (*SSHClient) Close

func (s *SSHClient) Close()

Close stops the client connection

func (*SSHClient) Connect

func (s *SSHClient) Connect() (err error)

Connect connects to the server

func (*SSHClient) GetOutput

func (s *SSHClient) GetOutput() (stdout string, stderr string)

GetOutput return stdout & stderr outputs

func (*SSHClient) OpenPortForward

func (s *SSHClient) OpenPortForward() (localPort int, err error)

OpenPortForward forwards the port as specified

func (*SSHClient) RunAsProcess

func (s *SSHClient) RunAsProcess() (localPort int, err error)

RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key

func (*SSHClient) SftpClient

func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)

SftpClient returns an SftpClient

type Shaper

type Shaper struct {
	Func       func([]any) []any
	SrcColumns Columns
	TgtColumns Columns
	ColMap     map[int]int
}

type SnappyCompressor

type SnappyCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*SnappyCompressor) Compress

func (cp *SnappyCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*SnappyCompressor) Decompress

func (cp *SnappyCompressor) Decompress(reader io.Reader) (sReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*SnappyCompressor) Suffix

func (cp *SnappyCompressor) Suffix() string

type StreamProcessor

type StreamProcessor struct {
	N uint64
	// contains filtered or unexported fields
}

StreamProcessor processes rows and values

func NewStreamProcessor

func NewStreamProcessor() *StreamProcessor

NewStreamProcessor returns a new StreamProcessor

func (*StreamProcessor) CastRow

func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interface{}

CastRow casts each value of a row slows down processing about 40%?

func (*StreamProcessor) CastToString

func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...ColumnType) string

CastToString to string. used for csv writing slows processing down 5% with upstream CastRow or 35% without upstream CastRow

func (*StreamProcessor) CastToTime

func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)

CastToTime converts interface to time

func (*StreamProcessor) CastType

func (sp *StreamProcessor) CastType(val interface{}, typ ColumnType) interface{}

CastType casts the type of an interface CastType is used to cast the interface place holders?

func (*StreamProcessor) CastVal

func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interface{}

CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/slingdata-io/sling-cli/core/dbio/iop' -bench '^BenchmarkProcessVal'

func (*StreamProcessor) CastValWithoutStats

func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ ColumnType) interface{}

CastValWithoutStats casts the value without counting stats

func (*StreamProcessor) GetType

func (sp *StreamProcessor) GetType(val interface{}) (typ ColumnType)

GetType returns the type of an interface

func (*StreamProcessor) ParseString

func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}

ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"

func (*StreamProcessor) ParseTime

func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)

ParseTime parses a date string and returns time.Time

func (*StreamProcessor) ParseVal

func (sp *StreamProcessor) ParseVal(val interface{}) interface{}

ParseVal parses the value into its appropriate type

func (*StreamProcessor) ProcessRow

func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}

ProcessRow processes a row

func (*StreamProcessor) ProcessVal

func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}

ProcessVal processes a value

func (*StreamProcessor) SetConfig

func (sp *StreamProcessor) SetConfig(configMap map[string]string)

SetConfig sets the data.Sp.config values

type TransformFunc

type TransformFunc func(*StreamProcessor, string) (string, error)

type ZStandardCompressor

type ZStandardCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*ZStandardCompressor) Compress

func (cp *ZStandardCompressor) Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func (*ZStandardCompressor) Decompress

func (cp *ZStandardCompressor) Decompress(reader io.Reader) (sReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func (*ZStandardCompressor) Suffix

func (cp *ZStandardCompressor) Suffix() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL