Documentation
¶
Index ¶
- type AckAllRequest
- type AckAllResponse
- type GrpcClient
- type Option
- func WithAddress(host string, port int) Option
- func WithAuthToken(token string) Option
- func WithAutoReconnect(value bool) Option
- func WithCertificate(certData, serverOverrideDomain string) Option
- func WithCheckConnection(value bool) Option
- func WithClientId(id string) Option
- func WithConnectionNotificationFunc(fn func(msg string)) Option
- func WithCredentials(certFile, serverOverrideDomain string) Option
- func WithDefaultCacheTTL(ttl time.Duration) Option
- func WithDefaultChannel(channel string) Option
- func WithMaxReconnects(value int) Option
- func WithReceiveBufferSize(size int) Option
- func WithReconnectInterval(duration time.Duration) Option
- type Options
- type PollRequest
- func (p *PollRequest) SetAutoAck(autoAck bool) *PollRequest
- func (p *PollRequest) SetChannel(channel string) *PollRequest
- func (p *PollRequest) SetMaxItems(maxItems int) *PollRequest
- func (p *PollRequest) SetOnComplete(onComplete func()) *PollRequest
- func (p *PollRequest) SetOnErrorFunc(onErrorFunc func(err error)) *PollRequest
- func (p *PollRequest) SetWaitTimeout(waitTimeout int) *PollRequest
- type PollResponse
- func (r PollResponse) AckAll() error
- func (r PollResponse) AckOffsets(offsets ...int64) error
- func (r PollResponse) Close() error
- func (p *PollResponse) HasMessages() bool
- func (r PollResponse) NAckAll() error
- func (r PollResponse) NAckOffsets(offsets ...int64) error
- func (r PollResponse) ReQueueAll(channel string) error
- func (r PollResponse) ReQueueOffsets(channel string, offsets ...int64) error
- type QueueInfo
- type QueueMessage
- func (qm *QueueMessage) Ack() error
- func (r QueueMessage) AckAll() error
- func (r QueueMessage) AckOffsets(offsets ...int64) error
- func (qm *QueueMessage) AddTag(key, value string) *QueueMessage
- func (r QueueMessage) Close() error
- func (qm *QueueMessage) NAck() error
- func (r QueueMessage) NAckAll() error
- func (r QueueMessage) NAckOffsets(offsets ...int64) error
- func (qm *QueueMessage) ReQueue(channel string) error
- func (r QueueMessage) ReQueueAll(channel string) error
- func (r QueueMessage) ReQueueOffsets(channel string, offsets ...int64) error
- func (qm *QueueMessage) SetBody(body []byte) *QueueMessage
- func (qm *QueueMessage) SetChannel(channel string) *QueueMessage
- func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage
- func (qm *QueueMessage) SetId(id string) *QueueMessage
- func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage
- func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage
- func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage
- func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage
- func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage
- func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage
- type QueuesInfo
- type QueuesStreamClient
- func (q *QueuesStreamClient) AckAll(ctx context.Context, request *AckAllRequest) (*AckAllResponse, error)
- func (q *QueuesStreamClient) Close() error
- func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error)
- func (q *QueuesStreamClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
- func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error)
- type SendResult
- type SubscribeRequest
- type SubscribeResponse
- func (r SubscribeResponse) AckAll() error
- func (r SubscribeResponse) AckOffsets(offsets ...int64) error
- func (r SubscribeResponse) Close() error
- func (r SubscribeResponse) NAckAll() error
- func (r SubscribeResponse) NAckOffsets(offsets ...int64) error
- func (r SubscribeResponse) ReQueueAll(channel string) error
- func (r SubscribeResponse) ReQueueOffsets(channel string, offsets ...int64) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckAllRequest ¶
type AckAllRequest struct { ClientID string Channel string WaitTimeSeconds int32 // contains filtered or unexported fields }
func NewAckAllRequest ¶
func NewAckAllRequest() *AckAllRequest
func (*AckAllRequest) SetChannel ¶
func (req *AckAllRequest) SetChannel(channel string) *AckAllRequest
SetChannel - set ack all queue message request channel - mandatory if default channel was not set
func (*AckAllRequest) SetClientId ¶
func (req *AckAllRequest) SetClientId(clientId string) *AckAllRequest
func (*AckAllRequest) SetWaitTimeSeconds ¶
func (req *AckAllRequest) SetWaitTimeSeconds(wait int) *AckAllRequest
SetWaitTimeSeconds - set ack all queue message request wait timout
type AckAllResponse ¶
type GrpcClient ¶
type GrpcClient struct { pb.KubemqClient // contains filtered or unexported fields }
func NewGrpcClient ¶
func NewGrpcClient(ctx context.Context, op ...Option) (*GrpcClient, error)
func (*GrpcClient) Close ¶
func (g *GrpcClient) Close() error
func (*GrpcClient) GlobalClientId ¶
func (g *GrpcClient) GlobalClientId() string
func (*GrpcClient) Ping ¶
func (g *GrpcClient) Ping(ctx context.Context) (*pb.PingResult, error)
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithAddress ¶
WithAddress - set host and port address of KubeMQ server
func WithAuthToken ¶
WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection
func WithAutoReconnect ¶
WithAutoReconnect - set automatic reconnection in case of lost connectivity to server
func WithCertificate ¶
WithCertificate - set secured TLS credentials from the input certificate data for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCheckConnection ¶
WithCheckConnection - set server connectivity on grpcClient create
func WithClientId ¶
WithClientId - set grpcClient id to be used in all functions call with this grpcClient - mandatory
func WithConnectionNotificationFunc ¶ added in v1.7.2
WithConnectionNotificationFunc - set on connection activity messages
func WithCredentials ¶
WithCredentials - set secured TLS credentials from the input certificate file for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithDefaultCacheTTL ¶
WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value
func WithDefaultChannel ¶
WithDefaultChannel - set default channel for any outbound requests
func WithMaxReconnects ¶
WithMaxReconnects - set max reconnects before return error, default 0, never.
func WithReceiveBufferSize ¶
WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions
func WithReconnectInterval ¶
WithReconnectInterval - set reconnection interval duration, default is 5 seconds
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
func GetDefaultOptions ¶
func GetDefaultOptions() *Options
type PollRequest ¶
type PollRequest struct { Channel string `json:"Channel"` MaxItems int `json:"max_items"` WaitTimeout int `json:"wait_timeout"` AutoAck bool `json:"auto_ack"` OnErrorFunc func(err error) OnComplete func() }
PollRequest - Request parameters for Poll function
func NewPollRequest ¶
func NewPollRequest() *PollRequest
func (*PollRequest) SetAutoAck ¶
func (p *PollRequest) SetAutoAck(autoAck bool) *PollRequest
func (*PollRequest) SetChannel ¶
func (p *PollRequest) SetChannel(channel string) *PollRequest
func (*PollRequest) SetMaxItems ¶
func (p *PollRequest) SetMaxItems(maxItems int) *PollRequest
func (*PollRequest) SetOnComplete ¶
func (p *PollRequest) SetOnComplete(onComplete func()) *PollRequest
func (*PollRequest) SetOnErrorFunc ¶
func (p *PollRequest) SetOnErrorFunc(onErrorFunc func(err error)) *PollRequest
func (*PollRequest) SetWaitTimeout ¶
func (p *PollRequest) SetWaitTimeout(waitTimeout int) *PollRequest
type PollResponse ¶
type PollResponse struct { Messages []*QueueMessage // contains filtered or unexported fields }
func (PollResponse) AckOffsets ¶
func (*PollResponse) HasMessages ¶
func (p *PollResponse) HasMessages() bool
func (PollResponse) NAckOffsets ¶
func (PollResponse) ReQueueAll ¶
func (PollResponse) ReQueueOffsets ¶
type QueueInfo ¶ added in v1.7.0
type QueueInfo struct { Name string `json:"name"` Messages int64 `json:"messages"` Bytes int64 `json:"bytes"` FirstSequence int64 `json:"first_sequence"` LastSequence int64 `json:"last_sequence"` Sent int64 `json:"sent"` Subscribers int `json:"subscribers"` Waiting int64 `json:"waiting"` Delivered int64 `json:"delivered"` }
type QueueMessage ¶
type QueueMessage struct { *pb.QueueMessage // contains filtered or unexported fields }
func NewQueueMessage ¶
func NewQueueMessage() *QueueMessage
func (*QueueMessage) Ack ¶
func (qm *QueueMessage) Ack() error
func (QueueMessage) AckOffsets ¶
func (*QueueMessage) AddTag ¶
func (qm *QueueMessage) AddTag(key, value string) *QueueMessage
AddTag - add key value tags to query message
func (*QueueMessage) NAck ¶
func (qm *QueueMessage) NAck() error
func (QueueMessage) NAckOffsets ¶
func (*QueueMessage) ReQueue ¶
func (qm *QueueMessage) ReQueue(channel string) error
func (QueueMessage) ReQueueAll ¶
func (QueueMessage) ReQueueOffsets ¶
func (*QueueMessage) SetBody ¶
func (qm *QueueMessage) SetBody(body []byte) *QueueMessage
SetBody - set queue message body - mandatory if metadata field is empty
func (*QueueMessage) SetChannel ¶
func (qm *QueueMessage) SetChannel(channel string) *QueueMessage
SetChannel - set queue message Channel - mandatory if default Channel was not set
func (*QueueMessage) SetClientId ¶
func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage
SetClientId - set queue message ClientId - mandatory if default grpcClient was not set
func (*QueueMessage) SetId ¶
func (qm *QueueMessage) SetId(id string) *QueueMessage
SetId - set queue message id, otherwise new random uuid will be set
func (*QueueMessage) SetMetadata ¶
func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage
SetMetadata - set queue message metadata - mandatory if body field is empty
func (*QueueMessage) SetPolicyDelaySeconds ¶
func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage
SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery
func (*QueueMessage) SetPolicyExpirationSeconds ¶
func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage
SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires
func (*QueueMessage) SetPolicyMaxReceiveCount ¶
func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage
SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue
func (*QueueMessage) SetPolicyMaxReceiveQueue ¶
func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage
SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message
func (*QueueMessage) SetTags ¶
func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage
SetTags - set key value tags to queue message
type QueuesInfo ¶ added in v1.7.0
type QueuesStreamClient ¶
type QueuesStreamClient struct {
// contains filtered or unexported fields
}
func NewQueuesStreamClient ¶
func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesStreamClient, error)
func (*QueuesStreamClient) AckAll ¶
func (q *QueuesStreamClient) AckAll(ctx context.Context, request *AckAllRequest) (*AckAllResponse, error)
func (*QueuesStreamClient) Close ¶
func (q *QueuesStreamClient) Close() error
func (*QueuesStreamClient) Poll ¶
func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error)
func (*QueuesStreamClient) QueuesInfo ¶ added in v1.7.0
func (q *QueuesStreamClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
func (*QueuesStreamClient) Send ¶
func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error)
type SendResult ¶
type SendResult struct {
Results []*pb.SendQueueMessageResult
}
type SubscribeRequest ¶
type SubscribeRequest struct { Channels []string MaxItems int `json:"max_items"` WaitTimeout int `json:"wait_timeout"` AutoAck bool `json:"auto_ack"` }
func NewSubscribeRequest ¶
func NewSubscribeRequest() *SubscribeRequest
func (*SubscribeRequest) SetAutoAck ¶
func (s *SubscribeRequest) SetAutoAck(autoAck bool) *SubscribeRequest
func (*SubscribeRequest) SetChannels ¶
func (s *SubscribeRequest) SetChannels(channels ...string) *SubscribeRequest
func (*SubscribeRequest) SetMaxItems ¶
func (s *SubscribeRequest) SetMaxItems(maxItems int) *SubscribeRequest
func (*SubscribeRequest) SetWaitTimeout ¶
func (s *SubscribeRequest) SetWaitTimeout(waitTimeout int) *SubscribeRequest
type SubscribeResponse ¶
type SubscribeResponse struct { Messages []*QueueMessage // contains filtered or unexported fields }
func NewSubscribeResponse ¶
func NewSubscribeResponse() *SubscribeResponse