Documentation
¶
Index ¶
- Constants
- Variables
- type EventManager
- func (m *EventManager) Backup() (string, error)
- func (m *EventManager) CountRunningTasks(tenantId string) (n int, err error)
- func (m *EventManager) CreateTenantBucket(tenantId string)
- func (m *EventManager) Delete(tenantId, taskId string) error
- func (m *EventManager) Insert(e *TaskEvent) error
- func (m *EventManager) Iterate(tenantId, taskId string, fn func(e *TaskEvent) bool) error
- func (m *EventManager) Latest(tenantId, taskId string) (ev *TaskEvent, err error)
- func (m *EventManager) Tasks(tenantId string) (ids []string, err error)
- type Options
- type Scheduler
- type Server
- func (s *Server) CreateTask(ctx context.Context, req *pb.CreateTaskRequest) (*pb.Response, error)
- func (s *Server) CreateTenant(ctx context.Context, req *pb.CreateTenantRequest) (*pb.Response, error)
- func (s *Server) PauseTask(ctx context.Context, req *pb.PauseTaskRequest) (*pb.Response, error)
- func (s *Server) QueryTaskStatus(ctx context.Context, req *pb.QueryTaskStatusRequest) (*pb.QueryTaskStatusResponse, error)
- func (s *Server) QueryTenantTaskConcurrency(ctx context.Context, req *pb.QueryTenantTaskConcurrencyRequest) (resp *pb.QueryTenantTaskConcurrencyResponse, err error)
- func (s *Server) RestartTask(ctx context.Context, req *pb.RestartTaskRequest) (*pb.Response, error)
- func (s *Server) Start()
- func (s *Server) StopTask(ctx context.Context, req *pb.StopTaskRequest) (*pb.Response, error)
- type TaskEvent
Constants ¶
View Source
const ( MilestoneKeyPrefix = "milestone-" MilestoneLatest = MilestoneKeyPrefix + "latest" MilestoneDispatched = MilestoneKeyPrefix + "dispatched" MilestoneStarted = MilestoneKeyPrefix + "started" )
View Source
const (
TaskDispatched = "TaskDispatched"
)
Variables ¶
View Source
var ( DefaultDBPath = "./events.db" DefaultDBSnapshotPrefix = "events-db-snapshot" DefaultEventCompactDuration = 60 // 60 seconds DefaultTaskCounterTTL = 30 // 30 seconds )
View Source
var BoltDBOption = &bbolt.Options{ Timeout: time.Second, NoGrowSync: false, FreelistType: bbolt.FreelistArrayType, }
Functions ¶
This section is empty.
Types ¶
type EventManager ¶
type EventManager struct {
// contains filtered or unexported fields
}
func NewEventManager ¶
func NewEventManager(sc config.SnapshotConfig, schedulerId string, lg types.Logger) (*EventManager, error)
func (*EventManager) Backup ¶
func (m *EventManager) Backup() (string, error)
func (*EventManager) CountRunningTasks ¶
func (m *EventManager) CountRunningTasks(tenantId string) (n int, err error)
func (*EventManager) CreateTenantBucket ¶
func (m *EventManager) CreateTenantBucket(tenantId string)
func (*EventManager) Delete ¶
func (m *EventManager) Delete(tenantId, taskId string) error
func (*EventManager) Insert ¶
func (m *EventManager) Insert(e *TaskEvent) error
func (*EventManager) Iterate ¶
func (m *EventManager) Iterate(tenantId, taskId string, fn func(e *TaskEvent) bool) error
type Options ¶
type Options struct { Name string // Scheduler name, also used as partition name Zone string // Zone name ScheduleInterval int64 // Interval in seconds for checking active tenants & new tasks StaleCheckDelay int64 // Time in seconds for checking stale tasks TaskEventUpdateDeadline int64 // Deadline in seconds for the scheduler to receive task update events Snapshot config.SnapshotConfig // Scheduler state snapshot configurations Transport config.TransportConfig // Transport config ServerConfig config.ServerConfig // http and grpc config }
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func (*Scheduler) SchedulerId ¶
SchedulerId returns scheduler's id in the form of <zone>-<name>
type Server ¶
type Server struct { pb.UnimplementedScheduleServiceServer // contains filtered or unexported fields }
func (*Server) CreateTask ¶
func (*Server) CreateTenant ¶
func (*Server) QueryTaskStatus ¶
func (s *Server) QueryTaskStatus(ctx context.Context, req *pb.QueryTaskStatusRequest) (*pb.QueryTaskStatusResponse, error)
func (*Server) QueryTenantTaskConcurrency ¶
func (s *Server) QueryTenantTaskConcurrency(ctx context.Context, req *pb.QueryTenantTaskConcurrencyRequest) (resp *pb.QueryTenantTaskConcurrencyResponse, err error)
func (*Server) RestartTask ¶
type TaskEvent ¶
type TaskEvent struct { EventType string `json:"eventType"` WorkerId string `json:"workerId"` TenantId string `json:"tenantId"` TaskId string `json:"taskId"` // TaskType enum.TaskType `json:"taskType"` Timestamp time.Time `json:"timestamp"` Value json.RawMessage `json:"value"` }
func NewEventFromMessage ¶
func NewEventFromMessage(m *types.TaskMessage) *TaskEvent
func NewEventFromTask ¶ added in v0.1.6
Click to show internal directories.
Click to hide internal directories.