enqueueit

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2024 License: AGPL-3.0 Imports: 18 Imported by: 0

README

Enqueue It

Go Reference

Easy and scalable solution for managing and executing background tasks and microservices seamlessly in .NET applications. It allows you to schedule, queue, and process your jobs and microservices efficiently.

Designed to support distributed systems, enabling you to scale your background processes and microservices across multiple servers. With advanced features like performance monitoring, exception logging, and integration with various storage types, providing complete control and visibility over your workflow.

Provides a user-friendly web dashboard that allows you to monitor and manage your jobs and microservices from a centralized location. You can easily check the status of your tasks, troubleshoot issues, and optimize performance.

Benefits and Features

  • Schedule and queue microservices
  • Run multiple servers for increased performance and reliability
  • Monitor CPU and memory usage of microservices
  • Log exceptions to help find bugs and memory leaks
  • Connect to multiple storage types for optimal performance:
    • Main storage (Redis) for active jobs and services
    • Long-term storage (SQL databases such as SQL Server, PostgreSQL, MySQL, and more) for completed jobs and job history

Installation

You can install the EnqueueIt Go package by running the following command:

go get github.com/cybercloudsys/enqueueit-go

Usage

To use EnqueueIt go package you may need to create a configuration file named enqueueIt.json in your project and specify the connection strings and settings for your storage servers and queues. Here is an example of a configuration file:

{
  "StorageConfig": "localhost:6379",
  "StorageType": "Redis",
  "LongTermStorageConfig": "Server=localhost;Database=JobsDb;User ID=sa;Password=P@ssw0rd;",
  "LongTermStorageType": "SqlServer",
  "Servers": [
    {
      "Queues": [
        {
          "Name": "jobs",
          "WorkersCount": 50,
          "Retries": 0,
          "RetryDelay": 5
        },
        {
          "Name": "services",
          "WorkersCount": 50,
          "Retries": 0,
          "RetryDelay": 5
        }
      ]
    }
  ]
}

The package will automaticly load the config file from the working directory, but in some cases you may need to load the config by it is full path

configPath := "enqueueit.json" // full path goes here
enqueueit.LoadConfiguration(&configPath)
Start EnqueueIt Server

To start EnqueueIt server, you can use enqueueit.Start method to start new server instance that will connect to Redis and start processing queued microservices.

	err := enqueueit.StartServer(nil, nil)
	if err != nil {
		log.Panic(err)
	}
Reading Microservice Argument

To read microservice argument, you can import "github.com/cybercloudsys/enqueueit-go/microservice" and use GetJobArgument method to get the sent argument as string, if the argument was sent as an instance of the struct or object from .NET app then the value will be as json string that can be deserialized to instance of a struct.

import (
  "encoding/json"

  "github.com/cybercloudsys/enqueueit-go/microservice"
)
// read the microservice argument.
arg, err := microservice.GetJobArgument()
if err != nil {
  panic(err)
}
// deserialize json string to instance of a struct
data := &TestData{}
json.Unmarshal([]byte(*arg), data)
Running Microservices

To run a microservice, you can use Enqueue method and pass the name of the microservice and a value that represents the input for the microservice. The value will be passed as a command-line argument to the microservice executable file. For example, to run a microservice named microservice1 with an instance of struct that has a string field called Message, you can write:

// connect to redis storage.
  redis, err := enqueueit.Connect(enqueueit.LoadConfiguration(nil))
  if err != nil {
  	panic(err)
  }
  // enqueue a microservice
  _, err := enqueueit.Enqueue("microservice1", &TestData{ Message: "Hello World" }, "services", redis);
  if err != nil {
  	panic(err)
  }

This will add the microservice to the services queue and it will be executed as soon as possible by the EnqueueIt server.

Scheduling Microservices

EnqueueIt allows you to schedule microservices to run at a specific time or after another job has completed. There are three types of scheduled microservices you can create with EnqueueIt:

  • One-time microservice: This is a microservice that will run only once at a given time. You can use the Schedule method and pass the name of the microservice, the input object and the time as parameters. For example, to run a microservice after 5 minutes, named microservice1 and pass an instance of struct that has a field called Message, you can write:

    enqueueit.Schedule("microservice1", &TestData{ Message: "Hello World" }, time.Now().Add(time.Minute * 5), "services", redis)
    
  • Recurring microservice: This is a microservice that will run repeatedly according to a specified frequency. You can use the Subscribe method and pass the name of the microservice, the input object and the recurring pattern as parameters. The recurring pattern is an instance of the RecurringPattern struct from Recur package that defines how often the microservice should run. For example, to run a microservice named microservice1 with an object that has a property called Message every day at 06:00 AM, you can write:

    // you need to import recur to use RecurringPattren
    import "github.com/cybercloudsys/recur-go"
    
    enqueueit.Subscribe("microservice1", new { Message = "Run this later" }, recur.Daily(6))
    
  • Microservice dependent on another job: This is a microservice that will run only after another job has finished successfully. You can use the EnqueueAfter method and pass the name of the microservice, the input argument and the ID of the job that need to be finished first as parameters. For example, to run two microservices in sequence, you can write:

    //run the first microservice immediately
    jobId, err = enqueueit.Enqueue("microservice1", &TestData{ Message: "Hello World" }, "services", redis);
    if err != nil {
      panic(err)
    }
    
    //this is a microservice to be run after the previous microservice is being completed
    enqueueit.EnqueueAfter("microservice2", new { Message = "Run this after the first job!" }, jobId);
    

License

EnqueueIt
Copyright © 2023 Cyber Cloud Systems LLC

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see https://www.gnu.org/licenses/.

Any user of this program who modifies its source code and distributes the modified version must make the source code available to all recipients of the software, under the terms of the license.

If you have any questions about this agreement, You can contact us through this email info@cybercloudsys.com

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Enqueue

func Enqueue(appName string, argument interface{}, queue string, redisStorage *RedisStorage) (uuid.UUID, error)

func StartServer

func StartServer(sqlStorage SqlStorage, config *Configuration) error

Types

type Application

type Application struct {
	Name          string
	BaseDirectory string
	LauncherApp   *string
}

type Argument

type Argument struct {
	Name  string
	Value string
	Type  string
}

type BackgroundJob

type BackgroundJob struct {
	Id           uuid.UUID
	JobId        uuid.UUID
	Job          *Job `json:"-"`
	ProcessedBy  *uuid.UUID
	Server       string
	CreatedAt    time.Time
	Status       JobStatus
	JobError     *string    `json:"-"`
	Error        *JobError  `json:",omitempty"`
	StartedAt    *time.Time `json:",omitempty"`
	CompletedAt  *time.Time `json:",omitempty"`
	LastActivity *time.Time `json:",omitempty"`
	JobLogs      []*JobLog  `json:"-"`
	Logs         *string    `json:"-"`
}

type Configuration

type Configuration struct {
	StorageConfig           string
	LongTermStorageConfig   string
	Applications            []*Application
	Servers                 []*Server
	OffDays                 []time.Weekday
	StorageType             StorageType
	LongTermStorageType     LongTermStorageType
	EnableStopServers       bool
	EnableDeleteAll         bool
	ConnectionRetries       int
	ConnectionRetryInterval int
	JobHeartbeatInterval    int
	InactiveJobTimeout      int
	ServerHeartbeatInterval int
	InactiveServerTimeout   int
	LockHeartbeatInterval   int
	InactiveLockTimeout     int
	StorageExpirationInDays int
	StorageSyncInterval     int
	StorageSyncBatchSize    int
	CleanStorageInterval    int
}

func LoadConfiguration

func LoadConfiguration(configPath *string) *Configuration

type DistributedLock

type DistributedLock struct {
	Id           string
	Key          string
	StartedAt    time.Time
	LastActivity time.Time
	// contains filtered or unexported fields
}

func NewDistributedLock

func NewDistributedLock(key string, start bool, redisStorage *RedisStorage) *DistributedLock

func (*DistributedLock) Enter

func (distLock *DistributedLock) Enter()

func (*DistributedLock) Exit

func (distLock *DistributedLock) Exit()

func (*DistributedLock) TryEnter

func (distLock *DistributedLock) TryEnter() bool

func (*DistributedLock) TryEnterTill

func (distLock *DistributedLock) TryEnterTill(timeout time.Duration) bool

func (*DistributedLock) TryEnterTillMS

func (distLock *DistributedLock) TryEnterTillMS(millisecondsTimeout int) bool

type Job

type Job struct {
	Id                    uuid.UUID
	Name                  *string `json:",omitempty"`
	Queue                 string
	AppName               string
	JobArgument           *JobArgument
	Argument              *string `json:",omitempty"`
	CreatedAt             time.Time
	IsRecurring           bool
	StartAt               *time.Time `json:",omitempty"`
	Active                bool
	Recurring             *string                 `json:"-"`
	RecurringPattern      *recur.RecurringPattern `json:",omitempty"`
	Tries                 int
	Type                  JobType
	AfterBackgroundJobIds *string          `json:",omitempty"`
	BackgroundJobs        []*BackgroundJob `json:"-"`
}

func EnqueueAfter

func EnqueueAfter(appName string, argument interface{}, bgJobId uuid.UUID,
	queue string, redisStorage *RedisStorage) (*Job, error)

func NewJob

func NewJob(appName string, argument interface{}, startAt *time.Time, recurPattern *recur.RecurringPattern, queue string) *Job

func Schedule

func Schedule(appName string, argument interface{}, startAt time.Time, queue string, redisStorage *RedisStorage) (*Job, error)

func Subscribe

func Subscribe(appName string, argument interface{}, recurPattern recur.RecurringPattern,
	queue string, redisStorage *RedisStorage) (*Job, error)

func (*Job) Enqueue

func (job *Job) Enqueue(redisStorage *RedisStorage) (uuid.UUID, error)

type JobArgument

type JobArgument struct {
	Assembly      string
	ClassType     string
	MethodName    string
	MetadataToken int
	IsStatic      bool
	Arguments     []*Argument
}

type JobError

type JobError struct {
	Message    string
	StackTrace *string
	InnerError *JobError
}

func NewJobError

func NewJobError(message string) *JobError

type JobLog

type JobLog struct {
	Time        time.Time
	CpuUsage    float64
	MemoryUsage float64
}

type JobStatus

type JobStatus uint8
const (
	Scheduled JobStatus = iota
	Enqueued
	Processing
	Processed
	Canceled
	Interrupted
	Failed
)

func (JobStatus) String

func (status JobStatus) String() string

type JobType

type JobType uint8
const (
	Thread JobType = iota
	Microservice
)

type LongTermStorageType

type LongTermStorageType uint8
const (
	SqlServer LongTermStorageType = iota
	PostgresSQL
	MySql
	Oracle
)

type PassArgumentType

type PassArgumentType uint8
const (
	JobId PassArgumentType = iota
	Base64
)

type Queue

type Queue struct {
	Name          string
	WorkersCount  int
	Retries       int
	RetryInterval int
}

func (*Queue) StartJob

func (queue *Queue) StartJob(bgJobId uuid.UUID, server *Server, redisStorage *RedisStorage)

type RedisStorage

type RedisStorage struct {
	Config *Configuration
	// contains filtered or unexported fields
}

func Connect

func Connect(config *Configuration) (*RedisStorage, error)

func (*RedisStorage) AddJobLog

func (redisStorage *RedisStorage) AddJobLog(BackgroundJobId uuid.UUID, log *JobLog)

func (*RedisStorage) DeleteBackgroundJob

func (redisStorage *RedisStorage) DeleteBackgroundJob(bgJob *BackgroundJob)

func (*RedisStorage) DeleteBackgroundJobs

func (redisStorage *RedisStorage) DeleteBackgroundJobs(bgJobs []*BackgroundJob)

func (*RedisStorage) DeleteDistributedLock

func (redisStorage *RedisStorage) DeleteDistributedLock(key string, id string)

func (*RedisStorage) DeleteExpired

func (redisStorage *RedisStorage) DeleteExpired() error

func (*RedisStorage) DeleteJob

func (redisStorage *RedisStorage) DeleteJob(job *Job)

func (*RedisStorage) Dequeue

func (redisStorage *RedisStorage) Dequeue(queue string) uuid.UUID

func (*RedisStorage) DistributedLocksCount

func (redisStorage *RedisStorage) DistributedLocksCount(key string) int64

func (*RedisStorage) EnqueueAfter

func (redisStorage *RedisStorage) EnqueueAfter(jobId uuid.UUID, bgJobId uuid.UUID) error

func (*RedisStorage) GetAllDistributedLocks

func (redisStorage *RedisStorage) GetAllDistributedLocks() []*DistributedLock

func (*RedisStorage) GetAllJobs

func (redisStorage *RedisStorage) GetAllJobs(status JobStatus) ([]*BackgroundJob, int)

func (*RedisStorage) GetBackgroundJob

func (redisStorage *RedisStorage) GetBackgroundJob(backgroundJobId uuid.UUID,
	includeDetails bool) *BackgroundJob

func (*RedisStorage) GetJob

func (redisStorage *RedisStorage) GetJob(jobId uuid.UUID, loadLatest bool) *Job

func (*RedisStorage) GetJobLogs

func (redisStorage *RedisStorage) GetJobLogs(backgroundJobId uuid.UUID) []*JobLog

func (*RedisStorage) GetJobs

func (redisStorage *RedisStorage) GetJobs(serverId uuid.UUID, queue string) []*Job

func (*RedisStorage) GetServer

func (redisStorage *RedisStorage) GetServer(server *Server)

func (*RedisStorage) GetServerJobs

func (redisStorage *RedisStorage) GetServerJobs(serverId uuid.UUID, queue string) []*BackgroundJob

func (*RedisStorage) HasRunningJobs

func (redisStorage *RedisStorage) HasRunningJobs(serverId uuid.UUID) bool

func (*RedisStorage) IsDistributedLockEntered

func (redisStorage *RedisStorage) IsDistributedLockEntered(key string, id string) bool

func (*RedisStorage) JobEnqueued

func (redisStorage *RedisStorage) JobEnqueued(jobId uuid.UUID, queue string)

func (*RedisStorage) SaveBackgroundJob

func (redisStorage *RedisStorage) SaveBackgroundJob(bgJob *BackgroundJob) error

func (*RedisStorage) SaveDistributedLock

func (redisStorage *RedisStorage) SaveDistributedLock(distLock *DistributedLock)

func (*RedisStorage) SaveJob

func (redisStorage *RedisStorage) SaveJob(job *Job, forceUpdate bool) error

func (*RedisStorage) SaveServer

func (redisStorage *RedisStorage) SaveServer(server *Server)

func (*RedisStorage) ScheduleChanged

func (redisStorage *RedisStorage) ScheduleChanged(serverId uuid.UUID, queue string) bool

type Server

type Server struct {
	Id           uuid.UUID
	Hostname     string
	Queues       []*Queue
	Status       ServerStatus
	StartedAt    time.Time
	LastActivity time.Time
	HasDataSync  bool
	WorkersCount int
	// contains filtered or unexported fields
}

func (*Server) StartService

func (server *Server) StartService(redisStorage *RedisStorage, sqlStorage SqlStorage)

type ServerStatus

type ServerStatus uint8
const (
	Running ServerStatus = iota
	Stopped
)

type SqlStorage

type SqlStorage interface {
	Connect(config *Configuration) error
	SaveBackgroundJobs(bgJobs []*BackgroundJob) error
	DeleteExpired() error
}

type StorageType

type StorageType uint8
const (
	Redis StorageType = iota
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL