taskqueueworker

package
v1.17.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 39 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"time"

	"github.com/golangid/candi/candishared"
	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
	"github.com/golangid/candi/codebase/factory/types"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

From internal service (same runtime)
package usecase

import (
	"context"
	"log"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
)

func someUsecase(ctx context.Context) {
	// add task queue for `{{task_name}}` with 5 retry
	if err := taskqueueworker.AddJob(ctx, "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 5, Args: []byte(`{{arguments/message}}`),
	}); err != nil {
		log.Println(err)
	}
}
Or if running on a separate server
  • Via GraphQL API:

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
	  param: {
		task_name: "{{task_name}}"
		max_retry: 5
		args: "{\"params\": \"test-one\"}"
	  }
  )
}
  • cURL:
curl --location --request POST '{{task-queue-worker-host}}/graphql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "operationName": "addJob",
    "variables": {
        "param": {
            "task_name": "{{task_name}}",
            "max_retry": 1,
            "args": "{{arguments/message}}"
        }
    },
    "query": "mutation addJob($param: AddJobInputResolver!) {\n  add_job(param: $param)\n}\n"
}'
  • Direct call function:
// add task queue for `task-one` via HTTP request
jobID, err := taskqueueworker.AddJobViaHTTPRequest(context.Background(), "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
if err != nil {
	log.Println(err)
}
fmt.Println("Queued job id is ", jobID)
  • Auto call to external worker via HTTP request using basic function AddJob(), please set configuration:
taskqueueworker.SetExternalWorkerHost({{task-queue-worker-host}})
jobID, err := taskqueueworker.AddJob(context.Background(), &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
fmt.Println("Queued job id is ", jobID)

Documentation

Index

Constants

View Source
const (

	// StatusRetrying const
	StatusRetrying JobStatusEnum = "RETRYING"
	// StatusFailure const
	StatusFailure JobStatusEnum = "FAILURE"
	// StatusSuccess const
	StatusSuccess JobStatusEnum = "SUCCESS"
	// StatusQueueing const
	StatusQueueing JobStatusEnum = "QUEUEING"
	// StatusStopped const
	StatusStopped JobStatusEnum = "STOPPED"

	// HeaderRetries const
	HeaderRetries = "retries"
	// HeaderMaxRetries const
	HeaderMaxRetries = "max_retry"
	// HeaderInterval const
	HeaderInterval = "interval"
	// HeaderCurrentProgress const
	HeaderCurrentProgress = "current_progress"
	// HeaderMaxProgress const
	HeaderMaxProgress = "max_progress"

	// TaskOptionDeleteJobAfterSuccess const
	TaskOptionDeleteJobAfterSuccess = "delAfterSuccess"
)

Variables

This section is empty.

Functions

func AddJob

func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error)

AddJob public function for add new job in same runtime

func AddJobViaHTTPRequest added in v1.7.0

func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobRequest) (jobID string, err error)

AddJobViaHTTPRequest public function for add new job via http request

func AddJobWorkerHandler added in v1.17.0

func AddJobWorkerHandler(taskName string) types.WorkerHandlerFunc

AddJobWorkerHandler worker handler, bridging request from another worker for add job, default max retry is 5

func NewTaskQueueWorker added in v1.6.8

func NewTaskQueueWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory

NewTaskQueueWorker create new task queue worker

func RecalculateSummary added in v1.11.4

func RecalculateSummary(ctx context.Context)

RecalculateSummary func

func RetryJob added in v1.11.0

func RetryJob(ctx context.Context, jobID string) error

RetryJob api for retry job by id

func StopJob added in v1.11.0

func StopJob(ctx context.Context, jobID string) error

StopJob api for stop job by id

func StreamAllJob added in v1.11.4

func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(job *Job)) (count int)

StreamAllJob api func for stream fetch all job

func UpdateProgressJob added in v1.13.13

func UpdateProgressJob(ctx context.Context, jobID string, numProcessed, maxProcess int) error

UpdateProgressJob api for update progress job

Types

type AddJobInputResolver added in v1.11.0

type AddJobInputResolver struct {
	TaskName      string
	MaxRetry      int
	Args          string
	RetryInterval *string
}

AddJobInputResolver model

type AddJobRequest added in v1.11.0

type AddJobRequest struct {
	TaskName      string        `json:"task_name"`
	MaxRetry      int           `json:"max_retry"`
	Args          []byte        `json:"args"`
	RetryInterval time.Duration `json:"retry_interval"`
	// contains filtered or unexported fields
}

AddJobRequest request model

func (*AddJobRequest) Validate added in v1.11.2

func (a *AddJobRequest) Validate() error

Validate method

type ClientSubscriber added in v1.10.16

type ClientSubscriber struct {
	ClientID   string
	PageName   string
	PageFilter string
}

ClientSubscriber model

type ConfigResolver added in v1.11.6

type ConfigResolver struct {
	WithPersistent bool
}

ConfigResolver resolver

type Configuration added in v1.11.26

type Configuration struct {
	Key      string `bson:"key" json:"key"`
	Name     string `bson:"name" json:"name"`
	Value    string `bson:"value" json:"value"`
	IsActive bool   `bson:"is_active" json:"is_active"`
}

Configuration model

type ConfigurationResolver added in v1.11.26

type ConfigurationResolver struct {
	Key      string
	Name     string
	Value    string
	IsActive bool
}

ConfigurationResolver resolver

type Configurations added in v1.11.26

type Configurations []Configuration

func (Configurations) ToMap added in v1.11.26

func (c Configurations) ToMap() map[string]string

ToMap method

type DashboardResolver added in v1.11.21

type DashboardResolver struct {
	Banner           string
	Tagline          string
	Version          string
	GoVersion        string
	StartAt          string
	BuildNumber      string
	Config           ConfigResolver
	MemoryStatistics MemstatsResolver
	DependencyHealth struct {
		Persistent *string
		Queue      *string
	}
	DependencyDetail struct {
		PersistentType         string
		QueueType              string
		UseSecondaryPersistent bool
	}
}

DashboardResolver resolver

type Filter

type Filter struct {
	Page                int        `json:"page"`
	Limit               int        `json:"limit"`
	Sort                string     `json:"sort,omitempty"`
	TaskName            string     `json:"taskName,omitempty"`
	TaskNameList        []string   `json:"taskNameList,omitempty"`
	ExcludeTaskNameList []string   `json:"excludeTaskNameList,omitempty"`
	Search              *string    `json:"search,omitempty"`
	JobID               *string    `json:"jobID,omitempty"`
	Status              *string    `json:"status,omitempty"`
	Statuses            []string   `json:"statuses,omitempty"`
	ExcludeStatus       []string   `json:"excludeStatus,omitempty"`
	ShowAll             bool       `json:"showAll,omitempty"`
	ShowHistories       *bool      `json:"showHistories,omitempty"`
	StartDate           string     `json:"startDate,omitempty"`
	EndDate             string     `json:"endDate,omitempty"`
	BeforeCreatedAt     *time.Time `json:"beforeCreatedAt,omitempty"`
	Count               int        `json:"count,omitempty"`
	// contains filtered or unexported fields
}

Filter type

func (*Filter) CalculateOffset added in v1.11.22

func (f *Filter) CalculateOffset() int

CalculateOffset method

func (*Filter) ParseStartEndDate added in v1.12.0

func (f *Filter) ParseStartEndDate() (startDate, endDate time.Time)

ParseStartEndDate method

type FilterMutateJobInputResolver added in v1.13.13

type FilterMutateJobInputResolver struct {
	TaskName  string
	Search    *string
	JobID     *string
	Statuses  []string
	StartDate *string
	EndDate   *string
}

FilterMutateJobInputResolver resolver

func (*FilterMutateJobInputResolver) ToFilter added in v1.13.13

func (i *FilterMutateJobInputResolver) ToFilter() (filter Filter)

ToFilter method

type GetAllJobHistoryInputResolver added in v1.11.22

type GetAllJobHistoryInputResolver struct {
	Page      *int
	Limit     *int
	StartDate *string
	EndDate   *string
	JobID     string
}

GetAllJobHistoryInputResolver resolver

func (*GetAllJobHistoryInputResolver) ToFilter added in v1.11.22

func (i *GetAllJobHistoryInputResolver) ToFilter() (filter Filter)

ToFilter method

type GetAllJobInputResolver added in v1.11.21

type GetAllJobInputResolver struct {
	TaskName  *string
	Page      *int
	Limit     *int
	Search    *string
	JobID     *string
	Statuses  *[]string
	StartDate *string
	EndDate   *string
}

GetAllJobInputResolver resolver

func (*GetAllJobInputResolver) ToFilter added in v1.11.21

func (i *GetAllJobInputResolver) ToFilter() (filter Filter)

ToFilter method

type Job

type Job struct {
	ID              string         `bson:"_id" json:"_id"`
	TaskName        string         `bson:"task_name" json:"task_name"`
	Arguments       string         `bson:"arguments" json:"arguments"`
	Retries         int            `bson:"retries" json:"retries"`
	MaxRetry        int            `bson:"max_retry" json:"max_retry"`
	Interval        string         `bson:"interval" json:"interval"`
	CreatedAt       time.Time      `bson:"created_at" json:"created_at"`
	UpdatedAt       time.Time      `bson:"updated_at" json:"updated_at"`
	FinishedAt      time.Time      `bson:"finished_at" json:"finished_at"`
	Status          string         `bson:"status" json:"status"`
	Error           string         `bson:"error" json:"error"`
	ErrorStack      string         `bson:"-" json:"error_stack"`
	Result          string         `bson:"result" json:"result"`
	TraceID         string         `bson:"trace_id" json:"trace_id"`
	CurrentProgress int            `bson:"current_progress" json:"current_progress"`
	MaxProgress     int            `bson:"max_progress" json:"max_progress"`
	RetryHistories  []RetryHistory `bson:"retry_histories" json:"retry_histories"`
	NextRetryAt     string         `bson:"-" json:"-"`
	// contains filtered or unexported fields
}

Job model

func GetDetailJob added in v1.11.1

func GetDetailJob(ctx context.Context, jobID string) (Job, error)

GetDetailJob api for get detail job by id

type JobListResolver

type JobListResolver struct {
	Meta MetaJobList
	Data []JobResolver
}

JobListResolver resolver

func (*JobListResolver) GetAllJob added in v1.11.21

func (j *JobListResolver) GetAllJob(ctx context.Context, filter *Filter)

type JobResolver added in v1.11.21

type JobResolver struct {
	ID              string
	TaskName        string
	Arguments       string
	Retries         int
	MaxRetry        int
	Interval        string
	CreatedAt       string
	FinishedAt      string
	Status          string
	Error           string
	Result          string
	ErrorStack      string
	TraceID         string
	RetryHistories  []RetryHistory
	NextRetryAt     string
	CurrentProgress int
	MaxProgress     int
	Meta            struct {
		IsCloseSession   bool
		Page             int
		TotalHistory     int
		IsShowMoreArgs   bool
		IsShowMoreError  bool
		IsShowMoreResult bool
	}
}

JobResolver resolver

func (*JobResolver) ParseFromJob added in v1.11.21

func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int)

type JobStatusEnum added in v1.7.0

type JobStatusEnum string

JobStatusEnum enum status

func (JobStatusEnum) String added in v1.11.23

func (j JobStatusEnum) String() string

String method

type MemstatsResolver added in v1.6.12

type MemstatsResolver struct {
	Alloc         uint64
	TotalAlloc    uint64
	NumGC         int
	NumGoroutines int
}

MemstatsResolver resolver

type MetaJobList added in v1.6.8

type MetaJobList struct {
	Page              int
	Limit             int
	TotalRecords      int
	TotalPages        int
	IsCloseSession    bool
	IsLoading         bool
	IsFreezeBroadcast bool
	Detail            SummaryDetail
}

MetaJobList resolver

type MetaTaskResolver added in v1.6.8

type MetaTaskResolver struct {
	Page                  int
	Limit                 int
	TotalRecords          int
	TotalPages            int
	IsCloseSession        bool
	TotalClientSubscriber int
	ClientID              string
}

MetaTaskResolver meta resolver

func (*MetaTaskResolver) CalculatePage added in v1.13.9

func (m *MetaTaskResolver) CalculatePage()

type MongoPersistent added in v1.11.22

type MongoPersistent struct {
	// contains filtered or unexported fields
}

func NewMongoPersistent added in v1.7.0

func NewMongoPersistent(db *mongo.Database) *MongoPersistent

NewMongoPersistent create mongodb persistent

func (*MongoPersistent) AggregateAllTaskJob added in v1.11.22

func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (results []TaskSummary)

func (*MongoPersistent) CleanJob added in v1.11.22

func (s *MongoPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)

func (*MongoPersistent) CountAllJob added in v1.11.22

func (s *MongoPersistent) CountAllJob(ctx context.Context, filter *Filter) int

func (*MongoPersistent) DeleteAllSummary added in v1.11.24

func (s *MongoPersistent) DeleteAllSummary(ctx context.Context, filter *Filter)

func (*MongoPersistent) DeleteJob added in v1.11.22

func (s *MongoPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error)

func (*MongoPersistent) FindAllJob added in v1.11.22

func (s *MongoPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)

func (*MongoPersistent) FindAllSummary added in v1.11.22

func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)

func (*MongoPersistent) FindDetailSummary added in v1.11.22

func (s *MongoPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)

func (*MongoPersistent) FindJobByID added in v1.11.22

func (s *MongoPersistent) FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error)

func (*MongoPersistent) GetAllConfiguration added in v1.11.26

func (s *MongoPersistent) GetAllConfiguration(ctx context.Context) (cfg []Configuration, err error)

func (*MongoPersistent) GetConfiguration added in v1.11.26

func (s *MongoPersistent) GetConfiguration(key string) (cfg Configuration, err error)

func (*MongoPersistent) IncrementSummary added in v1.11.22

func (s *MongoPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)

func (*MongoPersistent) Ping added in v1.11.22

func (s *MongoPersistent) Ping(ctx context.Context) error

func (*MongoPersistent) SaveJob added in v1.11.22

func (s *MongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) (err error)

func (*MongoPersistent) SetConfiguration added in v1.11.26

func (s *MongoPersistent) SetConfiguration(cfg *Configuration) (err error)

func (*MongoPersistent) SetSummary added in v1.11.22

func (s *MongoPersistent) SetSummary(summary Summary)

func (*MongoPersistent) Summary added in v1.11.22

func (s *MongoPersistent) Summary() Summary

func (*MongoPersistent) Type added in v1.11.23

func (s *MongoPersistent) Type() string

func (*MongoPersistent) UpdateJob added in v1.11.22

func (s *MongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)

func (*MongoPersistent) UpdateSummary added in v1.11.22

func (s *MongoPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})

type OptionFunc added in v1.6.8

type OptionFunc func(*option)

OptionFunc type

func SetAutoRemoveClientInterval added in v1.6.8

func SetAutoRemoveClientInterval(d time.Duration) OptionFunc

SetAutoRemoveClientInterval option func

func SetDashboardBanner added in v1.7.0

func SetDashboardBanner(banner string) OptionFunc

SetDashboardBanner option func

func SetDashboardBasicAuth added in v1.13.9

func SetDashboardBasicAuth(username, password string) OptionFunc

SetDashboardBasicAuth option func

func SetDashboardHTTPPort added in v1.7.4

func SetDashboardHTTPPort(port uint16) OptionFunc

SetDashboardHTTPPort option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetExternalWorkerHost added in v1.11.6

func SetExternalWorkerHost(host string) OptionFunc

SetExternalWorkerHost option func, setting worker host for add job, if not empty default using http request when add job

func SetLocker added in v1.8.8

func SetLocker(locker interfaces.Locker) OptionFunc

SetLocker option func

func SetMaxClientSubscriber added in v1.6.8

func SetMaxClientSubscriber(max int) OptionFunc

SetMaxClientSubscriber option func

func SetPersistent added in v1.11.6

func SetPersistent(p Persistent) OptionFunc

SetPersistent option func

func SetQueue added in v1.11.6

func SetQueue(q QueueStorage) OptionFunc

SetQueue option func

func SetSecondaryPersistent added in v1.12.8

func SetSecondaryPersistent(p Persistent) OptionFunc

SetSecondaryPersistent option func

func SetTLSConfig added in v1.17.0

func SetTLSConfig(tlsConfig *tls.Config) OptionFunc

SetTLSConfig option func

type Persistent added in v1.7.0

type Persistent interface {
	Ping(ctx context.Context) error
	SetSummary(Summary)
	Summary() Summary
	FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)
	FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error)
	CountAllJob(ctx context.Context, filter *Filter) int
	AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary)
	SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) (err error)
	UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)
	CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)
	DeleteJob(ctx context.Context, id string) (job Job, err error)
	Type() string
	GetAllConfiguration(ctx context.Context) (cfg []Configuration, err error)
	GetConfiguration(key string) (Configuration, error)
	SetConfiguration(cfg *Configuration) (err error)
}

Persistent abstraction

func NewNoopPersistent added in v1.11.6

func NewNoopPersistent() Persistent

NewNoopPersistent constructor

type QueueStorage

type QueueStorage interface {
	PushJob(ctx context.Context, job *Job) (n int64)
	PopJob(ctx context.Context, taskName string) (jobID string)
	NextJob(ctx context.Context, taskName string) (jobID string)
	Clear(ctx context.Context, taskName string)
	Ping() error
	Type() string
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type RestoreSecondaryResolver added in v1.12.8

type RestoreSecondaryResolver struct {
	TotalData int
	Message   string
}

RestoreSecondaryResolver resolver

type RetryHistory added in v1.10.0

type RetryHistory struct {
	ErrorStack string    `bson:"error_stack" json:"error_stack"`
	Status     string    `bson:"status" json:"status"`
	Error      string    `bson:"error" json:"error"`
	Result     string    `bson:"result" json:"result"`
	TraceID    string    `bson:"trace_id" json:"trace_id"`
	StartAt    time.Time `bson:"start_at" json:"start_at"`
	EndAt      time.Time `bson:"end_at" json:"end_at"`
}

RetryHistory model

type SQLPersistent added in v1.11.22

type SQLPersistent struct {
	// contains filtered or unexported fields
}

func NewSQLPersistent added in v1.11.22

func NewSQLPersistent(db *sql.DB) *SQLPersistent

NewSQLPersistent init new persistent SQL

func (*SQLPersistent) AggregateAllTaskJob added in v1.11.22

func (s *SQLPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary)

func (*SQLPersistent) CleanJob added in v1.11.22

func (s *SQLPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)

func (*SQLPersistent) CountAllJob added in v1.11.22

func (s *SQLPersistent) CountAllJob(ctx context.Context, filter *Filter) (count int)

func (*SQLPersistent) DeleteAllSummary added in v1.11.24

func (s *SQLPersistent) DeleteAllSummary(ctx context.Context, filter *Filter)

func (*SQLPersistent) DeleteJob added in v1.11.22

func (s *SQLPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error)

func (*SQLPersistent) FindAllJob added in v1.11.22

func (s *SQLPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)

func (*SQLPersistent) FindAllSummary added in v1.11.22

func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)

summary

func (*SQLPersistent) FindDetailSummary added in v1.11.22

func (s *SQLPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)

func (*SQLPersistent) FindJobByID added in v1.11.22

func (s *SQLPersistent) FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error)

func (*SQLPersistent) GetAllConfiguration added in v1.11.26

func (s *SQLPersistent) GetAllConfiguration(ctx context.Context) (cfg []Configuration, err error)

func (*SQLPersistent) GetConfiguration added in v1.11.26

func (s *SQLPersistent) GetConfiguration(key string) (cfg Configuration, err error)

func (*SQLPersistent) IncrementSummary added in v1.11.22

func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)

func (*SQLPersistent) Ping added in v1.11.22

func (s *SQLPersistent) Ping(ctx context.Context) error

func (*SQLPersistent) SaveJob added in v1.11.22

func (s *SQLPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) (err error)

func (*SQLPersistent) SetConfiguration added in v1.11.26

func (s *SQLPersistent) SetConfiguration(cfg *Configuration) (err error)

func (*SQLPersistent) SetSummary added in v1.11.22

func (s *SQLPersistent) SetSummary(summary Summary)

func (*SQLPersistent) Summary added in v1.11.22

func (s *SQLPersistent) Summary() Summary

func (*SQLPersistent) Type added in v1.11.23

func (s *SQLPersistent) Type() string

func (*SQLPersistent) UpdateJob added in v1.11.22

func (s *SQLPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)

func (*SQLPersistent) UpdateSummary added in v1.11.22

func (s *SQLPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})

type Summary added in v1.11.6

type Summary interface {
	FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)
	FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)
	UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})
	IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)
	DeleteAllSummary(ctx context.Context, filter *Filter)
}

Summary abstraction

func NewInMemSummary added in v1.11.6

func NewInMemSummary() Summary

NewInMemSummary constructor, store & read summary from in memory

type SummaryDetail added in v1.11.4

type SummaryDetail struct {
	Failure, Retrying, Success, Queueing, Stopped int
}

SummaryDetail type

type Task added in v1.9.0

type Task struct {
	// contains filtered or unexported fields
}

Task model

type TaskListResolver added in v1.6.8

type TaskListResolver struct {
	Meta MetaTaskResolver
	Data []TaskResolver
}

TaskListResolver resolver

type TaskResolver

type TaskResolver struct {
	Name           string
	ModuleName     string
	TotalJobs      int
	IsLoading      bool
	LoadingMessage string
	Detail         SummaryDetail
}

TaskResolver resolver

type TaskSummary added in v1.11.4

type TaskSummary struct {
	ID             string `bson:"_id"`
	TaskName       string `bson:"task_name"`
	Success        int    `bson:"success"`
	Queueing       int    `bson:"queueing"`
	Retrying       int    `bson:"retrying"`
	Failure        int    `bson:"failure"`
	Stopped        int    `bson:"stopped"`
	IsLoading      bool   `bson:"is_loading"`
	LoadingMessage string `bson:"loading_message"`
}

TaskSummary model

func (*TaskSummary) CountTotalJob added in v1.11.4

func (s *TaskSummary) CountTotalJob() int

CountTotalJob method

func (*TaskSummary) SetValue added in v1.11.4

func (s *TaskSummary) SetValue(source map[string]int)

SetValue method

func (*TaskSummary) ToMapResult added in v1.11.4

func (s *TaskSummary) ToMapResult() map[string]int

ToMapResult method

func (*TaskSummary) ToSummaryDetail added in v1.11.4

func (s *TaskSummary) ToSummaryDetail() (detail SummaryDetail)

ToSummaryDetail method

func (*TaskSummary) ToTaskResolver added in v1.13.0

func (s *TaskSummary) ToTaskResolver() (res TaskResolver)

ToTaskResolver method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL