Documentation ¶
Index ¶
- Constants
- func KeepRegister(ctx context.Context, cli *clientv3.Client, key, value string) (*etcdRegistry, error)
- func NewEtcdRegistry(cli *clientv3.Client) *etcdRegistry
- func NewLeaderDAO(client *clientv3.Client) *leaderDAO
- func NewLeaderDataDAO(client *clientv3.Client) *leaderDataDAO
- func NewNodeDAO(client *clientv3.Client) *nodeDAO
- func NewRunningTaskDAO(client *clientv3.Client) *runningTaskDAO
- func NewShardDAO(client *clientv3.Client) *shardDAO
- func NewSystemConfigDAO(client *clientv3.Client) *systemConfigDAO
- func NewTaskConfigDAO(client *clientv3.Client) *taskConfigDAO
- type ILeaderDAO
- type ILeaderDataDAO
- type INodeDAO
- type IRunningTaskDAO
- type IShardDAO
- type ITaskConfigDAO
- type InternalTaskType
- type NodeWithShards
- type Nodes
- type Shards
- type SystemConfig
- type TaskConfig
- type TaskConfigStore
- type TaskNodeShardStore
- type TaskWithNodesWithShards
Constants ¶
View Source
const EtcdCampaignTimeout = 30 // time.Second
View Source
const EtcdKeepAliveRetryInterval = 5 * time.Second
View Source
const EtcdKeyTTL = 10
View Source
const EtcdQueryTimeout = 3 * time.Second
Variables ¶
This section is empty.
Functions ¶
func KeepRegister ¶
func NewEtcdRegistry ¶
func NewLeaderDAO ¶
func NewLeaderDataDAO ¶
func NewNodeDAO ¶
func NewRunningTaskDAO ¶
func NewShardDAO ¶
func NewSystemConfigDAO ¶
func NewTaskConfigDAO ¶
Types ¶
type ILeaderDAO ¶
type ILeaderDataDAO ¶
type IRunningTaskDAO ¶
type IRunningTaskDAO interface { Put(ctx context.Context, taskName, node, shard string) error Get(ctx context.Context, taskName, node, shard string) (string, error) Delete(ctx context.Context, taskName, node, shard string) error DeleteTaskNode(ctx context.Context, taskName, node string) error DeleteTask(ctx context.Context, taskName string) error DeleteAll(ctx context.Context) error ListByTaskNode(ctx context.Context, taskName, node string) (Shards, error) ListByTask(ctx context.Context, task string) ([]*NodeWithShards, error) List(ctx context.Context) ([]*TaskWithNodesWithShards, error) }
type IShardDAO ¶
type IShardDAO interface { Put(ctx context.Context, taskName, node string, shards Shards) error Get(ctx context.Context, taskName, node string) (Shards, error) GetByTask(ctx context.Context, taskName string) ([]*NodeWithShards, error) GetAll(ctx context.Context) ([]*TaskWithNodesWithShards, error) Delete(ctx context.Context, taskName, node string) error DeleteTask(ctx context.Context, taskName string) error DeleteAll(ctx context.Context) error WatchShards(ctx context.Context, task, node string) (<-chan Shards, error) WatchForNewTask(ctx context.Context) (<-chan TaskNodeShardStore, error) }
type ITaskConfigDAO ¶
type ITaskConfigDAO interface { Put(ctx context.Context, value *TaskConfig) error Get(ctx context.Context, taskName string) (*TaskConfig, error) List(ctx context.Context) ([]*TaskConfig, error) Watch(ctx context.Context) (<-chan []*TaskConfig, error) WatchTaskConfig(ctx context.Context, taskName string) (<-chan *TaskConfig, error) }
type InternalTaskType ¶
type InternalTaskType uint32
const ( InternalTaskType_Task InternalTaskType = 1 //task InternalTaskType_Schedule InternalTaskType = 2 //timer task InternalTaskType_SummaryMapTask InternalTaskType = 3 //summary task - MapTask InternalTaskType_SummaryReduce1Task InternalTaskType = 4 //summary task - Reduce1Task InternalTaskType_SummaryReduce2Task InternalTaskType = 5 //summary task - Reduce2Task InternalTaskType_SummarySaveReduceResultTask InternalTaskType = 6 //summary task - SaveReduceResultTask InternalTaskType_SummaryCleanTask InternalTaskType = 7 //summary task - CleanTask )
type NodeWithShards ¶
type SystemConfig ¶
type SystemConfig struct { // Check leader changing every 10 seconds LeaderChangeCheckInterval int64 `json:"leader_change_check_interval"` // Check node changing every 30 seconds NodeChangeCheckInterval int64 `json:"node_change_check_interval"` }
func (*SystemConfig) Decode ¶
func (c *SystemConfig) Decode(value []byte) error
func (*SystemConfig) Encode ¶
func (c *SystemConfig) Encode() string
type TaskConfig ¶
type TaskConfig struct { // unique name Name string `json:"name"` // only for timer task - Full crontab specs, e.g. "* * * * * ?" Spec string `json:"spec"` // total shards among all nodes ShardNumber uint32 `json:"shard_number"` // when fetch data,take limit size TakeLimit uint32 `json:"take_limit"` // if no data, pause milliseconds NoDataPause uint64 `json:"no_data_pause"` // custom parameter Params string `json:"params"` // turn on or off Alive bool `json:"alive"` // task type TaskType InternalTaskType `json:"task_type"` }
func (*TaskConfig) Decode ¶
func (c *TaskConfig) Decode(value []byte) error
func (*TaskConfig) Encode ¶
func (c *TaskConfig) Encode() string
type TaskConfigStore ¶
type TaskConfigStore map[string]*TaskConfig
type TaskNodeShardStore ¶
type TaskWithNodesWithShards ¶
type TaskWithNodesWithShards struct { TaskName string NodeWithShards []*NodeWithShards }
Click to show internal directories.
Click to hide internal directories.