watch

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2024 License: MIT Imports: 13 Imported by: 0

README

watch服务做什么

它主要用来监听某一类事件或者信息,然后能根据不同的信息类型完成对应的处理。

那么我们大概需要如下东西

  • 消息体
    • 用于维护消息的类型,内容等,如果需要重试,还可以维护消息的重试次数、重试逻辑等
  • 消息队列
    • 用于消息的发送接收,也用于消息削峰等,给watch提供一个持续的消息流
  • 消息处理器
    • 根据消息类型匹配具体的处理器完成逻辑处理
  • 消息存储器
    • 持久化存储消息,避免消息丢失,也可以用于缓存等
  • 消息编码器
    • 消息编码器,用于序列化消息,方便存储和消费

消息索引

定义不同消息的索引规则, 方便持久化存储和消费

// Indexer 索引器
type Indexer interface {
    Index() string
}

消息体

// Message watch 消息结构体
type Message struct {
	// 并发锁结构, 避免并发读写问题
    lock sync.Mutex

    // 传输的消息内容, 由用户自定义
    data Indexer

    // 消息类型, 如需要增加新的类型,去vobj包增加
    topic vobj.Topic

    // 注册编码器
    schema Schemer

    // 重试次数
    retry int

    // 最大消息重试次数
    retryMax int

    // 是否已经处理过
    handleCtx context.Context
}

消息队列

// Queue 消息队列
type Queue interface {
    // Next 获取下一个消息
    Next() (*Message, bool)
    
    // Push 添加消息
    Push(msg *Message) error
    
    // Close 关闭队列
    Close() error
    
    // Len 获取队列长度
    Len() int
    
    // Clear 清空队列
    Clear()
}

存储器

// Storage 存储器
type Storage interface {
    // Get 获取消息
    Get(index Indexer) *Message

    // Put 放入消息
    Put(msg *Message) error

    // Clear 清空消息
    Clear()

    // Remove 移除消息
    Remove(index Indexer)

    // Close 关闭存储器
    Close() error

    // Len 长度
    Len() int

    // Range 遍历
    //  f返回值为bool类型,如果返回false,则停止range
    Range(f func(index Indexer, msg *Message) bool)
}

消息处理器

// Handler 消息处理
type Handler interface {
    // Handle 处理消息
    //
    // 	ctx 上下文
    // 	msg 消息
    Handle(ctx context.Context, msg *Message) error
}

消息编码器

type Schemer interface {
    // Decode 解码
    Decode(in *Message, out any) error

    // Encode 编码
    Encode(in *Message, out any) error
}

watch服务定义

以上完成了基础数据结构定义,现在我们需要吧这些功能组合起来,完成我们的watch功能

type Watcher struct {
    // 停止监听的通道
    stopCh chan struct{}
    // 存储器
    storage Storage
    // 消息队列
    queue Queue
    // 消息处理器
    handler Handler
    // 超时时间
    timeout time.Duration
}


func (w *Watcher) Start(_ context.Context) error {
  go func() {
    defer after.RecoverX()
    for {
      select {
        case <-w.stopCh:
          log.Infow("method", "stop watcher")
          w.clear()
          return
        default:
          if types.IsNil(w.queue) {
            log.Warnw("method", "queue is empty")
            continue
          }
          w.reader()
      }
    }
  }()
  return nil
}

func (w *Watcher) Stop(_ context.Context) error {
  w.stopCh <- struct{}{}
  return nil
}

// clear 清理资源
func (w *Watcher) clear() {
  if !types.IsNil(w.queue) {
    if err := w.queue.Close(); err != nil {
      log.Errorw("method", "close queue error", "error", err)
    }
  }
  
  if !types.IsNil(w.storage) {
    if err := w.storage.Close(); err != nil {
        log.Errorw("method", "close storage error", "error", err)
    }
  }
  
  close(w.stopCh)
  log.Infow("method", "clear resources", "res", "done")
}

// retry 重试
func (w *Watcher) retry(msg *Message) {
  if msg.GetRetry() >= msg.GetRetryMax() {
    // 重试次数超过最大次数不再重试
    return
  }
  // 消息重试次数+1
  msg.RetryInc()
  if err := w.queue.Push(msg); err != nil {
    log.Errorw("method", "push message to queue error", "error", err)
  }
}

func (w *Watcher) reader() {
  msg, ok := w.queue.Next()
  if !ok {
    return
  }
  
  if !types.IsNil(w.handler) {
    // 递交消息给处理器,由处理器决定消息去留, 如果失败,会进入重试逻辑
    ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
    defer cancel()
    if err := w.handler.Handle(ctx, msg); err != nil {
      log.Errorw("method", "handle message error", "error", err)
      w.retry(msg)
      return
    }
  }
  
  if !types.IsNil(w.storage) {
    // 存储消息
    if err := w.storage.Put(msg); err != nil {
      log.Errorw("method", "put message to storage error", "error", err)
      w.retry(msg)
      return
    }
  }
}

watch调用start方法后启动watch监听,监听到消息后,会调用handler处理消息,处理完成后,如果失败会根据消息的配置,决定是否需要重试。处理成功会加入存储器,如果失败会进入重试逻辑。

在重试逻辑中,判断消息是否已经达到最大的重试次数, 如果没有达到,则重新入队, 重试次数+1.

watch提供了stop方法,可以由调用方去通过系统信号或者其他方式停止监听服务,watch停止前会先清理依赖的队列、存储等、然后退出监听协程。

watch的实现平实简单,但是需要考虑很多细节,比如消息重试、消息存储、消息队列等,这些细节需要根据具体的业务场景进行设计,比如消息重试的次数、消息存储的存储方式、消息队列的实现方式等。

测试

package watch_test

import (
	"context"
	"errors"
	"fmt"
	"testing"
	"time"

	"github.com/aide-family/moon/pkg/vobj"
	"github.com/aide-family/moon/pkg/watch"
	"github.com/go-kratos/kratos/v2/log"
)

type MyMsg struct {
	Data int
}

func (m *MyMsg) Index() string {
	return fmt.Sprintf("my-msg-%d", m.Data)
}

func msgHandle(ctx context.Context, msg *watch.Message) error {
	log.Debugw("default handler", msg.GetData())

	if err := msg.GetSchema().Encode(msg, msg.GetData()); err != nil {
		log.Errorw("method", "Encode", "err", err)
	}
	if err := msg.GetSchema().Decode(msg, msg.GetData()); err != nil {
		log.Errorw("method", "Decode", "err", err)
	}
	d := msg.GetData().(*MyMsg)
	if d.Data%3 == 0 {
		return errors.New("模拟错误, 检测重试")
	}
	return nil
}

func TestNewWatcher(t *testing.T) {
	defaultQueue := watch.NewDefaultQueue(100)
	defaultStorage := watch.NewDefaultStorage()

	opts := []watch.WatcherOption{
		watch.WithWatcherQueue(defaultQueue),
		watch.WithWatcherStorage(defaultStorage),
		watch.WithWatcherTimeout(3 * time.Second),
		watch.WithWatcherHandler(watch.NewDefaultHandler(
			watch.WithDefaultHandlerTopicHandle(vobj.TopicUnknown, msgHandle),
		)),
	}
	w := watch.NewWatcher(opts...)

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	w.Start(ctx)

	msgCount := 100
	schema := watch.NewDefaultSchemer()
	msgOpts := []watch.MessageOption{
		watch.WithMessageSchema(schema),
		watch.WithMessageRetryMax(3),
	}
	go func() {
		for msgCount > 0 {
			time.Sleep(1 * time.Second) // 延时1秒发送
			value := msgCount
			msg := watch.NewMessage(&MyMsg{Data: value}, vobj.TopicUnknown, msgOpts...)
			msgCount--
			if err := w.GetQueue().Push(msg); err != nil {
				continue
			}
		}
	}()

	go func() {
		for {
			log.Infow("默认存储的数据长度", w.GetStorage().Len())
			time.Sleep(3 * time.Second)
		}
	}()

	time.Sleep(10 * time.Second)
	w.Stop(context.Background())
}

系统实现一些简单的消息队列、存储器等作为默认实现, 可以通过配置或者参数的方式来替换默认实现,比如替换队列实现为kafka、替换存储器实现为redis等。

在handle中,我们假定每3次就会失败, 依次来检测重试逻辑。

INFO 默认存储的数据长度=0
DEBUG default handler=&{100}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
DEBUG default handler=&{99}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{99}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{99}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{99}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
INFO 默认存储的数据长度=1
DEBUG default handler=&{98}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
DEBUG default handler=&{97}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
DEBUG default handler=&{96}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{96}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{96}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{96}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
INFO 默认存储的数据长度=3
DEBUG default handler=&{95}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
DEBUG default handler=&{94}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
DEBUG default handler=&{93}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{93}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{93}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
DEBUG default handler=&{93}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
ERROR method=handle message error error=模拟错误, 检测重试
INFO 默认存储的数据长度=5
DEBUG default handler=&{92}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
DEBUG default handler=&{91}
ERROR method=Encode err=rpc error: code = Unimplemented desc = encode unimplemented topic: 未知
ERROR method=Decode err=rpc error: code = Unimplemented desc = decode unimplemented topic: 未知
INFO method=stop watcher
INFO method=clear resources res=done

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CacheStorageMsg

type CacheStorageMsg string

CacheStorageMsg 缓存存储器消息

func (CacheStorageMsg) Index

func (c CacheStorageMsg) Index() string

Index 实现Indexer接口

func (CacheStorageMsg) String

func (c CacheStorageMsg) String() string

func (CacheStorageMsg) Unmarshal

func (c CacheStorageMsg) Unmarshal(v any) error

Unmarshal 反序列化

type DefaultHandlerOption

type DefaultHandlerOption func(d *defaultHandler)

DefaultHandlerOption 默认消息处理配置

func WithDefaultHandlerTopicHandle

func WithDefaultHandlerTopicHandle(topic vobj.Topic, handles ...HandleFun) DefaultHandlerOption

WithDefaultHandlerTopicHandle 设置默认消息处理

type HandleFun

type HandleFun func(ctx context.Context, msg *Message) error

HandleFun 消息处理函数

type Handler

type Handler interface {
	// Handle 处理消息
	//
	// 	ctx 上下文
	// 	msg 消息
	Handle(ctx context.Context, msg *Message) error
}

Handler 消息处理

func NewDefaultHandler

func NewDefaultHandler(opts ...DefaultHandlerOption) Handler

NewDefaultHandler 创建默认消息处理

type Indexer

type Indexer interface {
	fmt.Stringer
	// Index 索引生成器
	Index() string
}

Indexer 索引器

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message watch 消息结构体

func NewMessage

func NewMessage(data Indexer, topic vobj.Topic, opts ...MessageOption) *Message

NewMessage 创建消息

func (*Message) GetData

func (m *Message) GetData() Indexer

GetData 获取消息内容

func (*Message) GetRetry

func (m *Message) GetRetry() int

GetRetry 获取消息重试次数

func (*Message) GetRetryMax

func (m *Message) GetRetryMax() int

GetRetryMax 获取消息最大重试次数

func (*Message) GetSchema

func (m *Message) GetSchema() Schemer

GetSchema 获取消息编码器

func (*Message) GetTopic

func (m *Message) GetTopic() vobj.Topic

GetTopic 获取消息类型

func (*Message) IsHandled

func (m *Message) IsHandled(index int) bool

IsHandled 判断消息是否已经处理过

func (*Message) RetryInc

func (m *Message) RetryInc()

RetryInc 重试次数+1

func (*Message) WithHandledPath

func (m *Message) WithHandledPath(index int, handle HandleFun)

WithHandledPath 设置消息处理过的路径

type MessageOption

type MessageOption func(m *Message)

MessageOption 消息选项

func WithMessageRetryMax

func WithMessageRetryMax(retryMax int) MessageOption

WithMessageRetryMax 设置消息最大重试次数

func WithMessageSchema

func WithMessageSchema(schema Schemer) MessageOption

WithMessageSchema 设置消息编码器

type Queue

type Queue interface {
	// Next 获取下一个消息
	Next() <-chan *Message

	// Push 添加消息
	Push(msg *Message) error

	// Close 关闭队列
	Close() error

	// Len 获取队列长度
	Len() int

	// Clear 清空队列
	Clear()
}

Queue 消息队列

func NewDefaultQueue

func NewDefaultQueue(size int) Queue

NewDefaultQueue 定义接收消息和发送消息的消息队列

type Schemer

type Schemer interface {
	// Decode 解码
	Decode(in *Message, out any) error

	// Encode 编码
	Encode(in *Message, out any) error
}

Schemer 消息编解码器

func NewDefaultSchemer

func NewDefaultSchemer() Schemer

NewDefaultSchemer 默认编解码器

func NewEmptySchemer

func NewEmptySchemer() Schemer

NewEmptySchemer 空编解码器

type Storage

type Storage interface {
	// Get 获取消息
	Get(index Indexer) *Message

	// Put 放入消息
	Put(msg *Message) error

	// Clear 清空消息
	Clear()

	// Remove 移除消息
	Remove(index Indexer)

	// Close 关闭存储器
	Close() error

	// Len 长度
	Len() int

	// Range 遍历
	//  f返回值为bool类型,如果返回false,则停止range
	Range(f func(index Indexer, msg *Message) bool)
}

Storage 存储器

func NewCacheStorage

func NewCacheStorage(cacher conn.Cache) Storage

NewCacheStorage 定义缓存存储器

func NewDefaultStorage

func NewDefaultStorage() Storage

NewDefaultStorage 定义运行时存储

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher 监听器

func NewWatcher

func NewWatcher(name string, opts ...WatcherOption) *Watcher

NewWatcher 创建监听器

func (*Watcher) GetHandler

func (w *Watcher) GetHandler() Handler

GetHandler 获取消息处理器

func (*Watcher) GetQueue

func (w *Watcher) GetQueue() Queue

GetQueue 获取消息队列

func (*Watcher) GetStorage

func (w *Watcher) GetStorage() Storage

GetStorage 获取存储器

func (*Watcher) Start

func (w *Watcher) Start(_ context.Context) error

Start 启动监听

func (*Watcher) Stop

func (w *Watcher) Stop(_ context.Context) error

Stop 停止监听

type WatcherOption

type WatcherOption func(w *Watcher)

WatcherOption 监听器配置

func WithWatcherHandler

func WithWatcherHandler(handler Handler) WatcherOption

WithWatcherHandler 设置消息处理器

func WithWatcherQueue

func WithWatcherQueue(queue Queue) WatcherOption

WithWatcherQueue 设置消息队列

func WithWatcherStorage

func WithWatcherStorage(storage Storage) WatcherOption

WithWatcherStorage 设置存储器

func WithWatcherTimeout

func WithWatcherTimeout(timeout time.Duration) WatcherOption

WithWatcherTimeout 设置超时时间

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL