README
¶
fanout
import "github.com/ccheers/xpkg/sync/pipeline/fanout"
Example
package main
import "context"
// addCache 加缓存的例子
func addCache(c context.Context, id, value int) {
// some thing...
}
func main() {
// 这里只是举个例子 真正使用的时候 应该用bm/rpc 传过来的context
c := context.Background()
// 新建一个fanout 对象 名称为cache 名称主要用来上报监控和打日志使用 最好不要重复
// (可选参数) worker数量为1 表示后台只有1个线程在工作
// (可选参数) buffer 为1024 表示缓存chan长度为1024 如果chan慢了 再调用Do方法就会报错 设定长度主要为了防止OOM
cache := New("cache", Worker(1), Buffer(1024))
// 需要异步执行的方法
// 这里传进来的c里面的meta信息会被复制 超时会忽略 addCache拿到的context已经没有超时信息了
cache.Do(c, func(c context.Context) { addCache(c, 0, 0) })
// 程序结束的时候关闭fanout 会等待后台线程完成后返回
cache.Close()
}
Index
Variables
ErrFull chan full.
var ErrFull = errors.New("fanout: chan full")
type Fanout
Fanout async consume data from chan.
type Fanout struct {
// contains filtered or unexported fields
}
func New
func New(name string, opts ...Option) *Fanout
New new a fanout struct.
func (*Fanout) Close
func (c *Fanout) Close() error
Close close fanout
func (*Fanout) Do
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error)
Do save a callback func.
type Option
Option fanout option
type Option func(*options)
func Buffer
func Buffer(n int) Option
Buffer specifies the buffer of fanout
func Worker
func Worker(n int) Option
Worker specifies the worker of fanout
Generated by gomarkdoc
Documentation
¶
Overview ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrFull = errors.New("fanout: chan full")
ErrFull chan full.
Functions ¶
This section is empty.
Types ¶
Click to show internal directories.
Click to hide internal directories.