mq

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultHandlerQueueSize = 100

	TagCreate        = "create"
	TagReply         = "reply"
	TagDelete        = "delete"
	TagSync          = "sync"
	TagListMissed    = "list_missed"
	TagSubListMissed = "sub_list_missed"
	TagLike          = "like"
	TagHate          = "hate"
)

Variables

View Source
var (
	ErrInvalidLengthMessage        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowMessage          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupMessage = fmt.Errorf("proto: unexpected end of group")
)
View Source
var (
	ApiLogger *log.Logger
	ExcLogger *log.Logger

	MetricEventHandlerMsgDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Name:    "event_handler_msg_duration",
		Help:    "消息处理延时",
		Buckets: prometheus.DefBuckets,
	}, []string{"name", "topic", "status", "reason"})
)

Functions

func ConsumerContext added in v0.0.3

func ConsumerContext(traceID string, d time.Duration) (ctx context.Context, cancel context.CancelFunc)

func ObserveMsg added in v0.0.3

func ObserveMsg(topic, name, status, reason string, start time.Time)

func WarpMessage added in v0.0.3

func WarpMessage(ctx context.Context, tag string, msg proto.Message) ([]byte, error)

Types

type Consumer added in v0.0.3

type Consumer interface {
	Start() error
	Stop() error
}

type ConsumerMeta added in v0.0.3

type ConsumerMeta struct {
	Name        string
	Fn          HandleFunc
	NPoll       int
	NProc       int
	ProcTimeout time.Duration
	Done        chan struct{}
}

type HandleFunc added in v0.0.3

type HandleFunc func(ctx context.Context, msg *MqMessage) error

type MqMessage added in v0.0.3

type MqMessage struct {
	TxId                 string   `protobuf:"bytes,1,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"`
	TraceId              string   `protobuf:"bytes,2,opt,name=trace_id,json=traceId,proto3" json:"trace_id,omitempty"`
	Tag                  string   `protobuf:"bytes,3,opt,name=tag,proto3" json:"tag,omitempty"`
	Message              []byte   `protobuf:"bytes,4,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func Unmarshal added in v0.0.3

func Unmarshal(message []byte) (*MqMessage, error)

func (*MqMessage) Descriptor added in v0.0.3

func (*MqMessage) Descriptor() ([]byte, []int)

func (*MqMessage) GetMessage added in v0.0.3

func (m *MqMessage) GetMessage() []byte

func (*MqMessage) GetTag added in v0.0.3

func (m *MqMessage) GetTag() string

func (*MqMessage) GetTraceId added in v0.0.3

func (m *MqMessage) GetTraceId() string

func (*MqMessage) GetTxId added in v0.0.3

func (m *MqMessage) GetTxId() string

func (*MqMessage) Marshal added in v0.0.3

func (m *MqMessage) Marshal() (dAtA []byte, err error)

func (*MqMessage) MarshalTo added in v0.0.3

func (m *MqMessage) MarshalTo(dAtA []byte) (int, error)

func (*MqMessage) MarshalToSizedBuffer added in v0.0.3

func (m *MqMessage) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*MqMessage) ProtoMessage added in v0.0.3

func (*MqMessage) ProtoMessage()

func (*MqMessage) Reset added in v0.0.3

func (m *MqMessage) Reset()

func (*MqMessage) Size added in v0.0.3

func (m *MqMessage) Size() (n int)

func (*MqMessage) String added in v0.0.3

func (m *MqMessage) String() string

func (*MqMessage) Unmarshal added in v0.0.3

func (m *MqMessage) Unmarshal(dAtA []byte) error

func (*MqMessage) XXX_DiscardUnknown added in v0.0.3

func (m *MqMessage) XXX_DiscardUnknown()

func (*MqMessage) XXX_Marshal added in v0.0.3

func (m *MqMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MqMessage) XXX_Merge added in v0.0.3

func (m *MqMessage) XXX_Merge(src proto.Message)

func (*MqMessage) XXX_Size added in v0.0.3

func (m *MqMessage) XXX_Size() int

func (*MqMessage) XXX_Unmarshal added in v0.0.3

func (m *MqMessage) XXX_Unmarshal(b []byte) error

type Option added in v0.0.3

type Option func(*ConsumerMeta)

func WithNPoll added in v0.0.3

func WithNPoll(n int) Option

func WithNProc added in v0.0.3

func WithNProc(n int) Option

func WithName added in v0.0.3

func WithName(name string) Option

func WithProcTimeout added in v0.0.3

func WithProcTimeout(d time.Duration) Option

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL