task

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Ring

type Ring struct {
	// contains filtered or unexported fields
}

func (*Ring) ForceBatchOrShard

func (ring *Ring) ForceBatchOrShard(_ interface{})

func (*Ring) MakeRoom

func (ring *Ring) MakeRoom(newMsg *model.InputMessage)

func (*Ring) PutElem

func (ring *Ring) PutElem(msgRow model.MsgRow)

func (*Ring) PutMsgNolock

func (ring *Ring) PutMsgNolock(msg *model.InputMessage)

assumes ring.mux is locked, and msg.Offset is in range [ring.ringGroundOff, ring.ringGroundOff+ring.ringCap)

func (*Ring) QuitIdle

func (ring *Ring) QuitIdle()

assumes ring.mux is locked

type Service

type Service struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TaskService holds the configuration for each task

func NewTaskService

func NewTaskService(cfg *config.Config, taskCfg *config.TaskConfig) (service *Service)

NewTaskService creates an instance of new tasks with kafka, clickhouse and paser instances

func (*Service) Flush

func (service *Service) Flush(batch *model.Batch) (err error)

func (*Service) Init

func (service *Service) Init() (err error)

Init initializes the kafka and clickhouse task associated with this service

func (*Service) Run

func (service *Service) Run()

Run starts the task

func (*Service) Stop

func (service *Service) Stop()

Stop stop kafka and clickhouse client. This is blocking.

type Sharder

type Sharder struct {
	// contains filtered or unexported fields
}

func NewSharder

func NewSharder(service *Service) (sh *Sharder, err error)

func (*Sharder) Calc

func (sh *Sharder) Calc(row *model.Row) (int, error)

func (*Sharder) ForceFlush

func (sh *Sharder) ForceFlush(arg interface{})

func (*Sharder) PutElems

func (sh *Sharder) PutElems(partition int, ringBuf []model.MsgRow, begOff, endOff, ringCapMask int64)

type ShardingPolicy

type ShardingPolicy struct {
	// contains filtered or unexported fields
}

func NewShardingPolicy

func NewShardingPolicy(shardingKey string, shardingStripe uint64, dims []*model.ColumnWithType, shards int) (policy *ShardingPolicy, err error)

func (*ShardingPolicy) Calc

func (policy *ShardingPolicy) Calc(row *model.Row) (shard int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL