model

package
v0.0.0-...-d828794 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const EtcdCampaignTimeout = 30 // time.Second
View Source
const EtcdKeepAliveRetryInterval = 5 * time.Second
View Source
const EtcdKeyTTL = 10
View Source
const EtcdQueryTimeout = 3 * time.Second

Variables

This section is empty.

Functions

func KeepRegister

func KeepRegister(ctx context.Context, cli *clientv3.Client, key, value string) (*etcdRegistry, error)

func NewEtcdRegistry

func NewEtcdRegistry(cli *clientv3.Client) *etcdRegistry

func NewLeaderDAO

func NewLeaderDAO(client *clientv3.Client) *leaderDAO

func NewLeaderDataDAO

func NewLeaderDataDAO(client *clientv3.Client) *leaderDataDAO

func NewNodeDAO

func NewNodeDAO(client *clientv3.Client) *nodeDAO

func NewRunningTaskDAO

func NewRunningTaskDAO(client *clientv3.Client) *runningTaskDAO

func NewShardDAO

func NewShardDAO(client *clientv3.Client) *shardDAO

func NewSystemConfigDAO

func NewSystemConfigDAO(client *clientv3.Client) *systemConfigDAO

func NewTaskConfigDAO

func NewTaskConfigDAO(client *clientv3.Client) *taskConfigDAO

Types

type ILeaderDAO

type ILeaderDAO interface {
	Get(ctx context.Context) (string, error)
	Campaign(parentCtx context.Context, node string)
}

type ILeaderDataDAO

type ILeaderDataDAO interface {
	Put(ctx context.Context, nodes Nodes) error
	Get(ctx context.Context) (Nodes, error)
}

type INodeDAO

type INodeDAO interface {
	Register(ctx context.Context, addr string) (*etcdRegistry, error)
	List(ctx context.Context) ([]string, error)
	Watch(ctx context.Context) (<-chan []string, error)
}

type IRunningTaskDAO

type IRunningTaskDAO interface {
	Put(ctx context.Context, taskName, node, shard string) error
	Get(ctx context.Context, taskName, node, shard string) (string, error)
	Delete(ctx context.Context, taskName, node, shard string) error
	DeleteTaskNode(ctx context.Context, taskName, node string) error
	DeleteTask(ctx context.Context, taskName string) error
	DeleteAll(ctx context.Context) error
	ListByTaskNode(ctx context.Context, taskName, node string) (Shards, error)
	ListByTask(ctx context.Context, task string) ([]*NodeWithShards, error)
	List(ctx context.Context) ([]*TaskWithNodesWithShards, error)
}

type IShardDAO

type IShardDAO interface {
	Put(ctx context.Context, taskName, node string, shards Shards) error
	Get(ctx context.Context, taskName, node string) (Shards, error)
	GetByTask(ctx context.Context, taskName string) ([]*NodeWithShards, error)
	GetAll(ctx context.Context) ([]*TaskWithNodesWithShards, error)
	Delete(ctx context.Context, taskName, node string) error
	DeleteTask(ctx context.Context, taskName string) error
	DeleteAll(ctx context.Context) error
	WatchShards(ctx context.Context, task, node string) (<-chan Shards, error)
	WatchForNewTask(ctx context.Context) (<-chan TaskNodeShardStore, error)
}

type ITaskConfigDAO

type ITaskConfigDAO interface {
	Put(ctx context.Context, value *TaskConfig) error
	Get(ctx context.Context, taskName string) (*TaskConfig, error)
	List(ctx context.Context) ([]*TaskConfig, error)
	Watch(ctx context.Context) (<-chan []*TaskConfig, error)
	WatchTaskConfig(ctx context.Context, taskName string) (<-chan *TaskConfig, error)
}

type InternalTaskType

type InternalTaskType uint32
const (
	InternalTaskType_Task                        InternalTaskType = 1 //task
	InternalTaskType_Schedule                    InternalTaskType = 2 //timer task
	InternalTaskType_SummaryMapTask              InternalTaskType = 3 //summary task - MapTask
	InternalTaskType_SummaryReduce1Task          InternalTaskType = 4 //summary task - Reduce1Task
	InternalTaskType_SummaryReduce2Task          InternalTaskType = 5 //summary task - Reduce2Task
	InternalTaskType_SummarySaveReduceResultTask InternalTaskType = 6 //summary task - SaveReduceResultTask
	InternalTaskType_SummaryCleanTask            InternalTaskType = 7 //summary task - CleanTask
)

type NodeWithShards

type NodeWithShards struct {
	Node   string
	Shards Shards
}

type Nodes

type Nodes []string

func (*Nodes) Decode

func (s *Nodes) Decode(value []byte) error

func (*Nodes) Encode

func (s *Nodes) Encode() string

type Shards

type Shards []string

func (*Shards) Decode

func (s *Shards) Decode(value []byte) error

func (*Shards) Encode

func (s *Shards) Encode() string

type SystemConfig

type SystemConfig struct {
	// Check leader changing every 10 seconds
	LeaderChangeCheckInterval int64 `json:"leader_change_check_interval"`
	// Check node changing every 30 seconds
	NodeChangeCheckInterval int64 `json:"node_change_check_interval"`
}

func (*SystemConfig) Decode

func (c *SystemConfig) Decode(value []byte) error

func (*SystemConfig) Encode

func (c *SystemConfig) Encode() string

type TaskConfig

type TaskConfig struct {
	// unique name
	Name string `json:"name"`
	// only for timer task - Full crontab specs, e.g. "* * * * * ?"
	Spec string `json:"spec"`
	// total shards among all nodes
	ShardNumber uint32 `json:"shard_number"`
	// when fetch data,take limit size
	TakeLimit uint32 `json:"take_limit"`
	// if no data, pause milliseconds
	NoDataPause uint64 `json:"no_data_pause"`
	// custom parameter
	Params string `json:"params"`
	// turn on or off
	Alive bool `json:"alive"`
	// task type
	TaskType InternalTaskType `json:"task_type"`
}

func (*TaskConfig) Decode

func (c *TaskConfig) Decode(value []byte) error

func (*TaskConfig) Encode

func (c *TaskConfig) Encode() string

type TaskConfigStore

type TaskConfigStore map[string]*TaskConfig

type TaskNodeShardStore

type TaskNodeShardStore map[string]map[string]Shards

type TaskWithNodesWithShards

type TaskWithNodesWithShards struct {
	TaskName       string
	NodeWithShards []*NodeWithShards
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL