bus

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: GPL-3.0 Imports: 8 Imported by: 11

README

Bus Library: Implementing the Publish/Subscribe Paradigm in Golang

Go Report Card codecov

Branch Status
master Build Status
dev Build Status
Overview

The Bus library is a tool designed for convenient implementation of the publish/subscribe paradigm in the Go programming language. The publish/subscribe paradigm is widely used to facilitate efficient message exchange between system components, distinguishing message producers (publishers) and message consumers (subscribers).

Features
  1. The Bus library supports a powerful topic system. Topics can contain + and # symbols, allowing for flexible topic hierarchies. For example, the topic "home/+/temperature" can subscribe to all temperature changes in different rooms.

  2. One of the key advantages of the Bus library is its ability to transmit any data type based on structures. This provides flexibility and allows for the transmission of complex data between system components.

Library Interface
type Bus interface {
    Publish(topic string, args ...interface{})
    CloseTopic(topic string)
    Subscribe(topic string, fn interface{}, options ...interface{}) error
    Unsubscribe(topic string, fn interface{}) error
    Stat(ctx context.Context, limit, offset int64, orderBy, sort string) (stats Stats, total int64, err error)
    Purge()
}
Core Methods
  • Publish(topic string, args ...interface{}): Publishes a message to the specified topic with arbitrary arguments.

  • CloseTopic(topic string): Closes a topic, causing all subscribers to unsubscribe from that topic.

  • Subscribe(topic string, fn interface{}, options ...interface{}) error: Subscribes to a topic with a specified handler function and additional options.

  • Unsubscribe(topic string, fn interface{}) error: Unsubscribes from a topic for the specified handler function.

Additional Methods
  • Stat(ctx context.Context, limit, offset int64, orderBy, sort string) (stats Stats, total int64, err error): Retrieves statistics on the usage of topics with pagination and sorting options.

  • Purge(): Clears all topics and unsubscribes from all subscriptions.

Example
type TemperatureReading struct {
	RoomID      string
	Temperature float64
}

type HumidityReading struct {
	RoomID   string
	Humidity float64
}

func main() {

	var messageBus = bus.NewBus()

	var eventHandler = func(topic string, msg interface{}) {
		switch v := msg.(type) {
		case TemperatureReading:
			fmt.Printf("topic: \"%s\", message: %v\n", topic, v)
		case HumidityReading:
			fmt.Printf("topic: \"%s\", message: %v\n", topic, v)
		}
	}

	messageBus.Subscribe("home/#", eventHandler)
	defer messageBus.Unsubscribe("home/#", eventHandler)

	go func() {
		for i := 0; i < 5; i++ {
			messageBus.Publish("home/living_room/temperature", TemperatureReading{
				RoomID:      "living_room",
				Temperature: 20.0 + float64(i),
			})
			time.Sleep(time.Second * 2)
		}
	}()

	go func() {
		for i := 0; i < 5; i++ {
			messageBus.Publish("home/kitchen/humidity", HumidityReading{
				RoomID:   "kitchen",
				Humidity: 40.0 + float64(i),
			})
			time.Sleep(time.Second * 3)
		}
	}()

	// ctrl + C
	var gracefulStop = make(chan os.Signal, 10)
	signal.Notify(gracefulStop, syscall.SIGINT, syscall.SIGTERM)

	<-gracefulStop

}
Conclusion

The Bus library provides efficient tools for implementing the publish/subscribe paradigm in the Go programming language. Its use simplifies communication between system components, providing flexibility and ease of use.

Contributors

All contributors are welcome. If you would like to contribute, please adhere to the following rules.

  • Pull requests will be accepted only in the "develop" branch.
  • All modifications or additions should be tested.

Thank you for your understanding!

LICENSE

GPLv3 Public License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func TopicMatch

func TopicMatch(topic []byte, topicFilter []byte) bool

TopicMatch returns whether the topic and topic filter is matched.

Types

type Bus

type Bus interface {
	// Publish publishes arguments to the given topic subscribers
	// Publish block only when the buffer of one of the subscribers is full.
	Publish(topic string, args ...interface{})
	// CloseTopic unsubscribe all subscribers from given topic
	CloseTopic(topic string)
	// Subscribe subscribes to the given topic
	Subscribe(topic string, fn interface{}, options ...interface{}) error
	// Unsubscribe handler from the given topic
	Unsubscribe(topic string, fn interface{}) error
	// Stat ...
	Stat(ctx context.Context, limit, offset int64, orderBy, sort string) (stats Stats, total int64, err error)
	// Purge ...
	Purge()
}

Bus implements publish/subscribe messaging paradigm

func NewBus

func NewBus() Bus

NewBus ...

type RPSCounter

type RPSCounter struct {
	// contains filtered or unexported fields
}

func (*RPSCounter) Inc

func (c *RPSCounter) Inc()

func (*RPSCounter) Stop

func (c *RPSCounter) Stop()

func (*RPSCounter) Value

func (c *RPSCounter) Value() float64

type StatItem

type StatItem struct {
	Topic       string
	Subscribers int
	Min         time.Duration
	Max         time.Duration
	Avg         time.Duration
	Rps         float64
}

StatItem ...

type Statistic

type Statistic struct {
	// contains filtered or unexported fields
}

func NewStatistic

func NewStatistic() *Statistic

type Stats

type Stats []*StatItem

Stats ...

func (Stats) Len

func (s Stats) Len() int

func (Stats) Less

func (s Stats) Less(i, j int) bool

func (Stats) Swap

func (s Stats) Swap(i, j int)

type Topic

type Topic struct {
	*Statistic
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(name string) *Topic

func (*Topic) Close

func (t *Topic) Close()

func (*Topic) Publish

func (t *Topic) Publish(args ...interface{})

func (*Topic) Stat

func (t *Topic) Stat() *StatItem

func (*Topic) Subscribe

func (t *Topic) Subscribe(fn interface{}, options ...interface{}) error

func (*Topic) Unsubscribe

func (t *Topic) Unsubscribe(fn interface{}) (empty bool, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL