Documentation
¶
Index ¶
- type AckAllRequest
- type AckAllResponse
- type GrpcClient
- type Option
- func WithAddress(host string, port int) Option
- func WithAuthToken(token string) Option
- func WithAutoReconnect(value bool) Option
- func WithCertificate(certData, serverOverrideDomain string) Option
- func WithCheckConnection(value bool) Option
- func WithClientId(id string) Option
- func WithConnectionNotificationFunc(fn func(msg string)) Option
- func WithCredentials(certFile, serverOverrideDomain string) Option
- func WithDefaultCacheTTL(ttl time.Duration) Option
- func WithDefaultChannel(channel string) Option
- func WithMaxReconnects(value int) Option
- func WithReceiveBufferSize(size int) Option
- func WithReconnectInterval(duration time.Duration) Option
- type Options
- type PollRequest
- func (p *PollRequest) SetAutoAck(autoAck bool) *PollRequest
- func (p *PollRequest) SetChannel(channel string) *PollRequest
- func (p *PollRequest) SetMaxItems(maxItems int) *PollRequest
- func (p *PollRequest) SetOnComplete(onComplete func()) *PollRequest
- func (p *PollRequest) SetOnErrorFunc(onErrorFunc func(err error)) *PollRequest
- func (p *PollRequest) SetWaitTimeout(waitTimeout int) *PollRequest
- type PollResponse
- func (r PollResponse) AckAll() error
- func (r PollResponse) AckOffsets(offsets ...int64) error
- func (r PollResponse) Close() error
- func (p *PollResponse) HasMessages() bool
- func (r PollResponse) NAckAll() error
- func (r PollResponse) NAckOffsets(offsets ...int64) error
- func (r PollResponse) ReQueueAll(channel string) error
- func (r PollResponse) ReQueueOffsets(channel string, offsets ...int64) error
- type QueueInfo
- type QueueMessage
- func (qm *QueueMessage) Ack() error
- func (r QueueMessage) AckAll() error
- func (r QueueMessage) AckOffsets(offsets ...int64) error
- func (qm *QueueMessage) AddTag(key, value string) *QueueMessage
- func (r QueueMessage) Close() error
- func (qm *QueueMessage) NAck() error
- func (r QueueMessage) NAckAll() error
- func (r QueueMessage) NAckOffsets(offsets ...int64) error
- func (qm *QueueMessage) ReQueue(channel string) error
- func (r QueueMessage) ReQueueAll(channel string) error
- func (r QueueMessage) ReQueueOffsets(channel string, offsets ...int64) error
- func (qm *QueueMessage) SetBody(body []byte) *QueueMessage
- func (qm *QueueMessage) SetChannel(channel string) *QueueMessage
- func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage
- func (qm *QueueMessage) SetId(id string) *QueueMessage
- func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage
- func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage
- func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage
- func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage
- func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage
- func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage
- type QueuesInfo
- type QueuesStreamClient
- func (q *QueuesStreamClient) AckAll(ctx context.Context, request *AckAllRequest) (*AckAllResponse, error)
- func (q *QueuesStreamClient) Close() error
- func (q *QueuesStreamClient) Create(ctx context.Context, channel string) error
- func (q *QueuesStreamClient) Delete(ctx context.Context, channel string) error
- func (q *QueuesStreamClient) List(ctx context.Context, search string) ([]*common.QueuesChannel, error)
- func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error)
- func (q *QueuesStreamClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
- func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error)
- type SendResult
- type SubscribeRequest
- type SubscribeResponse
- func (r SubscribeResponse) AckAll() error
- func (r SubscribeResponse) AckOffsets(offsets ...int64) error
- func (r SubscribeResponse) Close() error
- func (r SubscribeResponse) NAckAll() error
- func (r SubscribeResponse) NAckOffsets(offsets ...int64) error
- func (r SubscribeResponse) ReQueueAll(channel string) error
- func (r SubscribeResponse) ReQueueOffsets(channel string, offsets ...int64) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckAllRequest ¶
type AckAllRequest struct { ClientID string Channel string WaitTimeSeconds int32 // contains filtered or unexported fields }
func NewAckAllRequest ¶
func NewAckAllRequest() *AckAllRequest
func (*AckAllRequest) SetChannel ¶
func (req *AckAllRequest) SetChannel(channel string) *AckAllRequest
SetChannel - set ack all queue message request channel - mandatory if default channel was not set
func (*AckAllRequest) SetClientId ¶
func (req *AckAllRequest) SetClientId(clientId string) *AckAllRequest
func (*AckAllRequest) SetWaitTimeSeconds ¶
func (req *AckAllRequest) SetWaitTimeSeconds(wait int) *AckAllRequest
SetWaitTimeSeconds - set ack all queue message request wait timout
type AckAllResponse ¶
type GrpcClient ¶
type GrpcClient struct { pb.KubemqClient // contains filtered or unexported fields }
func NewGrpcClient ¶
func NewGrpcClient(ctx context.Context, op ...Option) (*GrpcClient, error)
func (*GrpcClient) Close ¶
func (g *GrpcClient) Close() error
func (*GrpcClient) GlobalClientId ¶
func (g *GrpcClient) GlobalClientId() string
func (*GrpcClient) Ping ¶
func (g *GrpcClient) Ping(ctx context.Context) (*pb.PingResult, error)
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithAddress ¶
WithAddress - set host and port address of KubeMQ server
func WithAuthToken ¶
WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection
func WithAutoReconnect ¶
WithAutoReconnect - set automatic reconnection in case of lost connectivity to server
func WithCertificate ¶
WithCertificate - set secured TLS credentials from the input certificate data for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCheckConnection ¶
WithCheckConnection - set server connectivity on grpcClient create
func WithClientId ¶
WithClientId - set grpcClient id to be used in all functions call with this grpcClient - mandatory
func WithConnectionNotificationFunc ¶ added in v1.7.2
WithConnectionNotificationFunc - set on connection activity messages
func WithCredentials ¶
WithCredentials - set secured TLS credentials from the input certificate file for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithDefaultCacheTTL ¶
WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value
func WithDefaultChannel ¶
WithDefaultChannel - set default channel for any outbound requests
func WithMaxReconnects ¶
WithMaxReconnects - set max reconnects before return error, default 0, never.
func WithReceiveBufferSize ¶
WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions
func WithReconnectInterval ¶
WithReconnectInterval - set reconnection interval duration, default is 5 seconds
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
func GetDefaultOptions ¶
func GetDefaultOptions() *Options
type PollRequest ¶
type PollRequest struct { Channel string `json:"Channel"` MaxItems int `json:"max_items"` WaitTimeout int `json:"wait_timeout"` AutoAck bool `json:"auto_ack"` OnErrorFunc func(err error) OnComplete func() }
PollRequest - Request parameters for Poll function
func NewPollRequest ¶
func NewPollRequest() *PollRequest
func (*PollRequest) SetAutoAck ¶
func (p *PollRequest) SetAutoAck(autoAck bool) *PollRequest
func (*PollRequest) SetChannel ¶
func (p *PollRequest) SetChannel(channel string) *PollRequest
func (*PollRequest) SetMaxItems ¶
func (p *PollRequest) SetMaxItems(maxItems int) *PollRequest
func (*PollRequest) SetOnComplete ¶
func (p *PollRequest) SetOnComplete(onComplete func()) *PollRequest
func (*PollRequest) SetOnErrorFunc ¶
func (p *PollRequest) SetOnErrorFunc(onErrorFunc func(err error)) *PollRequest
func (*PollRequest) SetWaitTimeout ¶
func (p *PollRequest) SetWaitTimeout(waitTimeout int) *PollRequest
type PollResponse ¶
type PollResponse struct { Messages []*QueueMessage // contains filtered or unexported fields }
func (PollResponse) AckOffsets ¶
func (*PollResponse) HasMessages ¶
func (p *PollResponse) HasMessages() bool
func (PollResponse) NAckOffsets ¶
func (PollResponse) ReQueueAll ¶
func (PollResponse) ReQueueOffsets ¶
type QueueInfo ¶ added in v1.7.0
type QueueInfo struct { Name string `json:"name"` Messages int64 `json:"messages"` Bytes int64 `json:"bytes"` FirstSequence int64 `json:"first_sequence"` LastSequence int64 `json:"last_sequence"` Sent int64 `json:"sent"` Subscribers int `json:"subscribers"` Waiting int64 `json:"waiting"` Delivered int64 `json:"delivered"` }
type QueueMessage ¶
type QueueMessage struct { *pb.QueueMessage // contains filtered or unexported fields }
func NewQueueMessage ¶
func NewQueueMessage() *QueueMessage
func (*QueueMessage) Ack ¶
func (qm *QueueMessage) Ack() error
func (QueueMessage) AckOffsets ¶
func (*QueueMessage) AddTag ¶
func (qm *QueueMessage) AddTag(key, value string) *QueueMessage
AddTag - add key value tags to query message
func (*QueueMessage) NAck ¶
func (qm *QueueMessage) NAck() error
func (QueueMessage) NAckOffsets ¶
func (*QueueMessage) ReQueue ¶
func (qm *QueueMessage) ReQueue(channel string) error
func (QueueMessage) ReQueueAll ¶
func (QueueMessage) ReQueueOffsets ¶
func (*QueueMessage) SetBody ¶
func (qm *QueueMessage) SetBody(body []byte) *QueueMessage
SetBody - set queue message body - mandatory if metadata field is empty
func (*QueueMessage) SetChannel ¶
func (qm *QueueMessage) SetChannel(channel string) *QueueMessage
SetChannel - set queue message Channel - mandatory if default Channel was not set
func (*QueueMessage) SetClientId ¶
func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage
SetClientId - set queue message ClientId - mandatory if default grpcClient was not set
func (*QueueMessage) SetId ¶
func (qm *QueueMessage) SetId(id string) *QueueMessage
SetId - set queue message id, otherwise new random uuid will be set
func (*QueueMessage) SetMetadata ¶
func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage
SetMetadata - set queue message metadata - mandatory if body field is empty
func (*QueueMessage) SetPolicyDelaySeconds ¶
func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage
SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery
func (*QueueMessage) SetPolicyExpirationSeconds ¶
func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage
SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires
func (*QueueMessage) SetPolicyMaxReceiveCount ¶
func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage
SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue
func (*QueueMessage) SetPolicyMaxReceiveQueue ¶
func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage
SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message
func (*QueueMessage) SetTags ¶
func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage
SetTags - set key value tags to queue message
type QueuesInfo ¶ added in v1.7.0
type QueuesStreamClient ¶
QueuesStreamClient is a client for streaming queues operations. It manages the connection to the Kubemq server and provides methods for interacting with queues.
func NewQueuesStreamClient ¶
func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesStreamClient, error)
NewQueuesStreamClient is a function that creates a new QueuesStreamClient instance. It takes a context and an optional list of options. It returns a QueuesStreamClient and an error. The function creates a new GrpcClient using the provided context and options. If the creation of the GrpcClient fails, an error is returned. Otherwise, a new QueuesStreamClient is created with the client context and client. The QueuesStreamClient is then returned along with a nil error.
func (*QueuesStreamClient) AckAll ¶
func (q *QueuesStreamClient) AckAll(ctx context.Context, request *AckAllRequest) (*AckAllResponse, error)
AckAll sends an acknowledgment for all messages in the specified channel. It validates the request, creates a new AckAllQueueMessagesRequest, and sends it to the client's AckAllQueueMessages method. If successful, it creates a new AckAllResponse with the response data and returns it.
func (*QueuesStreamClient) Close ¶
func (q *QueuesStreamClient) Close() error
Close closes the QueuesStreamClient by closing the upstream and downstream connections and then closing the underlying GrpcClient connection. It also sleeps for 100 milliseconds before closing the connections. Returns an error if any of the close operations encounter an error, or if closing the underlying GrpcClient encounters an error.
func (*QueuesStreamClient) Create ¶ added in v1.8.0
func (q *QueuesStreamClient) Create(ctx context.Context, channel string) error
Create sends a create channel request to the Kubemq server. It creates a new channel of type "queues" with the specified channel name. It returns an error if the request fails or if there is an error creating the channel.
func (*QueuesStreamClient) Delete ¶ added in v1.8.0
func (q *QueuesStreamClient) Delete(ctx context.Context, channel string) error
Delete deletes a channel in the QueuesStreamClient. It sends a request to the server to delete the specified channel. If the request encounters an error while sending or if the response contains an error message, an error is returned.
Parameters: - ctx: the context.Context for the request - channel: the name of the channel to delete
Returns: - error: an error if the delete channel request fails, otherwise nil
func (*QueuesStreamClient) List ¶ added in v1.8.0
func (q *QueuesStreamClient) List(ctx context.Context, search string) ([]*common.QueuesChannel, error)
List returns a list of QueuesChannels based on the given search string. It sends a request to the Kubemq server to retrieve the list of channels. The search string is used to filter the channels. It returns the list of QueuesChannels and any error encountered.
func (*QueuesStreamClient) Poll ¶
func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error)
Poll retrieves messages from the QueuesStreamClient. It checks if the downstream connection is ready and then calls downstream.poll()
func (*QueuesStreamClient) QueuesInfo ¶ added in v1.7.0
func (q *QueuesStreamClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
QueuesInfo returns information about queues based on the provided filter. It sends a request to the gRPC client to retrieve QueuesInfoResponse. The response is then transformed into a QueuesInfo struct and returned.
func (*QueuesStreamClient) Send ¶
func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error)
Send sends one or more QueueMessages to the QueuesStreamClient. It acquires a lock to ensure thread safety. If there is no upstream connection, a new upstream connection is created. If the upstream connection is not ready, an error is returned. If there are no messages to send, an error is returned. The messages are converted to a list of pb.QueueMessage structs. A new QueuesUpstreamRequest is created with a unique RequestID and the list of messages. The request is sent to the upstream server using the send method of the upstream connection. The response is received through a select statement, handling both the response and the cancellation of the request. If the response indicates an error, an error is returned. Otherwise, a new SendResult is created from the response and returned along with no error. If the context is canceled before receiving the response, the transaction is canceled and an error is returned. If any other error occurs during the execution, it is returned.
type SendResult ¶
type SendResult struct {
Results []*pb.SendQueueMessageResult
}
type SubscribeRequest ¶
type SubscribeRequest struct { Channels []string MaxItems int `json:"max_items"` WaitTimeout int `json:"wait_timeout"` AutoAck bool `json:"auto_ack"` }
func NewSubscribeRequest ¶
func NewSubscribeRequest() *SubscribeRequest
func (*SubscribeRequest) SetAutoAck ¶
func (s *SubscribeRequest) SetAutoAck(autoAck bool) *SubscribeRequest
func (*SubscribeRequest) SetChannels ¶
func (s *SubscribeRequest) SetChannels(channels ...string) *SubscribeRequest
func (*SubscribeRequest) SetMaxItems ¶
func (s *SubscribeRequest) SetMaxItems(maxItems int) *SubscribeRequest
func (*SubscribeRequest) SetWaitTimeout ¶
func (s *SubscribeRequest) SetWaitTimeout(waitTimeout int) *SubscribeRequest
type SubscribeResponse ¶
type SubscribeResponse struct { Messages []*QueueMessage // contains filtered or unexported fields }
func NewSubscribeResponse ¶
func NewSubscribeResponse() *SubscribeResponse