workflow

package
v1.24.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultGroup = "workflow-group"
)
View Source
const (
	DefaultTaskTimeout = 5 * time.Minute
)

Variables

This section is empty.

Functions

func ArgsOf

func ArgsOf(args ...interface{}) []interface{}

func IdentityKeyOfFunction

func IdentityKeyOfFunction(fun interface{}) string

func ValueFromConetxt

func ValueFromConetxt(ctx context.Context, key string) string

func WithValues

func WithValues(ctx context.Context, kvs map[string]string) context.Context

Types

type Backend

type Backend interface {
	// 队列
	Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
	// 这里的sub要求多个消费者共享同一个topic下的数据,且无重复。
	Pub(ctx context.Context, name string, key string, val []byte) error

	// kv存储
	Get(ctx context.Context, key string) ([]byte, error)
	Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
	Del(ctx context.Context, key string) error
	List(ctx context.Context, keyprefix string) (map[string][]byte, error)
	Watch(ctx context.Context, key string, onchange OnChangeFunc) error
}

type Client

type Client interface {
	SubmitTask(ctx context.Context, task Task) error
	ListTasks(ctx context.Context, group, name string) ([]Task, error)
	RemoveTask(ctx context.Context, group, name string, uid string) error
	WatchTasks(ctx context.Context, group, name string, onchange func(ctx context.Context, task *Task) error) error
}

func NewClientFromBackend

func NewClientFromBackend(backend Backend) Client

type CronClient added in v1.24.4

type CronClient struct {
	Client
	// contains filtered or unexported fields
}

func NewCronSubmiter added in v1.24.4

func NewCronSubmiter(client Client) *CronClient

func (*CronClient) SubmitCronTask added in v1.24.4

func (s *CronClient) SubmitCronTask(ctx context.Context, task Task, crontabexp string) error

type DefaultClient added in v1.24.4

type DefaultClient struct {
	// contains filtered or unexported fields
}

func (*DefaultClient) ListTasks added in v1.24.4

func (c *DefaultClient) ListTasks(ctx context.Context, group, name string) ([]Task, error)

func (*DefaultClient) RemoveTask added in v1.24.4

func (c *DefaultClient) RemoveTask(ctx context.Context, group, name string, uid string) error

func (*DefaultClient) SubmitTask added in v1.24.4

func (c *DefaultClient) SubmitTask(ctx context.Context, task Task) error

func (*DefaultClient) WatchTasks added in v1.24.4

func (c *DefaultClient) WatchTasks(ctx context.Context, group, name string, onchange func(ctx context.Context, task *Task) error) error

type InmemoryBackend added in v1.24.4

type InmemoryBackend struct {
	// contains filtered or unexported fields
}

worker 仅允许一个实例启动,并且队列中的任务仅存在内存中,不支持持久化。

func NewInmemoryBackend added in v1.24.4

func NewInmemoryBackend(ctx context.Context) *InmemoryBackend

func (*InmemoryBackend) Del added in v1.24.4

func (t *InmemoryBackend) Del(ctx context.Context, key string) error

Del implements Backend.

func (*InmemoryBackend) Get added in v1.24.4

func (t *InmemoryBackend) Get(ctx context.Context, key string) ([]byte, error)

Get implements Backend.

func (*InmemoryBackend) List added in v1.24.4

func (t *InmemoryBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)

List implements Backend.

func (*InmemoryBackend) Pub added in v1.24.4

func (t *InmemoryBackend) Pub(ctx context.Context, name string, key string, val []byte) error

Pub implements Backend.

func (*InmemoryBackend) Put added in v1.24.4

func (t *InmemoryBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error

Put implements Backend.

func (*InmemoryBackend) Sub added in v1.24.4

func (t *InmemoryBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error

这里的sub要求多个消费者共享同一个topic下的数据,且无重复。

func (*InmemoryBackend) Watch added in v1.24.4

func (t *InmemoryBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error

Watch implements Backend.

type OnChangeFunc

type OnChangeFunc func(ctx context.Context, key string, val []byte) error

type Options

type Options struct {
	Addr     string `json:"addr,omitempty"`
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

type RedisBackend

type RedisBackend struct {
	// contains filtered or unexported fields
}

func NewRedisBackendFromClient

func NewRedisBackendFromClient(c *redis.Client) *RedisBackend

func (*RedisBackend) Del

func (b *RedisBackend) Del(ctx context.Context, key string) error

func (*RedisBackend) Get

func (b *RedisBackend) Get(ctx context.Context, key string) ([]byte, error)

func (*RedisBackend) List

func (b *RedisBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)

func (*RedisBackend) Pub

func (b *RedisBackend) Pub(ctx context.Context, name string, key string, val []byte) error

func (*RedisBackend) Put

func (b *RedisBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error

kv存储

func (*RedisBackend) Sub

func (b *RedisBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error

队列 nolint: funlen,gocognit

func (*RedisBackend) Watch

func (b *RedisBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error

type RemoteClient added in v1.24.4

type RemoteClient struct {
	Address string
	// contains filtered or unexported fields
}

func NewDefaultRemoteClient added in v1.24.4

func NewDefaultRemoteClient() *RemoteClient

func NewRemoteClient added in v1.24.4

func NewRemoteClient(address string) *RemoteClient

func (*RemoteClient) ListTasks added in v1.24.4

func (r *RemoteClient) ListTasks(ctx context.Context, group string, name string) ([]Task, error)

ListTasks implements Client.

func (*RemoteClient) RemoveTask added in v1.24.4

func (r *RemoteClient) RemoveTask(ctx context.Context, group string, name string, uid string) error

RemoveTask implements Client.

func (*RemoteClient) SubmitTask added in v1.24.4

func (r *RemoteClient) SubmitTask(ctx context.Context, task Task) error

SubmitTask implements Client.

func (*RemoteClient) WatchTasks added in v1.24.4

func (r *RemoteClient) WatchTasks(ctx context.Context, group string, name string, onchange func(ctx context.Context, task *Task) error) error

WatchTasks implements Client.

type RemoteClientServer added in v1.24.4

type RemoteClientServer struct {
	Client Client
}

func NewRemoteClientServer added in v1.24.4

func NewRemoteClientServer(client Client) *RemoteClientServer

func (*RemoteClientServer) Handler added in v1.24.4

func (s *RemoteClientServer) Handler() http.Handler

type RuntimeValuesContext

type RuntimeValuesContext struct {
	// contains filtered or unexported fields
}

func (*RuntimeValuesContext) Deadline

func (c *RuntimeValuesContext) Deadline() (deadline time.Time, ok bool)

func (*RuntimeValuesContext) Done

func (c *RuntimeValuesContext) Done() <-chan struct{}

func (*RuntimeValuesContext) Err

func (c *RuntimeValuesContext) Err() error

func (*RuntimeValuesContext) Value

func (c *RuntimeValuesContext) Value(key interface{}) interface{}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServerFromBackend

func NewServerFromBackend(backend Backend) *Server

func (*Server) NewClient

func (s *Server) NewClient(ctx context.Context) Client

func (*Server) Register

func (n *Server) Register(name string, fun interface{}) error

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

type Step

type Step struct {
	Name     string        `json:"name,omitempty"`
	Function string        `json:"function,omitempty"` // 任务所使用的 函数/组件/插件
	Args     []interface{} `json:"args,omitempty"`     // 对应的参数
	SubSteps []Step        `json:"subSteps,omitempty"` // 子任务
	Status   *TaskStatus   `json:"status,omitempty"`
}

type SubOption

type SubOption func(o *SubOptions)

func WithAutoACK

func WithAutoACK(ack bool) SubOption

func WithConcurrency

func WithConcurrency(con int) SubOption

type SubOptions

type SubOptions struct {
	AutoACK     bool // 自动确认,无论结果是否为 error
	Concurrency int  // 支持的并发数量
}

type Task

type Task struct {
	UID               string            `json:"uid,omitempty"`
	Name              string            `json:"name,omitempty"`  // 任务名称,例如 更新镜像,同步数据等。
	Group             string            `json:"group,omitempty"` // 任务类型分组
	Steps             []Step            `json:"steps,omitempty"`
	CreationTimestamp metav1.Time       `json:"creationTimestamp,omitempty"`
	Addtionals        map[string]string `json:"addtionals,omitempty"` // 额外信息
	Status            *TaskStatus       `json:"status,omitempty"`
}

type TaskStatus

type TaskStatus struct {
	StartTimestamp  metav1.Time    `json:"startTimestamp,omitempty"`
	FinishTimestamp metav1.Time    `json:"finishTimestamp,omitempty"`
	Status          TaskStatusCode `json:"status,omitempty"`
	Result          []interface{}  `json:"result,omitempty"`
	Executer        string         `json:"executer,omitempty"`
	Message         string         `json:"message,omitempty"`
}

type TaskStatusCode

type TaskStatusCode string
const (
	TaskStatusPending TaskStatusCode = "Pending"
	TaskStatusRunning TaskStatusCode = "Running"
	TaskStatusSuccess TaskStatusCode = "Success"
	TaskStatusError   TaskStatusCode = "Error"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL