Documentation
¶
Index ¶
- Constants
- Variables
- func Close()
- func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context.Context, error)
- func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error)
- func ReadMsgByPartition(ctx context.Context, topic string, partition int, value interface{}) (context.Context, error)
- func SetConfiger(ctx context.Context, configerType ConfigerType) error
- func WatchUpdate(ctx context.Context)
- func WriteDelayMsg(ctx context.Context, topic string, value interface{}, delaySeconds uint32) (jobID string, err error)
- func WriteMsg(ctx context.Context, topic string, key string, value interface{}) error
- func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error
- type AckHandler
- type ApolloConfig
- func (m *ApolloConfig) GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)
- func (m *ApolloConfig) Init(ctx context.Context) (err error)
- func (m *ApolloConfig) ParseKey(ctx context.Context, key string) (*KeyParts, error)
- func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
- type Config
- type Configer
- type ConfigerType
- type DelayClient
- type DelayHandler
- type EtcdConfig
- func (m *EtcdConfig) GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)
- func (m *EtcdConfig) Init(ctx context.Context) error
- func (m *EtcdConfig) ParseKey(ctx context.Context, k string) (*KeyParts, error)
- func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
- type Handler
- type InstanceManager
- type Job
- type KafkaHandler
- type KafkaReader
- func (m *KafkaReader) Close() error
- func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (Handler, error)
- func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error
- func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error
- func (m *KafkaReader) SetOffsetAt(ctx context.Context, t time.Time) error
- type KafkaWriter
- type KeyParts
- type MQRoleType
- type MQType
- type Message
- type Payload
- type Reader
- type SimpleConfig
- func (m *SimpleConfig) GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error)
- func (m *SimpleConfig) Init(ctx context.Context) error
- func (m *SimpleConfig) ParseKey(ctx context.Context, k string) (*KeyParts, error)
- func (m *SimpleConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
- type Writer
Constants ¶
View Source
const ( LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset = -2 // The least recent offset available for a partition. )
Variables ¶
View Source
var (
InvalidMqRoleTypeStringErr = errors.New("invalid mq role type string")
)
Functions ¶
func ReadDelayMsg ¶
ReadDelayMsg 读完自动确认
func ReadMsgByGroup ¶
func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error)
读完消息后会自动提交offset
func ReadMsgByPartition ¶
func SetConfiger ¶
func SetConfiger(ctx context.Context, configerType ConfigerType) error
func WatchUpdate ¶
func WriteDelayMsg ¶
Types ¶
type AckHandler ¶
func FetchDelayMsg ¶
func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (context.Context, AckHandler, error)
FetchDelayMsg 读完消息后不会自动确认
type ApolloConfig ¶
type ApolloConfig struct {
// contains filtered or unexported fields
}
func NewApolloConfiger ¶
func NewApolloConfiger() *ApolloConfig
func (*ApolloConfig) Watch ¶
func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
type Config ¶
type Config struct { MQType MQType MQAddr []string Topic string TimeOut time.Duration CommitInterval time.Duration // time interval to flush msg to broker default is 1 second BatchTimeout time.Duration Offset int64 OffsetAt string TTR uint32 // time to run TTL uint32 // time to live Tries uint16 // delay tries BatchSize int RequestInterval time.Duration }
type Configer ¶
type Configer interface { Init(ctx context.Context) error GetConfig(ctx context.Context, topic string, mqType MQType) (*Config, error) ParseKey(ctx context.Context, k string) (*KeyParts, error) Watch(ctx context.Context) <-chan *center.ChangeEvent }
var DefaultConfiger Configer
func NewConfiger ¶
func NewConfiger(configType ConfigerType) (Configer, error)
type ConfigerType ¶
type ConfigerType int
const ( ConfigerTypeSimple ConfigerType = iota ConfigerTypeEtcd ConfigerTypeApollo )
func (ConfigerType) String ¶
func (c ConfigerType) String() string
type DelayClient ¶
type DelayClient struct {
// contains filtered or unexported fields
}
延迟队列客户端
func NewDefaultDelayClient ¶
func NewDefaultDelayClient(ctx context.Context, topic string) (*DelayClient, error)
NewDefaultDelayClient 通过topic创建默认客户端
func NewDelayClient ¶
func (*DelayClient) Ack ¶
func (p *DelayClient) Ack(ctx context.Context, jobID string) error
Ack 确认消费
type DelayHandler ¶
type DelayHandler struct {
// contains filtered or unexported fields
}
func NewDelayHandler ¶
func NewDelayHandler(cli *DelayClient, jobID string) *DelayHandler
type EtcdConfig ¶
type EtcdConfig struct {
// contains filtered or unexported fields
}
func NewEtcdConfiger ¶
func NewEtcdConfiger() *EtcdConfig
func (*EtcdConfig) Watch ¶
func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
type InstanceManager ¶
type InstanceManager struct {
// contains filtered or unexported fields
}
func NewInstanceManager ¶
func NewInstanceManager() *InstanceManager
func (*InstanceManager) Close ¶
func (m *InstanceManager) Close()
type Job ¶
type Job struct { Namespace string `json:"namespace"` Queue string `json:"queue"` Body []byte `json:"body"` // 任务具体实体 ID string `json:"id"` TTL uint32 `json:"ttl"` // 任务过期时间 单位:s Delay uint32 `json:"delay"` // 任务延迟时间 单位:s ElapsedMS int64 `json:"elapsed_ms"` // 任务从产生到消费时间 单位:ms }
延迟队列任务
type KafkaHandler ¶
type KafkaHandler struct {
// contains filtered or unexported fields
}
func NewKafkaHandler ¶
func NewKafkaHandler(reader *kafka.Reader, msg kafka.Message) *KafkaHandler
type KafkaReader ¶
func NewKafkaReader ¶
func (*KafkaReader) Close ¶
func (m *KafkaReader) Close() error
func (*KafkaReader) FetchMsg ¶
func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (Handler, error)
func (*KafkaReader) ReadMsg ¶
func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error
func (*KafkaReader) SetOffset ¶
func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error
func (*KafkaReader) SetOffsetAt ¶
type KafkaWriter ¶
func NewKafkaWriter ¶
func NewKafkaWriter(brokers []string, topic string) *KafkaWriter
func (*KafkaWriter) Close ¶
func (m *KafkaWriter) Close() error
type MQRoleType ¶
type MQRoleType int
const ( RoleTypeReader MQRoleType = iota RoleTypeWriter RoleTypeDelayClient )
func MQRoleTypeFromInt ¶
func MQRoleTypeFromInt(it int) (t MQRoleType, err error)
func (MQRoleType) String ¶
func (t MQRoleType) String() string
type Payload ¶
type Payload struct { Carrier opentracing.TextMapCarrier `json:"c"` Value string `json:"v"` Head interface{} `json:"h"` Control *mqPayloadControl `json:"t"` }
type Reader ¶
type Reader interface { FetchMsg(ctx context.Context, value interface{}, ovalue interface{}) (Handler, error) ReadMsg(ctx context.Context, value interface{}, ovalue interface{}) error SetOffsetAt(ctx context.Context, t time.Time) error SetOffset(ctx context.Context, offset int64) error Close() error }
func NewGroupReader ¶
CommitInterval indicates the interval at which offsets are committed to the broker. If 0, commits will be handled synchronously.
type SimpleConfig ¶
type SimpleConfig struct {
// contains filtered or unexported fields
}
func NewSimpleConfiger ¶
func NewSimpleConfiger() *SimpleConfig
func (*SimpleConfig) Watch ¶
func (m *SimpleConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
Source Files
¶
Click to show internal directories.
Click to hide internal directories.