eventBus

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2020 License: MIT Imports: 8 Imported by: 0

README

Go事件总线

介绍

事件驱动golang event总线,适用于单进程代码快速解耦。

跨进程/集群(跨设备/容器)时应选择rabbit, rocket等开源MQ,而不是选择使用本项目。

软件架构

使用sync.mapgithub.com/deckarep/golang-set底层完成

小范围读写锁确保发布时的性能

使用说明
  1. 初始化实例
    import "github.com/kataras/golog"
    
    EventBus := eventBus.NewBus(golog.Default)
    
  2. 实现callback接口
    type subCallback struct {}
    
    var callback subCallback
    
    func (sub *subCallback) Callback(topic string, events ...interface{}) error {
    	config.Logger.Infof("event:%s%v", topic, events)
    	return nil
    }
    
    func initCallback() {
      callback = subCallback{}
    }
    
    
  3. 订阅主题
    func sub() {
      // 同步订阅(收到事件后,按注册顺序同步执行callback)
      err := EventBus.Subscribe("topic:test", &callback)
      if err != nil {
        golog.Default.Error(err.Error())
      }
      // 异步订阅(收到事件后,为每个callback创建Goroutine,异步执行)
      err := EventBus.SubscribeAsync("topic:test", &callback)
      if err != nil {
        golog.Default.Error(err.Error())
      }
    }
    
  4. 发布主题
    func pub() {
      // 异步发布,发布完成后不等待处理结果,立即返回
      EventBus.Publish("topic:test", "event:async")
      // 同步发布, 发布完成后等待同步与异步订阅全部完成后,返回同步订阅执行结果
      err := EventBus.PublishSync("topic:test", "event:sync")
      if err != nil {
        golog.Default.Error(err.Error())
      }
      // 同PublishSync,但不等待异步订阅完成
      err := EventBus.PublishSyncNoWait("topic:test", "event:syncNoWait")
      if err != nil {
        golog.Default.Error(err.Error())
      }
    }
    
  5. 取消订阅
    func unSub() {
      // 取消单个主题的订阅
      EventBus.UnSubscribe("topic:test", &callback)
      // 取消全部订阅
      EventBus.UnSubscribeAll(&callback)
    }
    
  6. 关闭主题

    注意: 关闭主题只清空目标主题的订阅信息,再次发布与重新订阅该主题不受影响。

    func closeTopic() {
      EventBus.CloseTopic("topic:test")
    }
    
demo

请参考unitTest/pub_sub_test.go

参与贡献
  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BeforeCallback added in v1.0.6

type BeforeCallback = func(topic string, ctx *memstore.Store, events ...interface{}) error

type Callback

type Callback interface {
	Callback(topic string, ctx *memstore.Store, events ...interface{}) error
}

因为会导致重复订阅,所以必须用interface的形式 type Callback = func(string, context.Context, ...interface{}) error

type CallbackFunc

type CallbackFunc = func(topic string, events ...interface{}) error

type Controller

type Controller interface {
	// 等待异步执行完成
	WaitAsync()
	// 设置同步订阅事务标记
	SetTransaction(topic string, tr bool)
	// 关闭主题
	CloseTopic(topic string)
}

type Cycle added in v1.0.4

type Cycle interface {
	// 设置发布时回调
	SetCycleBefore(topic string, callback BeforeCallback)
	// 设置同步完成时回调
	SetCycleAfterSync(topic string, callback CycleCallback)
	// 设置全部完成时回调
	SetCycleAfterAll(topic string, callback CycleCallback)
	// 设置错误时回调
	SetCycleError(topic string, onError ErrorCallback)
}

type CycleCallback added in v1.0.4

type CycleCallback = func(topic string, ctx *memstore.Store, events ...interface{})

type ErrorCallback added in v1.0.4

type ErrorCallback = func(topic string, ctx *memstore.Store, err error, events ...interface{})

type EventBus

type EventBus interface {
	Publisher
	Subscriber
	Controller
	Cycle
}

func NewBus

func NewBus(logger *golog.Logger) EventBus

type Publisher

type Publisher interface {
	// 发布
	Publish(topic string, events ...interface{})
	// 同步发布
	PublishSync(topic string, events ...interface{}) error
	// 同步发布, 不等待异步调用完成
	PublishSyncNoWait(topic string, events ...interface{}) error
}

type Subscriber

type Subscriber interface {
	// 同步订阅主题
	Subscribe(topic string, callback Callback) error
	// 异步订阅主题
	SubscribeAsync(topic string, callback Callback) error
	// 取消已订阅的主题
	UnSubscribe(topic string, callback Callback)
	// 取消所有已订阅的主题
	UnSubscribeAll(callback Callback)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL