Documentation
¶
Index ¶
- Constants
- Variables
- func Close()
- func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error)
- func ReadMsgByPartition(ctx context.Context, topic string, partition int, value interface{}) (context.Context, error)
- func SetConfiger(ctx context.Context, configerType ConfigerType) error
- func WatchUpdate(ctx context.Context)
- func WriteMsg(ctx context.Context, topic string, key string, value interface{}) error
- func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error
- type ApolloConfig
- func (m *ApolloConfig) GetConfig(ctx context.Context, topic string) (*Config, error)
- func (m *ApolloConfig) Init(ctx context.Context) (err error)
- func (m *ApolloConfig) ParseKey(ctx context.Context, key string) (*KeyParts, error)
- func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
- type Config
- type Configer
- type ConfigerType
- type EtcdConfig
- type Handler
- type InstanceManager
- type KafkaHandler
- type KafkaReader
- func (m *KafkaReader) Close() error
- func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (Handler, error)
- func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error
- func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error
- func (m *KafkaReader) SetOffsetAt(ctx context.Context, t time.Time) error
- type KafkaWriter
- type KeyParts
- type MQRoleType
- type MQType
- type Message
- type Payload
- type Reader
- type SimpleConfig
- type Writer
Constants ¶
View Source
const ( LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset = -2 // The least recent offset available for a partition. )
Variables ¶
View Source
var (
InvalidMqRoleTypeStringErr = errors.New("invalid mq role type string")
)
Functions ¶
func ReadMsgByGroup ¶
func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error)
读完消息后会自动提交offset
func ReadMsgByPartition ¶
func SetConfiger ¶
func SetConfiger(ctx context.Context, configerType ConfigerType) error
func WatchUpdate ¶
Types ¶
type ApolloConfig ¶
type ApolloConfig struct {
// contains filtered or unexported fields
}
func NewApolloConfiger ¶
func NewApolloConfiger() *ApolloConfig
func (*ApolloConfig) Watch ¶
func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
type Configer ¶
type Configer interface { Init(ctx context.Context) error GetConfig(ctx context.Context, topic string) (*Config, error) ParseKey(ctx context.Context, k string) (*KeyParts, error) Watch(ctx context.Context) <-chan *center.ChangeEvent }
var DefaultConfiger Configer
func NewConfiger ¶
func NewConfiger(configType ConfigerType) (Configer, error)
type ConfigerType ¶
type ConfigerType int
const ( ConfigerTypeSimple ConfigerType = iota ConfigerTypeEtcd ConfigerTypeApollo )
func (ConfigerType) String ¶
func (c ConfigerType) String() string
type EtcdConfig ¶
type EtcdConfig struct {
// contains filtered or unexported fields
}
func NewEtcdConfiger ¶
func NewEtcdConfiger() *EtcdConfig
func (*EtcdConfig) Watch ¶
func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
type InstanceManager ¶
type InstanceManager struct {
// contains filtered or unexported fields
}
func NewInstanceManager ¶
func NewInstanceManager() *InstanceManager
func (*InstanceManager) Close ¶
func (m *InstanceManager) Close()
type KafkaHandler ¶
type KafkaHandler struct {
// contains filtered or unexported fields
}
func NewKafkaHandler ¶
func NewKafkaHandler(reader *kafka.Reader, msg kafka.Message) *KafkaHandler
type KafkaReader ¶
func NewKafkaReader ¶
func (*KafkaReader) Close ¶
func (m *KafkaReader) Close() error
func (*KafkaReader) FetchMsg ¶
func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (Handler, error)
func (*KafkaReader) ReadMsg ¶
func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error
func (*KafkaReader) SetOffset ¶
func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error
func (*KafkaReader) SetOffsetAt ¶
type KafkaWriter ¶
func NewKafkaWriter ¶
func NewKafkaWriter(brokers []string, topic string) *KafkaWriter
func (*KafkaWriter) Close ¶
func (m *KafkaWriter) Close() error
type MQRoleType ¶
type MQRoleType int
const ( RoleTypeReader MQRoleType = iota RoleTypeWriter )
func MQRoleTypeFromInt ¶
func MQRoleTypeFromInt(it int) (t MQRoleType, err error)
func (MQRoleType) String ¶
func (t MQRoleType) String() string
type Payload ¶
type Payload struct { Carrier opentracing.TextMapCarrier `json:"c"` Value string `json:"v"` Head interface{} `json:"h"` Control interface{} `json:"t"` }
type Reader ¶
type Reader interface { FetchMsg(ctx context.Context, value interface{}, ovalue interface{}) (Handler, error) ReadMsg(ctx context.Context, value interface{}, ovalue interface{}) error SetOffsetAt(ctx context.Context, t time.Time) error SetOffset(ctx context.Context, offset int64) error Close() error }
func NewGroupReader ¶
CommitInterval indicates the interval at which offsets are committed to the broker. If 0, commits will be handled synchronously.
type SimpleConfig ¶
type SimpleConfig struct {
// contains filtered or unexported fields
}
func NewSimpleConfiger ¶
func NewSimpleConfiger() *SimpleConfig
func (*SimpleConfig) Watch ¶
func (m *SimpleConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent
Source Files
¶
Click to show internal directories.
Click to hide internal directories.