workflow

package
v1.23.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultGroup = "workflow-group"
)
View Source
const (
	DefaultTaskTimeout = 5 * time.Minute
)

Variables

This section is empty.

Functions

func ArgsOf

func ArgsOf(args ...interface{}) []interface{}

func IdentityKeyOfFunction

func IdentityKeyOfFunction(fun interface{}) string

func ValueFromConetxt

func ValueFromConetxt(ctx context.Context, key string) string

func WithValues

func WithValues(ctx context.Context, kvs map[string]string) context.Context

Types

type Backend

type Backend interface {
	// 队列
	Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
	// 这里的sub要求多个消费者共享同一个topic下的数据,且无重复。
	Pub(ctx context.Context, name string, key string, val []byte) error

	// kv存储
	Get(ctx context.Context, key string) ([]byte, error)
	Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
	Del(ctx context.Context, key string) error
	List(ctx context.Context, keyprefix string) (map[string][]byte, error)
	Watch(ctx context.Context, key string, onchange OnChangeFunc) error
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(options *Options) *Client

func NewClientFromBackend

func NewClientFromBackend(backend Backend) *Client

func NewClientFromRedisClient

func NewClientFromRedisClient(cli *redis.Client) *Client

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context, group, name string) ([]Task, error)

func (*Client) RemoveTask

func (c *Client) RemoveTask(ctx context.Context, group, name string, uid string) error

func (*Client) SubmitCronTask

func (c *Client) SubmitCronTask(ctx context.Context, task Task, crontabexp string) error

func (*Client) SubmitTask

func (c *Client) SubmitTask(ctx context.Context, task Task) error

func (*Client) WatchTasks

func (c *Client) WatchTasks(ctx context.Context, group, name string, onchange func(ctx context.Context, task *Task) error) error

type OnChangeFunc

type OnChangeFunc func(ctx context.Context, key string, val []byte) error

type Options

type Options struct {
	Addr     string `json:"addr,omitempty"`
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

type RedisBackend

type RedisBackend struct {
	// contains filtered or unexported fields
}

func NewRedisBackend

func NewRedisBackend(addr, username, password string) *RedisBackend

func NewRedisBackendFromClient

func NewRedisBackendFromClient(c *redis.Client) *RedisBackend

func (*RedisBackend) Del

func (b *RedisBackend) Del(ctx context.Context, key string) error

func (*RedisBackend) Get

func (b *RedisBackend) Get(ctx context.Context, key string) ([]byte, error)

func (*RedisBackend) List

func (b *RedisBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)

func (*RedisBackend) Pub

func (b *RedisBackend) Pub(ctx context.Context, name string, key string, val []byte) error

func (*RedisBackend) Put

func (b *RedisBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error

kv存储

func (*RedisBackend) Sub

func (b *RedisBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error

队列

func (*RedisBackend) Watch

func (b *RedisBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error

type RuntimeValuesContext

type RuntimeValuesContext struct {
	// contains filtered or unexported fields
}

func (*RuntimeValuesContext) Deadline

func (c *RuntimeValuesContext) Deadline() (deadline time.Time, ok bool)

func (*RuntimeValuesContext) Done

func (c *RuntimeValuesContext) Done() <-chan struct{}

func (*RuntimeValuesContext) Err

func (c *RuntimeValuesContext) Err() error

func (*RuntimeValuesContext) Value

func (c *RuntimeValuesContext) Value(key interface{}) interface{}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(options *Options) (*Server, error)

func NewServerFromBackend

func NewServerFromBackend(backend Backend) *Server

func NewServerFromRedisClient

func NewServerFromRedisClient(cli *redis.Client) *Server

func (*Server) NewClient

func (s *Server) NewClient(ctx context.Context) *Client

func (*Server) Register

func (n *Server) Register(name string, fun interface{}) error

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

type Step

type Step struct {
	Name     string        `json:"name,omitempty"`
	Function string        `json:"function,omitempty"` // 任务所使用的 函数/组件/插件
	Args     []interface{} `json:"args,omitempty"`     // 对应的参数
	SubSteps []Step        `json:"subSteps,omitempty"` // 子任务
	Status   *TaskStatus   `json:"status,omitempty"`
}

type SubOption

type SubOption func(o *SubOptions)

func WithAutoACK

func WithAutoACK(ack bool) SubOption

func WithConcurrency

func WithConcurrency(con int) SubOption

type SubOptions

type SubOptions struct {
	AutoACK     bool // 自动确认,无论结果是否为 error
	Concurrency int  // 支持的并发数量
}

type Task

type Task struct {
	UID               string            `json:"uid,omitempty"`
	Name              string            `json:"name,omitempty"`  // 任务名称,例如 更新镜像,同步数据等。
	Group             string            `json:"group,omitempty"` // 任务类型分组
	Steps             []Step            `json:"steps,omitempty"`
	CreationTimestamp metav1.Time       `json:"creationTimestamp,omitempty"`
	Addtionals        map[string]string `json:"addtionals,omitempty"` // 额外信息
	Status            *TaskStatus       `json:"status,omitempty"`
}

type TaskStatus

type TaskStatus struct {
	StartTimestamp  metav1.Time    `json:"startTimestamp,omitempty"`
	FinishTimestamp metav1.Time    `json:"finishTimestamp,omitempty"`
	Status          TaskStatusCode `json:"status,omitempty"`
	Result          []interface{}  `json:"result,omitempty"`
	Executer        string         `json:"executer,omitempty"`
	Message         string         `json:"message,omitempty"`
}

type TaskStatusCode

type TaskStatusCode string
const (
	TaskStatusPending TaskStatusCode = "Pending"
	TaskStatusRunning TaskStatusCode = "Running"
	TaskStatusSuccess TaskStatusCode = "Success"
	TaskStatusError   TaskStatusCode = "Error"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL