taskqueueworker

package
v1.11.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2022 License: Apache-2.0 Imports: 37 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"time"

	"github.com/golangid/candi/candishared"
	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
	"github.com/golangid/candi/codebase/factory/types"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

  • From internal service (same runtime)
package usecase

import (
	"context"
	"log"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
)

func someUsecase(ctx context.Context) {
	// add task queue for `{{task_name}}` with 5 retry
	if err := taskqueueworker.AddJob(ctx, "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 5, Args: []byte(`{{arguments/message}}`),
	}); err != nil {
		log.Println(err)
	}
}
  • Or if running on a separate server

Via GraphQL API

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
	  param: {
		task_name: "{{task_name}}"
		max_retry: 5
		args: "{\"params\": \"test-one\"}"
	  }
  )
}

cURL:

curl --location --request GET '{{task-queue-worker-host}}/graphql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "operationName": "addJob",
    "variables": {
        "param": {
            "task_name": "{{task_name}}",
            "max_retry": 1,
            "args": "{{arguments/message}}"
        }
    },
    "query": "mutation addJob($param: AddJobInputResolver!) {\n  add_job(param: $param)\n}\n"
}'

Direct call function

// add task queue for `task-one` via HTTP request
jobID, err := taskqueueworker.AddJobViaHTTPRequest(context.Background(), "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
if err != nil {
	log.Println(err)
}
fmt.Println("Queued job id is ", jobID)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error)

AddJob public function for add new job in same runtime

func AddJobViaHTTPRequest added in v1.7.0

func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobRequest) (jobID string, err error)

AddJobViaHTTPRequest public function for add new job via http request

func NewTaskQueueWorker added in v1.6.8

func NewTaskQueueWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory

NewTaskQueueWorker create new task queue worker

func RecalculateSummary added in v1.11.4

func RecalculateSummary(ctx context.Context)

RecalculateSummary func

func RetryJob added in v1.11.0

func RetryJob(ctx context.Context, jobID string) error

RetryJob api for retry job by id

func StopJob added in v1.11.0

func StopJob(ctx context.Context, jobID string) error

StopJob api for stop job by id

func StreamAllJob added in v1.11.4

func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(job *Job))

StreamAllJob api func for stream fetch all job

Types

type AddJobInputResolver added in v1.11.0

type AddJobInputResolver struct {
	TaskName      string
	MaxRetry      int
	Args          string
	RetryInterval *string
}

AddJobInputResolver model

type AddJobRequest added in v1.11.0

type AddJobRequest struct {
	TaskName      string        `json:"task_name"`
	MaxRetry      int           `json:"max_retry"`
	Args          []byte        `json:"args"`
	RetryInterval time.Duration `json:"retry_interval"`
	// contains filtered or unexported fields
}

AddJobRequest request model

func (*AddJobRequest) Validate added in v1.11.2

func (a *AddJobRequest) Validate() error

Validate method

type ClientSubscriber added in v1.10.16

type ClientSubscriber struct {
	ClientID      string
	SubscribeList struct {
		TaskDashboard bool
		JobDetailID   string
		JobList       *Filter
	}
}

ClientSubscriber model

type ConfigResolver added in v1.11.6

type ConfigResolver struct {
	WithPersistent bool
}

ConfigResolver resolver

type DashboardResolver added in v1.11.21

type DashboardResolver struct {
	Banner                    string
	Tagline                   string
	Version                   string
	GoVersion                 string
	StartAt                   string
	BuildNumber               string
	Config                    ConfigResolver
	TaskListClientSubscribers []string
	JobListClientSubscribers  []string
	MemoryStatistics          MemstatsResolver
	DependencyHealth          struct {
		Persistent *string
		Queue      *string
	}
}

DashboardResolver resolver

type Filter

type Filter struct {
	Page, Limit        int
	Sort               string
	TaskName           string
	TaskNameList       []string
	Search, JobID      *string
	Status             *string
	Statuses           []string
	ExcludeStatus      []string
	ShowAll            bool
	ShowHistories      *bool
	StartDate, EndDate time.Time
	Count              int
}

Filter type

func (*Filter) CalculateOffset added in v1.11.22

func (f *Filter) CalculateOffset() int

CalculateOffset method

type GetAllJobHistoryInputResolver added in v1.11.22

type GetAllJobHistoryInputResolver struct {
	Page      *int
	Limit     *int
	StartDate *string
	EndDate   *string
	JobID     string
}

GetAllJobHistoryInputResolver resolver

func (*GetAllJobHistoryInputResolver) ToFilter added in v1.11.22

func (i *GetAllJobHistoryInputResolver) ToFilter() (filter Filter)

ToFilter method

type GetAllJobInputResolver added in v1.11.21

type GetAllJobInputResolver struct {
	TaskName  *string
	Page      *int
	Limit     *int
	Search    *string
	JobID     *string
	Statuses  *[]string
	StartDate *string
	EndDate   *string
}

GetAllJobInputResolver resolver

func (*GetAllJobInputResolver) ToFilter added in v1.11.21

func (i *GetAllJobInputResolver) ToFilter() (filter Filter)

ToFilter method

type Job

type Job struct {
	ID             string         `bson:"_id" json:"_id"`
	TaskName       string         `bson:"task_name" json:"task_name"`
	Arguments      string         `bson:"arguments" json:"arguments"`
	Retries        int            `bson:"retries" json:"retries"`
	MaxRetry       int            `bson:"max_retry" json:"max_retry"`
	Interval       string         `bson:"interval" json:"interval"`
	CreatedAt      time.Time      `bson:"created_at" json:"created_at"`
	FinishedAt     time.Time      `bson:"finished_at" json:"finished_at"`
	Status         string         `bson:"status" json:"status"`
	Error          string         `bson:"error" json:"error"`
	ErrorStack     string         `bson:"-" json:"error_stack"`
	TraceID        string         `bson:"trace_id" json:"trace_id"`
	RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
	NextRetryAt    string         `bson:"-" json:"-"`
	// contains filtered or unexported fields
}

Job model

func GetDetailJob added in v1.11.1

func GetDetailJob(ctx context.Context, jobID string) (Job, error)

GetDetailJob api for get detail job by id

type JobListResolver

type JobListResolver struct {
	Meta MetaJobList
	Data []JobResolver
}

JobListResolver resolver

func (*JobListResolver) GetAllJob added in v1.11.21

func (j *JobListResolver) GetAllJob(ctx context.Context, filter *Filter)

type JobResolver added in v1.11.21

type JobResolver struct {
	ID             string
	TaskName       string
	Arguments      string
	Retries        int
	MaxRetry       int
	Interval       string
	CreatedAt      string
	FinishedAt     string
	Status         string
	Error          string
	ErrorStack     string
	TraceID        string
	RetryHistories []RetryHistory
	NextRetryAt    string
	Meta           struct {
		IsCloseSession bool
		Page           int
		TotalHistory   int
	}
}

JobResolver resolver

func (*JobResolver) ParseFromJob added in v1.11.21

func (j *JobResolver) ParseFromJob(job *Job)

type JobStatusEnum added in v1.7.0

type JobStatusEnum string

JobStatusEnum enum status

type MemstatsResolver added in v1.6.12

type MemstatsResolver struct {
	Alloc         string
	TotalAlloc    string
	NumGC         int
	NumGoroutines int
}

MemstatsResolver resolver

type MetaJobList added in v1.6.8

type MetaJobList struct {
	Page              int
	Limit             int
	TotalRecords      int
	TotalPages        int
	IsCloseSession    bool
	IsLoading         bool
	IsFreezeBroadcast bool
	Detail            SummaryDetail
}

MetaJobList resolver

type MetaTaskResolver added in v1.6.8

type MetaTaskResolver struct {
	Page                  int
	Limit                 int
	TotalRecords          int
	TotalPages            int
	IsCloseSession        bool
	TotalClientSubscriber int
}

MetaTaskResolver meta resolver

type MongoPersistent added in v1.11.22

type MongoPersistent struct {
	// contains filtered or unexported fields
}

func NewMongoPersistent added in v1.7.0

func NewMongoPersistent(db *mongo.Database) *MongoPersistent

NewMongoPersistent create mongodb persistent

func (*MongoPersistent) AggregateAllTaskJob added in v1.11.22

func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (results []TaskSummary)

func (*MongoPersistent) CleanJob added in v1.11.22

func (s *MongoPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)

func (*MongoPersistent) CountAllJob added in v1.11.22

func (s *MongoPersistent) CountAllJob(ctx context.Context, filter *Filter) int

func (*MongoPersistent) DeleteJob added in v1.11.22

func (s *MongoPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error)

func (*MongoPersistent) FindAllJob added in v1.11.22

func (s *MongoPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)

func (*MongoPersistent) FindAllSummary added in v1.11.22

func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)

func (*MongoPersistent) FindDetailSummary added in v1.11.22

func (s *MongoPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)

func (*MongoPersistent) FindJobByID added in v1.11.22

func (s *MongoPersistent) FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error)

func (*MongoPersistent) IncrementSummary added in v1.11.22

func (s *MongoPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)

func (*MongoPersistent) Ping added in v1.11.22

func (s *MongoPersistent) Ping(ctx context.Context) error

func (*MongoPersistent) SaveJob added in v1.11.22

func (s *MongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory)

func (*MongoPersistent) SetSummary added in v1.11.22

func (s *MongoPersistent) SetSummary(summary Summary)

func (*MongoPersistent) Summary added in v1.11.22

func (s *MongoPersistent) Summary() Summary

func (*MongoPersistent) UpdateJob added in v1.11.22

func (s *MongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)

func (*MongoPersistent) UpdateSummary added in v1.11.22

func (s *MongoPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})

type OptionFunc added in v1.6.8

type OptionFunc func(*option)

OptionFunc type

func SetAutoRemoveClientInterval added in v1.6.8

func SetAutoRemoveClientInterval(d time.Duration) OptionFunc

SetAutoRemoveClientInterval option func

func SetDashboardBanner added in v1.7.0

func SetDashboardBanner(banner string) OptionFunc

SetDashboardBanner option func

func SetDashboardHTTPPort added in v1.7.4

func SetDashboardHTTPPort(port uint16) OptionFunc

SetDashboardHTTPPort option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetExternalWorkerHost added in v1.11.6

func SetExternalWorkerHost(host string) OptionFunc

SetExternalWorkerHost option func, setting worker host for add job, if not empty default using http request when add job

func SetLocker added in v1.8.8

func SetLocker(locker candiutils.Locker) OptionFunc

SetLocker option func

func SetMaxClientSubscriber added in v1.6.8

func SetMaxClientSubscriber(max int) OptionFunc

SetMaxClientSubscriber option func

func SetPersistent added in v1.11.6

func SetPersistent(p Persistent) OptionFunc

SetPersistent option func

func SetQueue added in v1.11.6

func SetQueue(q QueueStorage) OptionFunc

SetQueue option func

func SetTracingDashboard added in v1.10.11

func SetTracingDashboard(host string) OptionFunc

SetTracingDashboard option func

type Persistent added in v1.7.0

type Persistent interface {
	Ping(ctx context.Context) error
	SetSummary(Summary)
	Summary() Summary
	FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)
	FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error)
	CountAllJob(ctx context.Context, filter *Filter) int
	AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary)
	SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory)
	UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)
	CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)
	DeleteJob(ctx context.Context, id string) (job Job, err error)
}

Persistent abstraction

func NewNoopPersistent added in v1.11.6

func NewNoopPersistent() Persistent

NewNoopPersistent constructor

type QueueStorage

type QueueStorage interface {
	PushJob(ctx context.Context, job *Job) (n int64)
	PopJob(ctx context.Context, taskName string) (jobID string)
	NextJob(ctx context.Context, taskName string) (jobID string)
	Clear(ctx context.Context, taskName string)
	Ping() error
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type RetryHistory added in v1.10.0

type RetryHistory struct {
	ErrorStack string    `bson:"error_stack" json:"error_stack"`
	Status     string    `bson:"status" json:"status"`
	Error      string    `bson:"error" json:"error"`
	TraceID    string    `bson:"trace_id" json:"trace_id"`
	StartAt    time.Time `bson:"start_at" json:"start_at"`
	EndAt      time.Time `bson:"end_at" json:"end_at"`
}

RetryHistory model

type SQLPersistent added in v1.11.22

type SQLPersistent struct {
	// contains filtered or unexported fields
}

func NewSQLPersistent added in v1.11.22

func NewSQLPersistent(db *sql.DB) *SQLPersistent

NewSQLPersistent init new persistent SQL

func (*SQLPersistent) AggregateAllTaskJob added in v1.11.22

func (s *SQLPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary)

func (*SQLPersistent) CleanJob added in v1.11.22

func (s *SQLPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)

func (*SQLPersistent) CountAllJob added in v1.11.22

func (s *SQLPersistent) CountAllJob(ctx context.Context, filter *Filter) (count int)

func (*SQLPersistent) DeleteJob added in v1.11.22

func (s *SQLPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error)

func (*SQLPersistent) FindAllJob added in v1.11.22

func (s *SQLPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)

func (*SQLPersistent) FindAllSummary added in v1.11.22

func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)

summary

func (*SQLPersistent) FindDetailSummary added in v1.11.22

func (s *SQLPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)

func (*SQLPersistent) FindJobByID added in v1.11.22

func (s *SQLPersistent) FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error)

func (*SQLPersistent) IncrementSummary added in v1.11.22

func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)

func (*SQLPersistent) Ping added in v1.11.22

func (s *SQLPersistent) Ping(ctx context.Context) error

func (*SQLPersistent) SaveJob added in v1.11.22

func (s *SQLPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory)

func (*SQLPersistent) SetSummary added in v1.11.22

func (s *SQLPersistent) SetSummary(summary Summary)

func (*SQLPersistent) Summary added in v1.11.22

func (s *SQLPersistent) Summary() Summary

func (*SQLPersistent) UpdateJob added in v1.11.22

func (s *SQLPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)

func (*SQLPersistent) UpdateSummary added in v1.11.22

func (s *SQLPersistent) UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})

type Summary added in v1.11.6

type Summary interface {
	FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)
	FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)
	UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})
	IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)
}

Summary abstraction

func NewInMemSummary added in v1.11.6

func NewInMemSummary() Summary

NewInMemSummary constructor, store & read summary from in memory

type SummaryDetail added in v1.11.4

type SummaryDetail struct {
	Failure, Retrying, Success, Queueing, Stopped int
}

SummaryDetail type

type Task added in v1.9.0

type Task struct {
	// contains filtered or unexported fields
}

Task model

type TaskListResolver added in v1.6.8

type TaskListResolver struct {
	Meta MetaTaskResolver
	Data []TaskResolver
}

TaskListResolver resolver

type TaskResolver

type TaskResolver struct {
	Name       string
	ModuleName string
	TotalJobs  int
	IsLoading  bool
	Detail     SummaryDetail
}

TaskResolver resolver

type TaskSummary added in v1.11.4

type TaskSummary struct {
	ID        string `bson:"_id"`
	TaskName  string `bson:"task_name"`
	Success   int    `bson:"success"`
	Queueing  int    `bson:"queueing"`
	Retrying  int    `bson:"retrying"`
	Failure   int    `bson:"failure"`
	Stopped   int    `bson:"stopped"`
	IsLoading bool   `bson:"is_loading"`
}

TaskSummary model

func (*TaskSummary) CountTotalJob added in v1.11.4

func (s *TaskSummary) CountTotalJob() int

CountTotalJob method

func (*TaskSummary) SetValue added in v1.11.4

func (s *TaskSummary) SetValue(source map[string]int)

SetValue method

func (*TaskSummary) ToMapResult added in v1.11.4

func (s *TaskSummary) ToMapResult() map[string]int

ToMapResult method

func (*TaskSummary) ToSummaryDetail added in v1.11.4

func (s *TaskSummary) ToSummaryDetail() (detail SummaryDetail)

ToSummaryDetail method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL