Documentation ¶
Index ¶
- func ParseSubscriptionRequest(req *pb.Subscribe) []stan.SubscriptionOption
- func UnmarshalToEventReceive(data []byte) (*pb.EventReceive, error)
- func ValidateEventMessage(msg *pb.Event) error
- func ValidatePollRequest(subRequest *pb.QueuesDownstreamRequest) error
- func ValidateQueueMessage(msg *pb.QueueMessage) error
- func ValidateReceiveQueueMessageRequest(subRequest *pb.ReceiveQueueMessagesRequest) error
- func ValidateRequest(req *pb.Request) error
- func ValidateResponse(res *pb.Response) error
- func ValidateSubscriptionToEvents(subReq *pb.Subscribe, kindID entities.KindType) error
- func ValidateSubscriptionToRequests(subReq *pb.Subscribe) error
- type AsyncDispatcher
- type Client
- func (c *Client) Disconnect() error
- func (c *Client) SendCommand(ctx context.Context, req *pb.Request) (*pb.Response, error)
- func (c *Client) SendEvents(ctx context.Context, event *pb.Event) (*pb.Result, error)
- func (c *Client) SendEventsStore(ctx context.Context, event *pb.Event) (*pb.Result, error)
- func (c *Client) SendQuery(ctx context.Context, req *pb.Request) (*pb.Response, error)
- func (c *Client) SendResponse(ctx context.Context, res *pb.Response) error
- func (c *Client) SubscribeToCommands(ctx context.Context, subReq *pb.Subscribe, reqCh chan *pb.Request) error
- func (c *Client) SubscribeToEvents(ctx context.Context, subReq *pb.Subscribe, eventsCh chan *pb.EventReceive) error
- func (c *Client) SubscribeToEventsStore(ctx context.Context, subReq *pb.Subscribe, eventsCh chan *pb.EventReceive) error
- func (c *Client) SubscribeToQueries(ctx context.Context, subReq *pb.Subscribe, reqCh chan *pb.Request) error
- func (c *Client) UnSubscribe() error
- type DispatchItem
- type Options
- type QueueClient
- func (qc *QueueClient) AckAllQueueMessages(ctx context.Context, req *pb.AckAllQueueMessagesRequest) (*pb.AckAllQueueMessagesResponse, error)
- func (qc *QueueClient) Disconnect() error
- func (qc *QueueClient) MonitorQueueMessages(ctx context.Context, channel string, msgCh chan *pb.QueueMessage, ...)
- func (qc *QueueClient) Poll(ctx context.Context, request *QueueDownstreamRequest) (*QueueDownstreamResponse, error)
- func (qc *QueueClient) ReceiveQueueMessages(ctx context.Context, request *pb.ReceiveQueueMessagesRequest) (*pb.ReceiveQueueMessagesResponse, error)
- func (qc *QueueClient) SendQueueMessage(ctx context.Context, msg *pb.QueueMessage) *pb.SendQueueMessageResult
- func (qc *QueueClient) SendQueueMessagesBatch(ctx context.Context, batch *pb.QueueMessagesBatchRequest) (*pb.QueueMessagesBatchResponse, error)
- func (qc *QueueClient) SetDelayMessagesProcessor(ctx context.Context)
- func (qc *QueueClient) SetQueueStreamMiddlewareFunc(fn func(qc *QueueClient, msg *pb.StreamQueueMessagesRequest) error)
- func (qc *QueueClient) SetQueuesDownstreamMiddlewareFunc(fn func(qc *QueueClient, req *pb.QueuesDownstreamRequest) error)
- func (qc *QueueClient) ShutdownDelayedMessagesProcessor()
- func (qc *QueueClient) StreamQueueMessage(parentCtx context.Context, requests chan *pb.StreamQueueMessagesRequest, ...)
- type QueueDownstreamRequest
- func (p *QueueDownstreamRequest) SetAutoAck(autoAck bool) *QueueDownstreamRequest
- func (p *QueueDownstreamRequest) SetChannel(channel string) *QueueDownstreamRequest
- func (p *QueueDownstreamRequest) SetMaxItems(maxItems int) *QueueDownstreamRequest
- func (p *QueueDownstreamRequest) SetWaitTimeout(waitTimeout int) *QueueDownstreamRequest
- type QueueDownstreamResponse
- func (p *QueueDownstreamResponse) AckAll() error
- func (p *QueueDownstreamResponse) AckRange(seqRange []int64) error
- func (p *QueueDownstreamResponse) ActiveOffsets() ([]int64, error)
- func (p *QueueDownstreamResponse) Close()
- func (p *QueueDownstreamResponse) IsActive() bool
- func (p *QueueDownstreamResponse) NackAll() error
- func (p *QueueDownstreamResponse) NackRange(seqRange []int64) error
- func (p *QueueDownstreamResponse) ReQueueAll(channel string) error
- func (p *QueueDownstreamResponse) ReQueueRange(seqRange []int64, channel string) error
- func (p *QueueDownstreamResponse) TransactionId() string
- type QueuePool
- type QueuePoolClient
- type QueuePoolOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParseSubscriptionRequest ¶
func ParseSubscriptionRequest(req *pb.Subscribe) []stan.SubscriptionOption
func UnmarshalToEventReceive ¶
func UnmarshalToEventReceive(data []byte) (*pb.EventReceive, error)
func ValidateEventMessage ¶
func ValidatePollRequest ¶
func ValidatePollRequest(subRequest *pb.QueuesDownstreamRequest) error
func ValidateQueueMessage ¶
func ValidateQueueMessage(msg *pb.QueueMessage) error
func ValidateReceiveQueueMessageRequest ¶
func ValidateReceiveQueueMessageRequest(subRequest *pb.ReceiveQueueMessagesRequest) error
func ValidateRequest ¶
func ValidateResponse ¶
Types ¶
type AsyncDispatcher ¶
func NewDispatcher ¶
func NewDispatcher(conn stan.Conn) *AsyncDispatcher
func (*AsyncDispatcher) DispatchAsync ¶
func (ad *AsyncDispatcher) DispatchAsync(items []*DispatchItem) map[int]error
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewStoreClient ¶
func (*Client) Disconnect ¶
func (*Client) SendCommand ¶
func (*Client) SendEvents ¶
func (*Client) SendEventsStore ¶
func (*Client) SendResponse ¶
func (*Client) SubscribeToCommands ¶
func (*Client) SubscribeToEvents ¶
func (*Client) SubscribeToEventsStore ¶
func (*Client) SubscribeToQueries ¶
func (*Client) UnSubscribe ¶
type DispatchItem ¶
type DispatchItem struct {
// contains filtered or unexported fields
}
type Options ¶
type Options struct { ClientID string Deadline time.Duration MemoryPipe *pipe.Pipe AutoReconnect bool MaxInflight int PubAckWaitSeconds int }
func NewClientOptions ¶
func (*Options) SetAutoReconnect ¶
func (*Options) SetMaxInflight ¶
func (*Options) SetPubAckWaitSeconds ¶
type QueueClient ¶
type QueueClient struct {
// contains filtered or unexported fields
}
func NewQueueClient ¶
func NewQueueClient(opts *Options, policyCfg *config.QueueConfig) (*QueueClient, error)
func (*QueueClient) AckAllQueueMessages ¶
func (qc *QueueClient) AckAllQueueMessages(ctx context.Context, req *pb.AckAllQueueMessagesRequest) (*pb.AckAllQueueMessagesResponse, error)
func (*QueueClient) Disconnect ¶
func (qc *QueueClient) Disconnect() error
func (*QueueClient) MonitorQueueMessages ¶
func (qc *QueueClient) MonitorQueueMessages(ctx context.Context, channel string, msgCh chan *pb.QueueMessage, errCh chan error, done chan bool)
func (*QueueClient) Poll ¶
func (qc *QueueClient) Poll(ctx context.Context, request *QueueDownstreamRequest) (*QueueDownstreamResponse, error)
func (*QueueClient) ReceiveQueueMessages ¶
func (qc *QueueClient) ReceiveQueueMessages(ctx context.Context, request *pb.ReceiveQueueMessagesRequest) (*pb.ReceiveQueueMessagesResponse, error)
func (*QueueClient) SendQueueMessage ¶
func (qc *QueueClient) SendQueueMessage(ctx context.Context, msg *pb.QueueMessage) *pb.SendQueueMessageResult
func (*QueueClient) SendQueueMessagesBatch ¶
func (qc *QueueClient) SendQueueMessagesBatch(ctx context.Context, batch *pb.QueueMessagesBatchRequest) (*pb.QueueMessagesBatchResponse, error)
func (*QueueClient) SetDelayMessagesProcessor ¶
func (qc *QueueClient) SetDelayMessagesProcessor(ctx context.Context)
func (*QueueClient) SetQueueStreamMiddlewareFunc ¶
func (qc *QueueClient) SetQueueStreamMiddlewareFunc(fn func(qc *QueueClient, msg *pb.StreamQueueMessagesRequest) error)
func (*QueueClient) SetQueuesDownstreamMiddlewareFunc ¶
func (qc *QueueClient) SetQueuesDownstreamMiddlewareFunc(fn func(qc *QueueClient, req *pb.QueuesDownstreamRequest) error)
func (*QueueClient) ShutdownDelayedMessagesProcessor ¶
func (qc *QueueClient) ShutdownDelayedMessagesProcessor()
func (*QueueClient) StreamQueueMessage ¶
func (qc *QueueClient) StreamQueueMessage(parentCtx context.Context, requests chan *pb.StreamQueueMessagesRequest, response chan *pb.StreamQueueMessagesResponse, done chan bool)
type QueueDownstreamRequest ¶
type QueueDownstreamRequest struct { *pb.QueuesDownstreamRequest TransactionId string }
func NewPollRequest ¶
func NewPollRequest(pb *pb.QueuesDownstreamRequest, transactionId string) *QueueDownstreamRequest
func (*QueueDownstreamRequest) SetAutoAck ¶
func (p *QueueDownstreamRequest) SetAutoAck(autoAck bool) *QueueDownstreamRequest
func (*QueueDownstreamRequest) SetChannel ¶
func (p *QueueDownstreamRequest) SetChannel(channel string) *QueueDownstreamRequest
func (*QueueDownstreamRequest) SetMaxItems ¶
func (p *QueueDownstreamRequest) SetMaxItems(maxItems int) *QueueDownstreamRequest
func (*QueueDownstreamRequest) SetWaitTimeout ¶
func (p *QueueDownstreamRequest) SetWaitTimeout(waitTimeout int) *QueueDownstreamRequest
type QueueDownstreamResponse ¶
type QueueDownstreamResponse struct { sync.Mutex *pb.QueuesDownstreamResponse // contains filtered or unexported fields }
func NewPollResponse ¶
func NewPollResponse(request *QueueDownstreamRequest) *QueueDownstreamResponse
func (*QueueDownstreamResponse) AckAll ¶
func (p *QueueDownstreamResponse) AckAll() error
func (*QueueDownstreamResponse) AckRange ¶
func (p *QueueDownstreamResponse) AckRange(seqRange []int64) error
func (*QueueDownstreamResponse) ActiveOffsets ¶
func (p *QueueDownstreamResponse) ActiveOffsets() ([]int64, error)
func (*QueueDownstreamResponse) Close ¶
func (p *QueueDownstreamResponse) Close()
func (*QueueDownstreamResponse) IsActive ¶
func (p *QueueDownstreamResponse) IsActive() bool
func (*QueueDownstreamResponse) NackAll ¶
func (p *QueueDownstreamResponse) NackAll() error
func (*QueueDownstreamResponse) NackRange ¶
func (p *QueueDownstreamResponse) NackRange(seqRange []int64) error
func (*QueueDownstreamResponse) ReQueueAll ¶
func (p *QueueDownstreamResponse) ReQueueAll(channel string) error
func (*QueueDownstreamResponse) ReQueueRange ¶
func (p *QueueDownstreamResponse) ReQueueRange(seqRange []int64, channel string) error
func (*QueueDownstreamResponse) TransactionId ¶
func (p *QueueDownstreamResponse) TransactionId() string
type QueuePool ¶
type QueuePool struct {
// contains filtered or unexported fields
}
func NewQueuePool ¶
func (*QueuePool) ClientsCount ¶
func (*QueuePool) ReleaseClient ¶
type QueuePoolClient ¶
type QueuePoolClient struct { Channel string Client *QueueClient LastConnect time.Time // contains filtered or unexported fields }
func NewQueuePoolClient ¶
func NewQueuePoolClient(channel string, opts *Options, policyCfg *config.QueueConfig) (*QueuePoolClient, error)
func (*QueuePoolClient) GetClient ¶
func (qpc *QueuePoolClient) GetClient() *QueueClient
func (*QueuePoolClient) ReleaseClient ¶
func (qpc *QueuePoolClient) ReleaseClient()
type QueuePoolOptions ¶
Click to show internal directories.
Click to hide internal directories.