export

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const BatchReadRows = 4000

BatchReadRows ~= 20MB rawlog file has about 3700+ rows

View Source
const LoggerNameContentReader = "ETLContentReader"
View Source
const LoggerNameETLMerge = "ETLMerge"
View Source
const MergeTaskCronExprEvery05Min = "0 */5 * * * *"
View Source
const MergeTaskCronExprEvery15Min = "0 */15 * * * *"
View Source
const MergeTaskCronExprEvery15Sec = "*/15 * * * * *"
View Source
const MergeTaskCronExprEvery1Hour = "0 0 */1 * * *"
View Source
const MergeTaskCronExprEvery2Hour = "0 0 */2 * * *"
View Source
const MergeTaskCronExprEvery4Hour = "0 0 4,8,12,16,20 * * *"
View Source
const MergeTaskCronExprYesterday = "0 5 0 * * *"
View Source
const MergeTaskToday = "today"
View Source
const MergeTaskYesterday = "yesterday"
View Source
const ParamSeparator = " "

Variables

View Source
var MergeTaskCronExpr = MergeTaskCronExprEvery4Hour

MergeTaskCronExpr support sec level

Functions

func CreateCronTask

func CreateCronTask(ctx context.Context, executorID task.TaskCode, taskService taskservice.TaskService) error

func GetWriterFactory added in v0.7.0

func GetWriterFactory(fs fileservice.FileService, nodeUUID, nodeType string, ext string) (factory table.WriterFactory)

func InitCronExpr

func InitCronExpr(ctx context.Context, duration time.Duration) error

InitCronExpr support min interval 5 min, max 12 hour

func MergeTaskExecutorFactory

func MergeTaskExecutorFactory(opts ...MergeOption) func(ctx context.Context, task task.Task) error

func MergeTaskMetadata

func MergeTaskMetadata(id task.TaskCode, args ...string) task.TaskMetadata

MergeTaskMetadata handle args like: "{db_tbl_name} [date, default: today]"

func SubStringPrefixLimit added in v0.7.0

func SubStringPrefixLimit(str string, length int) string

Types

type Cache

type Cache interface {
	Put(*table.Row)
	Size() int64
	Flush(ETLWriter) error
	Reset()
	IsEmpty() bool
}

type ContentReader

type ContentReader struct {
	// contains filtered or unexported fields
}

func NewContentReader

func NewContentReader(ctx context.Context, path string, reader *simdcsv.Reader, raw io.ReadCloser) *ContentReader

func (*ContentReader) Close

func (s *ContentReader) Close()

func (*ContentReader) ReadLine

func (s *ContentReader) ReadLine() ([]string, error)

func (*ContentReader) ReadRow added in v0.7.0

func (s *ContentReader) ReadRow(row *table.Row) error

type ContentWriter

type ContentWriter struct {
	// contains filtered or unexported fields
}

func NewContentWriter

func NewContentWriter(writer io.StringWriter, buffer []byte) *ContentWriter

func (*ContentWriter) FlushAndClose

func (w *ContentWriter) FlushAndClose() (int, error)

func (*ContentWriter) WriteRow added in v0.7.0

func (w *ContentWriter) WriteRow(row *table.Row) error

func (*ContentWriter) WriteStrings

func (w *ContentWriter) WriteStrings(record []string) error

type ETLReader added in v0.7.0

type ETLReader interface {
	// ReadRow read one line as table.Row
	ReadRow(row *table.Row) error
	// ReadLine read raw data from file.
	ReadLine() ([]string, error)
	// Close files and release all content.
	Close()
}

func NewCSVReader

func NewCSVReader(ctx context.Context, fs fileservice.FileService, path string) (ETLReader, error)

NewCSVReader create new csv reader. success case return: ok_reader, nil error failed case return: nil_reader, error

type ETLWriter added in v0.7.0

type ETLWriter interface {
	// WriteRow write table.Row as one line info file.
	WriteRow(row *table.Row) error
	// WriteStrings write record as one line into file.
	WriteStrings(record []string) error
	// FlushAndClose flush its buffer and close the writer.
	FlushAndClose() (int, error)
}

ETLWriter handle serialize logic, like csv file and tae file.

type FileMeta added in v0.7.0

type FileMeta struct {
	FilePath string
	FileSize int64
}

type MOCollector

type MOCollector struct {
	motrace.BatchProcessor
	// contains filtered or unexported fields
}

MOCollector handle all bufferPipe

func NewMOCollector

func NewMOCollector(ctx context.Context, opts ...MOCollectorOption) *MOCollector

func (*MOCollector) Collect

func (c *MOCollector) Collect(ctx context.Context, item batchpipe.HasName) error

Collect item in chan, if collector is stopped then return error

func (*MOCollector) Register added in v0.7.0

func (c *MOCollector) Register(name batchpipe.HasName, impl motrace.PipeImpl)

func (*MOCollector) Start

func (c *MOCollector) Start() bool

Start all goroutine worker, including collector, generator, and exporter

func (*MOCollector) Stop

func (c *MOCollector) Stop(graceful bool) error

type MOCollectorOption added in v0.7.0

type MOCollectorOption func(*MOCollector)

func WithCollectorCnt added in v0.7.0

func WithCollectorCnt(cnt int) MOCollectorOption

func WithExporterCnt added in v0.7.0

func WithExporterCnt(cnt int) MOCollectorOption

func WithGeneratorCnt added in v0.7.0

func WithGeneratorCnt(cnt int) MOCollectorOption

type MapCache

type MapCache struct {
	// contains filtered or unexported fields
}

func (*MapCache) Flush

func (c *MapCache) Flush(writer ETLWriter) error

Flush will do Reset

func (*MapCache) IsEmpty

func (c *MapCache) IsEmpty() bool

func (*MapCache) Put

func (c *MapCache) Put(r *table.Row)

func (*MapCache) Reset

func (c *MapCache) Reset()

func (*MapCache) Size

func (c *MapCache) Size() int64

type Merge

type Merge struct {
	Table  *table.Table            // WithTable
	FS     fileservice.FileService // WithFileService
	FSName string                  // WithFileServiceName, cooperate with FS

	// MaxFileSize 控制合并后最大文件大小,default: 128 MB
	MaxFileSize int64 // WithMaxFileSize
	// MaxMergeJobs 允许进行的 Merge 的任务个数,default: 16
	MaxMergeJobs int64 // WithMaxMergeJobs
	// MinFilesMerge 控制 Merge 最少合并文件个数,default:2
	//
	// Deprecated: useless in Merge all in one file
	MinFilesMerge int // WithMinFilesMerge
	// FileCacheSize 控制 Merge 过程中,允许缓存的文件大小,default: 32 MB
	FileCacheSize int64
	// contains filtered or unexported fields
}

Merge like a compaction, merge input files into one/two/... files.

  • `NewMergeService` init merge as service, with param `serviceInited` to avoid multi init.
  • `MergeTaskExecutorFactory` drive by Cron TaskService.
  • `NewMerge` handle merge obj init.
  • `Merge::Start` as service loop, trigger `Merge::Main` each cycle
  • `Merge::Main` handle handle job, 1. foreach account, build `rootPath` with tuple {account, date, Table } 2. call `Merge::doMergeFiles` with all files in `rootPath`, do merge job
  • `Merge::doMergeFiles` handle one job flow: read each file, merge in cache, write into file.

func NewMerge

func NewMerge(ctx context.Context, opts ...MergeOption) (*Merge, error)

func NewMergeService

func NewMergeService(ctx context.Context, opts ...MergeOption) (*Merge, bool, error)

func (*Merge) Main

func (m *Merge) Main(ctx context.Context, ts time.Time) error

Main handle cron job foreach all

func (*Merge) Start

func (m *Merge) Start(ctx context.Context, interval time.Duration)

Start for service Loop

func (*Merge) Stop

func (m *Merge) Stop()

Stop should call only once

type MergeOption

type MergeOption func(*Merge)

func WithFileService

func WithFileService(fs fileservice.FileService) MergeOption

func WithFileServiceName

func WithFileServiceName(name string) MergeOption

func WithMaxFileSize

func WithMaxFileSize(filesize int64) MergeOption

func WithMaxMergeJobs

func WithMaxMergeJobs(jobs int64) MergeOption

func WithMinFilesMerge

func WithMinFilesMerge(files int) MergeOption

func WithTable

func WithTable(tbl *table.Table) MergeOption

func (MergeOption) Apply

func (opt MergeOption) Apply(m *Merge)

type PipeImplHolder added in v0.7.0

type PipeImplHolder struct {
	// contains filtered or unexported fields
}

func (*PipeImplHolder) Get added in v0.7.0

func (h *PipeImplHolder) Get(name string) (motrace.PipeImpl, bool)

func (*PipeImplHolder) Put added in v0.7.0

func (h *PipeImplHolder) Put(name string, impl motrace.PipeImpl) bool

func (*PipeImplHolder) Size added in v0.7.0

func (h *PipeImplHolder) Size() int

type SliceCache

type SliceCache struct {
	// contains filtered or unexported fields
}

func (*SliceCache) Flush

func (c *SliceCache) Flush(writer ETLWriter) error

func (*SliceCache) IsEmpty

func (c *SliceCache) IsEmpty() bool

func (*SliceCache) Put

func (c *SliceCache) Put(r *table.Row)

func (*SliceCache) Reset

func (c *SliceCache) Reset()

func (*SliceCache) Size

func (c *SliceCache) Size() int64

Directories

Path Synopsis
Bin to show how Merge Task work.
Bin to show how Merge Task work.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL