Documentation ¶
Index ¶
- Variables
- type BaseConsumerGroupTopicOption
- func (b *BaseConsumerGroupTopicOption) ConsumerGroup() string
- func (b *BaseConsumerGroupTopicOption) DeregisterClient() DeregisterClient
- func (b *BaseConsumerGroupTopicOption) GRPCType() consts.GRPCType
- func (b *BaseConsumerGroupTopicOption) RegisterClient() RegisterClient
- func (b *BaseConsumerGroupTopicOption) SubscriptionMode() pb.Subscription_SubscriptionItem_SubscriptionMode
- func (b *BaseConsumerGroupTopicOption) Topic() string
- type ConsumerGroupConfig
- type ConsumerGroupMetadata
- type ConsumerGroupStateEvent
- type ConsumerGroupTopicConfChangeEvent
- type ConsumerGroupTopicConfig
- type ConsumerGroupTopicMetadata
- type ConsumerGroupTopicOption
- type ConsumerManager
- type ConsumerService
- func (c *ConsumerService) Subscribe(ctx context.Context, sub *pb.Subscription) (*pb.Response, error)
- func (c *ConsumerService) SubscribeStream(stream pb.ConsumerService_SubscribeStreamServer) error
- func (c *ConsumerService) Unsubscribe(ctx context.Context, sub *pb.Subscription) (*pb.Response, error)
- type DeregisterClient
- type EventMeshConsumer
- type GroupClient
- type MessageContext
- type MessageHandler
- type Processor
- type RegisterClient
- type Request
- type Response
- type StateAction
- type StreamGroupTopicOption
- func (b *StreamGroupTopicOption) AllEmiters() *set.Set
- func (b *StreamGroupTopicOption) AllURLs() *set.Set
- func (b *StreamGroupTopicOption) DeregisterClient() DeregisterClient
- func (b *StreamGroupTopicOption) IDCEmiters() *sync.Map
- func (b *StreamGroupTopicOption) IDCURLs() *sync.Map
- func (b *StreamGroupTopicOption) RegisterClient() RegisterClient
- func (b *StreamGroupTopicOption) Size() int
- type StreamRequest
- type WebhookGroupTopicOption
- type WebhookRequest
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNoConnectorPlugin = errors.New("no connector plugin found") ErrNewConsumerConnector = errors.New("create consumer connector err") ErrNewProducerConnector = errors.New("create producer connector err") )
View Source
var ( ConsumerGroupWaitingRequestThreshold = 1000 ErrRequestReachMaxThreshold = errors.New("request reach the max threshold") )
View Source
var (
ErrNoConsumerClient = errors.New("no consumer group client")
)
View Source
var (
ErrNoProtocolFound = errors.New("no protocol type found in event message")
)
View Source
var (
ErrProtocolPluginNotFound = fmt.Errorf("protocol plugin not found")
)
Functions ¶
This section is empty.
Types ¶
type BaseConsumerGroupTopicOption ¶
type BaseConsumerGroupTopicOption struct {
// contains filtered or unexported fields
}
BaseConsumerGroupTopicOption refers to ConsumerGroupTopicConfig
func (*BaseConsumerGroupTopicOption) ConsumerGroup ¶
func (b *BaseConsumerGroupTopicOption) ConsumerGroup() string
func (*BaseConsumerGroupTopicOption) DeregisterClient ¶
func (b *BaseConsumerGroupTopicOption) DeregisterClient() DeregisterClient
func (*BaseConsumerGroupTopicOption) GRPCType ¶
func (b *BaseConsumerGroupTopicOption) GRPCType() consts.GRPCType
func (*BaseConsumerGroupTopicOption) RegisterClient ¶
func (b *BaseConsumerGroupTopicOption) RegisterClient() RegisterClient
func (*BaseConsumerGroupTopicOption) SubscriptionMode ¶
func (b *BaseConsumerGroupTopicOption) SubscriptionMode() pb.Subscription_SubscriptionItem_SubscriptionMode
func (*BaseConsumerGroupTopicOption) Topic ¶
func (b *BaseConsumerGroupTopicOption) Topic() string
type ConsumerGroupConfig ¶
type ConsumerGroupMetadata ¶
type ConsumerGroupStateEvent ¶
type ConsumerGroupStateEvent struct { ConsumerGroup string ConsumerGroupConfig *ConsumerGroupConfig ConsumerGroupStateAction StateAction }
type ConsumerGroupTopicConfChangeEvent ¶
type ConsumerGroupTopicConfChangeEvent struct { Action StateAction ConsumerGroup string ConsumerGroupTopicConfig *ConsumerGroupTopicConfig }
type ConsumerGroupTopicConfig ¶
type ConsumerGroupTopicConfig struct { ConsumerGroup string Topic string SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode GRPCType consts.GRPCType // IDCWebhookURLs webhook urls seperated by IDC // key is IDC, value is vector.Vector IDCWebhookURLs *sync.Map // AllURLs all webhook urls, ignore idc AllURLs *set.Set }
type ConsumerGroupTopicOption ¶
type ConsumerGroupTopicOption interface { ConsumerGroup() string Topic() string SubscriptionMode() pb.Subscription_SubscriptionItem_SubscriptionMode GRPCType() consts.GRPCType RegisterClient() RegisterClient DeregisterClient() DeregisterClient IDCURLs() *sync.Map AllURLs() *set.Set AllEmiters() *set.Set IDCEmiters() *sync.Map Size() int }
func NewConsumerGroupTopicOption ¶
func NewConsumerGroupTopicOption( cg string, topic string, mode pb.Subscription_SubscriptionItem_SubscriptionMode, gtype consts.GRPCType) ConsumerGroupTopicOption
func NewWStreamGroupTopicOption ¶
func NewWStreamGroupTopicOption(cg string, topic string, mode pb.Subscription_SubscriptionItem_SubscriptionMode, gtype consts.GRPCType) ConsumerGroupTopicOption
func NewWebhookGroupTopicOption ¶
func NewWebhookGroupTopicOption(cg string, topic string, mode pb.Subscription_SubscriptionItem_SubscriptionMode, gtype consts.GRPCType) ConsumerGroupTopicOption
type ConsumerManager ¶
type ConsumerManager interface { GetConsumer(consumerGroup string) (EventMeshConsumer, error) RegisterClient(cli *GroupClient) error DeRegisterClient(cli *GroupClient) error UpdateClientTime(cli *GroupClient) RestartConsumer(consumerGroup string) error Start() error Stop() error }
func NewConsumerManager ¶
func NewConsumerManager() (ConsumerManager, error)
NewConsumerManager create new consumer manager
type ConsumerService ¶
type ConsumerService struct { pb.UnimplementedConsumerServiceServer // contains filtered or unexported fields }
ConsumerService grpc service
func NewConsumerServiceServer ¶
func NewConsumerServiceServer(consumerManager ConsumerManager, producerManager producer.ProducerManager) (*ConsumerService, error)
func (*ConsumerService) Subscribe ¶
func (c *ConsumerService) Subscribe(ctx context.Context, sub *pb.Subscription) (*pb.Response, error)
func (*ConsumerService) SubscribeStream ¶
func (c *ConsumerService) SubscribeStream(stream pb.ConsumerService_SubscribeStreamServer) error
SubscribeStream handle stream request with two groutine for Recv() and Send() message for Recv() goroutine if got err==io.EOF as the client close the stream(即客户端关闭stream) for Send() refers to https://github.com/grpc/grpc-go/issues/444
func (*ConsumerService) Unsubscribe ¶
func (c *ConsumerService) Unsubscribe(ctx context.Context, sub *pb.Subscription) (*pb.Response, error)
type DeregisterClient ¶
type DeregisterClient func(*GroupClient)
type EventMeshConsumer ¶
type EventMeshConsumer interface { Init() error Start() error ServiceState() consts.ServiceState RegisterClient(cli *GroupClient) bool DeRegisterClient(cli *GroupClient) bool Shutdown() error }
func NewEventMeshConsumer ¶
func NewEventMeshConsumer(consumerGroup string) (EventMeshConsumer, error)
type GroupClient ¶
type GroupClient struct { ENV string IDC string ConsumerGroup string Topic string GRPCType consts.GRPCType URL string SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode SYS string IP string PID string Hostname string APIVersion string LastUPTime time.Time Emiter emitter.EventEmitter }
GroupClient consumer group client details
func DefaultStreamGroupClient ¶
func DefaultStreamGroupClient() *GroupClient
func DefaultWebhookGroupClient ¶
func DefaultWebhookGroupClient() *GroupClient
type MessageContext ¶
type MessageContext struct { MsgRandomNo string SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode GrpcType consts.GRPCType ConsumerGroup string Event *cloudv2.Event TopicConfig ConsumerGroupTopicOption }
type MessageHandler ¶
type MessageHandler interface {
Handler(mctx *MessageContext) error
}
func NewMessageHandler ¶
func NewMessageHandler(consumerGroup string) (MessageHandler, error)
type Processor ¶
type Processor interface { Subscribe(consumerMgr ConsumerManager, msg *pb.Subscription) (*pb.Response, error) UnSubscribe(consumerMgr ConsumerManager, msg *pb.Subscription) (*pb.Response, error) SubscribeStream(consumerMgr ConsumerManager, emiter emitter.EventEmitter, msg *pb.Subscription) error Heartbeat(consumerMgr ConsumerManager, msg *pb.Heartbeat) (*pb.Response, error) ReplyMessage(ctx context.Context, producerMgr producer.ProducerManager, emiter emitter.EventEmitter, msg *pb.SimpleMessage) error }
type RegisterClient ¶
type RegisterClient func(*GroupClient)
type Request ¶
type Request struct { *retry.Retry MessageContext *MessageContext CreateTime time.Time LastPushTime time.Time Complete *atomic.Bool SimpleMessage *pb.SimpleMessage Try func() error }
func NewRequest ¶
func NewRequest(mctx *MessageContext) (*Request, error)
type StateAction ¶
type StateAction string
const ( NEW StateAction = "NEW" CHANGE StateAction = "CHANGE" DELETE StateAction = "DELETE" )
type StreamGroupTopicOption ¶
type StreamGroupTopicOption struct { *BaseConsumerGroupTopicOption // contains filtered or unexported fields }
StreamGroupTopicOption topic option for subscribe with stream
func (*StreamGroupTopicOption) AllEmiters ¶
func (b *StreamGroupTopicOption) AllEmiters() *set.Set
func (*StreamGroupTopicOption) AllURLs ¶
func (b *StreamGroupTopicOption) AllURLs() *set.Set
func (*StreamGroupTopicOption) DeregisterClient ¶
func (b *StreamGroupTopicOption) DeregisterClient() DeregisterClient
func (*StreamGroupTopicOption) IDCEmiters ¶
func (b *StreamGroupTopicOption) IDCEmiters() *sync.Map
func (*StreamGroupTopicOption) IDCURLs ¶
func (b *StreamGroupTopicOption) IDCURLs() *sync.Map
func (*StreamGroupTopicOption) RegisterClient ¶
func (b *StreamGroupTopicOption) RegisterClient() RegisterClient
func (*StreamGroupTopicOption) Size ¶
func (b *StreamGroupTopicOption) Size() int
type StreamRequest ¶
type StreamRequest struct { *Request // contains filtered or unexported fields }
func NewStreamRequest ¶
func NewStreamRequest(mctx *MessageContext) (*StreamRequest, error)
type WebhookGroupTopicOption ¶
type WebhookGroupTopicOption struct { *BaseConsumerGroupTopicOption // contains filtered or unexported fields }
WebhookGroupTopicOption topic option for subscribe with webhook
func (*WebhookGroupTopicOption) AllEmiters ¶
func (b *WebhookGroupTopicOption) AllEmiters() *set.Set
func (*WebhookGroupTopicOption) AllURLs ¶
func (b *WebhookGroupTopicOption) AllURLs() *set.Set
func (*WebhookGroupTopicOption) IDCEmiters ¶
func (b *WebhookGroupTopicOption) IDCEmiters() *sync.Map
func (*WebhookGroupTopicOption) IDCURLs ¶
func (b *WebhookGroupTopicOption) IDCURLs() *sync.Map
func (*WebhookGroupTopicOption) Size ¶
func (b *WebhookGroupTopicOption) Size() int
type WebhookRequest ¶
type WebhookRequest struct { *Request // IDCWebhookURLs webhook urls seperated by IDC // key is IDC, value is set.Set IDCWebhookURLs *sync.Map // AllURLs all webhook urls, ignore idc AllURLs *set.Set // contains filtered or unexported fields }
func NewWebhookRequest ¶
func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.