export

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const AccountAll = ETLParamAccountAll
View Source
const BatchReadRows = 4000

BatchReadRows ~= 20MB rawlog file has about 3700+ rows

View Source
const CsvExtension = ".csv"
View Source
const ETLParamAccountAll = "*"
View Source
const ETLParamTypeAll = MergeLogTypeALL
View Source
const FilenameElems = 3
View Source
const FilenameIdxType = 2
View Source
const FilenameSeparator = "_"
View Source
const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
View Source
const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
View Source
const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
View Source
const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
View Source
const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
View Source
const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
View Source
const MergeTaskCronExprYesterday = "0 5 0 * * *"
View Source
const MergeTaskToday = "today"
View Source
const MergeTaskYesterday = "yesterday"
View Source
const ParamSeparator = " "
View Source
const PathElems = 7
View Source
const PathIdxAccount = 0
View Source
const PathIdxFilename = 6
View Source
const PathIdxTable = 5
View Source
const READONLY = 1
View Source
const READWRITE = 0

Variables

View Source
var CommonCsvOptions = &CsvOptions{
	FieldTerminator: ',',
	EncloseRune:     '"',
	Terminator:      '\n',
}
View Source
var ETLParamTSAll = time.Time{}
View Source
var ExternalTableEngine = "EXTERNAL"
View Source
var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour

MergeTaskCronExpr support sec level

View Source
var NormalTableEngine = "TABLE"

Functions

func CreateCronTask

func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error

func DefaultContext

func DefaultContext() context.Context

func GetOptionFactory

func GetOptionFactory(engine string) func(db, tbl, account string) TableOptions

func InitCronExpr

func InitCronExpr(duration time.Duration) error

InitCronExpr support min interval 5 min, max 12 hour

func InitMerge

func InitMerge(mergeCycle time.Duration, filesize int) error

func MergeTaskExecutorFactory

func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error

func MergeTaskMetadata

func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata

MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"

func Register

func Register(name batchpipe.HasName, impl batchpipe.PipeImpl[batchpipe.HasName, any])

func ResetGlobalBatchProcessor

func ResetGlobalBatchProcessor()

func SetDefaultContextFunc

func SetDefaultContextFunc(f getContextFunc)

func SetGlobalBatchProcessor

func SetGlobalBatchProcessor(p BatchProcessor)

func SetPathBuilder

func SetPathBuilder(pathBuilder string) error

func String2Bytes

func String2Bytes(s string) (ret []byte)

Types

type AccountDatePathBuilder

type AccountDatePathBuilder struct{}

func NewAccountDatePathBuilder

func NewAccountDatePathBuilder() *AccountDatePathBuilder

func (*AccountDatePathBuilder) Build

func (b *AccountDatePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string

func (*AccountDatePathBuilder) BuildETLPath

func (b *AccountDatePathBuilder) BuildETLPath(db, name, account string) string

BuildETLPath implement PathBuilder

# account | typ | ts | table | filename like: * /* /*/*/* /metric /*.csv

func (*AccountDatePathBuilder) GetName

func (b *AccountDatePathBuilder) GetName() string

func (*AccountDatePathBuilder) NewLogFilename

func (b *AccountDatePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string

func (*AccountDatePathBuilder) NewMergeFilename

func (b *AccountDatePathBuilder) NewMergeFilename(timestampStart, timestampEnd string) string

func (*AccountDatePathBuilder) ParsePath

func (b *AccountDatePathBuilder) ParsePath(path string) (CSVPath, error)

func (*AccountDatePathBuilder) SupportAccountStrategy

func (b *AccountDatePathBuilder) SupportAccountStrategy() bool

func (*AccountDatePathBuilder) SupportMergeSplit

func (b *AccountDatePathBuilder) SupportMergeSplit() bool

type BatchProcessor

type BatchProcessor interface {
	Collect(context.Context, batchpipe.HasName) error
	Start() bool
	Stop(graceful bool) error
}

func GetGlobalBatchProcessor

func GetGlobalBatchProcessor() BatchProcessor

type CSVPath

type CSVPath interface {
	Table() string
	Timestamp() []string
}

type CSVReader

type CSVReader interface {
	ReadLine() ([]string, error)
	Close()
}

func NewCSVReader

func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (CSVReader, error)

type CSVWriter

type CSVWriter interface {
	// WriteStrings write record as one line into csv file
	WriteStrings(record []string) error
	// FlushAndClose flush its buffer and close.
	FlushAndClose() error
}

func NewCSVWriter

func NewCSVWriter(ctx context.Context, fs fileservice.FileService, path string, buf []byte) (CSVWriter, error)

type Cache

type Cache interface {
	Put(*Row)
	Size() int64
	Flush(CSVWriter) error
	Reset()
	IsEmpty() bool
}

type Column

type Column struct {
	Name    string
	Type    string
	Default string
	Comment string
	Alias   string // only use in view
}

func (*Column) ToCreateSql

func (col *Column) ToCreateSql() string

ToCreateSql return column scheme in create sql

  • case 1: `column_name` varchar(36) DEFAULT "def_val" COMMENT "what am I, with default."
  • case 2: `column_name` varchar(36) NOT NULL COMMENT "what am I. Without default, SO NOT NULL."

type ContentReader

type ContentReader struct {
	// contains filtered or unexported fields
}

func NewContentReader

func NewContentReader(ctx context.Context, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader

func (*ContentReader) Close

func (s *ContentReader) Close()

func (*ContentReader) ReadLine

func (s *ContentReader) ReadLine() ([]string, error)

type ContentWriter

type ContentWriter struct {
	// contains filtered or unexported fields
}

func NewContentWriter

func NewContentWriter(writer io.StringWriter, buffer []byte) *ContentWriter

func (*ContentWriter) FlushAndClose

func (w *ContentWriter) FlushAndClose() error

func (*ContentWriter) WriteStrings

func (w *ContentWriter) WriteStrings(record []string) error

type CsvOptions

type CsvOptions struct {
	FieldTerminator rune // like: ','
	EncloseRune     rune // like: '"'
	Terminator      rune // like: '\n'
}

type CsvTableOptions

type CsvTableOptions struct {
	Formatter string
	DbName    string
	TblName   string
	Account   string
}

func (*CsvTableOptions) FormatDdl

func (o *CsvTableOptions) FormatDdl(ddl string) string

func (*CsvTableOptions) GetCreateOptions

func (o *CsvTableOptions) GetCreateOptions() string

func (*CsvTableOptions) GetTableOptions

func (o *CsvTableOptions) GetTableOptions(builder PathBuilder) string

type DBTablePathBuilder

type DBTablePathBuilder struct{}

func NewDBTablePathBuilder

func NewDBTablePathBuilder() *DBTablePathBuilder

func (*DBTablePathBuilder) Build

func (m *DBTablePathBuilder) Build(account string, typ MergeLogType, ts time.Time, db string, name string) string

func (*DBTablePathBuilder) BuildETLPath

func (m *DBTablePathBuilder) BuildETLPath(db, name, account string) string

BuildETLPath implement PathBuilder

like: system/metric_*.csv

func (*DBTablePathBuilder) GetName

func (m *DBTablePathBuilder) GetName() string

func (*DBTablePathBuilder) NewLogFilename

func (m *DBTablePathBuilder) NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string

func (*DBTablePathBuilder) NewMergeFilename

func (m *DBTablePathBuilder) NewMergeFilename(timestampStart, timestampEnd string) string

func (*DBTablePathBuilder) ParsePath

func (m *DBTablePathBuilder) ParsePath(path string) (CSVPath, error)

func (*DBTablePathBuilder) SupportAccountStrategy

func (m *DBTablePathBuilder) SupportAccountStrategy() bool

func (*DBTablePathBuilder) SupportMergeSplit

func (m *DBTablePathBuilder) SupportMergeSplit() bool

type FSWriter

type FSWriter struct {
	// contains filtered or unexported fields
}

func NewFSWriter

func NewFSWriter(ctx context.Context, fs fileservice.FileService, opts ...FSWriterOption) *FSWriter

func (*FSWriter) Write

func (w *FSWriter) Write(p []byte) (n int, err error)

Write implement io.Writer, Please execute in series

func (*FSWriter) WriteString

func (w *FSWriter) WriteString(s string) (n int, err error)

WriteString implement io.StringWriter

type FSWriterFactory

type FSWriterFactory func(ctx context.Context, db string, name batchpipe.HasName, options ...FSWriterOption) io.StringWriter

func GetFSWriterFactory

func GetFSWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string) FSWriterFactory

type FSWriterOption

type FSWriterOption func(*FSWriter)

func WithAccount

func WithAccount(a string) FSWriterOption

func WithDatabase

func WithDatabase(dir string) FSWriterOption

func WithFilePath

func WithFilePath(filepath string) FSWriterOption

func WithName

func WithName(item batchpipe.HasName) FSWriterOption

func WithNode

func WithNode(uuid, nodeType string) FSWriterOption

func WithPathBuilder

func WithPathBuilder(builder PathBuilder) FSWriterOption

func WithTimestamp

func WithTimestamp(ts time.Time) FSWriterOption

func (FSWriterOption) Apply

func (f FSWriterOption) Apply(w *FSWriter)

type MOCollector

type MOCollector struct {
	// contains filtered or unexported fields
}

MOCollector handle all bufferPipe

func NewMOCollector

func NewMOCollector() *MOCollector

func (*MOCollector) Collect

func (c *MOCollector) Collect(ctx context.Context, i batchpipe.HasName) error

func (*MOCollector) Start

func (c *MOCollector) Start() bool

Start all goroutine worker, including collector, generator, and exporter

func (*MOCollector) Stop

func (c *MOCollector) Stop(graceful bool) error

type MapCache

type MapCache struct {
	// contains filtered or unexported fields
}

func (*MapCache) Flush

func (c *MapCache) Flush(writer CSVWriter) error

func (*MapCache) IsEmpty

func (c *MapCache) IsEmpty() bool

func (*MapCache) Put

func (c *MapCache) Put(r *Row)

func (*MapCache) Reset

func (c *MapCache) Reset()

func (*MapCache) Size

func (c *MapCache) Size() int64

type Merge

type Merge struct {
	Table  *Table                  // WithTable
	FS     fileservice.FileService // WithFileService
	FSName string                  // WithFileServiceName, cooperate with FS

	// MaxFileSize 控制合并后最大文件大小, default: 128 MB
	MaxFileSize int64 // WithMaxFileSize
	// MaxMergeJobs 允许进行的Merge的任务个数,default: 16
	MaxMergeJobs int64 // WithMaxMergeJobs
	// MinFilesMerge 控制Merge最少合并文件个数,default:2
	//
	// Deprecated: useless in Merge all in one file
	MinFilesMerge int // WithMinFilesMerge
	// FileCacheSize 控制Merge 过程中, 允许缓存的文件大小,default: 32 MB
	FileCacheSize int64
	// contains filtered or unexported fields
}

Merge like a compaction, merge input files into one/two/... files.

  • `NewMergeService` init merge as service, with param `serviceInited` to avoid multi init.
  • `MergeTaskExecutorFactory` drive by Cron TaskService.
  • `NewMerge` handle merge obj init.
  • `Merge::Start` as service loop, trigger `Merge::Main` each cycle
  • `Merge::Main` handle handle job, 1. foreach account, build `rootPath` with tuple {account, date, Table } 2. call `Merge::doMergeFiles` with all files in `rootPath`, do merge job
  • `Merge::doMergeFiles` handle one job flow: read each file, merge in cache, write into file.

func NewMerge

func NewMerge(ctx context.Context, opts ...MergeOption) *Merge

func NewMergeService

func NewMergeService(ctx context.Context, opts ...MergeOption) (*Merge, bool)

func (*Merge) Main

func (m *Merge) Main(ts time.Time) error

Main handle cron job foreach all

func (*Merge) Start

func (m *Merge) Start(interval time.Duration)

Start for service Loop

func (*Merge) Stop

func (m *Merge) Stop()

Stop should call only once

type MergeLogType

type MergeLogType string
const MergeLogTypeALL MergeLogType = "*"
const MergeLogTypeLogs MergeLogType = "logs"
const MergeLogTypeMerged MergeLogType = "merged"

func (MergeLogType) String

func (t MergeLogType) String() string

type MergeOption

type MergeOption func(*Merge)

func WithFileService

func WithFileService(fs fileservice.FileService) MergeOption

func WithFileServiceName

func WithFileServiceName(name string) MergeOption

func WithMaxFileSize

func WithMaxFileSize(filesize int64) MergeOption

func WithMaxMergeJobs

func WithMaxMergeJobs(jobs int64) MergeOption

func WithMinFilesMerge

func WithMinFilesMerge(files int) MergeOption

func WithTable

func WithTable(tbl *Table) MergeOption

func (MergeOption) Apply

func (opt MergeOption) Apply(m *Merge)

type MetricLogPath

type MetricLogPath struct {
	// contains filtered or unexported fields
}

func NewMetricLogPath

func NewMetricLogPath(path string) *MetricLogPath

NewMetricLogPath

path like: sys/[log|merged]/yyyy/mm/dd/table/***.csv ## idx: 0 1 2 3 4 5 6 filename like: {timestamp}_{node_uuid}_{node_type}.csv ## or: {timestamp_start}_{timestamp_end}_merged.csv

func (*MetricLogPath) Parse

func (p *MetricLogPath) Parse() error

func (*MetricLogPath) Table

func (p *MetricLogPath) Table() string

func (*MetricLogPath) Timestamp

func (p *MetricLogPath) Timestamp() []string

type NoopTableOptions

type NoopTableOptions struct{}

func (NoopTableOptions) FormatDdl

func (o NoopTableOptions) FormatDdl(ddl string) string

func (NoopTableOptions) GetCreateOptions

func (o NoopTableOptions) GetCreateOptions() string

func (NoopTableOptions) GetTableOptions

func (o NoopTableOptions) GetTableOptions(PathBuilder) string

type PathBuilder

type PathBuilder interface {
	// Build directory path
	Build(account string, typ MergeLogType, ts time.Time, db string, name string) string
	// BuildETLPath return path for EXTERNAL table 'infile' options
	//
	// like: {account}/merged/*/*/*/{name}/*.csv
	BuildETLPath(db, name, account string) string
	// ParsePath
	//
	// switch path {
	// case "{timestamp_writedown}_{node_uuid}_{ndoe_type}.csv":
	// case "{timestamp_start}_{timestamp_end}_merged.csv"
	// }
	ParsePath(path string) (CSVPath, error)
	NewMergeFilename(timestampStart, timestampEnd string) string
	NewLogFilename(name, nodeUUID, nodeType string, ts time.Time) string
	// SupportMergeSplit const. if false, not support SCV merge|split task
	SupportMergeSplit() bool
	// SupportAccountStrategy const
	SupportAccountStrategy() bool
	// GetName const
	GetName() string
}

PathBuilder hold strategy to build filepath

func PathBuilderFactory

func PathBuilderFactory(pathBuilder string) PathBuilder

type Row

type Row struct {
	Table          *Table
	AccountIdx     int
	Columns        []string
	Name2ColumnIdx map[string]int
}

func (*Row) GetAccount

func (r *Row) GetAccount() string

GetAccount return r.Columns[r.AccountIdx] if r.AccountIdx >= 0 and r.Table.PathBuilder.SupportAccountStrategy, else return "sys"

func (*Row) ParseRow

func (r *Row) ParseRow(cols []string) error

func (*Row) PrimaryKey

func (r *Row) PrimaryKey() string

func (*Row) Reset

func (r *Row) Reset()

func (*Row) SetColumnVal

func (r *Row) SetColumnVal(col Column, val string)

func (*Row) SetFloat64

func (r *Row) SetFloat64(col string, val float64)

func (*Row) SetInt64

func (r *Row) SetInt64(col string, val int64)

func (*Row) SetVal

func (r *Row) SetVal(col string, val string)

func (*Row) Size

func (r *Row) Size() (size int64)

func (*Row) ToRawStrings

func (r *Row) ToRawStrings() []string

ToRawStrings not format

func (*Row) ToStrings

func (r *Row) ToStrings() []string

ToStrings output all column as string

type SliceCache

type SliceCache struct {
	// contains filtered or unexported fields
}

func (*SliceCache) Flush

func (c *SliceCache) Flush(writer CSVWriter) error

func (*SliceCache) IsEmpty

func (c *SliceCache) IsEmpty() bool

func (*SliceCache) Put

func (c *SliceCache) Put(r *Row)

func (*SliceCache) Reset

func (c *SliceCache) Reset()

func (*SliceCache) Size

func (c *SliceCache) Size() int64

type Table

type Table struct {
	Account          string
	Database         string
	Table            string
	Columns          []Column
	PrimaryKeyColumn []Column
	Engine           string
	Comment          string
	// PathBuilder help to desc param 'infile'
	PathBuilder PathBuilder
	// AccountColumn help to split data in account's filepath
	AccountColumn *Column
	// TableOptions default is nil, see GetTableOptions
	TableOptions TableOptions
	// SupportUserAccess default false. if true, user account can access.
	SupportUserAccess bool
}

func GetAllTable

func GetAllTable() []*Table

func RegisterTableDefine

func RegisterTableDefine(table *Table) *Table

RegisterTableDefine return old one, if already registered

func (*Table) Clone

func (tbl *Table) Clone() *Table

func (*Table) GetDatabase

func (tbl *Table) GetDatabase() string

func (*Table) GetIdentify

func (tbl *Table) GetIdentify() string

func (*Table) GetName

func (tbl *Table) GetName() string

func (*Table) GetRow

func (tbl *Table) GetRow() *Row

func (*Table) GetTableOptions

func (tbl *Table) GetTableOptions() TableOptions

func (*Table) NewRowCache

func (tbl *Table) NewRowCache() Cache

func (*Table) ToCreateSql

func (tbl *Table) ToCreateSql(ifNotExists bool) string

type TableOptions

type TableOptions interface {
	FormatDdl(ddl string) string
	// GetCreateOptions return option for `create {option}table`, which should end with ' '
	GetCreateOptions() string
	GetTableOptions(PathBuilder) string
}

type View

type View struct {
	Database    string
	Table       string
	OriginTable *Table
	Columns     []Column
	Condition   WhereCondition
	// SupportUserAccess default false. if true, user account can access.
	SupportUserAccess bool
}

func (*View) ToCreateSql

func (tbl *View) ToCreateSql(ifNotExists bool) string

type ViewOption

type ViewOption func(view *View)

func SupportUserAccess

func SupportUserAccess(support bool) ViewOption

func WithColumn

func WithColumn(c Column) ViewOption

func (ViewOption) Apply

func (opt ViewOption) Apply(view *View)

type ViewSingleCondition

type ViewSingleCondition struct {
	Column Column
	Table  string
}

func (*ViewSingleCondition) String

func (tbl *ViewSingleCondition) String() string

type WhereCondition

type WhereCondition interface {
	String() string
}

Directories

Path Synopsis
Bin to show how Merge Task work.
Bin to show how Merge Task work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL