pipeline

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

README

pkg/sync/pipeline

提供内存批量聚合工具

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrFull = errors.New("channel full")

ErrFull channel full error

Functions

This section is empty.

Types

type Aggregation

type Aggregation struct {
	Do    func(c context.Context, index int, values map[string][]interface{})
	Split func(key string) int
	// contains filtered or unexported fields
}

Aggregation pipeline struct

func NewPipeline

func NewPipeline(config *Config) (res *Aggregation)

NewPipeline new pipline

func (*Aggregation) Add

func (p *Aggregation) Add(c context.Context, key string, value interface{}) (err error)

Add async add a value to channal, channel shard in split method

func (*Aggregation) Close

func (p *Aggregation) Close() (err error)

Close all goroutinue

func (*Aggregation) Start

func (p *Aggregation) Start()

Start start all mergeproc

func (*Aggregation) SyncAdd

func (p *Aggregation) SyncAdd(c context.Context, key string, value interface{}) (err error)

SyncAdd sync add a value to channal, channel shard in split method

type Config

type Config struct {
	// MaxSize merge size
	MaxSize int
	// Interval merge interval
	Interval xtime.Duration
	// Buffer channel size
	Buffer int
	// Worker channel number
	Worker int
	// Name use for metrics
	Name string
}

Config Aggregation config

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL