gsb

package module
v0.0.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2021 License: BSD-3-Clause Imports: 22 Imported by: 0

README

Go Service Bus (GSB)

Vision

  1. More impact, More quickly, Less code.

Principles

  1. Keep it Simple
  2. You aren't gonna need it
  3. Design for Evolution
  4. Least Astonishment
  5. Avoid Premature Optimisation

Front of Mind

  1. What are you doing
  2. What can go wrong

Rules

  1. Anything that fails during startup / initialisation should fail immediatley with descriptive STDOUT.
  2. Anything that fails during execution should return an error and be processed by the system, to allow continuous operation.
  3. Handlers must implement Handler interface
  4. Handlers are named, and Handlers are run when Named Messages arrive.
  5. All code execution is initiated by receiving a message.

Guidelines

  1. All external resources are initialised and validated on start up, then provided on execution as Application Resources.

Rationale

  1. Try to raise the level of infrastructure.

  2. The infrastructure,

  3. takes care of starting, committing and rolling back transactions.

  4. runs all code within the context of a message.

  5. provides automatic retry if the code errors

  6. moves failed messages to the error queue

Description

The backbone of GSB is a message queue.

All work is initiated by pulling messages from this message queue.

If all external dependencies, eg, queues, databases, etc are provided as Application Resources, gsb will manage all transactions, meaning the message then provides a transaction boundary.

In this model, a number of options present when errors occur while processing a message. One, they can be retried. For example, their could be a network glitch which has corrected itself by the time the message is retried. Two, for persistent errors, the message can be moved to error queue. For example, a database may be down. In this case, the database can be restarted, and the message sent back for processing.

Variables

  1. NAME
  2. RETRIES
  3. ERROR_Q
  4. AUDIT_Q
  5. MQ
  6. GSB_
  7. GSBMON_
  8. VERBOSE
Application Resources

GSB_[name]=type://[username:password@]host/path

All external resources are initialised and validated on start up, then provided on execution.

  1. DynamoDb
  2. dynamodb
  3. FileSystem
  4. s3
  5. fs
  6. KeyValue
  7. dir
  8. redis
  9. Queue
  10. beanstalk
  11. sqs
  12. inmemq
AWS Specific
  1. AWS_ENDPOINT
  2. AWS_REGION
Monitors

GSBMON_[name]=type://[username:password@]host/path

  1. dir
  2. queue

COMMANDS

To enable the ability to configure the running system, gsb will process commands coming in via the backbone MQ,

  1. GSB_VERBOSE_ON
  2. GSB_VERBOSE_OFF
  3. GSB_AUDIT_ON
  4. GSB_AUDIT_OFF

References

  1. 8 Fallacies of Distributed Computing
  2. 12 Factor App

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AwsSetQueueTimeout added in v0.0.5

func AwsSetQueueTimeout(svc *sqs.Client, envUrl *url.URL, queueUrl *string) error

func GetTimeoutFromUrl added in v0.0.8

func GetTimeoutFromUrl(qUrl *url.URL) (string, error)

func NewAwsConfig added in v0.0.3

func NewAwsConfig() aws.Config

func ResetMqInMemory

func ResetMqInMemory()

Types

type AppResBucket added in v0.0.15

type AppResBucket struct {
	// contains filtered or unexported fields
}

func NewAppResBucket added in v0.0.15

func NewAppResBucket(url *url.URL) *AppResBucket

func (*AppResBucket) Del added in v0.0.15

func (b *AppResBucket) Del(name string) error

func (*AppResBucket) Exists added in v0.0.15

func (b *AppResBucket) Exists(name string) (bool, error)

func (*AppResBucket) Get added in v0.0.15

func (b *AppResBucket) Get(name string) (interface{}, error)

func (*AppResBucket) Init added in v0.0.15

func (b *AppResBucket) Init() error

func (*AppResBucket) Put added in v0.0.15

func (b *AppResBucket) Put(name string, item interface{}) error

type AppResDir

type AppResDir struct {
	// contains filtered or unexported fields
}

func NewAppResDir

func NewAppResDir(url *url.URL) *AppResDir

func (*AppResDir) Del

func (dir *AppResDir) Del(name string) error

func (*AppResDir) Exists added in v0.0.10

func (dir *AppResDir) Exists(name string) (bool, error)

func (*AppResDir) Get

func (dir *AppResDir) Get(name string) (interface{}, error)

func (*AppResDir) Init

func (dir *AppResDir) Init() error

func (*AppResDir) Put

func (dir *AppResDir) Put(name string, item interface{}) error

type AppResDynamoDb

type AppResDynamoDb struct {
	// contains filtered or unexported fields
}

func NewAppResDynamoDb

func NewAppResDynamoDb(url *url.URL) *AppResDynamoDb

func (*AppResDynamoDb) CreateTable added in v0.0.5

func (ddb *AppResDynamoDb) CreateTable(input *dynamodb.CreateTableInput) error

func (*AppResDynamoDb) Get

func (ddb *AppResDynamoDb) Get(tableName string, key map[string]string) (interface{}, error)

func (*AppResDynamoDb) Init

func (ddb *AppResDynamoDb) Init() error

func (*AppResDynamoDb) ListTables added in v0.0.7

func (ddb *AppResDynamoDb) ListTables() ([]string, error)

func (*AppResDynamoDb) Put

func (ddb *AppResDynamoDb) Put(tableName string, item interface{}) error

type AppResFS

type AppResFS struct {
	// contains filtered or unexported fields
}

func NewAppResFS

func NewAppResFS(url *url.URL) *AppResFS

func (*AppResFS) Del

func (s *AppResFS) Del(path string, name string) error

func (*AppResFS) Get

func (s *AppResFS) Get(path string, name string) ([]byte, error)

func (*AppResFS) Init

func (fs *AppResFS) Init() error

func (*AppResFS) Put

func (s *AppResFS) Put(path string, name string, payload []byte) error

type AppResFileSystem

type AppResFileSystem interface {
	Init() error
	Put(path string, name string, payload []byte) error
	Get(path string, name string) ([]byte, error)
	Del(path string, name string) error
}

type AppResKeyVal

type AppResKeyVal interface {
	Init() error
	Put(key string, item interface{}) error
	Exists(key string) (bool, error)
	Get(key string) (interface{}, error)
	Del(key string) error
}

type AppResQueue

type AppResQueue interface {
	Init() error
	Name() string
	Send(payload string) error
	Retrieve() (*string, error)
	Commit() error
}

type AppResQueueSqs added in v0.0.4

type AppResQueueSqs struct {
	// contains filtered or unexported fields
}

func NewAppResQueueSqs added in v0.0.4

func NewAppResQueueSqs(url *url.URL) *AppResQueueSqs

func (*AppResQueueSqs) Commit added in v0.0.4

func (q *AppResQueueSqs) Commit() error

func (*AppResQueueSqs) CreateQueue added in v0.0.5

func (q *AppResQueueSqs) CreateQueue()

func (*AppResQueueSqs) GetQueueUrl added in v0.0.4

func (q *AppResQueueSqs) GetQueueUrl(queueName string) (*sqs.GetQueueUrlOutput, error)

func (*AppResQueueSqs) Init added in v0.0.4

func (q *AppResQueueSqs) Init() error

func (*AppResQueueSqs) Name added in v0.0.4

func (q *AppResQueueSqs) Name() string

func (*AppResQueueSqs) Retrieve added in v0.0.4

func (q *AppResQueueSqs) Retrieve() (*string, error)

func (*AppResQueueSqs) Send added in v0.0.4

func (q *AppResQueueSqs) Send(Message string) error

type AppResRedis

type AppResRedis struct {
	// contains filtered or unexported fields
}

func NewAppResRedis

func NewAppResRedis(url *url.URL) *AppResRedis

func (*AppResRedis) Del

func (r *AppResRedis) Del(key string) error

func (*AppResRedis) Exists added in v0.0.10

func (r *AppResRedis) Exists(name string) (bool, error)

func (*AppResRedis) Get

func (r *AppResRedis) Get(key string) (interface{}, error)

func (*AppResRedis) Init

func (r *AppResRedis) Init() error

func (*AppResRedis) Put

func (r *AppResRedis) Put(key string, item interface{}) error

type AppResS3

type AppResS3 struct {
	// contains filtered or unexported fields
}

func NewAppResS3

func NewAppResS3(url *url.URL) *AppResS3

func (*AppResS3) Del

func (s *AppResS3) Del(path string, name string) error

func (*AppResS3) Get

func (s *AppResS3) Get(path string, name string) ([]byte, error)

func (*AppResS3) Init

func (s *AppResS3) Init() error

func (*AppResS3) Put

func (s *AppResS3) Put(path string, name string, payload []byte) error

type AppResources

type AppResources struct {
	// contains filtered or unexported fields
}

func NewAppResources

func NewAppResources() *AppResources

func (*AppResources) GetDDb added in v0.0.12

func (ar *AppResources) GetDDb(name string) (*AppResDynamoDb, error)

func (*AppResources) GetFileSystem added in v0.0.12

func (ar *AppResources) GetFileSystem(name string) (*AppResFileSystem, error)

func (*AppResources) GetKeyVal added in v0.0.12

func (ar *AppResources) GetKeyVal(name string) (*AppResKeyVal, error)

func (*AppResources) GetQueue added in v0.0.12

func (ar *AppResources) GetQueue(name string) (*AppResQueue, error)

func (*AppResources) Init

func (ar *AppResources) Init()

func (*AppResources) LoadAppResources added in v0.0.2

func (ar *AppResources) LoadAppResources(gsbEnvVars []GsbEnvVar)

type Envelope

type Envelope struct {
	Identifier        string
	Name              string
	Msg               string
	OriginalQueueName string
	Errors            []EnvelopeError
}

func NewEnvelope

func NewEnvelope(QueueName, Name, Message string) (Envelope, error)

func UnmarshalEnvelope

func UnmarshalEnvelope(payload []byte) (*Envelope, error)

func (*Envelope) AddError added in v0.0.5

func (env *Envelope) AddError(error string)

func (*Envelope) Marshal

func (env *Envelope) Marshal() ([]byte, error)

type EnvelopeError added in v0.0.5

type EnvelopeError struct {
	Timestamp time.Time
	Error     string
}

type GsbEnvVar

type GsbEnvVar struct {
	// contains filtered or unexported fields
}

func LoadGsbEnvVars added in v0.0.2

func LoadGsbEnvVars(prefix string) []GsbEnvVar

type Handler

type Handler struct {
	Name string

	Fn HandlerFn
	// contains filtered or unexported fields
}

type HandlerFn

type HandlerFn func(*HandlerState, string) error

type HandlerState

type HandlerState struct {
	AppResources *AppResources
	Keys         map[string]string
	// contains filtered or unexported fields
}

func NewHandlerState

func NewHandlerState(host *Host, appResources *AppResources) *HandlerState

func (*HandlerState) Send

func (hs *HandlerState) Send(name string, payload string) error

type Host

type Host struct {
	Name       string
	Mq         Mq
	MqUrl      *url.URL
	LocalSendQ Queue
	AuditQ     Queue
	ErrorQ     Queue
	Handlers   []Handler
	// contains filtered or unexported fields
}

func NewHost

func NewHost() *Host

func (*Host) AddHandler

func (h *Host) AddHandler(name string, f HandlerFn)

func (*Host) Init

func (h *Host) Init()

func (*Host) InitMq

func (h *Host) InitMq()

func (*Host) MqCommit

func (h *Host) MqCommit() error

func (*Host) MqRetrieve

func (h *Host) MqRetrieve() *string

func (*Host) ProcessCommand added in v0.0.17

func (h *Host) ProcessCommand(cmd *string)

func (*Host) ProcessError

func (h *Host) ProcessError(env *Envelope, err error)

func (*Host) ProcessHandlers

func (h *Host) ProcessHandlers(env *Envelope) error

func (*Host) ProcessMessage

func (h *Host) ProcessMessage(env *Envelope)

func (*Host) Run

func (h *Host) Run()

func (*Host) SendCommandToAudit added in v0.0.18

func (h *Host) SendCommandToAudit(payload string)

func (*Host) SendEnvelope added in v0.0.16

func (h *Host) SendEnvelope(q AppResQueue, env Envelope) error

func (*Host) SendMessage added in v0.0.2

func (h *Host) SendMessage(name string, payload string) error

func (*Host) SendMessage2 added in v0.0.16

func (h *Host) SendMessage2(q AppResQueue, name string, payload string) error

func (*Host) SendToAudit added in v0.0.5

func (h *Host) SendToAudit(env Envelope)

func (*Host) Tick

func (h *Host) Tick()

func (*Host) VerboseLog added in v0.0.17

func (h *Host) VerboseLog(msg string)

type MessageSender added in v0.0.2

type MessageSender interface {
	SendMessage(string, string) error
}

type Monitor added in v0.0.2

type Monitor interface {
	Init() error
	Next() (*MonitorData, error)
	Commit() error
}

type MonitorData added in v0.0.2

type MonitorData struct {
	Name    string
	Payload string
}

type MonitorDir added in v0.0.2

type MonitorDir struct {
	// contains filtered or unexported fields
}

func NewMonitorDir added in v0.0.2

func NewMonitorDir(url *url.URL) *MonitorDir

func (*MonitorDir) Commit added in v0.0.2

func (dir *MonitorDir) Commit() error

func (*MonitorDir) Init added in v0.0.2

func (dir *MonitorDir) Init() error

func (*MonitorDir) Next added in v0.0.2

func (dir *MonitorDir) Next() (*MonitorData, error)

type MonitorQueue added in v0.0.13

type MonitorQueue struct {
	Q Queue
	// contains filtered or unexported fields
}

func NewMonitorQueue added in v0.0.13

func NewMonitorQueue(url *url.URL) *MonitorQueue

func (*MonitorQueue) Commit added in v0.0.13

func (q *MonitorQueue) Commit() error

func (*MonitorQueue) Init added in v0.0.13

func (q *MonitorQueue) Init() error

func (*MonitorQueue) Next added in v0.0.13

func (q *MonitorQueue) Next() (*MonitorData, error)

type Monitors added in v0.0.2

type Monitors struct {
	// contains filtered or unexported fields
}

func NewMonitors added in v0.0.2

func NewMonitors() *Monitors

func (*Monitors) Init added in v0.0.2

func (m *Monitors) Init(messageSender MessageSender)

func (*Monitors) LoadMonitors added in v0.0.2

func (m *Monitors) LoadMonitors(gsbEnvVars []GsbEnvVar)

func (*Monitors) Process added in v0.0.2

func (m *Monitors) Process()

type Mq

type Mq interface {
	Init() error
	Retrieve() (*string, error)
	Commit() error
}

type MqAws

type MqAws struct {
	// contains filtered or unexported fields
}

func NewMqAws

func NewMqAws(url *url.URL) *MqAws

func (*MqAws) Commit

func (mq *MqAws) Commit() error

func (*MqAws) Init

func (mq *MqAws) Init() error

func (*MqAws) Retrieve

func (mq *MqAws) Retrieve() (*string, error)

type MqBeanstalk

type MqBeanstalk struct {
	// contains filtered or unexported fields
}

func NewMqBeanstalk

func NewMqBeanstalk(url *url.URL) *MqBeanstalk

func (*MqBeanstalk) Commit

func (mq *MqBeanstalk) Commit() error

func (*MqBeanstalk) Init

func (mq *MqBeanstalk) Init() error

func (*MqBeanstalk) Retrieve

func (mq *MqBeanstalk) Retrieve() (*string, error)

type MqInMemory

type MqInMemory struct {
	List []string
	// contains filtered or unexported fields
}

func NewMqInMemory

func NewMqInMemory(url *url.URL) *MqInMemory

func NewMqInMemoryFromString

func NewMqInMemoryFromString(urlString string) *MqInMemory

func (*MqInMemory) Commit

func (mq *MqInMemory) Commit() error

func (*MqInMemory) Init

func (mq *MqInMemory) Init() error

func (*MqInMemory) Move

func (mq *MqInMemory) Move(fromS string, toS string)

func (*MqInMemory) Name

func (mq *MqInMemory) Name() string

func (*MqInMemory) Retrieve

func (mq *MqInMemory) Retrieve() (*string, error)

func (*MqInMemory) Send

func (mq *MqInMemory) Send(payload string) error

type Queue

type Queue interface {
	Init() error
	Name() string
	Send(payload string) error
	Retrieve() (*string, error)
	Commit() error
}

func NewQueue

func NewQueue(urlString string) Queue

type QueueBeanstalk

type QueueBeanstalk struct {
	// contains filtered or unexported fields
}

func NewAppResQueueBeanstalk

func NewAppResQueueBeanstalk(url *url.URL) *QueueBeanstalk

func (*QueueBeanstalk) Commit

func (q *QueueBeanstalk) Commit() error

func (*QueueBeanstalk) Init

func (q *QueueBeanstalk) Init() error

func (*QueueBeanstalk) Name

func (q *QueueBeanstalk) Name() string

func (*QueueBeanstalk) Retrieve

func (q *QueueBeanstalk) Retrieve() (*string, error)

func (*QueueBeanstalk) Send

func (q *QueueBeanstalk) Send(payload string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL