mq

package
v1.3.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2020 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LastOffset  int64 = -1 // The most recent offset available for a partition.
	FirstOffset       = -2 // The least recent offset available for a partition.
)

Variables

View Source
var (
	InvalidMqRoleTypeStringErr = errors.New("invalid mq role type string")
)

Functions

func Close

func Close()

func ReadMsgByGroup

func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error)

读完消息后会自动提交offset

func ReadMsgByPartition

func ReadMsgByPartition(ctx context.Context, topic string, partition int, value interface{}) (context.Context, error)

func SetConfiger

func SetConfiger(ctx context.Context, configerType ConfigerType) error

func WatchUpdate

func WatchUpdate(ctx context.Context)

func WriteMsg

func WriteMsg(ctx context.Context, topic string, key string, value interface{}) error

func WriteMsgs

func WriteMsgs(ctx context.Context, topic string, msgs ...Message) error

Types

type ApolloConfig

type ApolloConfig struct {
	// contains filtered or unexported fields
}

func NewApolloConfiger

func NewApolloConfiger() *ApolloConfig

func (*ApolloConfig) GetConfig

func (m *ApolloConfig) GetConfig(ctx context.Context, topic string) (*Config, error)

func (*ApolloConfig) Init

func (m *ApolloConfig) Init(ctx context.Context) (err error)

func (*ApolloConfig) ParseKey

func (m *ApolloConfig) ParseKey(ctx context.Context, key string) (*KeyParts, error)

func (*ApolloConfig) Watch

func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent

type Config

type Config struct {
	MQType         MQType
	MQAddr         []string
	Topic          string
	TimeOut        time.Duration
	CommitInterval time.Duration
	Offset         int64
	OffsetAt       string
}

type Configer

type Configer interface {
	Init(ctx context.Context) error
	GetConfig(ctx context.Context, topic string) (*Config, error)
	ParseKey(ctx context.Context, k string) (*KeyParts, error)
	Watch(ctx context.Context) <-chan *center.ChangeEvent
}
var DefaultConfiger Configer

func NewConfiger

func NewConfiger(configType ConfigerType) (Configer, error)

type ConfigerType

type ConfigerType int
const (
	ConfigerTypeSimple ConfigerType = iota
	ConfigerTypeEtcd
	ConfigerTypeApollo
)

func (ConfigerType) String

func (c ConfigerType) String() string

type EtcdConfig

type EtcdConfig struct {
	// contains filtered or unexported fields
}

func NewEtcdConfiger

func NewEtcdConfiger() *EtcdConfig

func (*EtcdConfig) GetConfig

func (m *EtcdConfig) GetConfig(ctx context.Context, topic string) (*Config, error)

func (*EtcdConfig) Init

func (m *EtcdConfig) Init(ctx context.Context) error

func (*EtcdConfig) ParseKey

func (m *EtcdConfig) ParseKey(ctx context.Context, k string) (*KeyParts, error)

func (*EtcdConfig) Watch

func (m *EtcdConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent

type Handler

type Handler interface {
	CommitMsg(ctx context.Context) error
}

func FetchMsgByGroup

func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface{}) (context.Context, Handler, error)

读完消息后不会自动提交offset,需要手动调用Handle.CommitMsg方法来提交offset

type InstanceManager

type InstanceManager struct {
	// contains filtered or unexported fields
}

func NewInstanceManager

func NewInstanceManager() *InstanceManager

func (*InstanceManager) Close

func (m *InstanceManager) Close()

type KafkaHandler

type KafkaHandler struct {
	// contains filtered or unexported fields
}

func NewKafkaHandler

func NewKafkaHandler(reader *kafka.Reader, msg kafka.Message) *KafkaHandler

func (*KafkaHandler) CommitMsg

func (m *KafkaHandler) CommitMsg(ctx context.Context) error

type KafkaReader

type KafkaReader struct {
	*kafka.Reader
}

func NewKafkaReader

func NewKafkaReader(brokers []string, topic, groupId string, partition, minBytes, maxBytes int, commitInterval time.Duration) *KafkaReader

func (*KafkaReader) Close

func (m *KafkaReader) Close() error

func (*KafkaReader) FetchMsg

func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (Handler, error)

func (*KafkaReader) ReadMsg

func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error

func (*KafkaReader) SetOffset

func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error

func (*KafkaReader) SetOffsetAt

func (m *KafkaReader) SetOffsetAt(ctx context.Context, t time.Time) error

type KafkaWriter

type KafkaWriter struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

func NewKafkaWriter

func NewKafkaWriter(brokers []string, topic string) *KafkaWriter

func (*KafkaWriter) Close

func (m *KafkaWriter) Close() error

func (*KafkaWriter) WriteMsg

func (m *KafkaWriter) WriteMsg(ctx context.Context, k string, v interface{}) error

func (*KafkaWriter) WriteMsgs

func (m *KafkaWriter) WriteMsgs(ctx context.Context, msgs ...Message) error

type KeyParts

type KeyParts struct {
	Topic string
	Group string
}

type MQRoleType

type MQRoleType int
const (
	RoleTypeReader MQRoleType = iota
	RoleTypeWriter
)

func MQRoleTypeFromInt

func MQRoleTypeFromInt(it int) (t MQRoleType, err error)

func (MQRoleType) String

func (t MQRoleType) String() string

type MQType

type MQType int
const (
	MQTypeKafka MQType = iota
)

func (MQType) String

func (t MQType) String() string

type Message

type Message struct {
	Key   string
	Value interface{}
}

type Payload

type Payload struct {
	Carrier opentracing.TextMapCarrier `json:"c"`
	Value   string                     `json:"v"`
	Head    interface{}                `json:"h"`
	Control interface{}                `json:"t"`
}

type Reader

type Reader interface {
	FetchMsg(ctx context.Context, value interface{}, ovalue interface{}) (Handler, error)
	ReadMsg(ctx context.Context, value interface{}, ovalue interface{}) error
	SetOffsetAt(ctx context.Context, t time.Time) error
	SetOffset(ctx context.Context, offset int64) error
	Close() error
}

func NewGroupReader

func NewGroupReader(ctx context.Context, topic, groupId string) (Reader, error)

CommitInterval indicates the interval at which offsets are committed to the broker. If 0, commits will be handled synchronously.

func NewPartitionReader

func NewPartitionReader(ctx context.Context, topic string, partition int) (Reader, error)

type SimpleConfig

type SimpleConfig struct {
	// contains filtered or unexported fields
}

func NewSimpleConfiger

func NewSimpleConfiger() *SimpleConfig

func (*SimpleConfig) GetConfig

func (m *SimpleConfig) GetConfig(ctx context.Context, topic string) (*Config, error)

func (*SimpleConfig) Init

func (m *SimpleConfig) Init(ctx context.Context) error

func (*SimpleConfig) ParseKey

func (m *SimpleConfig) ParseKey(ctx context.Context, k string) (*KeyParts, error)

func (*SimpleConfig) Watch

func (m *SimpleConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent

type Writer

type Writer interface {
	WriteMsg(ctx context.Context, key string, value interface{}) error
	WriteMsgs(ctx context.Context, msgs ...Message) error
	Close() error
}

func NewWriter

func NewWriter(ctx context.Context, topic string) (Writer, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL