schedulejob

package module
v0.0.0-...-77393a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2019 License: Apache-2.0 Imports: 3 Imported by: 0

README

go-schedule-job

Simple cron like job scheduler

package main

import (
	"os"
	"time"
    "runtime"
    "github.com/etcd-io/bbolt"
    
    schedulejob "github.com/9glt/go-schedule-job"
)

var (
	dbPath     = "./test-database.db-test"
	bucketJobs = []byte("jobs")
)

type DB struct {
	db *bbolt.DB
}

func (db *DB) First() ([]byte, []byte) {
	var key []byte
	var payload []byte

	db.db.View(func(tx *bbolt.Tx) error {
		bucket := tx.Bucket(bucketJobs)
		c := bucket.Cursor()
		k, v := c.First()
		key, payload = clone(k), clone(v)
		return nil
	})
	return key, payload
}
func (db *DB) Save(key, value []byte) {
	db.db.Update(func(tx *bbolt.Tx) error {
		bucket, _ := tx.CreateBucketIfNotExists(bucketJobs)
		bucket.Put(key, value)
		return nil
	})
}
func (db *DB) Delete(key []byte) {
	db.db.Update(func(tx *bbolt.Tx) error {
		bucket := tx.Bucket(bucketJobs)
		bucket.Delete(key)
		return nil
	})
}

var _ = (schedulejob.Store)(&DB{})

func TestMain(t *testing.T) {
	db, err := bbolt.Open(dbPath, 0755, nil)
	if err != nil {
		t.Fatal(err)
	}
	defer func() {
		db.Close()
		os.Remove(dbPath)
	}()

    sc := schedulejob.New(&DB{db})
    
    // schedule job after 5seconds
    sc.Schedule(5, []byte("http://test1"))
    // true for auto delete items after send to channel
	ch, _ := sc.Worker(true)
	go func() {
	for1:
		for {
			select {
			case item, ok := <-ch:
				if ok {
                    // do you job with item.Payload
					continue
				}
				break for1
			}
		}
	}()
    
    runtime.Goexit()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Item

type Item struct {
	Payload []byte
	// contains filtered or unexported fields
}

Item struct

func (*Item) Delete

func (i *Item) Delete()

Delete from schedule key after done

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler main truct

func New

func New(store Store) *Scheduler

New creates new schedule instance

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(after int64, payload []byte)

Schedule takes seconds to delay job and payload

func (*Scheduler) Worker

func (s *Scheduler) Worker(autodelete bool) (chan Item, func())

Worker periodicaly reads store and found jobs pushes to channel. returns chan Item + cancel func

type Store

type Store interface {
	First() ([]byte, []byte)
	Save(key, value []byte)
	Delete(key []byte)
}

Store interface for data persists and read data from database

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL