client

package
v0.0.0-...-7febb3a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseSubscriptionRequest

func ParseSubscriptionRequest(req *pb.Subscribe) []stan.SubscriptionOption

func UnmarshalToEventReceive

func UnmarshalToEventReceive(data []byte) (*pb.EventReceive, error)

func ValidateEventMessage

func ValidateEventMessage(msg *pb.Event) error

func ValidatePollRequest

func ValidatePollRequest(subRequest *pb.QueuesDownstreamRequest) error

func ValidateQueueMessage

func ValidateQueueMessage(msg *pb.QueueMessage) error

func ValidateReceiveQueueMessageRequest

func ValidateReceiveQueueMessageRequest(subRequest *pb.ReceiveQueueMessagesRequest) error

func ValidateRequest

func ValidateRequest(req *pb.Request) error

func ValidateResponse

func ValidateResponse(res *pb.Response) error

func ValidateSubscriptionToEvents

func ValidateSubscriptionToEvents(subReq *pb.Subscribe, kindID entities.KindType) error

func ValidateSubscriptionToRequests

func ValidateSubscriptionToRequests(subReq *pb.Subscribe) error

Types

type AsyncDispatcher

type AsyncDispatcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(conn stan.Conn) *AsyncDispatcher

func (*AsyncDispatcher) DispatchAsync

func (ad *AsyncDispatcher) DispatchAsync(items []*DispatchItem) map[int]error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts *Options) (*Client, error)

func NewStoreClient

func NewStoreClient(opts *Options) (*Client, error)

func (*Client) Disconnect

func (c *Client) Disconnect() error

func (*Client) SendCommand

func (c *Client) SendCommand(ctx context.Context, req *pb.Request) (*pb.Response, error)

func (*Client) SendEvents

func (c *Client) SendEvents(ctx context.Context, event *pb.Event) (*pb.Result, error)

func (*Client) SendEventsStore

func (c *Client) SendEventsStore(ctx context.Context, event *pb.Event) (*pb.Result, error)

func (*Client) SendQuery

func (c *Client) SendQuery(ctx context.Context, req *pb.Request) (*pb.Response, error)

func (*Client) SendResponse

func (c *Client) SendResponse(ctx context.Context, res *pb.Response) error

func (*Client) SubscribeToCommands

func (c *Client) SubscribeToCommands(ctx context.Context, subReq *pb.Subscribe, reqCh chan *pb.Request) error

func (*Client) SubscribeToEvents

func (c *Client) SubscribeToEvents(ctx context.Context, subReq *pb.Subscribe, eventsCh chan *pb.EventReceive) error

func (*Client) SubscribeToEventsStore

func (c *Client) SubscribeToEventsStore(ctx context.Context, subReq *pb.Subscribe, eventsCh chan *pb.EventReceive) error

func (*Client) SubscribeToQueries

func (c *Client) SubscribeToQueries(ctx context.Context, subReq *pb.Subscribe, reqCh chan *pb.Request) error

func (*Client) UnSubscribe

func (c *Client) UnSubscribe() error

type DispatchItem

type DispatchItem struct {
	// contains filtered or unexported fields
}

type Options

type Options struct {
	ClientID          string
	Deadline          time.Duration
	MemoryPipe        *pipe.Pipe
	AutoReconnect     bool
	MaxInflight       int
	PubAckWaitSeconds int
}

func NewClientOptions

func NewClientOptions(clientId string) *Options

func (*Options) SetAutoReconnect

func (o *Options) SetAutoReconnect(value bool) *Options

func (*Options) SetDeadline

func (o *Options) SetDeadline(value time.Duration) *Options

func (*Options) SetMaxInflight

func (o *Options) SetMaxInflight(value int) *Options

func (*Options) SetMemoryPipe

func (o *Options) SetMemoryPipe(value *pipe.Pipe) *Options

func (*Options) SetPubAckWaitSeconds

func (o *Options) SetPubAckWaitSeconds(value int) *Options

type QueueClient

type QueueClient struct {
	// contains filtered or unexported fields
}

func NewQueueClient

func NewQueueClient(opts *Options, policyCfg *config.QueueConfig) (*QueueClient, error)

func (*QueueClient) AckAllQueueMessages

func (*QueueClient) Disconnect

func (qc *QueueClient) Disconnect() error

func (*QueueClient) MonitorQueueMessages

func (qc *QueueClient) MonitorQueueMessages(ctx context.Context, channel string, msgCh chan *pb.QueueMessage, errCh chan error, done chan bool)

func (*QueueClient) Poll

func (*QueueClient) ReceiveQueueMessages

func (qc *QueueClient) ReceiveQueueMessages(ctx context.Context, request *pb.ReceiveQueueMessagesRequest) (*pb.ReceiveQueueMessagesResponse, error)

func (*QueueClient) SendQueueMessage

func (qc *QueueClient) SendQueueMessage(ctx context.Context, msg *pb.QueueMessage) *pb.SendQueueMessageResult

func (*QueueClient) SendQueueMessagesBatch

func (qc *QueueClient) SendQueueMessagesBatch(ctx context.Context, batch *pb.QueueMessagesBatchRequest) (*pb.QueueMessagesBatchResponse, error)

func (*QueueClient) SetDelayMessagesProcessor

func (qc *QueueClient) SetDelayMessagesProcessor(ctx context.Context)

func (*QueueClient) SetQueueStreamMiddlewareFunc

func (qc *QueueClient) SetQueueStreamMiddlewareFunc(fn func(qc *QueueClient, msg *pb.StreamQueueMessagesRequest) error)

func (*QueueClient) SetQueuesDownstreamMiddlewareFunc

func (qc *QueueClient) SetQueuesDownstreamMiddlewareFunc(fn func(qc *QueueClient, req *pb.QueuesDownstreamRequest) error)

func (*QueueClient) ShutdownDelayedMessagesProcessor

func (qc *QueueClient) ShutdownDelayedMessagesProcessor()

func (*QueueClient) StreamQueueMessage

func (qc *QueueClient) StreamQueueMessage(parentCtx context.Context, requests chan *pb.StreamQueueMessagesRequest, response chan *pb.StreamQueueMessagesResponse, done chan bool)

type QueueDownstreamRequest

type QueueDownstreamRequest struct {
	*pb.QueuesDownstreamRequest
	TransactionId string
}

func NewPollRequest

func NewPollRequest(pb *pb.QueuesDownstreamRequest, transactionId string) *QueueDownstreamRequest

func (*QueueDownstreamRequest) SetAutoAck

func (p *QueueDownstreamRequest) SetAutoAck(autoAck bool) *QueueDownstreamRequest

func (*QueueDownstreamRequest) SetChannel

func (p *QueueDownstreamRequest) SetChannel(channel string) *QueueDownstreamRequest

func (*QueueDownstreamRequest) SetMaxItems

func (p *QueueDownstreamRequest) SetMaxItems(maxItems int) *QueueDownstreamRequest

func (*QueueDownstreamRequest) SetWaitTimeout

func (p *QueueDownstreamRequest) SetWaitTimeout(waitTimeout int) *QueueDownstreamRequest

type QueueDownstreamResponse

type QueueDownstreamResponse struct {
	sync.Mutex
	*pb.QueuesDownstreamResponse
	// contains filtered or unexported fields
}

func NewPollResponse

func NewPollResponse(request *QueueDownstreamRequest) *QueueDownstreamResponse

func (*QueueDownstreamResponse) AckAll

func (p *QueueDownstreamResponse) AckAll() error

func (*QueueDownstreamResponse) AckRange

func (p *QueueDownstreamResponse) AckRange(seqRange []int64) error

func (*QueueDownstreamResponse) ActiveOffsets

func (p *QueueDownstreamResponse) ActiveOffsets() ([]int64, error)

func (*QueueDownstreamResponse) Close

func (p *QueueDownstreamResponse) Close()

func (*QueueDownstreamResponse) IsActive

func (p *QueueDownstreamResponse) IsActive() bool

func (*QueueDownstreamResponse) NackAll

func (p *QueueDownstreamResponse) NackAll() error

func (*QueueDownstreamResponse) NackRange

func (p *QueueDownstreamResponse) NackRange(seqRange []int64) error

func (*QueueDownstreamResponse) ReQueueAll

func (p *QueueDownstreamResponse) ReQueueAll(channel string) error

func (*QueueDownstreamResponse) ReQueueRange

func (p *QueueDownstreamResponse) ReQueueRange(seqRange []int64, channel string) error

func (*QueueDownstreamResponse) TransactionId

func (p *QueueDownstreamResponse) TransactionId() string

type QueuePool

type QueuePool struct {
	// contains filtered or unexported fields
}

func NewQueuePool

func NewQueuePool(ctx context.Context, opts *QueuePoolOptions, appConfig *config.Config) *QueuePool

func (*QueuePool) ClientsCount

func (qp *QueuePool) ClientsCount() int

func (*QueuePool) Close

func (qp *QueuePool) Close()

func (*QueuePool) GetClient

func (qp *QueuePool) GetClient(channel string) (*QueueClient, error)

func (*QueuePool) ReleaseClient

func (qp *QueuePool) ReleaseClient(channel string)

type QueuePoolClient

type QueuePoolClient struct {
	Channel     string
	Client      *QueueClient
	LastConnect time.Time
	// contains filtered or unexported fields
}

func NewQueuePoolClient

func NewQueuePoolClient(channel string, opts *Options, policyCfg *config.QueueConfig) (*QueuePoolClient, error)

func (*QueuePoolClient) GetClient

func (qpc *QueuePoolClient) GetClient() *QueueClient

func (*QueuePoolClient) InUsed

func (qpc *QueuePoolClient) InUsed(d time.Duration) bool

func (*QueuePoolClient) ReleaseClient

func (qpc *QueuePoolClient) ReleaseClient()

type QueuePoolOptions

type QueuePoolOptions struct {
	KillAfter time.Duration
	MaxUsage  uint32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL