eventually

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2021 License: MIT Imports: 10 Imported by: 0

README

Eventually CircleCI

license release Travis CI Coverage Status codecov GolangCI Go Report Card Codacy Badge GoDoc DepShield Badge FOSSA Status

 _____                 _               _ _
| ____|_   _____ _ __ | |_ _   _  __ _| | |_   _
|  _| \ \ / / _ \ '_ \| __| | | |/ _` | | | | | |
| |___ \ V /  __/ | | | |_| |_| | (_| | | | |_| |
|_____| \_/ \___|_| |_|\__|\__,_|\__,_|_|_|\__, |
                                           |___/

is a library that helps to implement CQRS by having an event bus that facilitates the Pub-Sub operations.

Table of Contents

✨ Features

  • Pub/Sub model.
  • Delayed publish.
  • Scheduled publish.

🏎️ Getting Started

Prerequisites
Installation
go get -u github.com/ahmedkamals/eventually
Examples
Main
// main.go
package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"time"

	"github.com/ahmedkamals/colorize"
	"github.com/ahmedkamals/eventually"
)

type (
	eventLogger struct {
		logChan chan string
	}

	errorQueue struct {
		errChan chan error
	}

	universalPayload struct{}
)

const (
	// MailboxDefaultSize defines size for the consumer mail box.
	MailboxDefaultSize = 10
	universal          = "universal"
)

var (
	colorized = colorize.NewColorable(os.Stdout)
)

func newEventLogger(logChan chan string) eventually.Logger {
	return &eventLogger{
		logChan: logChan,
	}
}

func (e *eventLogger) Log(message string) {
	select {
	case e.logChan <- message:
	// Drop any log message that exceeds the log queue size.
	default:
	}
}

func newErrorQueue(errChan chan error) eventually.ErrorQueue {
	return &errorQueue{
		errChan: errChan,
	}
}

func (e *errorQueue) Report(err error) {
	select {
	case e.errChan <- err:
	// Drop any error message that exceeds the error queue size.
	default:
	}
}

func main() {
	logChan := make(chan string, 10)
	errChan := make(chan error, 10)

	go monitorLogMessages(logChan)
	go monitorErrors(errChan)

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	eventBus := eventually.NewBus(eventually.NewEventStore(), newEventLogger(logChan), newErrorQueue(errChan), 100)
	eventBus.Run(ctx)

	topics := map[string]eventually.Descriptor{
		universal:         eventually.NewDescriptor(universal, new(universalPayload), 1),
		"example":         eventually.NewDescriptor("example", "examplePayload", 1),
		"anotherExample":  eventually.NewDescriptor("anotherExample", "anotherExamplePayload", 2),
		"publishAfter":    eventually.NewDescriptor("publishAfter", "publishAfterPayload", 1),
		"schedulePublish": eventually.NewDescriptor("schedulePublish", "schedulePublishPayload", 1),
	}

	topicsConsumersMap := map[eventually.Descriptor][]eventually.Consumer{
		topics[universal]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["example"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["anotherExample"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["publishAfter"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["schedulePublish"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
	}

	subscribe(eventBus, topicsConsumersMap, errChan)

	eventBus.Publish(topics["example"], topics["anotherExample"])
	eventBus.PublishAfter(ctx, time.Second, topics["publishAfter"])
	eventBus.SchedulePublish(ctx, time.NewTicker(800*time.Millisecond), topics["schedulePublish"])

	<-time.After(88 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.White("After publishing"), eventBus.Length(), eventBus.Topics())

	eventBus.Unsubscribe(topicsConsumersMap[topics["example"]][0], topics["example"])
	<-time.After(88 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.Magenta("After unsubscribing"), eventBus.Length(), eventBus.Topics())

	eventBus.Unregister(topics["anotherExample"])
	<-time.After(888 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.Yellow("After unregistering"), eventBus.Length(), eventBus.Topics())

	// Delay till all topics are published.
	<-time.After(3 * time.Second)
}

func subscribe(eventBus eventually.Bus, topicsConsumersMap map[eventually.Descriptor][]eventually.Consumer, errChan chan error) {

	for topic, consumers := range topicsConsumersMap {
		for _, consumer := range consumers {
			go func(consumer eventually.Consumer) {
				for {
					message := consumer.ReadMessage()
					if message == nil {
						continue
					}
					fmt.Printf(
						"%s[%s] Inbox: got message %s\n",
						colorized.Green(consumer.String()),
						consumer.AggregateID(),
						colorized.Orange(message.Payload().(string)),
					)
				}
			}(consumer)

			switch topic.Name() {
			case universal:
				errChan <- eventBus.SubscribeToAll(consumer)

			default:
				errChan <- eventBus.Subscribe(consumer, topic)
			}
		}
	}
	// Delay till subscription happens.
	<-time.After(8 * time.Millisecond)
}

func monitorLogMessages(logChan chan string) {
	for message := range logChan {
		fmt.Println(colorized.Cyan(message))
	}
}

func monitorErrors(errChan chan error) {
	for err := range errChan {
		if err != nil && !errors.Is(err, io.EOF) {
			fmt.Println(colorized.Red(err.Error()))
		}
	}
}
Consumer
// consumer.go
package main

import (
	"fmt"
	"github.com/ahmedkamals/eventually"
	"reflect"
)

type (
	exampleConsumer struct {
		id      eventually.UUID
		mailBox eventually.Inbox
	}
)

func newExampleConsumer(ID eventually.UUID, mailBox eventually.Inbox) eventually.Consumer {
	return &exampleConsumer{
		id:      ID,
		mailBox: mailBox,
	}
}

func (ec *exampleConsumer) AggregateID() eventually.UUID {
	return ec.id
}

func (ec *exampleConsumer) MatchCriteria() eventually.Match {
	return func(topic eventually.Descriptor) bool {
		return true
	}
}

func (ec *exampleConsumer) Drop(descriptor eventually.Descriptor) {
	ec.mailBox.Receive(descriptor)
}

func (ec *exampleConsumer) ReadMessage() eventually.Descriptor {
	return ec.mailBox.Read()
}

func (ec *exampleConsumer) OnSubscribe(topic eventually.Descriptor, callback eventually.Notification) {
	callback(topic, ec)
}

func (ec *exampleConsumer) OnUnsubscribe(topic eventually.Descriptor, callback eventually.Notification) {
	callback(topic, ec)
}

func (ec *exampleConsumer) Signout() {
	ec.mailBox.Signout()
}

func (ec *exampleConsumer) GoString() string {
	return fmt.Sprintf("%s[%s]", ec.String(), ec.id)
}

func (ec *exampleConsumer) String() string {
	return reflect.TypeOf(ec).Elem().Name()
}

Sample output

🕸️ Tests

make test
Benchmarks

Benchmarks Flamegraph

🔥 Todo:
  • Stats collection about published messages.
  • Storing published messages on a persistence medium (memory, disk) for a defined period of time.

🤝 Contribution

Please refer to the CONTRIBUTING.md file.

Git Hooks

In order to set up tests running on each commit do the following steps:

ln -sf ../../assets/git/hooks/pre-commit.sh .git/hooks/pre-commit && \
ln -sf ../../assets/git/hooks/pre-push.sh .git/hooks/pre-push     && \
ln -sf ../../assets/git/hooks/commit-msg.sh .git/hooks/commit-msg

👨‍💻 Credits

🆓 LICENSE

Eventually is released under MIT license, please refer to the LICENSE.md file.

FOSSA Status

Happy Coding 🙂

Analytics

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus interface {

	// Topics returns the current registered topicsMap.
	Topics() []Descriptor
	// Length returns the number of subscribed topicsMap.
	Length() int
	// Subscribe a certain consumer to a single or a group of topicsMap.
	Subscribe(Consumer, ...Descriptor) error
	// SubscribeToAll topicsMap.
	SubscribeToAll(Consumer) error
	// Unsubscribe removes a consumer for a group of topicsMap.
	Unsubscribe(Consumer, ...Descriptor) error
	// Unregister removes all consumers for the specified topicsMap.
	Unregister(...Descriptor) error
	// contains filtered or unexported methods
}

Bus implements publish/subscribe pattern: https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

func NewBus

func NewBus(eventStore EventStore, logger Logger, errorQueue ErrorQueue, bufferSize int) Bus

NewBus creates new Bus

type Category

type Category string

Category of the topic.

type Consumer

type Consumer interface {
	fmt.GoStringer
	fmt.Stringer
	// MatchCriteria returns a match criteria callback.
	MatchCriteria() Match
	// Drop puts the message in the receiver mail box.
	Drop(Descriptor)
	// ReadMessage reads a message from the receiver mail box.
	ReadMessage() Descriptor
	// OnSubscribe called on new subscription.
	OnSubscribe(Descriptor, Notification)
	// OnUnsubscribe called on subscription cancellation.
	OnUnsubscribe(Descriptor, Notification)
	// Signout closes the mailbox.
	Signout()
	// contains filtered or unexported methods
}

Consumer interface defines consumers operations.

type ConsumerCollection

type ConsumerCollection interface {
	// Append a consumer to the ConsumerCollection.
	Append(...Consumer)
	// Delete a consumer from the ConsumerCollection.
	Delete(...Consumer)
	// Length returns the length of the ConsumerCollection.
	Length() int
	// Exists check if consumers do exist in the ConsumerCollection.
	Exists(Consumer) bool
	// Iterator iterates over the consumers in a ConsumerCollection.
	Iterator() <-chan Consumer
}

ConsumerCollection defines consumers collection operations.

type Descriptor

type Descriptor interface {
	fmt.GoStringer
	fmt.Stringer

	// Name returns the topic category.
	Name() Category
	// contains filtered or unexported methods
}

Descriptor is the interface that a topic must implement.

func NewDescriptor

func NewDescriptor(category Category, payload Payload, version Version) Descriptor

NewDescriptor returns a new topic descriptor.

type Dispatcher

type Dispatcher interface {
	// contains filtered or unexported methods
}

Dispatcher is responsible for dispatching topics.

func NewDispatcher

func NewDispatcher(eventStore EventStore, logger Logger, errorQueue ErrorQueue, bufferSize int) Dispatcher

NewDispatcher creates a new Dispatcher.

type ErrorQueue

type ErrorQueue interface {
	Report(error)
}

ErrorQueue interface for error reporting.

type EventStore

type EventStore interface {
	Load(UUID) (ConsumerCollection, bool)
	Store(Descriptor, ...Consumer) bool
	DeleteTopic(...Descriptor)
	Length() int
	Remove(Descriptor, ...Consumer) bool
	IsSubscribed(Descriptor, Consumer) bool
	Topics() []Descriptor
}

EventStore for the topicsMap.

func NewEventStore

func NewEventStore() EventStore

NewEventStore creates a new EventStore.

type Inbox

type Inbox interface {
	Receive(Descriptor)
	Read() Descriptor
	Signout()
}

Inbox interface for consumers to receive messages on.

func NewInbox

func NewInbox(space uint) Inbox

NewInbox creates a new Inbox.

type Logger

type Logger interface {
	Log(string)
}

Logger interface for logging operations.

type Match

type Match func(Descriptor) bool

Match callback that can be used for matching event.

type Metadata

type Metadata map[string]interface{}

Metadata about the topic.

type Notification

type Notification func(Descriptor, Consumer)

Notification callback when a subscription on topic happens (e.g. OnSubscribe, On.Unsubscribe).

type Payload

type Payload interface{}

Payload of the topic.

type Serializer

type Serializer interface {
	// contains filtered or unexported methods
}

Serializer interface

func NewSerializer

func NewSerializer() Serializer

NewSerializer creates a new Serializer

type UUID

type UUID string

UUID of the topic.

func NewUUID

func NewUUID() UUID

NewUUID creates new UUID.

type Version

type Version int

Version of the topic.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL