taskqueueworker

package
v1.6.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2021 License: Apache-2.0 Imports: 31 Imported by: 0

README

Example

Create delivery handler

package workerhandler

import (
	"context"
	"time"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/logger"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(ctx context.Context, message []byte) error {
	logger.LogRed("task-one: " + string(message))
	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(ctx context.Context, message []byte) error {
	logger.LogYellow("task-two: " + string(message))
	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

  • From internal service (same runtime)
package usecase

import (
	"context"
	"log"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
)

func someUsecase() {
	// add task queue for `task-one` with 5 retry
	if err := taskqueueworker.AddJob("task-one", 5, `{"params": "test-one"}`); err != nil {
		log.Println(err)
	}

	// add task queue for `task-two` with 5 retry
	if err := taskqueueworker.AddJob("task-two", 5, `{"params": "test-two"}`); err != nil {
		log.Println(err)
	}
}
  • Or via GraphQL API

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
    task_name: "task-one"
    max_retry: 5
    args: "{\"params\": \"test-one\"}"
  )
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(taskName string, maxRetry int, args []byte) (err error)

AddJob public function

func NewTaskQueueWorker added in v1.6.8

func NewTaskQueueWorker(service factory.ServiceFactory, q QueueStorage, db *mongo.Database, opts ...OptionFunc) factory.AppServerFactory

NewTaskQueueWorker create new task queue worker

Types

type Filter

type Filter struct {
	Page, Limit int
	TaskName    string
	Search      *string
	Status      []string
}

Filter type

type Job

type Job struct {
	ID          string    `bson:"_id" json:"_id"`
	TaskName    string    `bson:"task_name" json:"task_name"`
	Arguments   string    `bson:"arguments" json:"arguments"`
	Retries     int       `bson:"retries" json:"retries"`
	MaxRetry    int       `bson:"max_retry" json:"max_retry"`
	Interval    string    `bson:"interval" json:"interval"`
	CreatedAt   time.Time `bson:"created_at" json:"created_at"`
	FinishedAt  time.Time `bson:"finished_at" json:"finished_at"`
	Status      string    `bson:"status" json:"status"`
	Error       string    `bson:"error" json:"error"`
	TraceID     string    `bson:"traceId" json:"traceId"`
	NextRetryAt string    `bson:"-" json:"-"`
}

Job model

type JobListResolver

type JobListResolver struct {
	Meta MetaJobList
	Data []Job
}

JobListResolver resolver

type MetaJobList added in v1.6.8

type MetaJobList struct {
	Page           int
	Limit          int
	TotalRecords   int
	TotalPages     int
	IsCloseSession bool
	Detail         struct {
		GiveUp, Retrying, Success, Queueing, Stopped int
	}
}

MetaJobList resolver

type MetaTaskResolver added in v1.6.8

type MetaTaskResolver struct {
	Page           int
	Limit          int
	TotalRecords   int
	TotalPages     int
	IsCloseSession bool
}

MetaTaskResolver meta resolver

type OptionFunc added in v1.6.8

type OptionFunc func(*option)

OptionFunc type

func SetAutoRemoveClientInterval added in v1.6.8

func SetAutoRemoveClientInterval(d time.Duration) OptionFunc

SetAutoRemoveClientInterval option func

func SetJaegerTracingDashboard added in v1.6.8

func SetJaegerTracingDashboard(host string) OptionFunc

SetJaegerTracingDashboard option func

func SetMaxClientSubscriber added in v1.6.8

func SetMaxClientSubscriber(max int) OptionFunc

SetMaxClientSubscriber option func

type QueueStorage

type QueueStorage interface {
	GetAllJobs(taskName string) []*Job
	PushJob(job *Job)
	PopJob(taskName string) Job
	NextJob(taskName string) *Job
	Clear(taskName string)
}

QueueStorage abstraction for queue storage backend

func NewInMemQueue

func NewInMemQueue() QueueStorage

NewInMemQueue init inmem queue

func NewRedisQueue

func NewRedisQueue(redisPool *redis.Pool) QueueStorage

NewRedisQueue init inmem queue

type TaglineResolver

type TaglineResolver struct {
	Tagline                   string
	TaskListClientSubscribers []string
	JobListClientSubscribers  []string
}

TaglineResolver resolver

type TaskListResolver added in v1.6.8

type TaskListResolver struct {
	Meta MetaTaskResolver
	Data []TaskResolver
}

TaskListResolver resolver

type TaskResolver

type TaskResolver struct {
	Name      string
	TotalJobs int
	Detail    struct {
		GiveUp, Retrying, Success, Queueing, Stopped int
	}
}

TaskResolver resolver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL