Documentation
¶
Index ¶
- Variables
- type IManager
- type IStream
- type Manager
- func (s *Manager) Acknowledge(ctx context.Context, parsedReq *ParsedStreamingPullRequest) error
- func (s *Manager) CreateNewStream(server metrov1.Subscriber_StreamingPullServer, req *ParsedStreamingPullRequest, ...) error
- func (s *Manager) ModifyAcknowledgement(ctx context.Context, parsedReq *ParsedStreamingPullRequest) error
- type ParsedStreamingPullRequest
- func NewParsedAcknowledgeRequest(req *metrov1.AcknowledgeRequest) (*ParsedStreamingPullRequest, error)
- func NewParsedModifyAckDeadlineRequest(req *metrov1.ModifyAckDeadlineRequest) (*ParsedStreamingPullRequest, error)
- func NewParsedStreamingPullRequest(req *metrov1.StreamingPullRequest) (*ParsedStreamingPullRequest, error)
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultNumMessagesToReadOffStream int32 = 10
DefaultNumMessagesToReadOffStream ...
Functions ¶
This section is empty.
Types ¶
type IManager ¶
type IManager interface { CreateNewStream(server metrov1.Subscriber_StreamingPullServer, req *ParsedStreamingPullRequest, errGroup *errgroup.Group, ch cache.ICache) error Acknowledge(ctx context.Context, parsedReq *ParsedStreamingPullRequest) error ModifyAcknowledgement(ctx context.Context, req *ParsedStreamingPullRequest) error }
IManager ...
func NewStreamManager ¶
func NewStreamManager(ctx context.Context, subscriptionCore subscription.ICore, offsetCore offset.ICore, bs brokerstore.IBrokerStore, grpcServerAddr string, topicCore topic.ICore) IManager
NewStreamManager ...
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager ...
func (*Manager) Acknowledge ¶
func (s *Manager) Acknowledge(ctx context.Context, parsedReq *ParsedStreamingPullRequest) error
Acknowledge ...
func (*Manager) CreateNewStream ¶
func (s *Manager) CreateNewStream(server metrov1.Subscriber_StreamingPullServer, req *ParsedStreamingPullRequest, errGroup *errgroup.Group, ch cache.ICache) error
CreateNewStream ...
func (*Manager) ModifyAcknowledgement ¶
func (s *Manager) ModifyAcknowledgement(ctx context.Context, parsedReq *ParsedStreamingPullRequest) error
ModifyAcknowledgement ...
type ParsedStreamingPullRequest ¶
type ParsedStreamingPullRequest struct { ClientID string Subscription string AckIDs []string ModAckIDs []string AckMessages []*subscriber.AckMessage ModAckMessages []*subscriber.AckMessage ModifyDeadlineMsgIdsWithSecs map[string]int32 }
ParsedStreamingPullRequest ...
func NewParsedAcknowledgeRequest ¶
func NewParsedAcknowledgeRequest(req *metrov1.AcknowledgeRequest) (*ParsedStreamingPullRequest, error)
NewParsedAcknowledgeRequest ...
func NewParsedModifyAckDeadlineRequest ¶
func NewParsedModifyAckDeadlineRequest(req *metrov1.ModifyAckDeadlineRequest) (*ParsedStreamingPullRequest, error)
NewParsedModifyAckDeadlineRequest ...
func NewParsedStreamingPullRequest ¶
func NewParsedStreamingPullRequest(req *metrov1.StreamingPullRequest) (*ParsedStreamingPullRequest, error)
NewParsedStreamingPullRequest ...
func (*ParsedStreamingPullRequest) HasAcknowledgement ¶
func (r *ParsedStreamingPullRequest) HasAcknowledgement() bool
HasAcknowledgement ...
func (*ParsedStreamingPullRequest) HasModifyAcknowledgement ¶
func (r *ParsedStreamingPullRequest) HasModifyAcknowledgement() bool
HasModifyAcknowledgement ...
func (*ParsedStreamingPullRequest) HasSubscription ¶
func (r *ParsedStreamingPullRequest) HasSubscription() bool
HasSubscription ...
Click to show internal directories.
Click to hide internal directories.