pcron

package module
v0.0.0-...-6a1fb7b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: MIT Imports: 2 Imported by: 0

README

Built with GoLang Version License

PCron

PCron is a Go package designed to simplify the integration of cron scheduling with Kafka message production. It provides a structured approach for running scheduled tasks and publishing messages to Kafka topics based on predefined schedules.

Features

  • Integration with Kafka: PCron seamlessly integrates with Kafka through the use of the sarama package, allowing easy production of messages to Kafka topics.

  • Error Handling: PCron provides an error channel (errChan) to capture and handle errors that occur during job execution. This allows for robust error logging or alerting mechanisms to be implemented.

  • Flexible Scheduling: With PCron, you can define custom schedules using cron expressions, providing flexibility in scheduling tasks according to specific time patterns.

Installation

Install it in the usual way:

go get -u github.com/thegeorgenikhil/pcron

Usage

Below is a basic example demonstrating how to use PCron to schedule a job that produces messages to a Kafka topic:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/IBM/sarama"
	"github.com/thegeorgenikhil/pcron"
)

const (
	Name = "hello-world-cron"
	Schedule = "* * * * *" // Run every minute
	TopicName = "hello-world"
)

var (
	BrokerURLs = []string{"localhost:9092"}
)

type HelloWorldJob struct{}

// This method is called by the producer cron to run the job
// It returns a slice of producer messages to be sent to Kafka
func (hwj *HelloWorldJob) Run() ([]*sarama.ProducerMessage, error) {
	messages := []*sarama.ProducerMessage{
		{
			Topic: TopicName,
			Key:   sarama.StringEncoder("key"),
			Value: sarama.StringEncoder("Hello World!"),
		},
	}

	return messages, nil
}

func main() {
	job := &HelloWorldJob{}

    // Before running the producer cron, make sure you create the topic in Kafka.
    // An example for creating a topic is given in the `examples` folder.
	createTopic()

	// Create producer cron config
	config := pcron.NewConfig(Name, Schedule, job, BrokerURLs)

	// Create producer cron
	producerCron, err := pcron.New(config)
	if err != nil {
		log.Fatalf("Error creating producer cron: %v", err)
	}

	// Start the cron
	err = producerCron.StartCron()
	if err != nil {
		log.Fatalf("Error starting producer cron: %v", err)
	}
	defer producerCron.StopCron()

	// Handle errors from the job in a separate goroutine
	go func() {
		for err := range producerCron.GetErrorChan() {
			log.Printf("[ERROR]: %v", err)
		}
	}()

	select {}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Name is the name of the cron job.
	Name string
	// Schedule is the cron schedule for the job.
	Schedule string
	// Job is the job to be executed. It has a Run method where the actual job logic is implemented and returns a list of messages to be published.
	Job Job
	// BrokerURLs is the list of Kafka broker URLs.
	BrokerURLs []string
	// ProducerConfig is the configuration for the Kafka producer.
	ProducerConfig *sarama.Config
}

Config holds the configuration for a cron job.

func NewConfig

func NewConfig(name, schedule string, job Job, brokerURls []string, producerConfig ...*sarama.Config) *Config

NewConfig creates a new Config instance.

type Job

type Job interface {
	Run() ([]*sarama.ProducerMessage, error)
}

Job defines the interface for jobs to be executed.

type ProducerCron

type ProducerCron struct {
	// contains filtered or unexported fields
}

ProducerCron contains the configuration and associated cron meth

func New

func New(cfg *Config) (*ProducerCron, error)

NewProducerCron creates a new ProducerCron instance.

func (*ProducerCron) GetErrorChan

func (cp *ProducerCron) GetErrorChan() <-chan error

GetErrorChan returns the error channel.

func (*ProducerCron) StartCron

func (cp *ProducerCron) StartCron() error

StartCron starts the cron scheduler and runs the job at the specified schedule.

func (*ProducerCron) StopCron

func (cp *ProducerCron) StopCron()

StopCron stops the cron scheduler.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL