hedwig

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2022 License: Apache-2.0 Imports: 11 Imported by: 2

README

Hedwig Library for Go

Build Status Go Report Card Godoc codecov

Hedwig is an inter-service communication bus that works on AWS and GCP, while keeping things pretty simple and straight forward.

It allows validation of the message payloads before they are sent, helping to catch cross-component incompatibilities early.

Hedwig allows separation of concerns between consumers and publishers so your services are loosely coupled, and the contract is enforced by the message payload validation. Hedwig may also be used to build asynchronous APIs.

For intra-service messaging, see Taskhawk.

To learn more, read the docs.

Fan Out

Hedwig utilizes SNS for fan-out configuration. A publisher publishes messages on a topic. This message may be received by zero or more consumers. The publisher need not be aware of the consuming application. There are a variety of messages that may be published as such, but they generally fall into two buckets:

  • Asynchronous API Requests: Hedwig may be used to call APIs asynchronously. The contract is enforced by your infra-structure by connecting SNS topics to SQS queues, and payload is validated using the schema you define. Response is a delivered using a separate message if required.
  • Notifications: The most common use case is to notify other services/apps that may be interested in events. For example, your User Management app can publish a user.created message notification to all your apps. As publishers and consumers are loosely coupled, this separation of concerns is very effective in ensuring a stable eco-system.

Provisioning

Hedwig works on SQS and SNS as backing queues. Before you can publish/consume messages, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Hedwig provides tools to make infra configuration easier: see Terraform Google for further details.

Quick Start

First, install the library:

go get github.com/cloudchacho/hedwig-go

Create a protobuf schema and save as schema.proto:

syntax = "proto2";

package main;

import "hedwig/protobuf/options.proto";

option go_package = "example.com/hedwig;main";

message SendEmailV1 {
    option (hedwig.message_options).major_version = 1;
    option (hedwig.message_options).minor_version = 0;
    option (hedwig.message_options).message_type = "email.send";

    string to = 1;
    string message = 1;
}

Clone hedwig Options definition file and compile your schema:

git clone github.com/cloudchacho/hedwig /usr/local/lib/protobuf/include/hedwig/
protoc -I/usr/local/lib/protobuf/include -I. --go_out=. schema.proto

In publisher application, initialize the publisher:

    settings := aws.Settings {
        AWSAccessKey:    <YOUR AWS KEY>,
        AWSAccountID:    <YOUR AWS ACCOUNT ID>,
        AWSRegion:       <YOUR AWS REGION>,
        AWSSecretKey:    <YOUR AWS SECRET KEY>,
        AWSSessionToken: <YOUR AWS SESSION TOKEN>, 
    }
    backend := aws.NewBackend(settings, nil)
	encoderDecoder := protobuf.NewMessageEncoderDecoder([]proto.Message{&SendEmailV1{}})
    routing := map[hedwig.MessageRouteKey]string{
        {
            MessageType:    "email.send",
            MessageMajorVersion: 1,
        }: "send_email",
    }
    publisher := hedwig.NewPublisher(backend, encoderDecoder, encoderDecoder, routing)

And finally, send a message:

    headers := map[string]string{}
    msg, err := hedwig.NewMessage("email.send", "1.0", headers, data, "myapp")
    if err != nil {
        return err
    }
    err := publisher.Publish(context.Background(), msg)

In consumer application, define your callback:

    // Handler
    func HandleSendEmail(ctx context.Context, msg *hedwig.Message) error {
        to := msg.data.(*SendEmailV1).GetTo()
		// actually send email
    }

And start the consumer:

    settings := aws.Settings {
        AWSAccessKey:    <YOUR AWS KEY>,
        AWSAccountID:    <YOUR AWS ACCOUNT ID>,
        AWSRegion:       <YOUR AWS REGION>,
        AWSSecretKey:    <YOUR AWS SECRET KEY>,
        AWSSessionToken: <YOUR AWS SESSION TOKEN>, 
    }
    backend := aws.NewBackend(settings, nil)
	encoderDecoder := protobuf.NewMessageEncoderDecoder([]proto.Message{&SendEmailV1{}})
    registry := hedwig.CallbackRegistry{{"email.send", 1}: HandleSendEmail}
    consumer := hedwig.NewConsumer(backend, encoderDecoder, nil, registry)
    err := consumer.ListenForMessages(context.Background(), hedwig.ListenRequest{})

For more complete code, see examples.

Development

Prerequisites

Install go1.11.x

Getting Started

$ cd ${GOPATH}/src/github.com/cloudchacho/hedwig-go
$ go build
Running Tests

$ make test

Getting Help

We use GitHub issues for tracking bugs and feature requests.

  • If it turns out that you may have found a bug, please open an issue

Release notes

v0.8
  • Cleaned up interfaces to be more idiomatic Go
v0.7
  • Initial version

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRetry = errors.New("Retry error")

ErrRetry should cause the task to retry, but not treat the retry as an error

Functions

This section is empty.

Types

type CallbackFunction

type CallbackFunction func(context.Context, *Message) error

CallbackFunction is the function signature for a hedwig callback function

type CallbackRegistry

type CallbackRegistry map[MessageTypeMajorVersion]CallbackFunction

CallbackRegistry is a map of message type and major versions to callback functions

type Consumer added in v0.8.0

type Consumer struct {
	// contains filtered or unexported fields
}

type ConsumerBackend added in v0.8.0

type ConsumerBackend interface {
	// Receive messages from configured queue(s) and provide it through the channel. This should run indefinitely
	// until the context is canceled. Provider metadata should include all info necessary to ack/nack a message.
	// The channel must not be closed by the backend.
	Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, messageCh chan<- ReceivedMessage) error

	// NackMessage nacks a message on the queue
	NackMessage(ctx context.Context, providerMetadata interface{}) error

	// AckMessage acknowledges a message on the queue
	AckMessage(ctx context.Context, providerMetadata interface{}) error

	// RequeueDLQ re-queues everything in the Hedwig DLQ back into the Hedwig queue
	RequeueDLQ(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration) error
}

ConsumerBackend is used for consuming messages from a transport

type Decoder added in v0.8.0

type Decoder interface {
	// DecodeData validates and decodes data
	DecodeData(messageType string, version *semver.Version, data interface{}) (interface{}, error)

	// ExtractData extracts data from the on-the-wire payload when not using message transport
	ExtractData(messagePayload []byte, attributes map[string]string) (MetaAttributes, interface{}, error)

	// DecodeMessageType decodes message type from meta attributes
	DecodeMessageType(schema string) (string, *semver.Version, error)
}

Decoder is responsible for decoding the message payload in appropriate format from over the wire transport format

type Encoder added in v0.8.0

type Encoder interface {
	// EncodeData encodes the message with appropriate format for transport over the wire
	EncodeData(data interface{}, useMessageTransport bool, metaAttrs MetaAttributes) ([]byte, error)

	// EncodeMessageType encodes the message type with appropriate format for transport over the wire
	EncodeMessageType(messageType string, version *semver.Version) string

	// VerifyKnownMinorVersion checks that message version is known to us
	VerifyKnownMinorVersion(messageType string, version *semver.Version) error
}

Encoder is responsible for encoding the message payload in appropriate format for over the wire transport

type EncoderDecoder added in v0.9.0

type EncoderDecoder interface {
	Encoder
	Decoder
}

EncoderDecoder can both encode and decode messages

type GetLoggerFunc

type GetLoggerFunc func(ctx context.Context) Logger

GetLoggerFunc returns the logger object

func LogrusGetLoggerFunc

func LogrusGetLoggerFunc(fn func(ctx context.Context) *logrus.Entry) GetLoggerFunc

type Instrumenter added in v0.7.0

type Instrumenter interface {
	// OnReceive is called as soon as possible after a message is received from the backend. Caller must call
	// the returned finalized function when processing for the message is finished (typically done via defer).
	// The context must be replaced with the returned context for the remainder of the operation.
	// This is where a new span must be started.
	OnReceive(ctx context.Context, attributes map[string]string) (context.Context, func())

	// OnMessageDeserialized is called when a message has been received from the backend and decoded
	// This is where span attributes, such as name, may be updated.
	OnMessageDeserialized(ctx context.Context, message *Message)

	// OnPublish is called right before a message is published. Caller must call
	// the returned finalized function when publishing for the message is finished (typically done via defer).
	// The attributes may be updated to include trace id for downstream consumers.
	OnPublish(ctx context.Context, message *Message, attributes map[string]string) (context.Context, map[string]string, func())
}

Instrumenter defines the interface for Hedwig's instrumentation

type ListenRequest

type ListenRequest struct {
	// How many messages to fetch at one time
	NumMessages uint32 // default 1

	// How long should the message be hidden from other consumers?
	VisibilityTimeout time.Duration // defaults to queue configuration

	// How many goroutines to spin for processing messages concurrently
	NumConcurrency uint32 // default 1
}

ListenRequest represents a request to listen for messages

type Logger added in v0.8.0

type Logger interface {
	// Error logs an error with a message. `fields` can be used as additional metadata for structured logging.
	// You can generally expect one of these fields to be available: message_sqs_id, message_sns_id.
	// By default fields are logged as a map using fmt.Sprintf
	Error(err error, message string, fields LoggingFields)

	// Warn logs a warn level log with a message. `fields` param works the same as `Error`.
	Warn(err error, message string, fields LoggingFields)

	// Info logs a debug level log with a message. `fields` param works the same as `Error`.
	Info(message string, fields LoggingFields)

	// Debug logs a debug level log with a message. `fields` param works the same as `Error`.
	Debug(message string, fields LoggingFields)
}

Logger represents an logging interface that this library expects

type LoggingFields

type LoggingFields map[string]interface{}

type Message

type Message struct {
	Data              interface{}
	Type              string
	DataSchemaVersion *semver.Version
	ID                string
	Metadata          metadata
}

Message model for hedwig messages.

func NewMessage

func NewMessage(dataType string, dataSchemaVersion string, headers map[string]string, data interface{}, publisherName string) (*Message, error)

NewMessage creates new Hedwig messages based off of message type and Schema version

type MessageRouting added in v0.2.0

type MessageRouting map[MessageTypeMajorVersion]string

MessageRouting is a map of message type and major versions to Hedwig topics

type MessageTypeMajorVersion added in v0.2.0

type MessageTypeMajorVersion struct {
	// Message type
	MessageType string
	// Message major version
	MajorVersion uint
}

MessageTypeMajorVersion is a tuple of message typa and major version

type MetaAttributes

type MetaAttributes struct {
	Timestamp     time.Time
	Publisher     string
	Headers       map[string]string
	ID            string
	Schema        string
	FormatVersion *semver.Version
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher handles hedwig publishing

func NewPublisher

func NewPublisher(backend PublisherBackend, encoderDecoder EncoderDecoder, routing MessageRouting) *Publisher

NewPublisher creates a new Publisher.

`messageRouting`: Maps message type and major version to topic names

<message type>, <message version> => topic name

An entry is required for every message type that the app wants to publish. It is recommended that major versions of a message be published on separate topics.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, message *Message) (string, error)

Publish a message on Hedwig

func (*Publisher) WithInstrumenter added in v0.8.0

func (p *Publisher) WithInstrumenter(instrumenter Instrumenter)

func (*Publisher) WithUseTransportMessageAttributes added in v0.8.0

func (p *Publisher) WithUseTransportMessageAttributes(useTransportMessageAttributes bool)

type PublisherBackend added in v0.8.0

type PublisherBackend interface {
	// Publish a message represented by the payload, with specified attributes to the specific topic
	Publish(ctx context.Context, message *Message, payload []byte, attributes map[string]string, topic string) (string, error)
}

PublisherBackend is used to publish messages to a transport

type QueueConsumer added in v0.8.0

type QueueConsumer struct {
	Consumer
}

func NewQueueConsumer

func NewQueueConsumer(backend ConsumerBackend, decoder Decoder, getLogger GetLoggerFunc, registry CallbackRegistry) *QueueConsumer

func (*QueueConsumer) ListenForMessages added in v0.8.0

func (c *QueueConsumer) ListenForMessages(ctx context.Context, request ListenRequest) error

ListenForMessages starts a hedwig listener for the provided message types

func (*QueueConsumer) RequeueDLQ added in v0.8.0

func (c *QueueConsumer) RequeueDLQ(ctx context.Context, request ListenRequest) error

RequeueDLQ re-queues everything in the Hedwig DLQ back into the Hedwig queue

func (*QueueConsumer) WithInstrumenter added in v0.8.0

func (c *QueueConsumer) WithInstrumenter(instrumenter Instrumenter) *QueueConsumer

func (*QueueConsumer) WithUseTransportMessageAttributes added in v0.8.0

func (c *QueueConsumer) WithUseTransportMessageAttributes(useTransportMessageAttributes bool)

type ReceivedMessage added in v0.9.0

type ReceivedMessage struct {
	Payload          []byte
	Attributes       map[string]string
	ProviderMetadata interface{}
}

ReceivedMessage is the message as received by a transport backend.

type StdLogger added in v0.8.0

type StdLogger struct{}

func (*StdLogger) Debug added in v0.8.0

func (s *StdLogger) Debug(message string, fields LoggingFields)

func (*StdLogger) Error added in v0.8.0

func (s *StdLogger) Error(err error, message string, fields LoggingFields)

func (*StdLogger) Info added in v0.8.0

func (s *StdLogger) Info(message string, fields LoggingFields)

func (*StdLogger) Warn added in v0.8.0

func (s *StdLogger) Warn(err error, message string, fields LoggingFields)

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL