rivulet

package module
v0.0.0-...-375bd00 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2024 License: MIT Imports: 19 Imported by: 0

README

Rivulet

Rivulet is a simple and flexible messaging framework for Go applications, offering seamless integration with different transport mechanisms.

Overview

Rivulet provides a convenient way for Golang developers to publish and subscribe to messages.

import "github.com/mr-joshcrane/rivulet"

Rivulet

Its modular design allows for easy integration of different transports such as in-memory, network-based, and AWS EventBridge. With Rivulet, users can experience a small, stable interface that abstracts the complexities of message handling while providing a seamless communication experience.

Features

  • Modular Design: Rivulet offers a modular design, allowing for easy integration of different transport mechanisms.
  • Easy Integration: The framework is designed to be easy to integrate into existing Go projects, enabling seamless communication using various transport mechanisms.
  • Extensible: With the ability to define custom transports and message transforms, Rivulet can be easily extended to fit specific use cases.

Usage

Rivulet provides a simple API for publishing and subscribing to messages. Below is an example of how to use Rivulet to publish messages using AWS EventBridge:

import (
        "context"
        "fmt"
        "github.com/aws/aws-sdk-go-v2/config"
        "github.com/aws/aws-sdk-go-v2/service/eventbridge"
        "github.com/mr-joshcrane/rivulet"
)

func main() {
        cfg, err := config.LoadDefaultConfig(context.Background())
        if err != nil {
                fmt.Println(err)
                return
        }
        eb := eventbridge.NewFromConfig(cfg)
        p := rivulet.NewPublisher(
            "myPublisherId",
            rivulet.WithEventBridgeTransport(eb),
        )
        err = p.Publish("a line")
        if err != nil {
                fmt.Println(err)
        }
}

License

Rivulet is licensed under the MIT License. For more details, see the LICENSE file.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultTransform = func(m Message) (string, error) {
	data, err := json.Marshal(m)
	if err != nil {
		return "", err
	}
	return string(data), nil
}

Functions

func NewMemoryPublisher

func NewMemoryPublisher(name string, options ...PublisherOptions) (*Publisher, *Subscriber)

NewMemoryPublisher creates a new Publisher with the given name and options. By default, the Publisher uses an in-memory Transport.

func ParseQueryResults

func ParseQueryResults(query *dynamodb.QueryOutput) ([]string, error)

func Query

func Query(publisherName string) *dynamodb.QueryInput

func Read

func Read(ctx context.Context, publisherName string) ([]string, error)

func SetupEventBridgeReceiverInfrastructure

func SetupEventBridgeReceiverInfrastructure(cfg aws.Config, p *Publisher) error

Types

type EventBridgeClient

type EventBridgeClient interface {
	PutEvents(ctx context.Context, events *eventbridge.PutEventsInput, opts ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error)
}

EventBridgeTransport is a Transport that ships messages via AWS EventBridge

type EventBridgeReceiver

type EventBridgeReceiver struct {
	// contains filtered or unexported fields
}

EventBridgeReceiver

func (EventBridgeReceiver) Receive

func (r EventBridgeReceiver) Receive(ctx context.Context) ([]Message, error)

type EventBridgeTransport

type EventBridgeTransport struct {
	EventBridge EventBridgeClient
	// contains filtered or unexported fields
}

EventBridgeTransport is a Transport that ships messages via AWS EventBridge

func (*EventBridgeTransport) Publish

func (t *EventBridgeTransport) Publish(message Message) error

Publish sends a message to the specified AWS EventBus via the EventBridgeTransport Note that the message is transformed before being sent and the default transform is to simply marshal the message to JSON. This can be overridden by providing a custom transform function via the WithTransform functional option. Note that it is assumed that the corresponding Rule in AWS EventBridge is configured to match this event and route it to the appropriate target. Successfully delivery of the event to the EventBus is no indication that the event will be routed to the target.

type EventBridgeTransportOptions

type EventBridgeTransportOptions func(*EventBridgeTransport)

EventBridgeTransportOptions are functional options for configuring an EventBridgeTransport Such options include setting the DetailType, Source, EventBusName, and any custom Transform function needed (if any) to modify the message before sending it

func WithDetailType

func WithDetailType(detailType string) EventBridgeTransportOptions

WithDetailType is a functional option specifying the DetailType of the EventBridgeTransport

func WithEventBusName

func WithEventBusName(eventBusName string) EventBridgeTransportOptions

WithEventBusName is a functional option specifying the EventBusName of the EventBridgeTransport

func WithSource

func WithSource(source string) EventBridgeTransportOptions

WithSource is a functional option specifying the Source of the EventBridgeTransport

func WithTransform

func WithTransform(transform Transform) EventBridgeTransportOptions

WithTransform is a functional option specifying the Transform function of the EventBridgeTransport It's the users responsibility to ensure that any errors are handled correctly

type InMemoryReceiver

type InMemoryReceiver struct {
	// contains filtered or unexported fields
}

InMemoryReceiver is a Receiver that receives messages from an InMemoryTransport

func (*InMemoryReceiver) Receive

func (r *InMemoryReceiver) Receive(ctx context.Context) ([]Message, error)

Receive blocks until a message is available or the context is done. It then returns all messages received up to that point. Ideally signal the context when you're done receiving messages, rather than closing the channel.

type InMemoryTransport

type InMemoryTransport struct {
	// contains filtered or unexported fields
}

InMemoryTransport is a Transport that deals with messages within process

func NewMemoryTransport

func NewMemoryTransport() *InMemoryTransport

InMemoryReceiver is a Receiver that receives messages from an InMemoryTransport This is primarily useful in testing both this package and any dependent package

func (*InMemoryTransport) GetReceiver

func (t *InMemoryTransport) GetReceiver() *InMemoryReceiver

GetReceiver returns a Receiver that can be used to receive messages from the InMemoryTransport It's convenient to be able to get the associated Receiver from the Transport

func (*InMemoryTransport) Publish

func (t *InMemoryTransport) Publish(m Message) error

Publish sends a message to the InMemoryTransport by sending a message on the channel

type Message

type Message struct {
	Publisher string
	Order     int
	Content   string
}

Message is a unit of data that can be published by a Publisher. Message contains the name of the Publisher that published it, the order in which it was published, and the content of the message.

type MessageStore

type MessageStore interface {
	All() []Message
}

type NetworkTransport

type NetworkTransport struct {
	// contains filtered or unexported fields
}

NetworkTransport is a Transport that ships messages over the Network

func (*NetworkTransport) Publish

func (t *NetworkTransport) Publish(m Message) error

Publish sends a message to the NetworkTransport by sending an HTTP POST Request

type Publisher

type Publisher struct {
	Transport Transport
	// contains filtered or unexported fields
}

Publishers are producers of messages. They deliver a Message to a Receiver via a Transport.

func NewEventBridgePublisher

func NewEventBridgePublisher(name string, eventBridge EventBridgeClient, opts ...EventBridgeTransportOptions) *Publisher

func (*Publisher) Counter

func (p *Publisher) Counter() int64

Counter allows a way for ordering in case transports are not guaranteed to deliver in order.

func (*Publisher) Publish

func (p *Publisher) Publish(str string) error

Publish sends a message via a Transport. A Publisher is responsible for various metadata about the message.

type PublisherOptions

type PublisherOptions func(*Publisher)

PublisherOptions are functional options for configuring a Publisher. Pass them to NewMemoryPublisher at construction time.

func WithEventBridgeTransport

func WithEventBridgeTransport(eventBridge EventBridgeClient, opts ...EventBridgeTransportOptions) PublisherOptions

WithEventBridgeTransport is a functional option specifying that a Publisher should use an EventBridgeTransport to deliver messages

func WithNetworkTransport

func WithNetworkTransport(endpoint string) PublisherOptions

WithNetworkTransport is a functional option specifying that a Publisher should use the given endpoint to deliver messages over the network

func WithTransport

func WithTransport(t Transport) PublisherOptions

WithTransport is a functional option specifying that a Publisher should use the given Transport to deliver messages.

type Reader

type Reader struct {
	PublisherName  string
	CurrentMessage int
	Store          MessageStore
}

func NewReader

func NewReader(publsherName string) *Reader

func (*Reader) NewMessages

func (r *Reader) NewMessages() []string

type Receiver

type Receiver interface {
	Receive(context.Context) ([]Message, error)
}

Receiver is a mechanism for receiving messages.

type Subscriber

type Subscriber struct {
	Store store.Store
	// contains filtered or unexported fields
}

Subscriber is a consumer of messages. It expects to receive messages from its Receiver and save them to its [Store].

func NewEventBridgeSubscriber

func NewEventBridgeSubscriber(event events.EventBridgeEvent, store store.Store) *Subscriber

func (*Subscriber) Receive

func (s *Subscriber) Receive(ctx context.Context) error

Receive receives messages from the Receiver and saves them to the [Store]. It returns an error if there was a problem saving the messages.

type Transform

type Transform func(Message) (string, error)

Transform is a way to modify a message before sending it to EventBridgeClient It can be considered a way to add a pre-processing step to the message Useful in the case that you have specific requirements you need to adhere to in order for downstream AWS EventBridge Rules to target your event correctly

type Transport

type Transport interface {
	Publish(Message) error
}

Transport some generic mechanism for delivering messages

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL