taskqueueworker

package
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2021 License: Apache-2.0 Imports: 31 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"context"
	"time"

	taskqueueworker "pkg.agungdwiprasetyo.com/candi/codebase/app/task_queue_worker"
	"pkg.agungdwiprasetyo.com/candi/codebase/factory/types"
	"pkg.agungdwiprasetyo.com/candi/logger"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(ctx context.Context, message []byte) error {
	logger.LogRed("task-one: " + string(message))
	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(ctx context.Context, message []byte) error {
	logger.LogYellow("task-two: " + string(message))
	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"pkg.agungdwiprasetyo.com/candi/codebase/factory/dependency"
	"pkg.agungdwiprasetyo.com/candi/codebase/factory/types"
	"pkg.agungdwiprasetyo.com/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

package usecase

import (
	"context"
	"log"

	taskqueueworker "pkg.agungdwiprasetyo.com/candi/codebase/app/task_queue_worker"
)

func someUsecase() {
	// add task queue for `task-one` with 5 retry
	if err := taskqueueworker.AddJob("task-one", 5, `{"params": "test-one"}`); err != nil {
		log.Println(err)
	}

	// add task queue for `task-two` with 5 retry
	if err := taskqueueworker.AddJob("task-two", 5, `{"params": "test-two"}`); err != nil {
		log.Println(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(taskName string, maxRetry int, args []byte) (err error)

AddJob public function

func NewWorker

NewWorker create new cron worker

Types

type ErrorRetrier

type ErrorRetrier struct {
	Delay   time.Duration
	Message string
}

ErrorRetrier .

func (*ErrorRetrier) Error

func (e *ErrorRetrier) Error() string

Error implement error

type Filter

type Filter struct {
	Page, Limit int
	TaskName    string
	Search      *string
	Status      []string
}

Filter type

type Job

type Job struct {
	ID          string `bson:"_id" json:"_id"`
	TaskName    string `bson:"task_name" json:"task_name"`
	Arguments   string `bson:"arguments" json:"arguments"`
	Retries     int    `bson:"retries" json:"retries"`
	MaxRetry    int    `bson:"max_retry" json:"max_retry"`
	Interval    string `bson:"interval" json:"interval"`
	CreatedAt   string `bson:"created_at" json:"created_at"`
	FinishedAt  string `bson:"finished_at" json:"finished_at"`
	Status      string `bson:"status" json:"status"`
	Error       string `bson:"error" json:"error"`
	TraceID     string `bson:"traceId" json:"traceId"`
	NextRetryAt string `bson:"-" json:"-"`
}

Job model

type JobListResolver

type JobListResolver struct {
	Meta Meta
	Data []Job
}

JobListResolver resolver

type Meta

type Meta struct {
	Page         int
	Limit        int
	TotalRecords int
	TotalPages   int
	Detail       struct {
		GiveUp, Retrying, Success, Queueing, Stopped int
	}
}

Meta resolver

type QueueStorage

type QueueStorage interface {
	GetAllJobs(taskName string) []*Job
	PushJob(job *Job)
	PopJob(taskName string) Job
	NextJob(taskName string) *Job
	Clear(taskName string)
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type TaglineResolver

type TaglineResolver struct {
	Tagline                   string
	TaskListClientSubscribers []string
	JobListClientSubscribers  []string
}

TaglineResolver resolver

type TaskResolver

type TaskResolver struct {
	Name      string
	TotalJobs int
	Detail    struct {
		GiveUp, Retrying, Success, Queueing, Stopped int
	}
}

TaskResolver resolver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL