maestro

package module
v0.0.0-...-0172ac5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 13 Imported by: 0

README

Maestro

Not totally sure where this project is going to go yet, but the idea is to be a message broker that uses database watch streams to handle managing the queues

Things to do now:

  • [x] Basic Container Implementation
    • The goal is to eventually have more containers like a heap, but for now just a slice is fine
  • [x] Mongo Change Stream Watcher
  • [x] Protocol parsing
    • Decided to get really fancy here with struct tags. Probably overkill
  • [ ] Protocol Buffer Implementation
    • Initial thought it to have the protocol send some version number, content length and then the data as a protobuf.
  • [ ] Peer Subscribing/Unsubscribing
  • [ ] Queues sending data and receiving acknowledgements (probably some more protobuf work)
  • [ ] Message type that will probably be some fixed length so I know if someone is subbing, acking, ect.

Building

There really is no building yet...

The docker-compose file will start up a Mongo instance that has a replica set so that change streams work. This is all for now

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueEmpty   = errors.New("queue is empty")
	ErrItemNotFound = errors.New("item not found")
)
View Source
var ErrInvalidTerminator = errors.New("invalid terminator")
View Source
var ErrUnauthorized = errors.New("unauthorized")

Functions

func IntToBytes

func IntToBytes(n int, byteCount int) []byte

func MongoAuthURL

func MongoAuthURL() (string, error)

MongoAuthURL constructs a MongoDB connection string from environment variables.

Types

type ActionType

type ActionType string
const (
	ActionTypeAcknowledge ActionType = "acknowledge"
	ActionTypeSubscribe   ActionType = "subscribe"
)

type AuthInfo

type AuthInfo struct {
	Claims map[string]any
	ConnID string
}

type AuthParserProtocol

type AuthParserProtocol struct {
	Authenticator Authenticator
	Parser        Parser
}

type Authenticator

type Authenticator interface {
	Authenticate(auth any) (AuthInfo, error)
}

type BinaryAuthContentMessage

type BinaryAuthContentMessage struct {
	Auth        []byte
	Content     []byte
	Version     int
	AuthSize    int
	ContentSize int
}

type BinaryAuthContentProtocol

type BinaryAuthContentProtocol struct {
	Authenticator Authenticator
	Parser        Parser
}

func (*BinaryAuthContentProtocol) Authenticate

func (au *BinaryAuthContentProtocol) Authenticate(m AuthInfo) (AuthInfo, error)

func (*BinaryAuthContentProtocol) Parse

func (au *BinaryAuthContentProtocol) Parse(data any) (Message, error)

func (*BinaryAuthContentProtocol) ParseIncoming

func (au *BinaryAuthContentProtocol) ParseIncoming(data any) (Message, error)

type Config

type Config struct {
	Logger slog.Logger
}

type Container

type Container interface {
	Push(item QueueItem)
	Pop() (QueueItem, error)
	Len() int
	Items() []QueueItem
	Find(id string) (QueueItem, error)
}

type ContainerWriter

type ContainerWriter interface {
	Write(item QueueItem) error
}

type JWTAuthenticator

type JWTAuthenticator struct {
	Opts JWTAuthenticatorOpts
}

func NewJWTAuthenticator

func NewJWTAuthenticator(opts JWTAuthenticatorOpts) *JWTAuthenticator

func (*JWTAuthenticator) Authenticate

func (j *JWTAuthenticator) Authenticate(data any) (AuthInfo, error)

Authenticate takes a token string and returns the claims if the token is valid

type JWTAuthenticatorOpts

type JWTAuthenticatorOpts struct {
	SigningMethod string
	Secret        string
}

type Maestro

type Maestro struct {
	Config Config
	Queues []Queue
}

type Message

type Message struct {
	Content    interface{}
	ConnID     string
	ActionType ActionType
}

type MongoChangeEvent

type MongoChangeEvent struct {
	FullDocument  bson.M             `bson:"fullDocument"`
	OperationType string             `bson:"operationType"`
	DocumentKey   primitive.ObjectID `bson:"documentKey"`
}

type MongoWatcher

type MongoWatcher struct {
	// contains filtered or unexported fields
}

func NewMongoWatcher

func NewMongoWatcher(client *mongo.Client, opts MongoWatcherOpts) (*MongoWatcher, error)

func (*MongoWatcher) Watch

func (mw *MongoWatcher) Watch(ctx context.Context, c chan QueueUpdateMessage) error

type MongoWatcherOpts

type MongoWatcherOpts struct {
	DatabaseName   string
	CollectionName string
}

type NilAuthenticator

type NilAuthenticator struct{}

func NewNilAuthenticator

func NewNilAuthenticator() *NilAuthenticator

func (*NilAuthenticator) Authenticate

func (na *NilAuthenticator) Authenticate(any) (AuthInfo, error)

type OpType

type OpType int
const (
	OpTypeInsert OpType = iota
	OpTypeUpdate
	OpTypeDelete
)

type Parser

type Parser interface {
	Parse(data any) (Message, error)
}

type Peer

type Peer struct{}

type PeerMap

type PeerMap struct {
	// contains filtered or unexported fields
}

func NewPeerMap

func NewPeerMap() *PeerMap

func (*PeerMap) AddPeer

func (pm *PeerMap) AddPeer(connID string, peer *Peer)

func (*PeerMap) GetPeer

func (pm *PeerMap) GetPeer(connID string) *Peer

func (*PeerMap) RemovePeer

func (pm *PeerMap) RemovePeer(connID string)

type Protocol

type Protocol interface {
	Authenticator
	Parser
	ParseIncoming(data any) (Message, error)
}

type Queue

type Queue struct {
	Container Container
	Writer    ContainerWriter
	Cfg       QueueConfig
	Name      string
}

type QueueConfig

type QueueConfig struct{}

type QueueItem

type QueueItem interface {
	ID() string
	Data() interface{}
}

type QueueUpdateMessage

type QueueUpdateMessage struct {
	Data   interface{}
	ID     string
	OpType OpType
}

type Server

type Server struct {
	Logger   slog.Logger
	Listener net.Listener
	Opts     ServerOpts
}

func NewServer

func NewServer(l net.Listener, opts ServerOpts) *Server

func (*Server) Start

func (s *Server) Start(ctx context.Context)

type ServerOpts

type ServerOpts struct {
	Addr string
	Port int
}

type SliceContainer

type SliceContainer struct {
	Elements []QueueItem
}

func NewSliceContainer

func NewSliceContainer() *SliceContainer

func (*SliceContainer) Find

func (sc *SliceContainer) Find(id string) (QueueItem, error)

func (*SliceContainer) Items

func (sc *SliceContainer) Items() []QueueItem

func (*SliceContainer) Len

func (sc *SliceContainer) Len() int

func (*SliceContainer) Pop

func (sc *SliceContainer) Pop() (QueueItem, error)

func (*SliceContainer) Push

func (sc *SliceContainer) Push(item QueueItem)

type Watcher

type Watcher interface {
	Watch(ctx context.Context, c chan QueueUpdateMessage) error
}

Directories

Path Synopsis
experimental

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL