loafergo

package module
v2.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2024 License: MIT Imports: 9 Imported by: 0

README

Loafer Go


Lib for Go with:

  • Async pooling of AWS/SQS messages
  • Producer of AWS/SNS messages
Install

Manual install:

go get -u github.com/justcodes/loafer-go/v2

Golang import:

import "github.com/justcodes/loafer-go/v2"
Usage

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	loafergo "github.com/justcodes/loafer-go/v2"
	"github.com/justcodes/loafer-go/v2/aws"
	"github.com/justcodes/loafer-go/v2/aws/sns"
	"github.com/justcodes/loafer-go/v2/aws/sqs"
)

const (
	awsEndpoint  = "http://localhost:4566"
	awsKey       = "dummy"
	awsSecret    = "dummy"
	awsAccountID = "000000000000"
	awsRegion    = "us-east-1"
	awsProfile   = "test-profile"
	workPool     = 5
	topicOne     = "my_topic__test"
	topicTwo     = "my_topic__test2"
	queueOne     = "example-1"
	queueTwo     = "example-2"
)

func main() {
	defer panicRecover()
	ctx := context.Background()
	awsConfig := &aws.Config{
		Key:      awsKey,
		Secret:   awsSecret,
		Region:   awsRegion,
		Profile:  awsProfile,
		Hostname: awsEndpoint,
	}

	snsClient, err := sns.NewClient(ctx, &aws.ClientConfig{
		Config:     awsConfig,
		RetryCount: 4,
	})

	producer, err := sns.NewProducer(&sns.Config{
		SNSClient: snsClient,
	})
	if err != nil {
		log.Fatal(err)
	}

	// Produce message async
	wg := &sync.WaitGroup{}
	wg.Add(2)
	go produceMessage(ctx, wg, producer, topicOne)
	go produceMessage(ctx, wg, producer, topicTwo)
	wg.Wait()
	log.Println("all messages was published")

	log.Printf("\n\n******** Start queues consumers ********\n\n")
	time.Sleep(2 * time.Second)

	sqsClient, err := sqs.NewClient(ctx, &aws.ClientConfig{
		Config:     awsConfig,
		RetryCount: 4,
	})

	var routes = []loafergo.Router{
		sqs.NewRoute(
			&sqs.Config{
				SQSClient: sqsClient,
				Handler:   handler1,
				QueueName: queueOne,
			},
			sqs.RouteWithVisibilityTimeout(25),
			sqs.RouteWithMaxMessages(5),
			sqs.RouteWithWaitTimeSeconds(8),
			sqs.RouteWithWorkerPoolSize(workPool),
		),
		sqs.NewRoute(&sqs.Config{
			SQSClient: sqsClient,
			Handler:   handler2,
			QueueName: queueTwo,
		}),
		sqs.NewRoute(&sqs.Config{
			SQSClient: sqsClient,
			Handler:   handler3,
			QueueName: queueTwo,
		}),
	}

	c := &loafergo.Config{}
	manager := loafergo.NewManager(c)
	manager.RegisterRoutes(routes)

	// Run manager
	err = manager.Run(ctx)
	if err != nil {
		panic(err)
	}
}

func handler1(ctx context.Context, m loafergo.Message) error {
	fmt.Printf("Message received handler1:  %s\n ", string(m.Body()))
	return nil
}

func handler2(ctx context.Context, m loafergo.Message) error {
	fmt.Printf("Message received handler2: %s\n ", string(m.Body()))
	return nil
}

func handler2(ctx context.Context, m loafergo.Message) error {
	fmt.Printf("Message received handler2: %s\n ", string(m.Body()))
	m.Backoff(2 * time.Hour) // visibility timeout will be extended in 2h
	return nil
}

func produceMessage(ctx context.Context, wg *sync.WaitGroup, producer sns.Producer, topic string) {
	for i := 0; i < 20; i++ {
		topicARN, err := sns.BuildTopicARN(awsRegion, awsAccountID, topic)
		if err != nil {
			log.Fatal(err)
		}

		id, err := producer.Produce(ctx, &sns.PublishInput{
			Message:  fmt.Sprintf("{\"message\": \"Hello world!\", \"topic\": \"%s\", \"id\": %d}", topic, i),
			TopicARN: topicARN,
		})
		if err != nil {
			log.Println("error to produce message: ", err)
			continue
		}
		fmt.Printf("Message produced to topic %s; id: %s \n", topic, id)
	}
	wg.Done()
}

func panicRecover() {
	if r := recover(); r != nil {
		log.Panicf("error: %v", r)
	}
	log.Println("example stopped")
}

TODO
  • Add more tests
  • Add support for sending messages to SQS
  • Add support for sending messages to SNS
Acknowledgments

This lib is inspired by loafer and gosqs.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyInput = newError("input must be filled")

ErrEmptyInput occurs when the producer received an empty or nil input

View Source
var ErrEmptyParam = newError("required parameter is missing")

ErrEmptyParam occurs when the required parameter is missing

View Source
var ErrEmptyRequiredField = newError("required field is missing")

ErrEmptyRequiredField occurs when the required field is missing

View Source
var ErrGetMessage = newError("unable to retrieve message")

ErrGetMessage fires when a request to retrieve messages from sqs fails

View Source
var ErrInvalidCreds = newError("invalid aws credentials")

ErrInvalidCreds invalid credentials

View Source
var ErrMarshal = newError("unable to marshal request")

ErrMarshal unable to marshal request

View Source
var ErrMessageProcessing = newError("processing time exceeding limit")

ErrMessageProcessing occurs when a message has exceeded the consumption time limit set by aws SQS

View Source
var ErrNoHandler = newError("handler is nil")

ErrNoHandler occurs when the handler is nil

View Source
var ErrNoRoute = newError("message received without a route")

ErrNoRoute message received without a route

View Source
var ErrNoSQSClient = newError("sqs client is nil")

ErrNoSQSClient occurs when the sqs client is nil

Functions

This section is empty.

Types

type Config

type Config struct {
	Logger Logger

	// RetryTimeout is used when the Route GetMessages method returns error
	// By default the retry timeout is 5s
	RetryTimeout time.Duration
}

Config defines the loafer Manager configuration

type Error

type Error struct {
	Err string `json:"err"`
	// contains filtered or unexported fields
}

Error defines the error handler for the loafergo package. Error satisfies the error interface and can be used safely with other error handlers

func (*Error) Context

func (e *Error) Context(err error) *Error

Context is used for creating a new instance of the error with the contextual error attached

func (*Error) Error

func (e *Error) Error() string

Error is used for implementing the error interface, and for creating a proper error string

type Handler

type Handler func(context.Context, Message) error

Handler represents the handler function

type Logger

type Logger interface {
	Log(...interface{})
}

A Logger is a minimalistic interface for the loafer to log messages to. Should be used to provide custom logging writers for the loafer to use.

type LoggerFunc

type LoggerFunc func(...interface{})

A LoggerFunc is a convenience type to convert a function taking a variadic list of arguments and wrap it so the Logger interface can be used.

Example:

loafergo.NewManager(context.Background(), loafergo.Config{Logger: loafergo.LoggerFunc(func(args ...interface{}) {
    fmt.Fprintln(os.Stdout, args...)
})})

func (LoggerFunc) Log

func (f LoggerFunc) Log(args ...interface{})

Log calls the wrapped function with the arguments provided

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager holds the routes and config fields

func NewManager

func NewManager(config *Config) *Manager

NewManager creates a new Manager with the given configuration

func (*Manager) GetRoutes

func (m *Manager) GetRoutes() []Router

GetRoutes returns the available routes as a slice of Router type

func (*Manager) RegisterRoute

func (m *Manager) RegisterRoute(route Router)

RegisterRoute register a new route to the Manager

func (*Manager) RegisterRoutes

func (m *Manager) RegisterRoutes(routes []Router)

RegisterRoutes register more than one route to the Manager

func (*Manager) Run

func (m *Manager) Run(ctx context.Context) error

Run the Manager distributing the worker pool by the number of routes returns errors if no routes

type Message

type Message interface {
	// Decode will unmarshal the body message into a supplied output using json
	Decode(out interface{}) error
	// Attribute will return the custom attribute that was sent throughout the request.
	Attribute(key string) string
	// Attributes will return the custom attributes that were sent with the request.
	Attributes() map[string]string
	// SystemAttributeByKey will return the system attributes by key.
	SystemAttributeByKey(key string) string
	// SystemAttributes will return the system attributes.
	SystemAttributes() map[string]string
	// Metadata will return the metadata that was sent throughout the request.
	Metadata() map[string]string
	// Identifier will return an identifier associated with the message ReceiptHandle.
	Identifier() string
	// Dispatch used to dispatch message if necessary
	Dispatch()
	// Backoff used to change the visibilityTimeout of the message
	// when a message is backedoff it will not be removed from the queue
	// instead it will extend the visibility timeout of the message
	Backoff(delay time.Duration)
	// BackedOff used to check if the message was backedOff by the handler
	BackedOff() bool
	// Body used to get the message Body
	Body() []byte
	// Message returns the body message
	Message() string
	// TimeStamp returns the message timestamp
	TimeStamp() time.Time
	// DecodeMessage will unmarshal the message into a supplied output using json
	DecodeMessage(out any) error
}

Message represents the message interface methods

type Router

type Router interface {
	Configure(ctx context.Context) error
	GetMessages(ctx context.Context) ([]Message, error)
	HandlerMessage(ctx context.Context, msg Message) error
	Commit(ctx context.Context, m Message) error
	WorkerPoolSize(ctx context.Context) int32
	VisibilityTimeout(ctx context.Context) int32
}

Router holds the Route methods to configure and run

type SNSClient

type SNSClient interface {
	Publish(ctx context.Context, params *sns.PublishInput, optFns ...func(*sns.Options)) (*sns.PublishOutput, error)
	PublishBatch(ctx context.Context, params *sns.PublishBatchInput, optFns ...func(*sns.Options)) (*sns.PublishBatchOutput, error)
}

SNSClient represents the aws sns client methods

type SQSClient

type SQSClient interface {
	ChangeMessageVisibility(
		ctx context.Context,
		params *sqs.ChangeMessageVisibilityInput,
		optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
	GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

SQSClient represents the aws sqs client methods

Directories

Path Synopsis
aws
sns
sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL