export

package
v1.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const BatchReadRows = 4000

BatchReadRows ~= 20MB rawlog file has about 3700+ rows

View Source
const LoggerNameContentReader = "ETLContentReader"
View Source
const LoggerNameETLMerge = "ETLMerge"
View Source
const LoggerNameMOCollector = "MOCollector"
View Source
const MAX_MERGE_INSERT_TIME = 10 * time.Second
View Source
const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
View Source
const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
View Source
const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
View Source
const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
View Source
const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
View Source
const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
View Source
const MergeTaskCronExprEveryMin = "0 * * * * *"
View Source
const MergeTaskCronExprYesterday = "0 5 0 * * *"
View Source
const MergeTaskToday = "today"
View Source
const MergeTaskYesterday = "yesterday"
View Source
const ParamSeparator = " "

Variables

View Source
var ETLMergeTaskPool *mpool.MPool
View Source
var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour

MergeTaskCronExpr support sec level

Functions

func CreateCronTask

func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error

func GetWriterFactory added in v0.7.0

func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, enableSqlWriter bool) table.WriterFactory

func InitCronExpr

func InitCronExpr(ctx context.Context, duration time.Duration) error

InitCronExpr support min interval 5 min, max 12 hour

func LongRunETLMerge added in v0.8.0

func LongRunETLMerge(ctx context.Context, task task.Task, logger *log.MOLogger, opts ...MergeOption) error

func MergeTaskExecutorFactory

func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error

func MergeTaskMetadata

func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata

MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"

func SubStringPrefixLimit added in v0.7.0

func SubStringPrefixLimit(str string, length int) string

Types

type Cache

type Cache interface {
	Put(*table.Row)
	Size() int64
	Flush(*table.Table) error
	Reset()
	IsEmpty() bool
}

type ContentReader

type ContentReader struct {
	// contains filtered or unexported fields
}

func NewContentReader

func NewContentReader(ctx context.Context, path string, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader

func (*ContentReader) Close

func (s *ContentReader) Close()

func (*ContentReader) ReadLine

func (s *ContentReader) ReadLine() ([]string, error)

func (*ContentReader) ReadRow added in v0.7.0

func (s *ContentReader) ReadRow(row *table.Row) error

type ETLReader added in v0.7.0

type ETLReader interface {
	// ReadRow read one line as table.Row
	ReadRow(row *table.Row) error
	// ReadLine read raw data from file.
	ReadLine() ([]string, error)
	// Close files and release all content.
	Close()
}

func NewCSVReader

func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (ETLReader, error)

NewCSVReader create new csv reader. success case return: ok_reader, nil error failed case return: nil_reader, error

type ETLWriter added in v0.7.0

type ETLWriter interface {
	// WriteRow write table.Row as one line info file.
	WriteRow(row *table.Row) error
	// WriteStrings write record as one line into file.
	WriteStrings(record []string) error
	// FlushAndClose flush its buffer and close the writer.
	FlushAndClose() (int, error)
}

ETLWriter handle serialize logic, like csv file and tae file.

type FileMeta added in v0.7.0

type FileMeta struct {
	FilePath string
	FileSize int64
}

type MOCollector

type MOCollector struct {
	motrace.BatchProcessor
	// contains filtered or unexported fields
}

MOCollector handle all bufferPipe

func NewMOCollector

func NewMOCollector(ctx context.Context, opts ...MOCollectorOption) *MOCollector

func (*MOCollector) Collect

func (c *MOCollector) Collect(ctx context.Context, item batchpipe.HasName) error

Collect item in chan, if collector is stopped then return error

func (*MOCollector) DiscardableCollect added in v1.0.0

func (c *MOCollector) DiscardableCollect(ctx context.Context, item batchpipe.HasName) error

DiscardableCollect implements motrace.DiscardableCollector cooperate with logutil.Discardable() field

func (*MOCollector) Register added in v0.7.0

func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)

func (*MOCollector) Start

func (c *MOCollector) Start() bool

Start all goroutine worker, including collector, generator, and exporter

func (*MOCollector) Stop

func (c *MOCollector) Stop(graceful bool) error

type MOCollectorOption added in v0.7.0

type MOCollectorOption func(*MOCollector)

func WithCollectorCnt added in v0.7.0

func WithCollectorCnt(cnt int) MOCollectorOption

func WithExporterCnt added in v0.7.0

func WithExporterCnt(cnt int) MOCollectorOption

func WithGeneratorCnt added in v0.7.0

func WithGeneratorCnt(cnt int) MOCollectorOption

func WithOBCollectorConfig added in v0.8.0

func WithOBCollectorConfig(cfg *config.OBCollectorConfig) MOCollectorOption

type Merge

type Merge struct {

	// MaxFileSize the total filesize to trigger doMergeFiles(),default: 32 MB
	// Deprecated
	MaxFileSize int64 // set by WithMaxFileSize
	// MaxMergeJobs 允许进行的 Merge 的任务个数,default: 1
	MaxMergeJobs int64 // set by WithMaxMergeJobs
	// contains filtered or unexported fields
}

Merge like a compaction, merge input files into one/two/... files. - NewMergeService init merge as service, with serviceInited to avoid multi init. - MergeTaskExecutorFactory drive by Cron TaskService. - NewMerge handle merge obj init. - Merge.Start() as service loop, trigger Merge.Main() - Merge.Main() handle main job.

  1. foreach account, build `rootPath` with tuple {account, date, Table }
  2. call Merge.doMergeFiles() with all files in `rootPath`, do merge job

- Merge.doMergeFiles handle one job flow: read each file, merge in cache, write into file.

func NewMerge

func NewMerge(ctx context.Context, opts ...MergeOption) (*Merge, error)

func NewMergeService

func NewMergeService(ctx context.Context, opts ...MergeOption) (*Merge, bool, error)

func (*Merge) Main

func (m *Merge) Main(ctx context.Context) error

Main do list all accounts, all dates which belong to m.table.GetName()

func (*Merge) Start

func (m *Merge) Start(ctx context.Context, interval time.Duration)

Start for service Loop

func (*Merge) Stop

func (m *Merge) Stop()

Stop should call only once

type MergeOption

type MergeOption func(*Merge)

func WithFileService

func WithFileService(fs fileservice.FileService) MergeOption

func WithMaxFileSize

func WithMaxFileSize(filesize int64) MergeOption

func WithMaxMergeJobs

func WithMaxMergeJobs(jobs int64) MergeOption

func WithTable

func WithTable(tbl *table.Table) MergeOption

func WithTask added in v1.0.0

func WithTask(task task.Task) MergeOption

func (MergeOption) Apply

func (opt MergeOption) Apply(m *Merge)

type PipeImplHolder added in v0.7.0

type PipeImplHolder struct {
	// contains filtered or unexported fields
}

func (*PipeImplHolder) Get added in v0.7.0

func (h *PipeImplHolder) Get(name string) (motrace.PipeImpl, bool)

func (*PipeImplHolder) Put added in v0.7.0

func (h *PipeImplHolder) Put(name string, impl motrace.PipeImpl) bool

func (*PipeImplHolder) Size added in v0.7.0

func (h *PipeImplHolder) Size() int

type SliceCache

type SliceCache struct {
	// contains filtered or unexported fields
}

func (*SliceCache) Flush

func (c *SliceCache) Flush(tbl *table.Table) error

func (*SliceCache) IsEmpty

func (c *SliceCache) IsEmpty() bool

func (*SliceCache) Put

func (c *SliceCache) Put(r *table.Row)

func (*SliceCache) Reset

func (c *SliceCache) Reset()

func (*SliceCache) Size

func (c *SliceCache) Size() int64

Directories

Path Synopsis
etl
db
Bin to show how Merge Task work.
Bin to show how Merge Task work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL