broker

package
v0.0.0-...-714bc2e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrNotReady .
	ErrNotReady = errors.New("connection not ready")
	// ErrInvalidAddrs .
	ErrInvalidAddrs = errors.New("invalid addrs")
)
View Source
var RandomConsumption = func(clientid string, topic string) string {
	if args := strings.Split(clientid, "."); len(args) != 0 {
		return strings.Join([]string{topic, args[0]}, ".")
	}
	return topic
}

RandomConsumption 随机消费。只有一个服务会消费

View Source
var SharedConsumption = func(clientid string, topic string) string {
	return strings.Join([]string{topic, clientid}, ".")
}

SharedConsumption 共享消费。多服务共同消费

Functions

func CheckSubject

func CheckSubject(t string) error

CheckSubject .

func Init

func Init(c Client, e error) error

Init .

Types

type Client

type Client interface {
	// Publish 消息发布
	// 若发布消息格式为 json, 则参数 'v' 为 JsonMessage.Data
	// 若发布消息格式为 proto, 则参数 'v' 为发布的完整消息体, 方法内不做额外的封装
	Publish(topic string, v interface{}, opts ...func(o *PublishOptions)) error
	// Subscribe 消息订阅
	Subscribe(topic string, handler Handler, opts ...func(o *SubscribeOptions)) error
	// Close .
	Close()
}

Client .

var C Client

C default client

type ConsumerModel

type ConsumerModel func(clientid string, topic string) string

ConsumerModel 消费者模式

type Event

type Event interface {
	// Topic .
	Topic() string
	// Reply .
	Reply() string
	// Body return bytes of message
	Body() []byte
	// Unmarshal unmarshal message
	// 若消息格式为 json, 则反序列化 message.data; 若消息格式为 proto, 则反序列化 message
	Unmarshal(v interface{}) error
}

Event .

func NewEvent

func NewEvent(topic string, reply string, body []byte, codec codec.Marshaler) Event

NewEvent .

type Handler

type Handler func(event Event) error

Handler with Subscribe

type JsonMessage

type JsonMessage struct {
	// ID topic 唯一标识符. uuid.NewString()
	ID string `json:"id"`
	// Producer .
	Producer string `json:"producer"`
	// CreatedAt 创建时间
	CreatedAt string `json:"created_at"`
	// Data .
	Data interface{} `json:"data"`
}

JsonMessage .

type Options

type Options struct {
	// Address adress
	Address string
	// ReconnectWait 重连等待时间。单位:秒
	ReconnectWait time.Duration
	// Version sarama.KafkaVersion
	Version string
}

Options .

func ParseOptions

func ParseOptions(opts ...func(o *Options)) *Options

ParseOptions .

type PublishOptions

type PublishOptions struct {
	// Context ctx
	Context context.Context
	// Codec 序列化方式. default codec.MarshalerType_Json
	Codec codec.Marshaler
	// Timeout 消息推送超时时间
	Timeout time.Duration
	// caller skip
	CallerSkip int
}

PublishOptions .

func ParsePublishOptions

func ParsePublishOptions(opts ...func(o *PublishOptions)) *PublishOptions

ParsePublishOptions .

type SubscribeOptions

type SubscribeOptions struct {
	// Context ctx
	Context context.Context
	// Codec 序列化方式. default codec.MarshalerType_Json
	Codec codec.Marshaler
	// ConsumerModel 消费者模式。多副本情况下,订阅相同 topic 的消费者是否共同处理数据
	ConsumerModel ConsumerModel
}

SubscribeOptions .

func ParseSubscribeOptions

func ParseSubscribeOptions(opts ...func(o *SubscribeOptions)) *SubscribeOptions

ParseSubscribeOptions .

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL