pipeline
import "github.com/ccheers/xpkg/sync/pipeline"
Index
Variables
ErrFull channel full error
var ErrFull = errors.New("channel full")
type Aggregation
Aggregation pipeline struct
type Aggregation struct {
Do func(c context.Context, index int, values map[string][]interface{})
Split func(key string) int
// contains filtered or unexported fields
}
func NewPipeline
func NewPipeline(config *Config) (res *Aggregation)
NewPipeline new pipline
func (*Aggregation) Add
func (p *Aggregation) Add(c context.Context, key string, value interface{}) (err error)
Add async add a value to channal, channel shard in split method
func (*Aggregation) Close
func (p *Aggregation) Close() (err error)
Close all goroutinue
func (*Aggregation) Start
func (p *Aggregation) Start()
Start start all mergeproc
func (*Aggregation) SyncAdd
func (p *Aggregation) SyncAdd(c context.Context, key string, value interface{}) (err error)
SyncAdd sync add a value to channal, channel shard in split method
type Config
Config Aggregation config
type Config struct {
// MaxSize merge size
MaxSize int
// Interval merge interval
Interval xtime.Duration
// Buffer channel size
Buffer int
// Worker channel number
Worker int
// Name use for metrics
Name string
}
Generated by gomarkdoc