taskqueueworker

package
v1.11.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2022 License: Apache-2.0 Imports: 36 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"time"

	"github.com/golangid/candi/candishared"
	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
	"github.com/golangid/candi/codebase/factory/types"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

  • From internal service (same runtime)
package usecase

import (
	"context"
	"log"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
)

func someUsecase(ctx context.Context) {
	// add task queue for `{{task_name}}` with 5 retry
	if err := taskqueueworker.AddJob(ctx, "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 5, Args: []byte(`{{arguments/message}}`),
	}); err != nil {
		log.Println(err)
	}
}
  • Or if running on a separate server

Via GraphQL API

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
	  param: {
		task_name: "{{task_name}}"
		max_retry: 5
		args: "{\"params\": \"test-one\"}"
	  }
  )
}

cURL:

curl --location --request GET '{{task-queue-worker-host}}/graphql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "operationName": "addJob",
    "variables": {
        "param": {
            "task_name": "{{task_name}}",
            "max_retry": 1,
            "args": "{{arguments/message}}"
        }
    },
    "query": "mutation addJob($param: AddJobInputResolver!) {\n  add_job(param: $param)\n}\n"
}'

Direct call function

// add task queue for `task-one` via HTTP request
jobID, err := taskqueueworker.AddJobViaHTTPRequest(context.Background(), "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
if err != nil {
	log.Println(err)
}
fmt.Println("Queued job id is ", jobID)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(ctx context.Context, job *AddJobRequest) (jobID string, err error)

AddJob public function for add new job in same runtime

func AddJobViaHTTPRequest added in v1.7.0

func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobRequest) (jobID string, err error)

AddJobViaHTTPRequest public function for add new job via http request

func NewTaskQueueWorker added in v1.6.8

func NewTaskQueueWorker(service factory.ServiceFactory, q QueueStorage, perst Persistent, opts ...OptionFunc) factory.AppServerFactory

NewTaskQueueWorker create new task queue worker

func RetryJob added in v1.11.0

func RetryJob(ctx context.Context, jobID string) error

RetryJob api for retry job by id

func StopJob added in v1.11.0

func StopJob(ctx context.Context, jobID string) error

StopJob api for stop job by id

Types

type AddJobInputResolver added in v1.11.0

type AddJobInputResolver struct {
	TaskName      string
	MaxRetry      int
	Args          string
	RetryInterval *string
}

AddJobInputResolver model

type AddJobRequest added in v1.11.0

type AddJobRequest struct {
	TaskName      string        `json:"task_name"`
	MaxRetry      int           `json:"max_retry"`
	Args          []byte        `json:"args"`
	RetryInterval time.Duration `json:"retry_interval"`
}

AddJobRequest request model

func (*AddJobRequest) Validate added in v1.11.2

func (a *AddJobRequest) Validate() error

Validate method

type ClientSubscriber added in v1.10.16

type ClientSubscriber struct {
	ClientID      string
	SubscribeList struct {
		TaskDashboard bool
		JobDetailID   string
		JobList       Filter
	}
}

ClientSubscriber model

type Filter

type Filter struct {
	Page, Limit        int
	TaskName           string
	TaskNameList       []string
	Search, JobID      *string
	Status             []string
	ShowAll            bool
	ShowHistories      *bool
	StartDate, EndDate time.Time
}

Filter type

type Job

type Job struct {
	ID             string         `bson:"_id" json:"_id"`
	TaskName       string         `bson:"task_name" json:"task_name"`
	Arguments      string         `bson:"arguments" json:"arguments"`
	Retries        int            `bson:"retries" json:"retries"`
	MaxRetry       int            `bson:"max_retry" json:"max_retry"`
	Interval       string         `bson:"interval" json:"interval"`
	CreatedAt      time.Time      `bson:"created_at" json:"created_at"`
	FinishedAt     time.Time      `bson:"finished_at" json:"finished_at"`
	Status         string         `bson:"status" json:"status"`
	Error          string         `bson:"error" json:"error"`
	ErrorStack     string         `bson:"-" json:"error_stack"`
	TraceID        string         `bson:"trace_id" json:"trace_id"`
	RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
	NextRetryAt    string         `bson:"-" json:"-"`
}

Job model

func GetDetailJob added in v1.11.1

func GetDetailJob(ctx context.Context, jobID string) (*Job, error)

GetDetailJob api for get detail job by id

type JobListResolver

type JobListResolver struct {
	Meta MetaJobList
	Data []Job
}

JobListResolver resolver

type JobStatusEnum added in v1.7.0

type JobStatusEnum string

JobStatusEnum enum status

type MemstatsResolver added in v1.6.12

type MemstatsResolver struct {
	Alloc         string
	TotalAlloc    string
	NumGC         int
	NumGoroutines int
}

MemstatsResolver resolver

type MetaJobList added in v1.6.8

type MetaJobList struct {
	Page           int
	Limit          int
	TotalRecords   int
	TotalPages     int
	IsCloseSession bool
	Detail         struct {
		Failure, Retrying, Success, Queueing, Stopped int
	}
}

MetaJobList resolver

type MetaTaskResolver added in v1.6.8

type MetaTaskResolver struct {
	Page                  int
	Limit                 int
	TotalRecords          int
	TotalPages            int
	IsCloseSession        bool
	TotalClientSubscriber int
}

MetaTaskResolver meta resolver

type OptionFunc added in v1.6.8

type OptionFunc func(*option)

OptionFunc type

func SetAutoCreatePersistentIndex added in v1.11.0

func SetAutoCreatePersistentIndex(autoCreateIndex bool) OptionFunc

SetAutoCreatePersistentIndex option func

func SetAutoRemoveClientInterval added in v1.6.8

func SetAutoRemoveClientInterval(d time.Duration) OptionFunc

SetAutoRemoveClientInterval option func

func SetDashboardBanner added in v1.7.0

func SetDashboardBanner(banner string) OptionFunc

SetDashboardBanner option func

func SetDashboardHTTPPort added in v1.7.4

func SetDashboardHTTPPort(port uint16) OptionFunc

SetDashboardHTTPPort option func

func SetDebugMode added in v1.7.4

func SetDebugMode(debugMode bool) OptionFunc

SetDebugMode option func

func SetLocker added in v1.8.8

func SetLocker(locker candiutils.Locker) OptionFunc

SetLocker option func

func SetMaxClientSubscriber added in v1.6.8

func SetMaxClientSubscriber(max int) OptionFunc

SetMaxClientSubscriber option func

func SetTracingDashboard added in v1.10.11

func SetTracingDashboard(host string) OptionFunc

SetTracingDashboard option func

type Persistent added in v1.7.0

type Persistent interface {
	FindAllJob(ctx context.Context, filter Filter) (jobs []Job)
	FindJobByID(ctx context.Context, id string, excludeFields ...string) (job *Job, err error)
	CountAllJob(ctx context.Context, filter Filter) int
	AggregateAllTaskJob(ctx context.Context, filter Filter) (result []TaskResolver)
	SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory)
	UpdateJob(ctx context.Context, filter Filter, updated map[string]interface{}) error
	CleanJob(ctx context.Context, taskName string)
	DeleteJob(ctx context.Context, id string) error
}

Persistent abstraction

func NewMongoPersistent added in v1.7.0

func NewMongoPersistent(db *mongo.Database) Persistent

NewMongoPersistent create mongodb persistent

type QueueStorage

type QueueStorage interface {
	PushJob(ctx context.Context, job *Job)
	PopJob(ctx context.Context, taskName string) (jobID string)
	NextJob(ctx context.Context, taskName string) (jobID string)
	Clear(ctx context.Context, taskName string)
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type RetryHistory added in v1.10.0

type RetryHistory struct {
	ErrorStack string    `bson:"error_stack" json:"error_stack"`
	Status     string    `bson:"status" json:"status"`
	Error      string    `bson:"error" json:"error"`
	TraceID    string    `bson:"trace_id" json:"trace_id"`
	StartAt    time.Time `bson:"start_at" json:"start_at"`
	EndAt      time.Time `bson:"end_at" json:"end_at"`
}

RetryHistory model

type TaglineResolver

type TaglineResolver struct {
	Banner                    string
	Tagline                   string
	Version                   string
	StartAt                   string
	BuildNumber               string
	TaskListClientSubscribers []string
	JobListClientSubscribers  []string
	MemoryStatistics          MemstatsResolver
}

TaglineResolver resolver

type Task added in v1.9.0

type Task struct {
	// contains filtered or unexported fields
}

Task model

type TaskListResolver added in v1.6.8

type TaskListResolver struct {
	Meta MetaTaskResolver
	Data []TaskResolver
}

TaskListResolver resolver

type TaskResolver

type TaskResolver struct {
	Name       string
	ModuleName string
	TotalJobs  int
	Detail     struct {
		Failure, Retrying, Success, Queueing, Stopped int
	}
}

TaskResolver resolver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL