Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrNotReady . ErrNotReady = errors.New("connection not ready") // ErrInvalidAddrs . ErrInvalidAddrs = errors.New("invalid addrs") )
View Source
var RandomConsumption = func(clientid string, topic string) string { if args := strings.Split(clientid, "."); len(args) != 0 { return strings.Join([]string{topic, args[0]}, ".") } return topic }
RandomConsumption 随机消费。只有一个服务会消费
View Source
strings.Join([]string{topic, clientid}, ".") }return
SharedConsumption 共享消费。多服务共同消费
Functions ¶
Types ¶
type Client ¶
type Client interface { // Publish 消息发布 // 若发布消息格式为 json, 则参数 'v' 为 JsonMessage.Data // 若发布消息格式为 proto, 则参数 'v' 为发布的完整消息体, 方法内不做额外的封装 Publish(topic string, v interface{}, opts ...func(o *PublishOptions)) error // Subscribe 消息订阅 Subscribe(topic string, handler Handler, opts ...func(o *SubscribeOptions)) error // Close . Close() }
Client .
var C Client
C default client
type ConsumerModel ¶
ConsumerModel 消费者模式
type Event ¶
type Event interface { // Topic . Topic() string // Reply . Reply() string // Body return bytes of message Body() []byte // Unmarshal unmarshal message // 若消息格式为 json, 则反序列化 message.data; 若消息格式为 proto, 则反序列化 message Unmarshal(v interface{}) error }
Event .
type JsonMessage ¶
type JsonMessage struct { // ID topic 唯一标识符. uuid.NewString() ID string `json:"id"` // Producer . Producer string `json:"producer"` // CreatedAt 创建时间 CreatedAt string `json:"created_at"` // Data . Data interface{} `json:"data"` }
JsonMessage .
type Options ¶
type Options struct { // Address adress Address string // ReconnectWait 重连等待时间。单位:秒 ReconnectWait time.Duration // Version sarama.KafkaVersion Version string }
Options .
type PublishOptions ¶
type PublishOptions struct { // Context ctx Context context.Context // Codec 序列化方式. default codec.MarshalerType_Json Codec codec.Marshaler // Timeout 消息推送超时时间 Timeout time.Duration // caller skip CallerSkip int }
PublishOptions .
func ParsePublishOptions ¶
func ParsePublishOptions(opts ...func(o *PublishOptions)) *PublishOptions
ParsePublishOptions .
type SubscribeOptions ¶
type SubscribeOptions struct { // Context ctx Context context.Context // Codec 序列化方式. default codec.MarshalerType_Json Codec codec.Marshaler // ConsumerModel 消费者模式。多副本情况下,订阅相同 topic 的消费者是否共同处理数据 ConsumerModel ConsumerModel }
SubscribeOptions .
func ParseSubscribeOptions ¶
func ParseSubscribeOptions(opts ...func(o *SubscribeOptions)) *SubscribeOptions
ParseSubscribeOptions .
Click to show internal directories.
Click to hide internal directories.