GoBroke

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2025 License: GPL-2.0 Imports: 7 Imported by: 0

README

GoBroke

GoBroke is a lightweight internal message routing system designed for modular logic processing in Go applications. It provides a clean architecture for handling messages between different components (clients and logic modules) within your project.

Overview

GoBroke acts as a message router that:

  • Routes messages between clients and logic modules
  • Supports different types of logic processing (Dispatched, Worker, Passive)
  • Allows custom endpoint implementations (HTTP, UDP, TCP, gRPC, etc.)
  • Provides clean separation between message handling and business logic

Architecture

Core Components
  1. Broke: The main router that manages message flow between clients and logic modules
  2. Endpoint: Interface for implementing custom network protocols
  3. Client: Represents connected clients and manages their state
  4. Logic: Interface for implementing business logic modules
  5. Message: Structure for passing data between components
  6. LogicBase: Base implementation providing common logic functionality
Message Flow
[Client] <-> [Endpoint] <-> [Broke] <-> [Logic Modules]

Messages can flow:

  • From clients to logic modules
  • From logic modules to specific clients
  • From logic modules to all clients (broadcast)

Logic Implementation

All logic modules in GoBroke extend the LogicBase struct, which provides common functionality:

type LogicBase struct {
    name      string
    logicType types.LogicType
    Ctx       context.Context
    *Broke
}

To create a new logic module, embed LogicBase and initialize it using NewLogicBase:

type customLogic struct {
    GoBroke.LogicBase
    // Additional fields specific to your logic
}

func CreateCustomLogic(broke *GoBroke.Broke) types.Logic {
    logic := customLogic{
        LogicBase: GoBroke.NewLogicBase("customlogic", types.DISPATCHED, broke),
        // Initialize additional fields
    }
    return &logic
}

Logic Types

GoBroke supports three types of logic modules:

1. DISPATCHED Logic
  • Processes messages immediately in a new goroutine
  • Best for quick, non-blocking operations
  • Suitable for broadcasting or simple transformations
  • Example: Message broadcaster
type broadcasterDispatched struct {
    GoBroke.LogicBase
}

func CreateDispatched(broke *GoBroke.Broke) types.Logic {
    worker := broadcasterDispatched{
        LogicBase: GoBroke.NewLogicBase("broadcaster", types.DISPATCHED, broke),
    }
    return &worker
}

func (w *broadcasterDispatched) RunLogic(msg types.Message) error {
    clients := w.GetAllClients()
    sMsg := types.Message{
        ToClient:   clients,
        FromLogic:  w,
        MessageRaw: msg.MessageRaw,
    }
    w.SendMessage(sMsg)
    return nil
}
2. WORKER Logic
  • Processes messages in a dedicated worker goroutine
  • Maintains its own message queue
  • Best for sequential processing or rate-limited operations
  • Example: Sequential message processor
type broadcasterWorker struct {
    GoBroke.LogicBase
    receive chan types.Message
}

func CreateWorker(broke *GoBroke.Broke, ctx context.Context) types.Logic {
    worker := broadcasterWorker{
        LogicBase: GoBroke.NewLogicBase("broadcaster", types.WORKER, broke),
        receive:   make(chan types.Message),
    }
    worker.startWorker()
    return &worker
}

func (w *broadcasterWorker) startWorker() {
    for {
        select {
        case <-w.Ctx.Done():
            return
        case msg := <-w.receive:
            w.work(msg)
        }
    }
}

func (w *broadcasterWorker) RunLogic(message types.Message) error {
    w.receive <- message
    return nil
}
3. PASSIVE Logic
  • Runs independently of message flow
  • Never receives messages directly
  • Best for background tasks or monitoring
  • Example: Inactivity monitor
type inactivityMonitor struct {
    GoBroke.LogicBase
    inactivityMinutes int
}

func CreateWorker(broke *GoBroke.Broke, inactivityMinutes int) types.Logic {
    worker := inactivityMonitor{
        LogicBase:         GoBroke.NewLogicBase("inactivitymonitor", types.PASSIVE, broke),
        inactivityMinutes: inactivityMinutes,
    }
    worker.startWorker()
    return &worker
}

func (w *inactivityMonitor) startWorker() {
    for {
        select {
        case <-w.Ctx.Done():
            return
        default:
            time.Sleep(10 * time.Second)
            clients := w.GetAllClients()
            for _, client := range clients {
                delta := time.Now().Sub(client.GetLastMessage())
                if delta.Minutes() > float64(w.inactivityMinutes) {
                    _ = w.RemoveClient(client)
                }
            }
        }
    }
}

func (w *inactivityMonitor) RunLogic(message types.Message) error {
    return fmt.Errorf("this logic does not support invocation")
}

Getting Started

  1. Create a new GoBroke instance:
ctx := context.Background()
gb, err := GoBroke.New(yourendpoint, GoBroke.WithContext(ctx))
if err != nil {
    panic(err)
}
  1. Implement your logic modules:
// Create and add logic modules
broadcasterLogic := broadcaster.CreateDispatched(gb)
_ = gb.AddLogic(broadcasterLogic)

inactivityMonitor := inactivitymonitor.CreateWorker(gb, 15)
_ = gb.AddLogic(inactivityMonitor)
  1. Implement an endpoint:
// Implement the endpoint.Endpoint interface
type Endpoint interface {
    Sender(chan types.Message) error
    Receiver(chan types.Message) error
    Disconnect(*clients.Client) error
}
  1. Start the router:
gb.Start()

Custom Endpoints

To implement a custom endpoint:

  1. Create a struct that implements the endpoint.Endpoint interface
  2. Implement the required methods:
    • Sender: Handle outgoing messages
    • Receiver: Handle incoming messages
    • Disconnect: Handle client disconnection

Example WebSocket endpoint structure:

type WSEndpoint struct {
    upgrader websocket.Upgrader
    clients  map[string]*websocket.Conn
}

func (e *WSEndpoint) Sender(ch chan types.Message) error {
    // Implement message sending logic
}

func (e *WSEndpoint) Receiver(ch chan types.Message) error {
    // Implement message receiving logic
}

func (e *WSEndpoint) Disconnect(client *clients.Client) error {
    // Implement client disconnection logic
}

Message Structure

Messages in GoBroke contain:

  • Target clients (ToClient)
  • Target logic modules (ToLogic)
  • Source client (FromClient)
  • Source logic module (FromLogic)
  • Raw message data (MessageRaw)
  • Metadata for additional context (Metadata)
  • Unique identifier (UUID)
  • Message state (State)
  • Tags for middleware processing (Tags)
Message State and Control

Messages can be in one of two states:

  • ACCEPTED (default): Message continues through the processing pipeline
  • REJECTED: Message is dropped from the processing pipeline

Control methods:

// Accept the message for further processing
message.Accept()

// Reject the message to prevent further processing
message.Reject()
Message Tags

Tags provide a way to attach and retrieve arbitrary data during message processing:

// Add a tag to the message
message.AddTag("priority", "high")

// Retrieve a tag value
value, err := message.GetTag("priority", nil)

Middleware

GoBroke supports middleware functions for both receiving and sending messages. Middleware can modify messages, add tags, or control message flow through accept/reject states.

Adding Middleware
// Middleware function type
type middlewareFunc func(types.Message) types.Message

// Add receive middleware (executed when messages are received)
gb.AttachReceiveMiddleware(func(msg types.Message) types.Message {
    // Process incoming message
    return msg
})

// Add send middleware (executed before messages are sent)
gb.AttachSendMiddleware(func(msg types.Message) types.Message {
    // Process outgoing message
    return msg
})

Example middleware for message filtering:

gb.AttachReceiveMiddleware(func(msg types.Message) types.Message {
    // Reject messages larger than 1MB
    if len(msg.MessageRaw) > 1024*1024 {
        msg.Reject()
    }
    return msg
})

Best Practices

  1. Logic Type Selection:

    • Use DISPATCHED for simple, non-blocking operations
    • Use WORKER for sequential or rate-limited processing
    • Use PASSIVE for background tasks and monitoring
  2. Context Usage:

    • Use the context provided by LogicBase for cancellation
    • Add timeouts where appropriate
    • Handle context cancellation in worker loops
  3. Message Processing:

    • Keep message processing logic concise
    • Use appropriate goroutines for concurrent processing
    • Consider message ordering requirements when choosing logic types
  4. LogicBase Usage:

    • Extend LogicBase for all logic implementations
    • Use the provided context for cancellation handling
    • Access common functionality through LogicBase methods

License

This project is licensed under the terms specified in the LICENSE file.

Note

This is primarily a personal project focused on clean architecture and modular design in Go. While it's functional and can be used in other projects, it's primarily meant as a learning tool and reference implementation.

Documentation

Overview

Package GoBroke provides a flexible message broker implementation for handling client-to-client and client-to-logic communication patterns. It supports different types of message routing, client management, and custom logic handlers.

Package GoBroke provides configuration options for the GoBroke message broker system.

Package GoBroke provides the base implementation for logic handlers in the GoBroke message broker system.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithChannelSize

func WithChannelSize(size int) brokeOptsFunc

WithChannelSize returns a brokeOptsFunc that sets the message channel buffer size. This affects how many messages can be queued before blocking occurs.

func WithContext

func WithContext(ctx context.Context) brokeOptsFunc

WithContext returns a brokeOptsFunc that sets a custom context for the broker. The context can be used to control the broker's lifecycle and pass values.

Types

type Broke

type Broke struct {
	// contains filtered or unexported fields
}

Broke represents a message broker instance that manages client connections, message routing, and custom logic handlers.

func New

func New(endpoint endpoint.Endpoint, opts ...brokeOptsFunc) (*Broke, error)

New creates a new GoBroke instance with the specified endpoint and optional configuration. It returns an error if the endpoint is nil or if there are issues setting up message queues.

func (*Broke) AddLogic

func (broke *Broke) AddLogic(logic types.Logic) error

AddLogic adds a new logic handler to the GoBroke instance. It returns an error if a logic handler with the same name already exists.

func (*Broke) AttachReceiveMiddleware added in v0.0.2

func (broke *Broke) AttachReceiveMiddleware(mFunc middlewareFunc)

AttachReceiveMiddleware adds a middleware function to the receive message pipeline. Middleware functions are executed in the order they are attached and can modify or filter messages before they are processed by the broker.

The middleware function receives a Message and returns a modified Message or nil if the message should be dropped from the pipeline.

func (*Broke) AttachSendMiddleware added in v0.0.2

func (broke *Broke) AttachSendMiddleware(mFunc middlewareFunc)

AttachSendMiddleware adds a middleware function to the send message pipeline. Middleware functions are executed in the order they are attached and can modify or filter messages before they are sent to clients.

The middleware function receives a Message and returns a modified Message or nil if the message should be dropped from the pipeline.

func (*Broke) GetAllClients

func (broke *Broke) GetAllClients() []*clients.Client

GetAllClients returns a slice containing all currently connected clients.

func (*Broke) GetClient

func (broke *Broke) GetClient(uuid string) (*clients.Client, error)

GetClient retrieves a client by their UUID. It returns the client instance and nil if found, or nil and an error if not found.

func (*Broke) RegisterClient

func (broke *Broke) RegisterClient(client *clients.Client) error

RegisterClient registers a new client in the GoBroke instance. This method should be called from the endpoint implementation. It returns an error if the client is already registered.

func (*Broke) RemoveClient

func (broke *Broke) RemoveClient(client *clients.Client) error

RemoveClient removes a client from the GoBroke instance and disconnects them from the endpoint. It returns an error if the client doesn't exist or if the disconnection fails.

func (*Broke) RemoveLogic

func (broke *Broke) RemoveLogic(name types.LogicName) error

RemoveLogic removes a logic handler from the GoBroke instance by its name. It returns nil even if the logic handler doesn't exist.

func (*Broke) SendMessage

func (broke *Broke) SendMessage(message types.Message)

SendMessage queues a message for processing by GoBroke. This method can be used to send messages to both logic handlers and clients. If the message is from a client, their last message timestamp is updated.

func (*Broke) SendMessageQuickly

func (broke *Broke) SendMessageQuickly(message types.Message)

SendMessageQuickly sends a message directly to the endpoint for processing. This method should only be used for client-to-client communication as it bypasses logic handlers.

func (*Broke) Start

func (broke *Broke) Start()

Start begins processing messages in the GoBroke instance. It runs until the context is cancelled, at which point it closes all message queues and stops processing.

type LogicBase

type LogicBase struct {
	Ctx    context.Context // Context for cancellation and value propagation
	*Broke                 // Embedded broker instance for accessing broker functionality
	// contains filtered or unexported fields
}

LogicBase provides a base implementation of the types.Logic interface. It implements common functionality that can be embedded in specific logic handlers.

func NewLogicBase

func NewLogicBase(name types.LogicName, logicType types.LogicType, broke *Broke) LogicBase

NewLogicBase creates a new LogicBase instance with the specified configuration. Parameters:

  • name: Unique identifier for the logic handler
  • logicType: Determines how messages are processed (WORKER, DISPATCHED, or PASSIVE)
  • broke: Reference to the broker instance

Returns a LogicBase configured with the provided parameters and a derived context.

func (LogicBase) Name

func (w LogicBase) Name() types.LogicName

Name returns the unique identifier of this logic handler. This method satisfies part of the types.Logic interface.

func (LogicBase) Type

func (w LogicBase) Type() types.LogicType

Type returns the LogicType of this handler (WORKER, DISPATCHED, or PASSIVE). This method satisfies part of the types.Logic interface.

Directories

Path Synopsis
Package clients provides client management functionality for the GoBroke system.
Package clients provides client management functionality for the GoBroke system.
Package endpoint defines the interface for communication endpoints in the GoBroke system.
Package endpoint defines the interface for communication endpoints in the GoBroke system.
Package brokeerrors provides error definitions for the GoBroke message broker system.
Package brokeerrors provides error definitions for the GoBroke message broker system.
examples
cmd
logic/broadcaster
Package broadcaster provides example implementations of logic handlers for broadcasting messages.
Package broadcaster provides example implementations of logic handlers for broadcasting messages.
logic/inactivitymonitor
Package inactivitymonitor provides a passive logic handler that monitors client activity and automatically removes clients that have been inactive for a specified duration.
Package inactivitymonitor provides a passive logic handler that monitors client activity and automatically removes clients that have been inactive for a specified duration.
Package message provides message creation and management functionality for the GoBroke system.
Package message provides message creation and management functionality for the GoBroke system.
Package types provides core type definitions for the GoBroke message broker system.
Package types provides core type definitions for the GoBroke message broker system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL