DAGFlow
a DAG task engine based on asynq
QuickStart
server side
- prepare asynq server mux
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
},
)
mux := asynq.NewServeMux()
please refer to asynq's doc to figure out more config options.
- create dagflow service
svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: "127.0.0.1:6379",
Expire: 120,
},
Store: types.StoreConfig{
Type: "redis",
},
}, nil)
if err != nil {
log.Fatal("failed to create service", err)
}
- create a flow object and register it to dagflow service
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
// for complex dag, you can use RegisterFlowsWithDefinitor
svc.RegisterFlows(mux, f)
- start asynq server
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
client side
- create dagflow service, same as step 2 in server side
svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: "127.0.0.1:6379",
Expire: 120,
},
Store: types.StoreConfig{
Type: "redis",
},
}, nil)
if err != nil {
log.Fatal("failed to create service", err)
}
- create a flow object and register it to dagflow service, same as step 3 in server side except
mux
should be set to nil
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
// for complex dag, you can use RegisterFlowsWithDefinitor
svc.RegisterFlows(nil, f)
- submit dagflow tasks
svc.Submit("f1", []byte(`1`))
DAG
single node DAG
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
complex DAG
func prepareFlow(f *flow.Flow) error {
if err := f.Node("l1n1", incOp); err != nil {
return err
}
if err := f.Node("l2n1", incOp); err != nil {
return err
}
if err := f.Node("l2n2", decOp); err != nil {
return err
}
if err := f.Node("l3n1", mulOp, flow.WithAggregator(func (dataMap map[string][]byte) ([]byte, error) {
l2n1Result := dataMap["l2n1"]
l2n2Result := dataMap["l2n2"]
// do anything you want to construct input data for node l3n1
})); err != nil {
return err
}
if err := f.Edge("l1n1", "l2n1"); err != nil {
return err
}
if err := f.Edge("l1n1", "l2n2"); err != nil {
return err
}
if err := f.Edge("l2n1", "l3n1"); err != nil {
return err
}
if err := f.Edge("l2n2", "l3n1"); err != nil {
return err
}
return nil
}
SwitchNode
SwitchNode is a special type node which works like switch case statment in golang
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.SwitchNode("n1", func(data []byte) string {
return "+"
}, map[string]flow.NodeFunc{
"+": incOp,
"-": decOp,
}); err != nil {
log.Fatal("failed to create node", err)
}