gsb

package module
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2021 License: BSD-3-Clause Imports: 20 Imported by: 0

README

Go Service Bus (GSB)

Principles

  1. Anything that fails during startup / initialisation should fail immediatley with descriptive STDOUT.
  2. Anything that fails during execution should return an error and be processed by the system, to allow continuous operation.
  3. All code execution is initiated by receiving a message.
  4. All external resources are initialised and validated on start up, then provided on execution.
  5. Handlers must have provide the Handler interface
  6. Handlers are named, and Handlers are run when Named Messages arrive.

Rationale

  1. Try to raise the level of infrastructure.

  2. The infrastructure,

  3. takes care of starting, committing and rolling back transactions.

  4. runs all code within the context of a message.

  5. provides automatic retry if the code errors

  6. moves failed messages to the error queue

Variables

  1. NAME
  2. RETRIES
  3. ERROR_Q
  4. AUDIT_Q
  5. MQ
  6. GSB_
  7. GSBMON_
Application Resources

GSB_[name]=type://[username:password@]host/path

All external resources are initialised and validated on start up, then provided on execution.

  1. DynamoDb
  2. dynamodb
  3. FileSystem
  4. s3
  5. fs
  6. KeyValue
  7. dir
  8. redis
  9. Queue
  10. beanstalk
  11. sqs
  12. inmemq
AWS Specific
  1. AWS_ENDPOINT
  2. AWS_REGION
Monitors

GSBMON_[name]=type://[username:password@]host/path

  1. dir

References

  1. 8 Fallacies of Distributed Computing

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AwsSetQueueTimeout added in v0.0.5

func AwsSetQueueTimeout(svc *sqs.Client, envUrl *url.URL, queueUrl *string) error

func GetTimeoutFromUrl added in v0.0.8

func GetTimeoutFromUrl(qUrl *url.URL) (string, error)

func NewAwsConfig added in v0.0.3

func NewAwsConfig() aws.Config

func ResetMqInMemory

func ResetMqInMemory()

func SendEnvelope

func SendEnvelope(q AppResQueue, env Envelope) error

func SendMessage

func SendMessage(q AppResQueue, name string, payload string) error

Types

type AppResDir

type AppResDir struct {
	// contains filtered or unexported fields
}

func NewAppResDir

func NewAppResDir(url *url.URL) *AppResDir

func (*AppResDir) Del

func (dir *AppResDir) Del(name string) error

func (*AppResDir) Exists added in v0.0.10

func (dir *AppResDir) Exists(name string) (bool, error)

func (*AppResDir) Get

func (dir *AppResDir) Get(name string) (interface{}, error)

func (*AppResDir) Init

func (dir *AppResDir) Init() error

func (*AppResDir) Put

func (dir *AppResDir) Put(name string, item interface{}) error

type AppResDynamoDb

type AppResDynamoDb struct {
	// contains filtered or unexported fields
}

func NewAppResDynamoDb

func NewAppResDynamoDb(url *url.URL) *AppResDynamoDb

func (*AppResDynamoDb) CreateTable added in v0.0.5

func (ddb *AppResDynamoDb) CreateTable(input *dynamodb.CreateTableInput) error

func (*AppResDynamoDb) Get

func (ddb *AppResDynamoDb) Get(tableName string, key map[string]string) (interface{}, error)

func (*AppResDynamoDb) Init

func (ddb *AppResDynamoDb) Init() error

func (*AppResDynamoDb) ListTables added in v0.0.7

func (ddb *AppResDynamoDb) ListTables() ([]string, error)

func (*AppResDynamoDb) Put

func (ddb *AppResDynamoDb) Put(tableName string, item interface{}) error

type AppResFS

type AppResFS struct {
	// contains filtered or unexported fields
}

func NewAppResFS

func NewAppResFS(url *url.URL) *AppResFS

func (*AppResFS) Del

func (s *AppResFS) Del(path string, name string) error

func (*AppResFS) Get

func (s *AppResFS) Get(path string, name string) ([]byte, error)

func (*AppResFS) Init

func (fs *AppResFS) Init() error

func (*AppResFS) Put

func (s *AppResFS) Put(path string, name string, payload []byte) error

type AppResFileSystem

type AppResFileSystem interface {
	Init() error
	Put(path string, name string, payload []byte) error
	Get(path string, name string) ([]byte, error)
	Del(path string, name string) error
}

type AppResKeyVal

type AppResKeyVal interface {
	Init() error
	Put(key string, item interface{}) error
	Exists(key string) (bool, error)
	Get(key string) (interface{}, error)
	Del(key string) error
}

type AppResQueue

type AppResQueue interface {
	Init() error
	Name() string
	Send(payload string) error
	Retrieve() (*string, error)
	Commit() error
}

type AppResQueueSqs added in v0.0.4

type AppResQueueSqs struct {
	// contains filtered or unexported fields
}

func NewAppResQueueSqs added in v0.0.4

func NewAppResQueueSqs(url *url.URL) *AppResQueueSqs

func (*AppResQueueSqs) Commit added in v0.0.4

func (q *AppResQueueSqs) Commit() error

func (*AppResQueueSqs) CreateQueue added in v0.0.5

func (q *AppResQueueSqs) CreateQueue()

func (*AppResQueueSqs) GetQueueUrl added in v0.0.4

func (q *AppResQueueSqs) GetQueueUrl(queueName string) (*sqs.GetQueueUrlOutput, error)

func (*AppResQueueSqs) Init added in v0.0.4

func (q *AppResQueueSqs) Init() error

func (*AppResQueueSqs) Name added in v0.0.4

func (q *AppResQueueSqs) Name() string

func (*AppResQueueSqs) Retrieve added in v0.0.4

func (q *AppResQueueSqs) Retrieve() (*string, error)

func (*AppResQueueSqs) Send added in v0.0.4

func (q *AppResQueueSqs) Send(Message string) error

type AppResRedis

type AppResRedis struct {
	// contains filtered or unexported fields
}

func NewAppResRedis

func NewAppResRedis(url *url.URL) *AppResRedis

func (*AppResRedis) Del

func (r *AppResRedis) Del(key string) error

func (*AppResRedis) Exists added in v0.0.10

func (r *AppResRedis) Exists(name string) (bool, error)

func (*AppResRedis) Get

func (r *AppResRedis) Get(key string) (interface{}, error)

func (*AppResRedis) Init

func (r *AppResRedis) Init() error

func (*AppResRedis) Put

func (r *AppResRedis) Put(key string, item interface{}) error

type AppResS3

type AppResS3 struct {
	// contains filtered or unexported fields
}

func NewAppResS3

func NewAppResS3(url *url.URL) *AppResS3

func (*AppResS3) Del

func (s *AppResS3) Del(path string, name string) error

func (*AppResS3) Get

func (s *AppResS3) Get(path string, name string) ([]byte, error)

func (*AppResS3) Init

func (s *AppResS3) Init() error

func (*AppResS3) Put

func (s *AppResS3) Put(path string, name string, payload []byte) error

type AppResources

type AppResources struct {
	// contains filtered or unexported fields
}

func NewAppResources

func NewAppResources() *AppResources

func (*AppResources) GetDDb added in v0.0.12

func (ar *AppResources) GetDDb(name string) (*AppResDynamoDb, error)

func (*AppResources) GetFileSystem added in v0.0.12

func (ar *AppResources) GetFileSystem(name string) (*AppResFileSystem, error)

func (*AppResources) GetKeyVal added in v0.0.12

func (ar *AppResources) GetKeyVal(name string) (*AppResKeyVal, error)

func (*AppResources) GetQueue added in v0.0.12

func (ar *AppResources) GetQueue(name string) (*AppResQueue, error)

func (*AppResources) Init

func (ar *AppResources) Init()

func (*AppResources) LoadAppResources added in v0.0.2

func (ar *AppResources) LoadAppResources(gsbEnvVars []GsbEnvVar)

type Envelope

type Envelope struct {
	Identifier        string
	Name              string
	Msg               string
	OriginalQueueName string
	Errors            []EnvelopeError
}

func NewEnvelope

func NewEnvelope(QueueName, Name, Message string) (Envelope, error)

func UnmarshalEnvelope

func UnmarshalEnvelope(payload []byte) (*Envelope, error)

func (*Envelope) AddError added in v0.0.5

func (env *Envelope) AddError(error string)

func (*Envelope) Marshal

func (env *Envelope) Marshal() ([]byte, error)

type EnvelopeError added in v0.0.5

type EnvelopeError struct {
	Timestamp time.Time
	Error     string
}

type GsbEnvVar

type GsbEnvVar struct {
	// contains filtered or unexported fields
}

func LoadGsbEnvVars added in v0.0.2

func LoadGsbEnvVars(prefix string) []GsbEnvVar

type Handler

type Handler struct {
	Name string

	Fn HandlerFn
	// contains filtered or unexported fields
}

type HandlerFn

type HandlerFn func(*HandlerState, string) error

type HandlerState

type HandlerState struct {
	AppResources *AppResources
	Keys         map[string]string
}

func NewHandlerState

func NewHandlerState(appResources *AppResources) *HandlerState

func (*HandlerState) Send

func (hs *HandlerState) Send(queueName string, name string, payload string) error

type Host

type Host struct {
	Name       string
	Mq         Mq
	MqUrl      *url.URL
	LocalSendQ Queue
	AuditQ     Queue
	ErrorQ     Queue
	Handlers   []Handler
	// contains filtered or unexported fields
}

func NewHost

func NewHost() *Host

func (*Host) AddHandler

func (h *Host) AddHandler(name string, f HandlerFn)

func (*Host) Init

func (h *Host) Init()

func (*Host) InitAuditQueue added in v0.0.5

func (h *Host) InitAuditQueue()

func (*Host) InitErrorQueue

func (h *Host) InitErrorQueue()

func (*Host) InitMq

func (h *Host) InitMq()

func (*Host) MqCommit

func (h *Host) MqCommit() error

func (*Host) MqRetrieve

func (h *Host) MqRetrieve() *Envelope

func (*Host) ProcessError

func (h *Host) ProcessError(env *Envelope, err error)

func (*Host) ProcessHandlers

func (h *Host) ProcessHandlers(env *Envelope) error

func (*Host) ProcessMessage

func (h *Host) ProcessMessage(env *Envelope)

func (*Host) Run

func (h *Host) Run()

func (*Host) SendMessage added in v0.0.2

func (h *Host) SendMessage(name string, payload string)

func (*Host) SendToAudit added in v0.0.5

func (h *Host) SendToAudit(env Envelope)

func (*Host) Tick

func (h *Host) Tick()

type MessageSender added in v0.0.2

type MessageSender interface {
	SendMessage(string, string)
}

type Monitor added in v0.0.2

type Monitor interface {
	Init() error
	Next() (*MonitorData, error)
	Commit() error
}

type MonitorData added in v0.0.2

type MonitorData struct {
	Name    string
	Payload string
}

type MonitorDir added in v0.0.2

type MonitorDir struct {
	// contains filtered or unexported fields
}

func NewMonitorDir added in v0.0.2

func NewMonitorDir(url *url.URL) *MonitorDir

func (*MonitorDir) Commit added in v0.0.2

func (dir *MonitorDir) Commit() error

func (*MonitorDir) Init added in v0.0.2

func (dir *MonitorDir) Init() error

func (*MonitorDir) Next added in v0.0.2

func (dir *MonitorDir) Next() (*MonitorData, error)

type Monitors added in v0.0.2

type Monitors struct {
	// contains filtered or unexported fields
}

func NewMonitors added in v0.0.2

func NewMonitors() *Monitors

func (*Monitors) Init added in v0.0.2

func (m *Monitors) Init(messageSender MessageSender)

func (*Monitors) LoadMonitors added in v0.0.2

func (m *Monitors) LoadMonitors(gsbEnvVars []GsbEnvVar)

func (*Monitors) Process added in v0.0.2

func (m *Monitors) Process()

type Mq

type Mq interface {
	Init() error
	Retrieve() (*string, error)
	Commit() error
}

type MqAws

type MqAws struct {
	// contains filtered or unexported fields
}

func NewMqAws

func NewMqAws(url *url.URL) *MqAws

func (*MqAws) Commit

func (mq *MqAws) Commit() error

func (*MqAws) Init

func (mq *MqAws) Init() error

func (*MqAws) Retrieve

func (mq *MqAws) Retrieve() (*string, error)

type MqBeanstalk

type MqBeanstalk struct {
	// contains filtered or unexported fields
}

func NewMqBeanstalk

func NewMqBeanstalk(url *url.URL) *MqBeanstalk

func (*MqBeanstalk) Commit

func (mq *MqBeanstalk) Commit() error

func (*MqBeanstalk) Init

func (mq *MqBeanstalk) Init() error

func (*MqBeanstalk) Retrieve

func (mq *MqBeanstalk) Retrieve() (*string, error)

type MqInMemory

type MqInMemory struct {
	List []string
	// contains filtered or unexported fields
}

func NewMqInMemory

func NewMqInMemory(url *url.URL) *MqInMemory

func NewMqInMemoryFromString

func NewMqInMemoryFromString(urlString string) *MqInMemory

func (*MqInMemory) Commit

func (mq *MqInMemory) Commit() error

func (*MqInMemory) Init

func (mq *MqInMemory) Init() error

func (*MqInMemory) Move

func (mq *MqInMemory) Move(fromS string, toS string)

func (*MqInMemory) Name

func (mq *MqInMemory) Name() string

func (*MqInMemory) Retrieve

func (mq *MqInMemory) Retrieve() (*string, error)

func (*MqInMemory) Send

func (mq *MqInMemory) Send(payload string) error

type Queue

type Queue interface {
	Init() error
	Name() string
	Send(payload string) error
	Retrieve() (*string, error)
	Commit() error
}

func NewQueue

func NewQueue(urlString string) Queue

type QueueBeanstalk

type QueueBeanstalk struct {
	// contains filtered or unexported fields
}

func NewAppResQueueBeanstalk

func NewAppResQueueBeanstalk(url *url.URL) *QueueBeanstalk

func (*QueueBeanstalk) Commit

func (q *QueueBeanstalk) Commit() error

func (*QueueBeanstalk) Init

func (q *QueueBeanstalk) Init() error

func (*QueueBeanstalk) Name

func (q *QueueBeanstalk) Name() string

func (*QueueBeanstalk) Retrieve

func (q *QueueBeanstalk) Retrieve() (*string, error)

func (*QueueBeanstalk) Send

func (q *QueueBeanstalk) Send(payload string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL