goscheduler

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2019 License: MIT Imports: 15 Imported by: 0

README

GoScheduler

Inspired by https://github.com/gocraft/work and https://github.com/nsqio/go-nsq, GoScheduler is a simple scheduler for Golang. It uses TCP connection to communicate between scheduler and client.

Scheduler

Scheduler service serves as the main gateway for receiving and pushing jobs. You could add storage to save your jobs.


	// Set your logger
	myLogger := log.New(os.Stderr, "", log.LstdFlags|log.Llongfile)

	// Scheduler config
	config := goscheduler.SchedulerConfig{
		Address: ":7000",

		Logger: myLogger,
		LogLvl: goscheduler.LogLevelDebug,
	}

	// Create scheduler instance
	sc, err := goscheduler.NewScheduler(config)
	if err != nil {
		log.Fatalln(err)
	}
	defer sc.Stop()

Client

Client service serves as the job producer or consumer.


	// Set your logger
	myLogger := log.New(os.Stderr, "", log.LstdFlags|log.Llongfile)

	// Client config
	config := goscheduler.ClientConfig{
		Address: "localhost:7000",

		Logger: myLogger,
		LogLvl: goscheduler.LogLevelDebug,
	}

	// Create client instance
	client, err := goscheduler.NewClient(config)
	if err != nil {
		log.Fatalln(err)
	}
	defer client.Stop()

	// -- As Producer

	// Adding new job
	// will run in 5 seconds
	client.AddJob("channel_name", time.Now().Add(5*time.Second), map[string]interface{}{
		"name": "foo",
		"age": 28,
	})


	// -- As Consumer

	// Register callback for new channel listener
	client.Listen("channel_name", callback)


	// Create callback function
	func callback(job *goscheduler.Job) error {
		log.Println("Running job", job.Channel, job.ID, job.Args)
		return nil
	}

Storage

Optional backup system for scheduler. You can build your own and save it anywhere (database, memory, file, etc)

Documentation

Index

Constants

View Source
const (

	// ErrorJobHasExpired : Requested job has expired and scheduler won't run it
	ErrorJobHasExpired = "Job has expired"

	// ErrorNewConnection : Error occured during net.Listen for new TCP connection
	ErrorNewConnection = "Failed to open TCP connection"

	// ErrorClosedConnection : Error occured because the connection is already closed
	ErrorClosedConnection = "Using closed connection"

	// ErrorInvalidChannelName : A valid channel name must contain alphabets, numbers, or underscore
	ErrorInvalidChannelName = "Invalid channel name"

	// ErrorInvalidJobID : A valid channel name must contain alphabets or numbers
	ErrorInvalidJobID = "Invalid Job ID"
)
View Source
const (
	JobStatusInactive = 0
	JobStatusActive   = 1
)

All job statuses

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	AddJob(channel string, runAt time.Time, args map[string]interface{}) (string, error)
	Listen(channel string, f ClientCallbackFunc) error
	RemoveJob(channel, id string) error
	Close()
}

A Client interface provides all functions for add and consume jobs

func NewClient

func NewClient(config ClientConfig) (Client, error)

NewClient create new client

type ClientCallbackFunc

type ClientCallbackFunc func(job *Job) error

ClientCallbackFunc represents callback function for a specific channel

type ClientConfig

type ClientConfig struct {
	Address string

	Logger logger
	LogLvl LogLevel

	ReadDeadline  time.Duration
	WriteDeadline time.Duration
}

ClientConfig contains every configuration to create a new Client

type Delegator

type Delegator interface {
	OnIOError(index int)
	OnRequestReceived(index int, data []byte, addr string)
	OnJobReceived(data []byte)
}

Delegator is an interface of methods that are used as callbacks in Protocol

type Job

type Job struct {
	Channel string                 `json:"channel"`
	ID      string                 `json:"id"`
	Args    map[string]interface{} `json:"args"`
	RunAt   time.Time              `json:"run_at"`
	Status  int                    `json:"status"`
}

Job represents a scheduled job that will run in specific time

type JobBatch

type JobBatch struct {
	RunAt time.Time `json:"run_at"`
	Jobs  []*Job
}

JobBatch represents a batch of jobs with same scheduled time

type LogLevel

type LogLevel int

LogLevel specifies the severity of a given log message

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarning
	LogLevelError
)

Log levels

func (LogLevel) String

func (lvl LogLevel) String() string

String returns the string form for a given LogLevel

type Protocol

type Protocol struct {
	// contains filtered or unexported fields
}

func (*Protocol) Close

func (p *Protocol) Close()

Close TCP connection

func (*Protocol) Flush

func (p *Protocol) Flush() error

Flush writes all buffered data to the underlying TCP connection

func (*Protocol) Read

func (p *Protocol) Read(d []byte) (int, error)

Read performs a deadlined read on the underlying TCP connection

func (*Protocol) SetLogger

func (p *Protocol) SetLogger(l logger, lvl LogLevel)

SetLogger assigns the logger to use as well as a level.

func (*Protocol) Write

func (p *Protocol) Write(d []byte) (int, error)

Write performs a deadlined write on the underlying TCP connection

func (*Protocol) WriteCommand

func (p *Protocol) WriteCommand(cmd *command.Command) error

WriteCommand is a goroutine safe method to write a Command to this connection, and flush.

type ProtocolConfig

type ProtocolConfig struct {
	Index         int
	Conn          net.Conn
	Delegator     Delegator
	ReadDeadline  time.Duration
	WriteDeadline time.Duration
}

type Scheduler

type Scheduler interface {
	Stop()
}

Scheduler is the core scheduling system that receives and processes jobs

func NewScheduler

func NewScheduler(config SchedulerConfig) (Scheduler, error)

NewScheduler create new scheduler

type SchedulerConfig

type SchedulerConfig struct {
	Storage Storage

	Address string

	Logger logger
	LogLvl LogLevel

	ReadDeadline  time.Duration
	WriteDeadline time.Duration
}

SchedulerConfig contains every configuration to create a new Scheduler

type Storage

type Storage interface {
	GetActiveJobs() (jobs []*Job, err error)
	InsertJob(job *Job) error
	RemoveJob(channel, id string) error
}

Storage provides initial data for scheduler during initialization

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL