Documentation ¶
Index ¶
- Constants
- func ArgsOf(args ...interface{}) []interface{}
- func IdentityKeyOfFunction(fun interface{}) string
- func ValueFromConetxt(ctx context.Context, key string) string
- func WithValues(ctx context.Context, kvs map[string]string) context.Context
- type Backend
- type Client
- type CronClient
- type DefaultClient
- func (c *DefaultClient) ListTasks(ctx context.Context, group, name string) ([]Task, error)
- func (c *DefaultClient) RemoveTask(ctx context.Context, group, name string, uid string) error
- func (c *DefaultClient) SubmitTask(ctx context.Context, task Task) error
- func (c *DefaultClient) WatchTasks(ctx context.Context, group, name string, ...) error
- type InmemoryBackend
- func (t *InmemoryBackend) Del(ctx context.Context, key string) error
- func (t *InmemoryBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (t *InmemoryBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)
- func (t *InmemoryBackend) Pub(ctx context.Context, name string, key string, val []byte) error
- func (t *InmemoryBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
- func (t *InmemoryBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
- func (t *InmemoryBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error
- type OnChangeFunc
- type Options
- type RedisBackend
- func (b *RedisBackend) Del(ctx context.Context, key string) error
- func (b *RedisBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (b *RedisBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)
- func (b *RedisBackend) Pub(ctx context.Context, name string, key string, val []byte) error
- func (b *RedisBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
- func (b *RedisBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
- func (b *RedisBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error
- type RemoteClient
- func (r *RemoteClient) ListTasks(ctx context.Context, group string, name string) ([]Task, error)
- func (r *RemoteClient) RemoveTask(ctx context.Context, group string, name string, uid string) error
- func (r *RemoteClient) SubmitTask(ctx context.Context, task Task) error
- func (r *RemoteClient) WatchTasks(ctx context.Context, group string, name string, ...) error
- type RemoteClientServer
- type RuntimeValuesContext
- type Server
- type Step
- type SubOption
- type SubOptions
- type Task
- type TaskStatus
- type TaskStatusCode
Constants ¶
View Source
const (
DefaultGroup = "workflow-group"
)
View Source
const (
DefaultTaskTimeout = 5 * time.Minute
)
Variables ¶
This section is empty.
Functions ¶
func IdentityKeyOfFunction ¶
func IdentityKeyOfFunction(fun interface{}) string
Types ¶
type Backend ¶
type Backend interface { // 队列 Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error // 这里的sub要求多个消费者共享同一个topic下的数据,且无重复。 Pub(ctx context.Context, name string, key string, val []byte) error // kv存储 Get(ctx context.Context, key string) ([]byte, error) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error Del(ctx context.Context, key string) error List(ctx context.Context, keyprefix string) (map[string][]byte, error) Watch(ctx context.Context, key string, onchange OnChangeFunc) error }
type Client ¶
type Client interface { SubmitTask(ctx context.Context, task Task) error ListTasks(ctx context.Context, group, name string) ([]Task, error) RemoveTask(ctx context.Context, group, name string, uid string) error WatchTasks(ctx context.Context, group, name string, onchange func(ctx context.Context, task *Task) error) error }
func NewClientFromBackend ¶
type CronClient ¶ added in v1.24.4
type CronClient struct { Client // contains filtered or unexported fields }
func NewCronSubmiter ¶ added in v1.24.4
func NewCronSubmiter(client Client) *CronClient
func (*CronClient) SubmitCronTask ¶ added in v1.24.4
type DefaultClient ¶ added in v1.24.4
type DefaultClient struct {
// contains filtered or unexported fields
}
func (*DefaultClient) RemoveTask ¶ added in v1.24.4
func (*DefaultClient) SubmitTask ¶ added in v1.24.4
func (c *DefaultClient) SubmitTask(ctx context.Context, task Task) error
type InmemoryBackend ¶ added in v1.24.4
type InmemoryBackend struct {
// contains filtered or unexported fields
}
worker 仅允许一个实例启动,并且队列中的任务仅存在内存中,不支持持久化。
func NewInmemoryBackend ¶ added in v1.24.4
func NewInmemoryBackend(ctx context.Context) *InmemoryBackend
func (*InmemoryBackend) Del ¶ added in v1.24.4
func (t *InmemoryBackend) Del(ctx context.Context, key string) error
Del implements Backend.
func (*InmemoryBackend) Put ¶ added in v1.24.4
func (t *InmemoryBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
Put implements Backend.
func (*InmemoryBackend) Sub ¶ added in v1.24.4
func (t *InmemoryBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
这里的sub要求多个消费者共享同一个topic下的数据,且无重复。
func (*InmemoryBackend) Watch ¶ added in v1.24.4
func (t *InmemoryBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error
Watch implements Backend.
type RedisBackend ¶
type RedisBackend struct {
// contains filtered or unexported fields
}
func NewRedisBackendFromClient ¶
func NewRedisBackendFromClient(c *redis.Client) *RedisBackend
func (*RedisBackend) Sub ¶
func (b *RedisBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
队列 nolint: funlen,gocognit
func (*RedisBackend) Watch ¶
func (b *RedisBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error
type RemoteClient ¶ added in v1.24.4
type RemoteClient struct { Address string // contains filtered or unexported fields }
func NewDefaultRemoteClient ¶ added in v1.24.4
func NewDefaultRemoteClient() *RemoteClient
func NewRemoteClient ¶ added in v1.24.4
func NewRemoteClient(address string) *RemoteClient
func (*RemoteClient) RemoveTask ¶ added in v1.24.4
RemoveTask implements Client.
func (*RemoteClient) SubmitTask ¶ added in v1.24.4
func (r *RemoteClient) SubmitTask(ctx context.Context, task Task) error
SubmitTask implements Client.
type RemoteClientServer ¶ added in v1.24.4
type RemoteClientServer struct {
Client Client
}
func NewRemoteClientServer ¶ added in v1.24.4
func NewRemoteClientServer(client Client) *RemoteClientServer
func (*RemoteClientServer) Handler ¶ added in v1.24.4
func (s *RemoteClientServer) Handler() http.Handler
type RuntimeValuesContext ¶
type RuntimeValuesContext struct {
// contains filtered or unexported fields
}
func (*RuntimeValuesContext) Deadline ¶
func (c *RuntimeValuesContext) Deadline() (deadline time.Time, ok bool)
func (*RuntimeValuesContext) Done ¶
func (c *RuntimeValuesContext) Done() <-chan struct{}
func (*RuntimeValuesContext) Err ¶
func (c *RuntimeValuesContext) Err() error
func (*RuntimeValuesContext) Value ¶
func (c *RuntimeValuesContext) Value(key interface{}) interface{}
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServerFromBackend ¶
type Step ¶
type Step struct { Name string `json:"name,omitempty"` Function string `json:"function,omitempty"` // 任务所使用的 函数/组件/插件 Args []interface{} `json:"args,omitempty"` // 对应的参数 SubSteps []Step `json:"subSteps,omitempty"` // 子任务 Status *TaskStatus `json:"status,omitempty"` }
type SubOptions ¶
type Task ¶
type Task struct { UID string `json:"uid,omitempty"` Name string `json:"name,omitempty"` // 任务名称,例如 更新镜像,同步数据等。 Group string `json:"group,omitempty"` // 任务类型分组 Steps []Step `json:"steps,omitempty"` CreationTimestamp metav1.Time `json:"creationTimestamp,omitempty"` Addtionals map[string]string `json:"addtionals,omitempty"` // 额外信息 Status *TaskStatus `json:"status,omitempty"` }
type TaskStatus ¶
type TaskStatus struct { StartTimestamp metav1.Time `json:"startTimestamp,omitempty"` FinishTimestamp metav1.Time `json:"finishTimestamp,omitempty"` Status TaskStatusCode `json:"status,omitempty"` Result []interface{} `json:"result,omitempty"` Executer string `json:"executer,omitempty"` Message string `json:"message,omitempty"` }
type TaskStatusCode ¶
type TaskStatusCode string
const ( TaskStatusPending TaskStatusCode = "Pending" TaskStatusRunning TaskStatusCode = "Running" TaskStatusSuccess TaskStatusCode = "Success" TaskStatusError TaskStatusCode = "Error" )
Click to show internal directories.
Click to hide internal directories.