stream

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EmptyGroupKey = ""
	EmptyTagValue = ""
)

Variables

View Source
var (
	FlushParallelMinRowNum = 10000
	ErrEmptyCache          = errors.New("empty window cache")
)

Functions

This section is empty.

Types

type BaseTask added in v1.2.0

type BaseTask struct {
	Logger Logger
	// contains filtered or unexported fields
}

type CacheRow

type CacheRow struct {
	// contains filtered or unexported fields
}

type CacheRowPool

type CacheRowPool struct {
	// contains filtered or unexported fields
}

func NewCacheRowPool

func NewCacheRowPool() *CacheRowPool

func (*CacheRowPool) Get

func (p *CacheRowPool) Get() *CacheRow

func (*CacheRowPool) Len

func (p *CacheRowPool) Len() int64

func (*CacheRowPool) Put

func (p *CacheRowPool) Put(r *CacheRow)

func (*CacheRowPool) Size

func (p *CacheRowPool) Size() int64

type Engine

type Engine interface {
	WriteRows(db, rp string, ptId uint32, shardID uint64, streamIdDstShardIdMap map[uint64]uint64, ww WritePointsWorkIF)
	RegisterTask(info *meta2.StreamInfo, fieldCalls []*streamLib.FieldCall) error
	Drain()
	DeleteTask(id uint64)
	Run()
	Close()
}

func NewStream

func NewStream(store Storage, Logger Logger, cli MetaClient, conf stream.Config) (Engine, error)

type Logger

type Logger interface {
	Error(msg string, fields ...zap.Field)
	Info(msg string, fields ...zap.Field)
	Debug(msg string, fields ...zap.Field)
}

type MetaClient

type MetaClient interface {
	GetStreamInfosStore() map[string]*meta2.StreamInfo
	GetMeasurementInfoStore(dbName string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
}

type RowsPool

type RowsPool struct {
	// contains filtered or unexported fields
}

func NewRowsPool

func NewRowsPool() *RowsPool

func (*RowsPool) Get

func (p *RowsPool) Get() *[]influx.Row

func (*RowsPool) Put

func (p *RowsPool) Put(r *[]influx.Row)

type Storage

type Storage interface {
	WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte) error
}

type Stream

type Stream struct {
	Logger Logger
	// contains filtered or unexported fields
}

func (*Stream) Close

func (s *Stream) Close()

Close shutdown

func (*Stream) DeleteTask

func (s *Stream) DeleteTask(id uint64)

func (*Stream) Drain

func (s *Stream) Drain()

Drain is for test case, to check whether there exist resource leakage

func (*Stream) RegisterTask

func (s *Stream) RegisterTask(info *meta2.StreamInfo, fieldCalls []*streamLib.FieldCall) error

func (*Stream) Run

func (s *Stream) Run()

func (*Stream) WriteRows

func (s *Stream) WriteRows(db, rp string, ptId uint32, shardID uint64, streamIdDstShardIdMap map[uint64]uint64,
	ww WritePointsWorkIF,
)

type TagTask added in v1.2.0

type TagTask struct {
	*WindowDataPool

	*BaseTask
	// contains filtered or unexported fields
}

func (*TagTask) Drain added in v1.2.0

func (s *TagTask) Drain()

func (*TagTask) Put added in v1.2.0

func (s *TagTask) Put(r *WindowCache)

func (*TagTask) UnCompressDictKeyUint added in v1.2.0

func (s *TagTask) UnCompressDictKeyUint(key uint64) (string, error)

UnCompressDictKeyUint corpusIndexes array not need lock

func (*TagTask) WriteRowsToShard added in v1.2.0

func (s *TagTask) WriteRowsToShard(start, end int) error

type Task

type Task interface {
	Drain()

	Put(r *WindowCache)
	// contains filtered or unexported methods
}

type TimeTask added in v1.2.0

type TimeTask struct {
	*WindowDataPool

	*BaseTask
	// contains filtered or unexported fields
}

func (*TimeTask) Drain added in v1.2.0

func (s *TimeTask) Drain()

func (*TimeTask) Put added in v1.2.0

func (s *TimeTask) Put(r *WindowCache)

func (*TimeTask) WriteRowsToShard added in v1.2.0

func (s *TimeTask) WriteRowsToShard() error

type WindowCache

type WindowCache struct {
	// contains filtered or unexported fields
}

type WindowCachePool

type WindowCachePool struct {
	// contains filtered or unexported fields
}

func NewWindowCachePool

func NewWindowCachePool() *WindowCachePool

func (*WindowCachePool) Count

func (p *WindowCachePool) Count() int64

func (*WindowCachePool) Get

func (p *WindowCachePool) Get() *WindowCache

func (*WindowCachePool) Put

func (p *WindowCachePool) Put(r *WindowCache)

type WindowDataPool

type WindowDataPool struct {
	// contains filtered or unexported fields
}

func NewWindowDataPool

func NewWindowDataPool() *WindowDataPool

func (*WindowDataPool) Get

func (p *WindowDataPool) Get() *WindowCache

func (*WindowDataPool) IncreaseChan

func (p *WindowDataPool) IncreaseChan()

func (*WindowDataPool) Len

func (p *WindowDataPool) Len() int64

func (*WindowDataPool) Put

func (p *WindowDataPool) Put(cache *WindowCache)

type WritePointsWorkIF added in v1.1.0

type WritePointsWorkIF interface {
	GetRows() []influx.Row
	PutWritePointsWork()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL