taskqueueworker

package
v1.11.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2022 License: Apache-2.0 Imports: 35 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"time"

	"github.com/golangid/candi/candishared"
	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
	"github.com/golangid/candi/codebase/factory/types"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

  • From internal service (same runtime)
package usecase

import (
	"context"
	"log"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
)

func someUsecase(ctx context.Context) {
	// add task queue for `{{task_name}}` with 5 retry
	if err := taskqueueworker.AddJob(ctx, "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 5, Args: []byte(`{{arguments/message}}`),
	}); err != nil {
		log.Println(err)
	}
}
  • Or if running on a separate server

Via GraphQL API

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
	  param: {
		task_name: "{{task_name}}"
		max_retry: 5
		args: "{\"params\": \"test-one\"}"
	  }
  )
}

cURL:

curl --location --request GET '{{task-queue-worker-host}}/graphql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "operationName": "addJob",
    "variables": {
        "param": {
            "task_name": "{{task_name}}",
            "max_retry": 1,
            "args": "{{arguments/message}}"
        }
    },
    "query": "mutation addJob($param: AddJobInputResolver!) {\n  add_job(param: $param)\n}\n"
}'

Direct call function

// add task queue for `task-one` via HTTP request
jobID, err := taskqueueworker.AddJobViaHTTPRequest(context.Background(), "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
if err != nil {
	log.Println(err)
}
fmt.Println("Queued job id is ", jobID)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error)

AddJob public function for add new job in same runtime

func AddJobViaHTTPRequest added in v1.7.0

func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobRequest) (jobID string, err error)

AddJobViaHTTPRequest public function for add new job via http request

func NewTaskQueueWorker added in v1.6.8

func NewTaskQueueWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory

NewTaskQueueWorker create new task queue worker

func RecalculateSummary added in v1.11.4

func RecalculateSummary(ctx context.Context)

RecalculateSummary func

func RetryJob added in v1.11.0

func RetryJob(ctx context.Context, jobID string) error

RetryJob api for retry job by id

func StopJob added in v1.11.0

func StopJob(ctx context.Context, jobID string) error

StopJob api for stop job by id

func StreamAllJob added in v1.11.4

func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(job *Job))

StreamAllJob api func for stream fetch all job

Types

type AddJobInputResolver added in v1.11.0

type AddJobInputResolver struct {
	TaskName      string
	MaxRetry      int
	Args          string
	RetryInterval *string
}

AddJobInputResolver model

type AddJobRequest added in v1.11.0

type AddJobRequest struct {
	TaskName      string        `json:"task_name"`
	MaxRetry      int           `json:"max_retry"`
	Args          []byte        `json:"args"`
	RetryInterval time.Duration `json:"retry_interval"`
	// contains filtered or unexported fields
}

AddJobRequest request model

func (*AddJobRequest) Validate added in v1.11.2

func (a *AddJobRequest) Validate() error

Validate method

type ClientSubscriber added in v1.10.16

type ClientSubscriber struct {
	ClientID      string
	SubscribeList struct {
		TaskDashboard bool
		JobDetailID   string
		JobList       *Filter
	}
}

ClientSubscriber model

type ConfigResolver added in v1.11.6

type ConfigResolver struct {
	WithPersistent bool
}

ConfigResolver resolver

type Filter

type Filter struct {
	Page, Limit        int
	Sort               string
	TaskName           string
	TaskNameList       []string
	Search, JobID      *string
	Status             *string
	Statuses           []string
	ExcludeStatus      []string
	ShowAll            bool
	ShowHistories      *bool
	StartDate, EndDate time.Time
}

Filter type

type Job

type Job struct {
	ID             string         `bson:"_id" json:"_id"`
	TaskName       string         `bson:"task_name" json:"task_name"`
	Arguments      string         `bson:"arguments" json:"arguments"`
	Retries        int            `bson:"retries" json:"retries"`
	MaxRetry       int            `bson:"max_retry" json:"max_retry"`
	Interval       string         `bson:"interval" json:"interval"`
	CreatedAt      time.Time      `bson:"created_at" json:"created_at"`
	FinishedAt     time.Time      `bson:"finished_at" json:"finished_at"`
	Status         string         `bson:"status" json:"status"`
	Error          string         `bson:"error" json:"error"`
	ErrorStack     string         `bson:"-" json:"error_stack"`
	TraceID        string         `bson:"trace_id" json:"trace_id"`
	RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
	NextRetryAt    string         `bson:"-" json:"-"`
	// contains filtered or unexported fields
}

Job model

func GetDetailJob added in v1.11.1

func GetDetailJob(ctx context.Context, jobID string) (Job, error)

GetDetailJob api for get detail job by id

type JobListResolver

type JobListResolver struct {
	Meta MetaJobList
	Data []Job
}

JobListResolver resolver

type JobStatusEnum added in v1.7.0

type JobStatusEnum string

JobStatusEnum enum status

type MemstatsResolver added in v1.6.12

type MemstatsResolver struct {
	Alloc         string
	TotalAlloc    string
	NumGC         int
	NumGoroutines int
}

MemstatsResolver resolver

type MetaJobList added in v1.6.8

type MetaJobList struct {
	Page           int
	Limit          int
	TotalRecords   int
	TotalPages     int
	IsCloseSession bool
	IsLoading      bool
	Detail         SummaryDetail
}

MetaJobList resolver

type MetaTaskResolver added in v1.6.8

type MetaTaskResolver struct {
	Page                  int
	Limit                 int
	TotalRecords          int
	TotalPages            int
	IsCloseSession        bool
	TotalClientSubscriber int
}

MetaTaskResolver meta resolver

type OptionFunc added in v1.6.8

type OptionFunc func(*option)

OptionFunc type

func SetAutoRemoveClientInterval added in v1.6.8

func SetAutoRemoveClientInterval(d time.Duration) OptionFunc

SetAutoRemoveClientInterval option func

func SetDashboardBanner added in v1.7.0

func SetDashboardBanner(banner string) OptionFunc

SetDashboardBanner option func

func SetDashboardHTTPPort added in v1.7.4

func SetDashboardHTTPPort(port uint16) OptionFunc

SetDashboardHTTPPort option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetExternalWorkerHost added in v1.11.6

func SetExternalWorkerHost(host string) OptionFunc

SetExternalWorkerHost option func, setting worker host for add job, if not empty default using http request when add job

func SetLocker added in v1.8.8

func SetLocker(locker candiutils.Locker) OptionFunc

SetLocker option func

func SetMaxClientSubscriber added in v1.6.8

func SetMaxClientSubscriber(max int) OptionFunc

SetMaxClientSubscriber option func

func SetPersistent added in v1.11.6

func SetPersistent(p Persistent) OptionFunc

SetPersistent option func

func SetQueue added in v1.11.6

func SetQueue(q QueueStorage) OptionFunc

SetQueue option func

func SetTracingDashboard added in v1.10.11

func SetTracingDashboard(host string) OptionFunc

SetTracingDashboard option func

type Persistent added in v1.7.0

type Persistent interface {
	SetSummary(Summary)
	Summary() Summary
	FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)
	FindJobByID(ctx context.Context, id string, excludeFields ...string) (job Job, err error)
	CountAllJob(ctx context.Context, filter *Filter) int
	AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary)
	SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory)
	UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)
	CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)
	DeleteJob(ctx context.Context, id string) (job Job, err error)
}

Persistent abstraction

func NewMongoPersistent added in v1.7.0

func NewMongoPersistent(db *mongo.Database) Persistent

NewMongoPersistent create mongodb persistent

func NewNoopPersistent added in v1.11.6

func NewNoopPersistent() Persistent

NewNoopPersistent constructor

type QueueStorage

type QueueStorage interface {
	PushJob(ctx context.Context, job *Job)
	PopJob(ctx context.Context, taskName string) (jobID string)
	NextJob(ctx context.Context, taskName string) (jobID string)
	Clear(ctx context.Context, taskName string)
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type RetryHistory added in v1.10.0

type RetryHistory struct {
	ErrorStack string    `bson:"error_stack" json:"error_stack"`
	Status     string    `bson:"status" json:"status"`
	Error      string    `bson:"error" json:"error"`
	TraceID    string    `bson:"trace_id" json:"trace_id"`
	StartAt    time.Time `bson:"start_at" json:"start_at"`
	EndAt      time.Time `bson:"end_at" json:"end_at"`
}

RetryHistory model

type Summary added in v1.11.6

type Summary interface {
	FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)
	FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)
	UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})
	IncrementSummary(ctx context.Context, taskName string, incr map[string]interface{})
}

Summary abstraction

func NewInMemSummary added in v1.11.6

func NewInMemSummary() Summary

NewInMemSummary constructor, store & read summary from in memory

type SummaryDetail added in v1.11.4

type SummaryDetail struct {
	Failure, Retrying, Success, Queueing, Stopped int
}

SummaryDetail type

type TaglineResolver

type TaglineResolver struct {
	Banner                    string
	Tagline                   string
	Version                   string
	GoVersion                 string
	StartAt                   string
	BuildNumber               string
	Config                    ConfigResolver
	TaskListClientSubscribers []string
	JobListClientSubscribers  []string
	MemoryStatistics          MemstatsResolver
}

TaglineResolver resolver

type Task added in v1.9.0

type Task struct {
	// contains filtered or unexported fields
}

Task model

type TaskListResolver added in v1.6.8

type TaskListResolver struct {
	Meta MetaTaskResolver
	Data []TaskResolver
}

TaskListResolver resolver

type TaskResolver

type TaskResolver struct {
	Name       string
	ModuleName string
	TotalJobs  int
	IsLoading  bool
	Detail     SummaryDetail
}

TaskResolver resolver

type TaskSummary added in v1.11.4

type TaskSummary struct {
	ID        string `bson:"_id"`
	TaskName  string `bson:"task_name"`
	Success   int    `bson:"success"`
	Queueing  int    `bson:"queueing"`
	Retrying  int    `bson:"retrying"`
	Failure   int    `bson:"failure"`
	Stopped   int    `bson:"stopped"`
	IsLoading bool   `bson:"is_loading"`
}

TaskSummary model

func (*TaskSummary) CountTotalJob added in v1.11.4

func (s *TaskSummary) CountTotalJob() int

CountTotalJob method

func (*TaskSummary) SetValue added in v1.11.4

func (s *TaskSummary) SetValue(source map[string]int)

SetValue method

func (*TaskSummary) ToMapResult added in v1.11.4

func (s *TaskSummary) ToMapResult() map[string]int

ToMapResult method

func (*TaskSummary) ToSummaryDetail added in v1.11.4

func (s *TaskSummary) ToSummaryDetail() (detail SummaryDetail)

ToSummaryDetail method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL