loafergo

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2024 License: MIT Imports: 8 Imported by: 0

README

Loafer Go


Lib for GO with async pooling of AWS/SQS messages

Install

Manual install:

go get -u github.com/justcodes/loafer-go

Golang import:

import "github.com/justcodes/loafer-go"
Usage

package main

import (
    "context"
    "fmt"
    "log"
    
    loafergo "github.com/justcodes/loafer-go"
    "github.com/justcodes/loafer-go/sqs"
)

const (
    awsEndpoint = "http://localhost:4566"
    awsKey      = "dummy"
    awsSecret   = "dummy"
    awsRegion   = "us-east-1"
    awsProfile  = "test-profile"
    workPool    = 5
)

func main() {
    defer panicRecover()
    ctx := context.Background()
    awsConfig := &sqs.AWSConfig{
        Key:      awsKey,
        Secret:   awsSecret,
        Region:   awsRegion,
        Profile:  awsProfile,
        Hostname: awsEndpoint,
    }
    
    sqsClient, err := sqs.NewSQSClient(ctx, &sqs.ClientConfig{
        AwsConfig:  awsConfig,
        RetryCount: 4,
    })
    
    var routes = []loafergo.Router{
        sqs.NewRoute(
            &sqs.Config{
                SQSClient: sqsClient,
                Handler:   handler1,
                QueueName: "example-1",
            },
            sqs.RouteWithVisibilityTimeout(25),
            sqs.RouteWithMaxMessages(5),
            sqs.RouteWithWaitTimeSeconds(8),
			sqs.RouteWithWorkerPoolSize(workPool),
        ),
        sqs.NewRoute(
            &sqs.Config{
                SQSClient: sqsClient,
                Handler:   handler2,
                QueueName: "example-2",
            }),
    }
    
    c := &loafergo.Config{}
    manager := loafergo.NewManager(c)
    manager.RegisterRoutes(routes)
    
    log.Println("starting consumers")
    err = manager.Run(ctx)
    if err != nil {
        panic(err)
    }
}
    
func handler1(ctx context.Context, m loafergo.Message) error {
    fmt.Printf("Message received handler1:  %s\n ", string(m.Body()))
    return nil
}
    
func handler2(ctx context.Context, m loafergo.Message) error {
    fmt.Printf("Message received handler2: %s\n ", string(m.Body()))
    return nil
}
	
func panicRecover() {
    if r := recover(); r != nil {
        log.Panicf("error: %v", r)
    }
    log.Println("example stopped")
}
TODO
  • Add more tests
  • Add support for sending messages to SQS
  • Add support for sending messages to SNS
Acknowledgments

This lib is inspired by loafer and gosqs.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyParam = newSQSErr("required parameter is missing")

ErrEmptyParam occurs when the required parameter is missing

View Source
var ErrEmptyRequiredField = newSQSErr("required field is missing")

ErrEmptyRequiredField occurs when the required field is missing

View Source
var ErrGetMessage = newSQSErr("unable to retrieve message")

ErrGetMessage fires when a request to retrieve messages from sqs fails

View Source
var ErrInvalidCreds = newSQSErr("invalid aws credentials")

ErrInvalidCreds invalid credentials

View Source
var ErrMarshal = newSQSErr("unable to marshal request")

ErrMarshal unable to marshal request

View Source
var ErrMessageProcessing = newSQSErr("processing time exceeding limit")

ErrMessageProcessing occurs when a message has exceeded the consumption time limit set by aws SQS

View Source
var ErrNoHandler = newSQSErr("handler is nil")

ErrNoHandler occurs when the handler is nil

View Source
var ErrNoRoute = newSQSErr("message received without a route")

ErrNoRoute message received without a route

View Source
var ErrNoSQSClient = newSQSErr("sqs client is nil")

ErrNoSQSClient occurs when the sqs client is nil

Functions

This section is empty.

Types

type Config

type Config struct {
	Logger Logger

	// RetryTimeout is used when the Route GetMessages method returns error
	// By default the retry timeout is 5s
	RetryTimeout time.Duration
}

Config defines the loafer Manager configuration

type Handler

type Handler func(context.Context, Message) error

Handler represents the handler function

type Logger

type Logger interface {
	Log(...interface{})
}

A Logger is a minimalistic interface for the loafer to log messages to. Should be used to provide custom logging writers for the loafer to use.

type LoggerFunc added in v1.0.0

type LoggerFunc func(...interface{})

A LoggerFunc is a convenience type to convert a function taking a variadic list of arguments and wrap it so the Logger interface can be used.

Example:

loafergo.NewManager(context.Background(), loafergo.Config{Logger: loafergo.LoggerFunc(func(args ...interface{}) {
    fmt.Fprintln(os.Stdout, args...)
})})

func (LoggerFunc) Log added in v1.0.0

func (f LoggerFunc) Log(args ...interface{})

Log calls the wrapped function with the arguments provided

type Manager added in v1.0.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager holds the routes and config fields

func NewManager

func NewManager(config *Config) *Manager

NewManager creates a new Manager with the given configuration

func (*Manager) GetRoutes added in v1.0.0

func (m *Manager) GetRoutes() []Router

GetRoutes returns the available routes as a slice of Router type

func (*Manager) RegisterRoute added in v1.0.0

func (m *Manager) RegisterRoute(route Router)

RegisterRoute register a new route to the Manager

func (*Manager) RegisterRoutes added in v1.0.0

func (m *Manager) RegisterRoutes(routes []Router)

RegisterRoutes register more than one route to the Manager

func (*Manager) Run added in v1.0.0

func (m *Manager) Run(ctx context.Context) error

Run the Manager distributing the worker pool by the number of routes returns errors if no routes

type Message

type Message interface {
	// Decode will unmarshal the message into a supplied output using json
	Decode(out interface{}) error
	// Attribute will return the custom attribute that was sent throughout the request.
	Attribute(key string) string
	// Metadata will return the metadata that was sent throughout the request.
	Metadata() map[string]string
	// Identifier will return a message identifier
	Identifier() string
	// Dispatch used to dispatch message if necessary
	Dispatch()
	// Body used to get the message Body
	Body() []byte
}

Message represents the message interface methods

type Router added in v1.0.0

type Router interface {
	Configure(ctx context.Context) error
	GetMessages(ctx context.Context) ([]Message, error)
	HandlerMessage(ctx context.Context, msg Message) error
	Commit(ctx context.Context, m Message) error
	WorkerPoolSize(ctx context.Context) int32
}

Router holds the Route methods to configure and run

type SQSClient added in v1.0.0

type SQSClient interface {
	ChangeMessageVisibility(
		ctx context.Context,
		params *sqs.ChangeMessageVisibilityInput,
		optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
	GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

SQSClient represents the aws sqs client methods

type SQSError

type SQSError struct {
	Err string `json:"err"`
	// contains filtered or unexported fields
}

SQSError defines the error handler for the loafergo package. SQSError satisfies the error interface and can be used safely with other error handlers

func (*SQSError) Context

func (e *SQSError) Context(err error) *SQSError

Context is used for creating a new instance of the error with the contextual error attached

func (*SQSError) Error

func (e *SQSError) Error() string

Error is used for implementing the error interface, and for creating a proper error string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL