iop

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2021 License: GPL-3.0 Imports: 37 Imported by: 5

README

Input-Process-Output (ipo)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ShowProgress use the progress bar to show progress
	ShowProgress = false

	// RemoveTrailingDecZeros removes the trailing zeros in CastToString
	RemoveTrailingDecZeros = false
	SampleSize             = 900
)

Functions

func CleanHeaderRow

func CleanHeaderRow(header []string) []string

CleanHeaderRow cleans the header row from incompatible characters

func CompareColumns

func CompareColumns(columns1 []Column, columns2 []Column) (err error)

CompareColumns compared two columns to see if there are similar

func Compress

func Compress(reader io.Reader) io.Reader

Compress uses gzip to compress

func CreateDummyFields

func CreateDummyFields(numCols int) (cols []string)

CreateDummyFields creates dummy columns for csvs with no header row

func Decompress

func Decompress(reader io.Reader) (gReader io.Reader, err error)

Decompress uses gzip to decompress if it is gzip. Otherwise return same reader

func GetISO8601DateMap

func GetISO8601DateMap(t time.Time) map[string]interface{}

GetISO8601DateMap return a map of date parts for string formatting

func IsDummy

func IsDummy(columns []Column) bool

IsDummy returns true if the columns are injected by CreateDummyFields

func MakeRowsChan

func MakeRowsChan() chan []interface{}

MakeRowsChan returns a buffered channel with default size

func NewPBar

func NewPBar(d time.Duration) *pb.ProgressBar

NewPBar creates a new progress bar

func Row

func Row(vals ...interface{}) []interface{}

Row is a row

func ScanCarrRet

func ScanCarrRet(data []byte, atEOF bool) (advance int, token []byte, err error)

ScanCarrRet removes the \r runes that are without \n rightafter

func Unzip

func Unzip(src string, dest string) ([]string, error)

Unzip will decompress a zip archive, moving all files and folders within the zip file (parameter 1) to an output directory (parameter 2).

Types

type BaseCompressor

type BaseCompressor struct {
	Compressor
	// contains filtered or unexported fields
}

func (*BaseCompressor) Context

func (cp *BaseCompressor) Context() (context *g.Context)

Context provides a pointer to context

type CSV

type CSV struct {
	Path      string
	NoHeader  bool
	Delimiter rune
	Columns   []Column
	File      *os.File
	Data      Dataset
	Reader    io.Reader
	NoTrace   bool
	// contains filtered or unexported fields
}

CSV is a csv object

func (*CSV) InferSchema

func (c *CSV) InferSchema() error

InferSchema returns a sample of n rows

func (*CSV) NewReader

func (c *CSV) NewReader() (*io.PipeReader, error)

NewReader creates a Reader

func (*CSV) Read

func (c *CSV) Read() (data Dataset, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) ReadStream

func (c *CSV) ReadStream() (ds *Datastream, err error)

ReadStream returns the read CSV stream with Line 1 as header

func (*CSV) Sample

func (c *CSV) Sample(n int) (Dataset, error)

Sample returns a sample of n rows

func (*CSV) SetFields

func (c *CSV) SetFields(fields []string)

SetFields sets the fields

func (*CSV) WriteStream

func (c *CSV) WriteStream(ds *Datastream) (cnt uint64, err error)

WriteStream to CSV file

type Column

type Column struct {
	Position    int             `json:"position"`
	Name        string          `json:"name"`
	Type        string          `json:"type"`
	DbType      string          `json:"-"`
	DbPrecision int             `json:"-"`
	DbScale     int             `json:"-"`
	Sourced     bool            `json:"-"` // whether is was sourced from a typed source
	Stats       ColumnStats     `json:"-"`
	ColType     *sql.ColumnType `json:"-"`
	// contains filtered or unexported fields
}

Column represents a schemata column

func InferFromStats

func InferFromStats(columns []Column, safe bool, noTrace bool) []Column

InferFromStats using the stats to infer data types

func SyncColumns

func SyncColumns(columns1 []Column, columns2 []Column) (columns []Column, err error)

SyncColumns syncs two columns together

func (*Column) IsBool

func (col *Column) IsBool() bool

IsBool returns whether the column is a boolean

func (*Column) IsDatetime

func (col *Column) IsDatetime() bool

IsDatetime returns whether the column is a datetime object

func (*Column) IsDecimal

func (col *Column) IsDecimal() bool

IsDecimal returns whether the column is a decimal

func (*Column) IsInteger

func (col *Column) IsInteger() bool

IsInteger returns whether the column is an integer

func (*Column) IsNumber

func (col *Column) IsNumber() bool

IsNumber returns whether the column is a decimal or an integer

func (*Column) IsString

func (col *Column) IsString() bool

IsString returns whether the column is a string

type ColumnStats

type ColumnStats struct {
	MinLen    int
	MaxLen    int
	MaxDecLen int
	Min       int64
	Max       int64
	NullCnt   int64
	IntCnt    int64
	DecCnt    int64
	BoolCnt   int64
	StringCnt int64
	DateCnt   int64
	TotalCnt  int64
	Checksum  uint64
}

ColumnStats holds statistics for a column

type Columns

type Columns []Column

Columns represent many columns

func MakeColumns added in v0.0.5

func MakeColumns(obj interface{}, useTag string, typeMap ...map[string]string) Columns

MakeColumns makes columns from a struct

func NewColumnsFromFields

func NewColumnsFromFields(fields ...string) (cols Columns)

NewColumnsFromFields creates Columns from fields

func (Columns) Dataset added in v0.0.5

func (cols Columns) Dataset() Dataset

Dataset return an empty inferred dataset

func (Columns) FieldMap added in v0.0.5

func (cols Columns) FieldMap(toLower bool) map[string]int

FieldMap return the fields map of indexes when `toLower` is true, field keys are lower cased

func (Columns) GetColumn

func (cols Columns) GetColumn(name string) Column

GetColumn returns the matched Col

func (Columns) IsDummy

func (cols Columns) IsDummy() bool

IsDummy returns true if the columns are injected by CreateDummyFields

func (Columns) Names added in v0.0.5

func (cols Columns) Names() []string

Names return the column names

func (Columns) Normalize

func (cols Columns) Normalize(name string) string

Normalize returns the normalized field name

type Compressor

type Compressor interface {
	Self() Compressor
	Compress(io.Reader) io.Reader
	Decompress(io.Reader) (io.Reader, error)
}

Compressor implements differnt kind of compression

type CompressorType

type CompressorType int

CompressorType is an int type for enum for the Compressor Type

const (
	// ZipCompressorType is for Zip compression
	ZipCompressorType CompressorType = iota
	// GzipCompressorType is for Gzip compression
	GzipCompressorType
	// SnappyCompressorType is for Snappy compression
	SnappyCompressorType
)

type Dataflow

type Dataflow struct {
	Columns  []Column
	Buffer   [][]interface{}
	StreamCh chan *Datastream
	Streams  []*Datastream
	Context  g.Context
	Limit    uint64

	Bytes    int64
	Ready    bool
	Inferred bool
	FsURL    string
	// contains filtered or unexported fields
}

Dataflow is a collection of concurrent Datastreams

func MakeDataFlow

func MakeDataFlow(dss ...*Datastream) (df *Dataflow, err error)

MakeDataFlow create a dataflow from datastreams

func NewDataflow

func NewDataflow(limit ...int) (df *Dataflow)

NewDataflow creates a new dataflow

func (*Dataflow) AddBytes

func (df *Dataflow) AddBytes(b int64)

AddBytes add bytes as processed

func (*Dataflow) Close

func (df *Dataflow) Close()

Close closes the df

func (*Dataflow) Count

func (df *Dataflow) Count() (cnt uint64)

Count returns the aggregate count

func (*Dataflow) Defer

func (df *Dataflow) Defer(f func())

Defer runs a given function as close of Dataflow

func (*Dataflow) Err

func (df *Dataflow) Err() (err error)

Err return the error if any

func (*Dataflow) IsClosed

func (df *Dataflow) IsClosed() bool

IsClosed is true is ds is closed

func (*Dataflow) IsEmpty

func (df *Dataflow) IsEmpty() bool

IsEmpty returns true is ds.Rows of all channels as empty

func (*Dataflow) PushStreams

func (df *Dataflow) PushStreams(dss ...*Datastream)

PushStreams waits until each datastream is ready, then adds them to the dataflow. Should be launched as a goroutine

func (*Dataflow) ResetStats

func (df *Dataflow) ResetStats()

ResetStats resets the stats

func (*Dataflow) SetColumns

func (df *Dataflow) SetColumns(columns []Column)

SetColumns sets the columns

func (*Dataflow) SetEmpty

func (df *Dataflow) SetEmpty()

SetEmpty sets all underlying datastreams empty

func (*Dataflow) Size

func (df *Dataflow) Size() int

Size is the number of streams

func (*Dataflow) SyncStats

func (df *Dataflow) SyncStats()

SyncStats sync stream processor stats aggregated to the df.Columns

func (*Dataflow) WaitReady

func (df *Dataflow) WaitReady() error

WaitReady waits until dataflow is ready

type Dataset

type Dataset struct {
	Result        *sqlx.Rows
	Columns       Columns
	Rows          [][]interface{}
	SQL           string
	Duration      float64
	Sp            *StreamProcessor
	Inferred      bool
	SafeInference bool
	NoTrace       bool
}

Dataset is a query returned dataset

func Collect

func Collect(dss ...*Datastream) (data Dataset, err error)

Collect reads from one or more streams and return a dataset

func NewDataset

func NewDataset(columns Columns) (data Dataset)

NewDataset return a new dataset

func NewDatasetFromMap

func NewDatasetFromMap(m map[string]interface{}) (data Dataset)

NewDatasetFromMap return a new dataset

func ReadCsv

func ReadCsv(path string) (Dataset, error)

ReadCsv reads CSV and returns dataset

func (*Dataset) Append

func (data *Dataset) Append(row []interface{})

Append appends a new row

func (*Dataset) ColValues

func (data *Dataset) ColValues(col int) []interface{}

ColValues returns the values of a one column as array

func (*Dataset) ColValuesStr

func (data *Dataset) ColValuesStr(col int) []string

ColValuesStr returns the values of a one column as array or string

func (*Dataset) FirstRow

func (data *Dataset) FirstRow() []interface{}

FirstRow returns the first row

func (*Dataset) FirstVal

func (data *Dataset) FirstVal() interface{}

FirstVal returns the first value from the first row

func (*Dataset) GetFields

func (data *Dataset) GetFields() []string

GetFields return the fields of the Data

func (*Dataset) InferColumnTypes

func (data *Dataset) InferColumnTypes()

InferColumnTypes determines the columns types

func (*Dataset) Records

func (data *Dataset) Records() []map[string]interface{}

Records return rows of maps

func (*Dataset) SetFields

func (data *Dataset) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Dataset) Stream

func (data *Dataset) Stream() *Datastream

Stream returns a datastream of the dataset

func (*Dataset) ToJSONMap

func (data *Dataset) ToJSONMap() map[string]interface{}

ToJSONMap converst to a JSON object

func (*Dataset) WriteCsv

func (data *Dataset) WriteCsv(path string) error

WriteCsv writes to a csv file

type Datastream

type Datastream struct {
	Columns       Columns
	Rows          chan []interface{}
	Buffer        [][]interface{}
	Count         uint64
	Context       g.Context
	Ready         bool
	Bytes         int64
	Sp            *StreamProcessor
	SafeInference bool
	NoTrace       bool
	Inferred      bool
	// contains filtered or unexported fields
}

Datastream is a stream of rows

func MergeDataflow

func MergeDataflow(df *Dataflow) (ds *Datastream)

MergeDataflow merges the dataflow streams into one

func NewDatastream

func NewDatastream(columns Columns) (ds *Datastream)

NewDatastream return a new datastream

func NewDatastreamContext

func NewDatastreamContext(ctx context.Context, columns Columns) (ds *Datastream)

NewDatastreamContext return a new datastream

func NewDatastreamIt

func NewDatastreamIt(ctx context.Context, columns Columns, nextFunc func(it *Iterator) bool) (ds *Datastream)

NewDatastreamIt with it

func ReadCsvStream

func ReadCsvStream(path string) (ds *Datastream, err error)

ReadCsvStream reads CSV and returns datasream

func (*Datastream) AddBytes

func (ds *Datastream) AddBytes(b int64)

AddBytes add bytes as processed

func (*Datastream) Chunk

func (ds *Datastream) Chunk(limit uint64) (chDs chan *Datastream)

Chunk splits the datastream into chunk datastreams

func (*Datastream) Clone added in v0.0.5

func (ds *Datastream) Clone() *Datastream

Clone returns a new datastream of the same source

func (*Datastream) Close

func (ds *Datastream) Close()

Close closes the datastream

func (*Datastream) Collect

func (ds *Datastream) Collect(limit int) (Dataset, error)

Collect reads a stream and return a dataset limit of 0 is unlimited

func (*Datastream) ConsumeCsvReader added in v0.1.0

func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error)

ConsumeCsvReader uses the provided reader to stream rows

func (*Datastream) ConsumeJsonReader added in v0.1.0

func (ds *Datastream) ConsumeJsonReader(reader io.Reader) (err error)

ConsumeJsonReader uses the provided reader to stream JSON This will put each JSON rec as one string value so payload can be processed downstream

func (*Datastream) Defer

func (ds *Datastream) Defer(f func())

Defer runs a given function as close of Datastream

func (*Datastream) Err

func (ds *Datastream) Err() (err error)

Err return the error if any

func (*Datastream) GetFields

func (ds *Datastream) GetFields(toLower ...bool) []string

GetFields return the fields of the Data

func (*Datastream) IsClosed

func (ds *Datastream) IsClosed() bool

IsClosed is true is ds is closed

func (*Datastream) IsEmpty

func (ds *Datastream) IsEmpty() bool

IsEmpty returns true is ds.Rows channel as empty

func (*Datastream) Map

func (ds *Datastream) Map(newColumns Columns, transf func([]interface{}) []interface{}) (nDs *Datastream)

Map applies the provided function to every row and returns the result

func (*Datastream) MapParallel

func (ds *Datastream) MapParallel(transf func([]interface{}) []interface{}, numWorkers int) (nDs *Datastream)

MapParallel applies the provided function to every row in parallel and returns the result. Order is not maintained.

func (*Datastream) NewCsvBufferReader

func (ds *Datastream) NewCsvBufferReader(limit int, bytesLimit int64) *bytes.Reader

NewCsvBufferReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvBufferReaderChnl

func (ds *Datastream) NewCsvBufferReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *bytes.Reader)

NewCsvBufferReaderChnl provides a channel of readers as the limit is reached data is read in memory, whereas NewCsvReaderChnl does not hold in memory

func (*Datastream) NewCsvBytesChnl

func (ds *Datastream) NewCsvBytesChnl(chunkRowSize int) (dataChn chan *[]byte)

NewCsvBytesChnl returns a channel yield chunk of bytes of csv

func (*Datastream) NewCsvReader

func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReader

NewCsvReader creates a Reader with limit. If limit == 0, then read all rows.

func (*Datastream) NewCsvReaderChnl

func (ds *Datastream) NewCsvReaderChnl(rowLimit int, bytesLimit int64) (readerChn chan *io.PipeReader)

NewCsvReaderChnl provides a channel of readers as the limit is reached each channel flows as fast as the consumer consumes

func (*Datastream) Push

func (ds *Datastream) Push(row []interface{})

Push return the fields of the Data

func (*Datastream) Records

func (ds *Datastream) Records() <-chan map[string]interface{}

Records return rows of maps

func (*Datastream) ResetStats

func (ds *Datastream) ResetStats()

ResetStats resets the stats

func (*Datastream) SetConfig

func (ds *Datastream) SetConfig(configMap map[string]string)

SetConfig sets the ds.config values

func (*Datastream) SetEmpty

func (ds *Datastream) SetEmpty()

SetEmpty sets the ds.Rows channel as empty

func (*Datastream) SetFields

func (ds *Datastream) SetFields(fields []string)

SetFields sets the fields/columns of the Datastream

func (*Datastream) Shape

func (ds *Datastream) Shape(columns Columns) (nDs *Datastream, err error)

Shape changes the column types as needed, to the provided columns var It will cast the already wrongly casted rows, and not recast the correctly casted rows

func (*Datastream) Start

func (ds *Datastream) Start() (err error)

Start generates the stream Should cycle the Iter Func until done

func (*Datastream) WaitReady

func (ds *Datastream) WaitReady() error

WaitReady waits until datastream is ready

type Iterator

type Iterator struct {
	Row     []interface{}
	Counter uint64
	Context g.Context
	Closed  bool
	// contains filtered or unexported fields
}

Iterator is the row provider for a datastream

type Parquet

type Parquet struct {
	Path    string
	Columns []Column
	File    *os.File
	PFile   source.ParquetFile
	Data    *Dataset
}

Parquet is a parquet object

func (*Parquet) ReadStream

func (p *Parquet) ReadStream() (*Datastream, error)

ReadStream returns the read Parquet stream into a Datastream https://github.com/xitongsys/parquet-go/blob/master/example/read_partial.go

func (*Parquet) WriteStream

func (p *Parquet) WriteStream(ds *Datastream) error

WriteStream to Parquet file from datastream

type SSHClient

type SSHClient struct {
	Host       string
	Port       int
	User       string
	Password   string
	TgtHost    string
	TgtPort    int
	PrivateKey string
	Err        error
	// contains filtered or unexported fields
}

SSHClient is a client to connect to a ssh server with the main goal of forwarding ports

func (*SSHClient) Close

func (s *SSHClient) Close()

Close stops the client connection

func (*SSHClient) Connect

func (s *SSHClient) Connect() (err error)

Connect connects to the server

func (*SSHClient) GetOutput

func (s *SSHClient) GetOutput() (stdout string, stderr string)

GetOutput return stdout & stderr outputs

func (*SSHClient) OpenPortForward

func (s *SSHClient) OpenPortForward() (localPort int, err error)

OpenPortForward forwards the port as specified

func (*SSHClient) RunAsProcess

func (s *SSHClient) RunAsProcess() (localPort int, err error)

RunAsProcess uses a separate process enables to use public key auth https://git-scm.com/book/pt-pt/v2/Git-no-Servidor-Generating-Your-SSH-Public-Key

func (*SSHClient) SftpClient

func (s *SSHClient) SftpClient() (sftpClient *sftp.Client, err error)

SftpClient returns an SftpClient

type StreamProcessor

type StreamProcessor struct {
	N uint64
	// contains filtered or unexported fields
}

StreamProcessor processes rows and values

func NewStreamProcessor

func NewStreamProcessor() *StreamProcessor

NewStreamProcessor returns a new StreamProcessor

func (*StreamProcessor) CastRow

func (sp *StreamProcessor) CastRow(row []interface{}, columns []Column) []interface{}

CastRow casts each value of a row

func (*StreamProcessor) CastToString

func (sp *StreamProcessor) CastToString(i int, val interface{}, valType ...string) string

CastToString to string. used for csv writing

func (*StreamProcessor) CastToTime

func (sp *StreamProcessor) CastToTime(i interface{}) (t time.Time, err error)

CastToTime converts interface to time

func (*StreamProcessor) CastType

func (sp *StreamProcessor) CastType(val interface{}, typ string) interface{}

CastType casts the type of an interface CastType is used to cast the interface place holders?

func (*StreamProcessor) CastVal

func (sp *StreamProcessor) CastVal(i int, val interface{}, typ string) interface{}

CastVal casts values with stats collection which degrades performance by ~10% go test -benchmem -run='^$ github.com/flarco/dbio/iop' -bench '^BenchmarkProcessVal'

func (*StreamProcessor) CastValWithoutStats

func (sp *StreamProcessor) CastValWithoutStats(i int, val interface{}, typ string) interface{}

CastValWithoutStats casts the value without counting stats

func (*StreamProcessor) GetType

func (sp *StreamProcessor) GetType(val interface{}) (typ string)

GetType returns the type of an interface

func (*StreamProcessor) ParseString

func (sp *StreamProcessor) ParseString(s string, jj ...int) interface{}

ParseString return an interface string: "varchar" integer: "integer" decimal: "decimal" date: "date" datetime: "timestamp" timestamp: "timestamp" text: "text"

func (*StreamProcessor) ParseTime

func (sp *StreamProcessor) ParseTime(i interface{}) (t time.Time, err error)

ParseTime parses a date string and returns time.Time

func (*StreamProcessor) ParseVal

func (sp *StreamProcessor) ParseVal(val interface{}) interface{}

ParseVal parses the value into its appropriate type

func (*StreamProcessor) ProcessRow

func (sp *StreamProcessor) ProcessRow(row []interface{}) []interface{}

ProcessRow processes a row

func (*StreamProcessor) ProcessVal

func (sp *StreamProcessor) ProcessVal(val interface{}) interface{}

ProcessVal processes a value

func (*StreamProcessor) SetConfig

func (sp *StreamProcessor) SetConfig(configMap map[string]string)

SetConfig sets the data.Sp.config values

type ZipCompressor

type ZipCompressor struct {
	BaseCompressor
	// contains filtered or unexported fields
}

ZipCompressor is for Zip Compression

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL