Documentation
¶
Index ¶
- Variables
- func CreateChannel(ctx context.Context, client *Client, clientId string, channel string, ...) error
- func DecodeCQChannelList(dataBytes []byte) ([]*common.CQChannel, error)
- func DecodePubSubChannelList(dataBytes []byte) ([]*common.PubSubChannel, error)
- func DecodeQueuesChannelList(dataBytes []byte) ([]*common.QueuesChannel, error)
- func DeleteChannel(ctx context.Context, client *Client, clientId string, channel string, ...) error
- func ListCQChannels(ctx context.Context, client *Client, clientId string, channelType string, ...) ([]*common.CQChannel, error)
- func ListPubSubChannels(ctx context.Context, client *Client, clientId string, channelType string, ...) ([]*common.PubSubChannel, error)
- func ListQueuesChannels(ctx context.Context, client *Client, clientId string, search string) ([]*common.QueuesChannel, error)
- type AckAllQueueMessagesRequest
- func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace
- func (req *AckAllQueueMessagesRequest) Complete(opts *Options) *AckAllQueueMessagesRequest
- func (req *AckAllQueueMessagesRequest) Send(ctx context.Context) (*AckAllQueueMessagesResponse, error)
- func (req *AckAllQueueMessagesRequest) SetChannel(channel string) *AckAllQueueMessagesRequest
- func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest
- func (req *AckAllQueueMessagesRequest) SetId(id string) *AckAllQueueMessagesRequest
- func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest
- func (req *AckAllQueueMessagesRequest) Validate() error
- type AckAllQueueMessagesResponse
- type Client
- func (c *Client) AQM() *AckAllQueueMessagesRequest
- func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
- func (c *Client) C() *Command
- func (c *Client) Close() error
- func (c *Client) E() *Event
- func (c *Client) ES() *EventStore
- func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest
- func (c *Client) NewCommand() *Command
- func (c *Client) NewEvent() *Event
- func (c *Client) NewEventStore() *EventStore
- func (c *Client) NewQuery() *Query
- func (c *Client) NewQueueMessage() *QueueMessage
- func (c *Client) NewQueueMessages() *QueueMessages
- func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest
- func (c *Client) NewResponse() *Response
- func (c *Client) NewStreamQueueMessage() *StreamQueueMessage
- func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)
- func (c *Client) Q() *Query
- func (c *Client) QM() *QueueMessage
- func (c *Client) QMB() *QueueMessages
- func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
- func (c *Client) R() *Response
- func (c *Client) RQM() *ReceiveQueueMessagesRequest
- func (c *Client) ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
- func (c *Client) SQM() *StreamQueueMessage
- func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
- func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
- func (c *Client) SetCommand(cmd *Command) *Command
- func (c *Client) SetEvent(e *Event) *Event
- func (c *Client) SetEventStore(es *EventStore) *EventStore
- func (c *Client) SetQuery(query *Query) *Query
- func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage
- func (c *Client) SetResponse(response *Response) *Response
- func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
- func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, ...)
- func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)
- func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
- func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)
- func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, ...) (<-chan *EventStoreReceive, error)
- func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
- func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
- func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)
- func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
- type Command
- func (c *Command) AddTag(key, value string) *Command
- func (c *Command) AddTrace(name string) *Trace
- func (c *Command) Send(ctx context.Context) (*CommandResponse, error)
- func (c *Command) SetBody(body []byte) *Command
- func (c *Command) SetChannel(channel string) *Command
- func (c *Command) SetClientId(clientId string) *Command
- func (c *Command) SetId(id string) *Command
- func (c *Command) SetMetadata(metadata string) *Command
- func (c *Command) SetTags(tags map[string]string) *Command
- func (c *Command) SetTimeout(timeout time.Duration) *Command
- type CommandReceive
- type CommandResponse
- type CommandsClient
- func (c *CommandsClient) Close() error
- func (c *CommandsClient) Create(ctx context.Context, channel string) error
- func (c *CommandsClient) Delete(ctx context.Context, channel string) error
- func (c *CommandsClient) List(ctx context.Context, search string) ([]*common.CQChannel, error)
- func (c *CommandsClient) Response(ctx context.Context, response *Response) error
- func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)
- func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, ...) error
- type CommandsSubscription
- type Event
- func (e *Event) AddTag(key, value string) *Event
- func (e *Event) Send(ctx context.Context) error
- func (e *Event) SetBody(body []byte) *Event
- func (e *Event) SetChannel(channel string) *Event
- func (e *Event) SetClientId(clientId string) *Event
- func (e *Event) SetId(id string) *Event
- func (e *Event) SetMetadata(metadata string) *Event
- func (e *Event) SetTags(tags map[string]string) *Event
- func (e *Event) String() string
- type EventStore
- func (es *EventStore) AddTag(key, value string) *EventStore
- func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)
- func (es *EventStore) SetBody(body []byte) *EventStore
- func (es *EventStore) SetChannel(channel string) *EventStore
- func (es *EventStore) SetClientId(clientId string) *EventStore
- func (es *EventStore) SetId(id string) *EventStore
- func (es *EventStore) SetMetadata(metadata string) *EventStore
- func (es *EventStore) SetTags(tags map[string]string) *EventStore
- type EventStoreReceive
- type EventStoreResult
- type EventsClient
- func (e *EventsClient) Close() error
- func (e *EventsClient) Create(ctx context.Context, channel string) error
- func (e *EventsClient) Delete(ctx context.Context, channel string) error
- func (e *EventsClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error)
- func (e *EventsClient) Send(ctx context.Context, message *Event) error
- func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)
- func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, ...) error
- type EventsErrorsHandler
- type EventsMessageHandler
- type EventsStoreClient
- func (es *EventsStoreClient) Close() error
- func (es *EventsStoreClient) Create(ctx context.Context, channel string) error
- func (es *EventsStoreClient) Delete(ctx context.Context, channel string) error
- func (es *EventsStoreClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error)
- func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)
- func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)
- func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, ...) error
- type EventsStoreSubscription
- type EventsSubscription
- type Option
- func WithAddress(host string, port int) Option
- func WithAuthToken(token string) Option
- func WithAutoReconnect(value bool) Option
- func WithCertificate(certData, serverOverrideDomain string) Option
- func WithCheckConnection(value bool) Option
- func WithClientId(id string) Option
- func WithCredentials(certFile, serverOverrideDomain string) Option
- func WithDefaultCacheTTL(ttl time.Duration) Option
- func WithDefaultChannel(channel string) Option
- func WithMaxReconnects(value int) Option
- func WithReceiveBufferSize(size int) Option
- func WithReconnectInterval(duration time.Duration) Option
- func WithTransportType(transportType TransportType) Option
- func WithUri(uri string) Option
- type Options
- type QueriesClient
- func (q *QueriesClient) Close() error
- func (q *QueriesClient) Create(ctx context.Context, channel string) error
- func (q *QueriesClient) Delete(ctx context.Context, channel string) error
- func (q *QueriesClient) List(ctx context.Context, search string) ([]*common.CQChannel, error)
- func (q *QueriesClient) Response(ctx context.Context, response *Response) error
- func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)
- func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, ...) error
- type QueriesSubscription
- type Query
- func (q *Query) AddTag(key, value string) *Query
- func (q *Query) AddTrace(name string) *Trace
- func (q *Query) Send(ctx context.Context) (*QueryResponse, error)
- func (q *Query) SetBody(body []byte) *Query
- func (q *Query) SetCacheKey(cacheKey string) *Query
- func (q *Query) SetCacheTTL(ttl time.Duration) *Query
- func (q *Query) SetChannel(channel string) *Query
- func (q *Query) SetClientId(clientId string) *Query
- func (q *Query) SetId(id string) *Query
- func (q *Query) SetMetadata(metadata string) *Query
- func (q *Query) SetTags(tags map[string]string) *Query
- func (q *Query) SetTimeout(timeout time.Duration) *Query
- type QueryReceive
- type QueryResponse
- type QueueInfo
- type QueueMessage
- func (qm *QueueMessage) Ack() error
- func (qm *QueueMessage) AddTag(key, value string) *QueueMessage
- func (qm *QueueMessage) AddTrace(name string) *Trace
- func (qm *QueueMessage) ExtendVisibility(value int32) error
- func (qm *QueueMessage) Reject() error
- func (qm *QueueMessage) Resend(channel string) error
- func (qm *QueueMessage) Send(ctx context.Context) (*SendQueueMessageResult, error)
- func (qm *QueueMessage) SetBody(body []byte) *QueueMessage
- func (qm *QueueMessage) SetChannel(channel string) *QueueMessage
- func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage
- func (qm *QueueMessage) SetId(id string) *QueueMessage
- func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage
- func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage
- func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage
- func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage
- func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage
- func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage
- type QueueMessageAttributes
- type QueueMessagePolicy
- type QueueMessages
- type QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) Complete(opts *Options) *QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) SetChannel(channel string) *QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) SetClientId(clientId string) *QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) SetId(id string) *QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest
- func (req *QueueTransactionMessageRequest) Validate() error
- type QueueTransactionMessageResponse
- func (qt *QueueTransactionMessageResponse) Ack() error
- func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error
- func (qt *QueueTransactionMessageResponse) Reject() error
- func (qt *QueueTransactionMessageResponse) Resend(channel string) error
- func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error
- type QueuesClient
- func (q *QueuesClient) AckAll(ctx context.Context, request *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
- func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)
- func (q *QueuesClient) Close() error
- func (q *QueuesClient) Create(ctx context.Context, channel string) error
- func (q *QueuesClient) Delete(ctx context.Context, channel string) error
- func (q *QueuesClient) List(ctx context.Context, search string) ([]*common.QueuesChannel, error)
- func (q *QueuesClient) Peek(ctx context.Context, request *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
- func (q *QueuesClient) Pull(ctx context.Context, request *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
- func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
- func (q *QueuesClient) Send(ctx context.Context, message *QueueMessage) (*SendQueueMessageResult, error)
- func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, ...) (chan struct{}, error)
- func (q *QueuesClient) Transaction(ctx context.Context, request *QueueTransactionMessageRequest) (*QueueTransactionMessageResponse, error)
- func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, ...) (chan struct{}, error)
- type QueuesInfo
- type ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace
- func (req *ReceiveQueueMessagesRequest) Complete(opts *Options) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) Send(ctx context.Context) (*ReceiveQueueMessagesResponse, error)
- func (req *ReceiveQueueMessagesRequest) SetChannel(channel string) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) SetClientId(clientId string) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) SetId(id string) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) SetIsPeak(value bool) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest
- func (req *ReceiveQueueMessagesRequest) Validate() error
- type ReceiveQueueMessagesResponse
- type Response
- func (r *Response) AddTrace(name string) *Trace
- func (r *Response) Send(ctx context.Context) error
- func (r *Response) SetBody(body []byte) *Response
- func (r *Response) SetClientId(clientId string) *Response
- func (r *Response) SetError(err error) *Response
- func (r *Response) SetExecutedAt(executedAt time.Time) *Response
- func (r *Response) SetMetadata(metadata string) *Response
- func (r *Response) SetRequestId(id string) *Response
- func (r *Response) SetResponseTo(channel string) *Response
- func (r *Response) SetTags(tags map[string]string) *Response
- func (r *Response) String() string
- type SendQueueMessageResult
- type ServerInfo
- type StreamQueueMessage
- func (req *StreamQueueMessage) AddTrace(name string) *Trace
- func (req *StreamQueueMessage) Close()
- func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)
- func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error
- func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage
- func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage
- func (req *StreamQueueMessage) SetId(id string) *StreamQueueMessage
- type SubscriptionOption
- func StartFromFirstEvent() SubscriptionOption
- func StartFromLastEvent() SubscriptionOption
- func StartFromNewEvents() SubscriptionOption
- func StartFromSequence(sequence int) SubscriptionOption
- func StartFromTime(since time.Time) SubscriptionOption
- func StartFromTimeDelta(delta time.Duration) SubscriptionOption
- type Trace
- type Transport
- type TransportType
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance") ErrNoTransportConnection = errors.New("no transport layer established, aborting") )
Functions ¶
func CreateChannel ¶ added in v1.8.0
func DecodeCQChannelList ¶ added in v1.8.0
func DecodePubSubChannelList ¶ added in v1.8.0
func DecodePubSubChannelList(dataBytes []byte) ([]*common.PubSubChannel, error)
func DecodeQueuesChannelList ¶ added in v1.8.0
func DecodeQueuesChannelList(dataBytes []byte) ([]*common.QueuesChannel, error)
func DeleteChannel ¶ added in v1.8.0
func ListCQChannels ¶ added in v1.8.0
func ListPubSubChannels ¶ added in v1.8.0
func ListQueuesChannels ¶ added in v1.8.0
Types ¶
type AckAllQueueMessagesRequest ¶ added in v1.2.0
type AckAllQueueMessagesRequest struct { RequestID string ClientID string Channel string WaitTimeSeconds int32 // contains filtered or unexported fields }
func (*AckAllQueueMessagesRequest) AddTrace ¶ added in v1.2.0
func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace
AddTrace - add tracing support to ack all receive queue message request
func (*AckAllQueueMessagesRequest) Complete ¶ added in v1.5.0
func (req *AckAllQueueMessagesRequest) Complete(opts *Options) *AckAllQueueMessagesRequest
func (*AckAllQueueMessagesRequest) Send ¶ added in v1.2.0
func (req *AckAllQueueMessagesRequest) Send(ctx context.Context) (*AckAllQueueMessagesResponse, error)
Send - sending receive queue messages request , waiting for response or timeout
func (*AckAllQueueMessagesRequest) SetChannel ¶ added in v1.2.0
func (req *AckAllQueueMessagesRequest) SetChannel(channel string) *AckAllQueueMessagesRequest
SetChannel - set ack all queue message request channel - mandatory if default channel was not set
func (*AckAllQueueMessagesRequest) SetClientId ¶ added in v1.2.0
func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest
SetClientId - set ack all queue message request ClientId - mandatory if default client was not set
func (*AckAllQueueMessagesRequest) SetId ¶ added in v1.2.0
func (req *AckAllQueueMessagesRequest) SetId(id string) *AckAllQueueMessagesRequest
SetId - set ack all queue message request id, otherwise new random uuid will be set
func (*AckAllQueueMessagesRequest) SetWaitTimeSeconds ¶ added in v1.2.0
func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest
SetWaitTimeSeconds - set ack all queue message request wait timout
func (*AckAllQueueMessagesRequest) Validate ¶ added in v1.5.0
func (req *AckAllQueueMessagesRequest) Validate() error
type AckAllQueueMessagesResponse ¶ added in v1.2.0
type Client ¶
type Client struct { ServerInfo *ServerInfo // contains filtered or unexported fields }
func (*Client) AQM ¶ added in v1.2.0
func (c *Client) AQM() *AckAllQueueMessagesRequest
AQM - create an empty ack all receive queue messages request object
func (*Client) AckAllQueueMessages ¶ added in v1.2.0
func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
AckAllQueueMessages - send ack all messages in queue
func (*Client) NewAckAllQueueMessagesRequest ¶ added in v1.2.0
func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest
NewAckAllQueueMessagesRequest - create an empty ack all receive queue messages request object
func (*Client) NewCommand ¶ added in v1.2.0
NewCommand - create an empty command
func (*Client) NewEventStore ¶ added in v1.2.0
func (c *Client) NewEventStore() *EventStore
NewEventStore- create an empty event store
func (*Client) NewQueueMessage ¶ added in v1.2.0
func (c *Client) NewQueueMessage() *QueueMessage
NewQueueMessage - create an empty queue messages
func (*Client) NewQueueMessages ¶ added in v1.2.0
func (c *Client) NewQueueMessages() *QueueMessages
NewQueueMessages - create an empty queue messages array
func (*Client) NewReceiveQueueMessagesRequest ¶ added in v1.2.0
func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest
NewReceiveQueueMessagesRequest - create an empty receive queue message request object
func (*Client) NewResponse ¶ added in v1.2.0
NewResponse - create an empty response
func (*Client) NewStreamQueueMessage ¶ added in v1.2.0
func (c *Client) NewStreamQueueMessage() *StreamQueueMessage
NewStreamQueueMessage - create an empty stream receive queue message object
func (*Client) Ping ¶ added in v1.3.5
func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)
Ping - get status of current connection
func (*Client) QM ¶ added in v1.2.0
func (c *Client) QM() *QueueMessage
QM - create an empty queue message object
func (*Client) QMB ¶ added in v1.2.0
func (c *Client) QMB() *QueueMessages
QMB - create an empty queue message array object
func (*Client) QueuesInfo ¶ added in v1.7.0
QueuesInfo - get queues detailed information
func (*Client) RQM ¶ added in v1.2.0
func (c *Client) RQM() *ReceiveQueueMessagesRequest
RQM - create an empty receive queue message request object
func (*Client) ReceiveQueueMessages ¶ added in v1.2.0
func (c *Client) ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
ReceiveQueueMessages - call to receive messages from a queue
func (*Client) SQM ¶ added in v1.2.0
func (c *Client) SQM() *StreamQueueMessage
SQM - create an empty stream receive queue message object
func (*Client) SendQueueMessage ¶ added in v1.2.0
func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
SendQueueMessage - send single queue message
func (*Client) SendQueueMessages ¶ added in v1.2.0
func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
SendQueueMessages - send multiple queue messages
func (*Client) SetCommand ¶ added in v1.4.0
func (*Client) SetEventStore ¶ added in v1.4.0
func (c *Client) SetEventStore(es *EventStore) *EventStore
func (*Client) SetQueueMessage ¶ added in v1.4.0
func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage
func (*Client) SetResponse ¶ added in v1.4.0
func (*Client) StreamEvents ¶
StreamEvents - send stream of events in a single call
func (*Client) StreamEventsStore ¶
func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
StreamEventsStore - send stream of events store in a single call
func (*Client) SubscribeToCommands ¶
func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)
SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error
func (*Client) SubscribeToCommandsWithRequest ¶ added in v1.5.0
func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error
func (*Client) SubscribeToEvents ¶
func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)
SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error
func (*Client) SubscribeToEventsStore ¶
func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)
SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error
func (*Client) SubscribeToEventsStoreWithRequest ¶ added in v1.5.0
func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
SubscribeToEventsStoreWithRequest - subscribe to events store by channel and group with subscription option. return channel of events or en error
func (*Client) SubscribeToEventsWithRequest ¶ added in v1.5.0
func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error
func (*Client) SubscribeToQueries ¶
func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)
SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error
func (*Client) SubscribeToQueriesWithRequest ¶ added in v1.5.0
func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error
type Command ¶
type Command struct { Id string Channel string Metadata string Body []byte Timeout time.Duration ClientId string Tags map[string]string // contains filtered or unexported fields }
func NewCommand ¶ added in v1.4.0
func NewCommand() *Command
func (*Command) Send ¶
func (c *Command) Send(ctx context.Context) (*CommandResponse, error)
Send - sending command , waiting for response or timeout
func (*Command) SetChannel ¶
SetChannel - set command channel - mandatory if default channel was not set
func (*Command) SetClientId ¶
SetClientId - set command ClientId - mandatory if default client was not set
func (*Command) SetMetadata ¶
SetMetadata - set command metadata - mandatory if body field is empty
type CommandReceive ¶
type CommandReceive struct { Id string ClientId string Channel string Metadata string Body []byte ResponseTo string Tags map[string]string }
func (*CommandReceive) String ¶ added in v1.9.0
func (cr *CommandReceive) String() string
type CommandResponse ¶
type CommandResponse struct { CommandId string ResponseClientId string Executed bool ExecutedAt time.Time Error string Tags map[string]string }
func (*CommandResponse) String ¶ added in v1.9.0
func (cr *CommandResponse) String() string
type CommandsClient ¶ added in v1.5.0
type CommandsClient struct {
// contains filtered or unexported fields
}
CommandsClient represents a client that can be used to send commands to a server. It contains a reference to the underlying client that handles the communication.
func NewCommandsClient ¶ added in v1.5.0
func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error)
NewCommandsClient creates a new instance of CommandsClient with the provided context and options. It returns the created CommandsClient instance and an error if any.
func (*CommandsClient) Close ¶ added in v1.5.0
func (c *CommandsClient) Close() error
Close closes the connection to the server by invoking the Close method of the underlying client. It returns an error if the close operation fails.
func (*CommandsClient) Create ¶ added in v1.8.0
func (c *CommandsClient) Create(ctx context.Context, channel string) error
Create sends a request to create a channel of type "commands" with the given channel name.
It returns an error if there was a failure in sending the create channel request, or if there was an error creating the channel.
func (*CommandsClient) Delete ¶ added in v1.8.0
func (c *CommandsClient) Delete(ctx context.Context, channel string) error
Delete deletes a channel from the commands client.
It sends a delete channel request to the KubeMQ server and returns an error if there is any. The function constructs a delete channel query, sets the required metadata and timeout, and makes the request through the client's query service. If the response contains an error message, it returns an error.
ctx: The context.Context object for the request. channel: The name of the channel to be deleted.
Returns: - nil if the channel was deleted successfully. - An error if the channel deletion failed.
func (*CommandsClient) List ¶ added in v1.8.0
List returns a list of CQChannels that match the given search criteria. It uses the ListCQChannels function to retrieve the data and decode it into CQChannel objects. The search parameter is optional and can be used to filter the results. The function requires a context, a client, a client ID, a channel type, and a search string. It returns a slice of CQChannel objects and an error if any occurred.
func (*CommandsClient) Response ¶ added in v1.5.0
func (c *CommandsClient) Response(ctx context.Context, response *Response) error
Response sets the response object in the CommandsClient and sends the response using the client's transport.
This method requires the client to be initialized.
Parameters:
ctx: The context.Context object for the request. response: The Response object to set and send.
Returns:
- error: An error if the client is not ready or if sending the response fails.
func (*CommandsClient) Send ¶ added in v1.5.0
func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)
Send sends a command using the provided context and command request. It checks if the client is ready to send the command. It sets the transport for the request and calls the Send method on the client to send the command. It returns the command response and any error that occurred during the process.
func (*CommandsClient) Subscribe ¶ added in v1.5.0
func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error
Subscribe starts a subscription to receive commands from the server. It takes a context and a CommandsSubscription request as input, along with a callback function onCommandReceive that will be invoked whenever a command is received or an error occurs. It returns an error if the client is not ready, if the callback function is nil, or if the request fails validation. The Subscribe method launches a goroutine that listens for incoming commands and triggers the callback function accordingly.
type CommandsSubscription ¶ added in v1.5.0
CommandsSubscription represents a subscription to commands requests by channel and group. It contains the following fields:
- Channel: the channel to subscribe to
- Group: the group to subscribe to
- ClientId: the ID of the client subscribing to the commands
Usage example:
commandsSubscription := &CommandsSubscription{ Channel: "channel", Group: "group", ClientId: "clientID", } err := commandsSubscription.Validate() if err != nil { // handle validation error } commandsCh, err := client.SubscribeToCommands(context.Background(), commandsSubscription, errCh) if err != nil { // handle subscribe error } for command := range commandsCh { // handle received command }
It also has the following methods:
Complete(opts *Options) *CommandsSubscription: completes the commands subscription with the given options Validate() error: validates the commands subscription, ensuring that it has a channel and client ID
func (*CommandsSubscription) Complete ¶ added in v1.5.0
func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription
Complete method sets the `ClientId` field of the `CommandsSubscription` struct if it is empty. It takes an `Options` object as a parameter, and uses the `clientId` field of the `Options` object to set the `ClientId` field of `CommandsSubscription` if it is empty. It returns a pointer to the modified `CommandsSubscription` object.
Example usage:
request := &CommandsSubscription{ Channel: "my-channel", Group: "my-group", } options := &Options{ clientId: "my-client-id", } request.Complete(options) // Now the `ClientId` field of `request` will be set as "my-client-id" if it was empty.
func (*CommandsSubscription) Validate ¶ added in v1.5.0
func (cs *CommandsSubscription) Validate() error
Validate checks if a CommandsSubscription object has valid channel and clientId values. It returns an error if any of the required fields is empty. Otherwise, it returns nil.
type Event ¶
type Event struct { Id string Channel string Metadata string Body []byte ClientId string Tags map[string]string // contains filtered or unexported fields }
func (*Event) SetChannel ¶
SetChannel - set event channel - mandatory if default channel was not set
func (*Event) SetClientId ¶
SetClientId - set event ClientId - mandatory if default client was not set
func (*Event) SetMetadata ¶
SetMetadata - set event metadata - mandatory if body field was not set
type EventStore ¶
type EventStore struct { Id string Channel string Metadata string Body []byte ClientId string Tags map[string]string // contains filtered or unexported fields }
func NewEventStore ¶ added in v1.4.0
func NewEventStore() *EventStore
func (*EventStore) AddTag ¶ added in v1.2.0
func (es *EventStore) AddTag(key, value string) *EventStore
AddTag - add key value tags to event store message
func (*EventStore) Send ¶
func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)
Send - sending events store message
func (*EventStore) SetBody ¶
func (es *EventStore) SetBody(body []byte) *EventStore
SetBody - set event store body - mandatory if metadata field was not set
func (*EventStore) SetChannel ¶
func (es *EventStore) SetChannel(channel string) *EventStore
SetChannel - set event store channel - mandatory if default channel was not set
func (*EventStore) SetClientId ¶
func (es *EventStore) SetClientId(clientId string) *EventStore
SetClientId - set event store ClientId - mandatory if default client was not set
func (*EventStore) SetId ¶
func (es *EventStore) SetId(id string) *EventStore
SetId - set event store id otherwise new random uuid will be set
func (*EventStore) SetMetadata ¶
func (es *EventStore) SetMetadata(metadata string) *EventStore
SetMetadata - set event store metadata - mandatory if body field was not set
func (*EventStore) SetTags ¶ added in v1.4.1
func (es *EventStore) SetTags(tags map[string]string) *EventStore
SetTags - set key value tags to event store message
type EventStoreReceive ¶
type EventStoreReceive struct { Id string Sequence uint64 Timestamp time.Time Channel string Metadata string Body []byte ClientId string Tags map[string]string }
func (*EventStoreReceive) String ¶ added in v1.9.0
func (es *EventStoreReceive) String() string
type EventStoreResult ¶
type EventsClient ¶ added in v1.5.0
type EventsClient struct {
// contains filtered or unexported fields
}
EventsClient is a client for interacting with events. It encapsulates a client for making API requests.
func NewEventsClient ¶ added in v1.5.0
func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error)
NewEventsClient creates an instance of EventsClient by calling NewClient and returning EventsClient{client}
Parameters: - ctx: The context.Context to be used in NewClient call. - op: Optional parameters of type Option to be passed to NewClient.
Returns a pointer to EventsClient and an error if NewClient call fails.
func (*EventsClient) Close ¶ added in v1.5.0
func (e *EventsClient) Close() error
Close closes the EventsClient by invoking the Close method on its underlying Client. It returns an error if there was a problem closing the client.
func (*EventsClient) Create ¶ added in v1.8.0
func (e *EventsClient) Create(ctx context.Context, channel string) error
Create creates a new event channel with the specified channel name. It sends a create-channel request to the KubeMQ server using the provided context and client. The channelType parameter specifies the type of the channel ('events' in this case). It returns an error if an error occurs during the creation of the channel.
func (*EventsClient) Delete ¶ added in v1.8.0
func (e *EventsClient) Delete(ctx context.Context, channel string) error
Delete deletes a channel from the events client. It sends a delete channel request with the specified channel ID and type to the client. Returns an error if the delete channel request fails or if there is an error deleting the channel.
Example usage:
err := eventsClient.Delete(ctx, "events.A") if err != nil { log.Fatal(err) }
func (*EventsClient) List ¶ added in v1.8.0
func (e *EventsClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error)
List retrieves a list of PubSubChannels based on the provided search string. It calls ListPubSubChannels function with the given context, EventsClient's client, client ID, channel type "events", and the search string. It returns a slice of PubSubChannel pointers and an error.
func (*EventsClient) Send ¶ added in v1.5.0
func (e *EventsClient) Send(ctx context.Context, message *Event) error
Check if the client is ready
func (*EventsClient) Stream ¶ added in v1.5.0
func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)
Stream sends events from client to server and receives events from server to client. It takes a context as input, which can be used to cancel the streaming process. It also takes an onError function callback, which will be called when an error occurs during the streaming process. The method returns a sendFunc function, which can be used to send events to the server, and an error, which will be non-nil if the client is not ready or if the onError callback is not provided. The sendFunc function takes an event message as input and returns an error. It sends the event to the server through a channel, and if the context is cancelled before the event is sent, it returns an error indicating that the context was cancelled during event message sending. The method starts two goroutines, one for sending events to the server and one for receiving events from the server. The sending goroutine sends events to the server by accepting events from the eventsCh channel. The receiving goroutine receives errors from the errCh channel and calls the onError callback for each error received. It also checks if the context is cancelled and stops the receiving goroutine if it is. The method returns the sendFunc function and a nil error.
func (*EventsClient) Subscribe ¶ added in v1.5.0
func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error
Subscribe subscribes to events using the provided EventsSubscription and callback function. It checks if the client is ready and if the callback function is provided. It validates the subscription request. It creates a channel for errors, subscribes to events with the request and initializes an events channel. It starts a goroutine to listen for events or errors and calls the callback function accordingly. If the context is canceled, it returns. It returns an error if any.
type EventsErrorsHandler ¶ added in v1.5.0
type EventsErrorsHandler func(error)
EventsErrorsHandler is a type representing a function that handles errors for events.
type EventsMessageHandler ¶ added in v1.5.0
type EventsMessageHandler func(*Event)
EventsMessageHandler is a function type that takes in a pointer to an Event object and does not return anything.
type EventsStoreClient ¶ added in v1.5.0
type EventsStoreClient struct {
// contains filtered or unexported fields
}
EventsStoreClient is a struct that holds a client instance.
func NewEventsStoreClient ¶ added in v1.5.0
func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error)
NewEventsStoreClient is a function that creates a new EventsStoreClient.
func (*EventsStoreClient) Close ¶ added in v1.5.0
func (es *EventsStoreClient) Close() error
Close is a method that closes the client connection.
func (*EventsStoreClient) Create ¶ added in v1.8.0
func (es *EventsStoreClient) Create(ctx context.Context, channel string) error
Create is a method that creates a new channel in the events store.
func (*EventsStoreClient) Delete ¶ added in v1.8.0
func (es *EventsStoreClient) Delete(ctx context.Context, channel string) error
Delete is a method that deletes a channel from the events store.
func (*EventsStoreClient) List ¶ added in v1.8.0
func (es *EventsStoreClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error)
List is a method that lists all channels in the events store.
func (*EventsStoreClient) Send ¶ added in v1.5.0
func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)
Send is a method that sends an event to the store.
func (*EventsStoreClient) Stream ¶ added in v1.5.0
func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)
Stream is a method that streams events from the store.
func (*EventsStoreClient) Subscribe ¶ added in v1.5.0
func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error
Subscribe is a method that subscribes to events from the store.
type EventsStoreSubscription ¶ added in v1.5.0
type EventsStoreSubscription struct { Channel string Group string ClientId string SubscriptionType SubscriptionOption }
EventsStoreSubscription is a struct that holds the subscription details.
func (*EventsStoreSubscription) Complete ¶ added in v1.5.0
func (es *EventsStoreSubscription) Complete(opts *Options) *EventsStoreSubscription
Complete is a method that completes the subscription with the provided options.
func (*EventsStoreSubscription) Validate ¶ added in v1.5.0
func (es *EventsStoreSubscription) Validate() error
Validate is a method that validates the subscription details.
type EventsSubscription ¶ added in v1.5.0
EventsSubscription represents a subscription to events by channel and group.
func (*EventsSubscription) Complete ¶ added in v1.5.0
func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription
Complete sets the ClientId of the EventsSubscription if it is empty. It takes an *Options argument to retrieve the clientId value. If the ClientId is already set in the EventsSubscription, it will not be overridden. It returns a pointer to the modified EventsSubscription.
func (*EventsSubscription) Validate ¶ added in v1.5.0
func (es *EventsSubscription) Validate() error
Validate checks if the EventsSubscription has a non-empty Channel and ClientId. If either of them is empty, it returns an error. Otherwise, it returns nil.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithAddress ¶
WithAddress - set host and port address of KubeMQ server
func WithAuthToken ¶ added in v1.3.2
WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection
func WithAutoReconnect ¶ added in v1.4.0
WithAutoReconnect - set automatic reconnection in case of lost connectivity to server
func WithCertificate ¶ added in v1.3.1
WithCertificate - set secured TLS credentials from the input certificate data for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCheckConnection ¶ added in v1.4.0
WithCheckConnection - set server connectivity on client create
func WithClientId ¶
WithClientId - set client id to be used in all functions call with this client - mandatory
func WithCredentials ¶
WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithDefaultCacheTTL ¶
WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value
func WithDefaultChannel ¶ added in v1.3.1
WithDefaultChannel - set default channel for any outbound requests
func WithMaxReconnects ¶ added in v1.4.0
WithMaxReconnects - set max reconnects before return error, default 0, never.
func WithReceiveBufferSize ¶
WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions
func WithReconnectInterval ¶ added in v1.4.0
WithReconnectInterval - set reconnection interval duration, default is 5 seconds
func WithTransportType ¶
func WithTransportType(transportType TransportType) Option
WithTransportType - set client transport type, currently GRPC or Rest
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
func GetDefaultOptions ¶
func GetDefaultOptions() *Options
type QueriesClient ¶ added in v1.5.0
type QueriesClient struct {
// contains filtered or unexported fields
}
QueriesClient represents a client for making queries to a server. It contains a reference to the underlying client that handles the communication with the server.
func NewQueriesClient ¶ added in v1.5.0
func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error)
NewQueriesClient creates a new instance of QueriesClient by calling NewClient function and returning QueriesClient with the newly created Client instance. It receives a context and an optional list of options as arguments and returns a pointer to QueriesClient and an error.
func (*QueriesClient) Close ¶ added in v1.5.0
func (q *QueriesClient) Close() error
Close closes the QueriesClient's underlying client connection. This method returns an error if the client is not initialized.
func (*QueriesClient) Create ¶ added in v1.8.0
func (q *QueriesClient) Create(ctx context.Context, channel string) error
Create creates a new channel in the QueriesClient with the given channel name.
Parameters:
- ctx (context.Context): The context for the request.
- channel (string): The name of the channel to create.
Returns:
- error: An error if the channel creation fails.
func (*QueriesClient) Delete ¶ added in v1.8.0
func (q *QueriesClient) Delete(ctx context.Context, channel string) error
Delete deletes a channel.
The method receives a context and the channel name to be deleted. It invokes the DeleteChannel function passing the received channel name as well as the clientId and channelType. DeleteChannel creates a new Query instance and sets the necessary properties such as the channel, clientId, metadata, tags, and timeout. It then calls the Send method of the client to send the delete channel request. If an error occurs during the request execution, it returns an error. If the response contains an error message, it returns an error. Otherwise, it returns nil, indicating the channel was successfully deleted.
func (*QueriesClient) List ¶ added in v1.8.0
List retrieves a list of channels with the specified search criteria. It returns a slice of *common.CQChannel and an error. The search criteria is passed as a string parameter. The Channels are retrieved using the ListCQChannels function, passing the context, client, client ID, channel type, and search criteria. If an error occurs during the retrieval, it is returned. If the retrieval is successful, the data is decoded into a slice of *common.CQChannel using the DecodeCQChannelList function. The decoded data and any error are returned.
func (*QueriesClient) Response ¶ added in v1.5.0
func (q *QueriesClient) Response(ctx context.Context, response *Response) error
Response sends a response to a command or query request.
The response must have a corresponding requestId and response channel, which are set using SetRequestId and SetResponseTo methods, respectively. The requestId is mandatory, while the response channel is received from either CommandReceived or QueryReceived objects.
Additional optional attributes that can be set for the response include:
- Metadata: SetMetadata should be used to set metadata for a query response only.
- Body: SetBody should be used to set the body for a query response only.
- Tags: SetTags can be used to set tags for the response.
- ClientId: SetClientId can be used to set the clientId for the response. If not set, the default clientId will be used.
- Error: SetError can be used to set an error when executing a command or query.
- ExecutedAt: SetExecutedAt can be used to set the execution time for a command or query.
- Trace: AddTrace can be used to add tracing support to the response.
Once all the necessary attributes are set, call the Send method to send the response.
Example:
resp := &Response{} resp.SetRequestId("12345") resp.SetResponseTo("channel-name") resp.SetMetadata("metadata") resp.SetBody([]byte("response-body")) resp.SetTags(map[string]string{"tag1": "value1", "tag2": "value2"}) resp.SetClientId("client-id") resp.SetError(errors.New("some error")) resp.SetExecutedAt(time.Now()) resp.AddTrace("trace-name") err := resp.Send(ctx) if err != nil { log.Println("Failed to send response:", err) }
func (*QueriesClient) Send ¶ added in v1.5.0
func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)
Send sends a query request using the provided context. It checks if the client is ready, sets the transport from the client, and calls the Send method on the client with the request. It returns the query response and an error, if any.
The following fields in the request are required: - transport (set from the client)
Example usage:
request := &Query{ Id: "123", Channel: "channel1", Metadata: "metadata", Body: []byte("query body"), Timeout: time.Second, ClientId: "client1", CacheKey: "cacheKey", CacheTTL: time.Minute, Tags: map[string]string{"tag1": "value1"}, } response, err := client.Send(ctx, request)
func (*QueriesClient) Subscribe ¶ added in v1.5.0
func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, onQueryReceive func(query *QueryReceive, err error)) error
Subscribe is a method of QueriesClient that allows a client to subscribe to queries. It takes a context, a QueriesSubscription request, and a callback function onQueryReceive. The context is used for cancellation, timing out, and passing values between middleware. The QueriesSubscription defines the channel, group, and clientId for the subscription. The onQueryReceive callback function will be called when a query is received or an error occurs. The method returns an error if the client is not ready, if the onQueryReceive function is nil, or if the QueriesSubscription request is invalid. The method creates a channel for receiving errors, subscribes to queries with the given request, and starts a goroutine that continuously listens for new queries or errors on the channel. When a query is received, it is passed to the onQueryReceive function with a nil error. When an error is received, it is passed to the onQueryReceive function with a nil query. If the context is canceled, the goroutine returns. The method returns with nil if the subscription is successfully set up.
type QueriesSubscription ¶ added in v1.5.0
QueriesSubscription represents a subscription to queries requests by channel and group
func (*QueriesSubscription) Complete ¶ added in v1.5.0
func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription
Complete updates the ClientId of the QueriesSubscription if it is empty, using the clientId value from the provided Options. It returns a pointer to the modified QueriesSubscription.
func (*QueriesSubscription) Validate ¶ added in v1.5.0
func (qs *QueriesSubscription) Validate() error
Validate checks if a queries subscription is valid. It returns an error if any of the required fields are empty.
type Query ¶
type Query struct { Id string Channel string Metadata string Body []byte Timeout time.Duration ClientId string CacheKey string CacheTTL time.Duration Tags map[string]string // contains filtered or unexported fields }
func (*Query) Send ¶
func (q *Query) Send(ctx context.Context) (*QueryResponse, error)
Send - sending query request , waiting for response or timeout
func (*Query) SetCacheKey ¶
SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests
func (*Query) SetCacheTTL ¶
SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set
func (*Query) SetChannel ¶
SetChannel - set query channel - mandatory if default channel was not set
func (*Query) SetClientId ¶
SetClientId - set query ClientId - mandatory if default client was not set
func (*Query) SetMetadata ¶
SetMetadata - set query metadata - mandatory if body field is empty
type QueryReceive ¶
type QueryReceive struct { Id string Channel string ClientId string Metadata string Body []byte ResponseTo string Tags map[string]string }
func (*QueryReceive) String ¶ added in v1.9.0
func (qr *QueryReceive) String() string
type QueryResponse ¶
type QueryResponse struct { QueryId string Executed bool ExecutedAt time.Time Metadata string ResponseClientId string Body []byte CacheHit bool Error string Tags map[string]string }
func (*QueryResponse) String ¶ added in v1.9.0
func (qr *QueryResponse) String() string
type QueueInfo ¶ added in v1.7.0
type QueueInfo struct { Name string `json:"name"` Messages int64 `json:"messages"` Bytes int64 `json:"bytes"` FirstSequence int64 `json:"first_sequence"` LastSequence int64 `json:"last_sequence"` Sent int64 `json:"sent"` Subscribers int `json:"subscribers"` Waiting int64 `json:"waiting"` Delivered int64 `json:"delivered"` }
type QueueMessage ¶ added in v1.2.0
type QueueMessage struct { *pb.QueueMessage // contains filtered or unexported fields }
func NewQueueMessage ¶ added in v1.4.0
func NewQueueMessage() *QueueMessage
func (*QueueMessage) Ack ¶ added in v1.2.0
func (qm *QueueMessage) Ack() error
ack - sending ack queue message in stream queue message mode
func (*QueueMessage) AddTag ¶ added in v1.2.0
func (qm *QueueMessage) AddTag(key, value string) *QueueMessage
AddTag - add key value tags to query message
func (*QueueMessage) AddTrace ¶ added in v1.2.0
func (qm *QueueMessage) AddTrace(name string) *Trace
AddTrace - add tracing support to queue message
func (*QueueMessage) ExtendVisibility ¶ added in v1.2.0
func (qm *QueueMessage) ExtendVisibility(value int32) error
ExtendVisibility - extend the visibility time for the current receive message
func (*QueueMessage) Reject ¶ added in v1.2.0
func (qm *QueueMessage) Reject() error
reject - sending reject queue message in stream queue message mode
func (*QueueMessage) Resend ¶ added in v1.2.0
func (qm *QueueMessage) Resend(channel string) error
Resend - sending resend
func (*QueueMessage) Send ¶ added in v1.2.0
func (qm *QueueMessage) Send(ctx context.Context) (*SendQueueMessageResult, error)
Send - sending queue message request , waiting for response or timeout
func (*QueueMessage) SetBody ¶ added in v1.2.0
func (qm *QueueMessage) SetBody(body []byte) *QueueMessage
SetBody - set queue message body - mandatory if metadata field is empty
func (*QueueMessage) SetChannel ¶ added in v1.2.0
func (qm *QueueMessage) SetChannel(channel string) *QueueMessage
SetChannel - set queue message channel - mandatory if default channel was not set
func (*QueueMessage) SetClientId ¶ added in v1.2.0
func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage
SetClientId - set queue message ClientId - mandatory if default client was not set
func (*QueueMessage) SetId ¶ added in v1.2.0
func (qm *QueueMessage) SetId(id string) *QueueMessage
SetId - set queue message id, otherwise new random uuid will be set
func (*QueueMessage) SetMetadata ¶ added in v1.2.0
func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage
SetMetadata - set queue message metadata - mandatory if body field is empty
func (*QueueMessage) SetPolicyDelaySeconds ¶ added in v1.2.0
func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage
SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery
func (*QueueMessage) SetPolicyExpirationSeconds ¶ added in v1.2.0
func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage
SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires
func (*QueueMessage) SetPolicyMaxReceiveCount ¶ added in v1.2.0
func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage
SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue
func (*QueueMessage) SetPolicyMaxReceiveQueue ¶ added in v1.2.0
func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage
SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message
func (*QueueMessage) SetTags ¶ added in v1.4.1
func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage
SetTags - set key value tags to queue message
type QueueMessageAttributes ¶ added in v1.2.0
type QueueMessagePolicy ¶ added in v1.2.0
type QueueMessages ¶ added in v1.2.0
type QueueMessages struct { Messages []*QueueMessage // contains filtered or unexported fields }
func (*QueueMessages) Add ¶ added in v1.2.0
func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages
Add - adding new queue message to array of messages
func (*QueueMessages) Send ¶ added in v1.2.0
func (qma *QueueMessages) Send(ctx context.Context) ([]*SendQueueMessageResult, error)
Send - sending queue messages array request , waiting for response or timeout
type QueueTransactionMessageRequest ¶ added in v1.5.0
type QueueTransactionMessageRequest struct { RequestID string ClientID string Channel string VisibilitySeconds int32 WaitTimeSeconds int32 }
QueueTransactionMessageRequest represents a request to enqueue a transaction message on a queue. It contains the following fields: - RequestID: The ID of the request. - ClientID: The ID of the client. - Channel: The channel to enqueue the message on. - VisibilitySeconds: The number of seconds for which the message will be hidden from other clients. - WaitTimeSeconds: The number of seconds to wait for a message to be received from the queue.
It has the following methods: - SetId: Set the request ID. - SetClientId: Set the client ID. - SetChannel: Set the channel. - SetWaitTimeSeconds: Set the wait time in seconds. - SetVisibilitySeconds: Set the visibility time in seconds. - Complete: Complete the request using the specified options. - Validate: Validate that the request is valid.
Usage example:
req := NewQueueTransactionMessageRequest(). SetId("123"). SetClientId("456"). SetChannel("channel"). SetWaitTimeSeconds(60). SetVisibilitySeconds(30) err := req.Validate() if err != nil { fmt.Println(err) return } client := NewClient() resp, err := client.Transaction(ctx, req) if err != nil { fmt.Println(err) return } fmt.Println(resp)
func NewQueueTransactionMessageRequest ¶ added in v1.5.0
func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest
NewQueueTransactionMessageRequest - create a new instance of QueueTransactionMessageRequest
func (*QueueTransactionMessageRequest) Complete ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) Complete(opts *Options) *QueueTransactionMessageRequest
Complete sets the ClientID field of the QueueTransactionMessageRequest to the value of opts.clientId if the ClientID field is empty. It returns the modified QueueTransactionMessageRequest.
func (*QueueTransactionMessageRequest) SetChannel ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) SetChannel(channel string) *QueueTransactionMessageRequest
SetChannel - set receive queue transaction message request channel - mandatory if default channel was not set
func (*QueueTransactionMessageRequest) SetClientId ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) SetClientId(clientId string) *QueueTransactionMessageRequest
SetClientId - set receive queue transaction message request ClientId - mandatory if default client was not set
func (*QueueTransactionMessageRequest) SetId ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) SetId(id string) *QueueTransactionMessageRequest
SetId - set receive queue transaction message request id, otherwise new random uuid will be set
func (*QueueTransactionMessageRequest) SetVisibilitySeconds ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest
SetVisibilitySeconds - set receive queue transaction message visibility seconds for hiding message from other clients during processing. It takes an integer argument 'visibility' as the number of seconds. It sets the visibility seconds of the request to the given value. The updated QueueTransactionMessageRequest is returned.
func (*QueueTransactionMessageRequest) SetWaitTimeSeconds ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest
SetWaitTimeSeconds - set receive queue transaction message request wait timout for receiving queue message.
func (*QueueTransactionMessageRequest) Validate ¶ added in v1.5.0
func (req *QueueTransactionMessageRequest) Validate() error
Validate checks if the QueueTransactionMessageRequest is valid. It returns an error if any of the mandatory fields are empty or if any of the numeric fields are less than or equal to zero.
type QueueTransactionMessageResponse ¶ added in v1.5.0
type QueueTransactionMessageResponse struct { Message *QueueMessage // contains filtered or unexported fields }
QueueTransactionMessageResponse represents the response returned from `Transaction` method of QueuesClient. It contains the client instance, the stream used for communication, and the Message received. It can be used to interact with the response of a transaction operation on a queue message.
func (*QueueTransactionMessageResponse) Ack ¶ added in v1.5.0
func (qt *QueueTransactionMessageResponse) Ack() error
Ack sends an acknowledgment for the received message by sending the ACK request to the server. It returns an error if there is an issue with sending the ACK request or closing the stream connection.
func (*QueueTransactionMessageResponse) ExtendVisibilitySeconds ¶ added in v1.5.0
func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error
func (*QueueTransactionMessageResponse) Reject ¶ added in v1.5.0
func (qt *QueueTransactionMessageResponse) Reject() error
Reject - rejects the queue transaction message by sending a StreamQueueMessagesRequest with StreamRequestType_RejectMessage to the Kubemq server. If an error occurs during the transaction, it will be returned. Otherwise, it will close the stream connection by calling CloseSend() on the QueueTransactionMessageResponse stream.
func (*QueueTransactionMessageResponse) Resend ¶ added in v1.5.0
func (qt *QueueTransactionMessageResponse) Resend(channel string) error
func (*QueueTransactionMessageResponse) ResendNewMessage ¶ added in v1.5.0
func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error
ResendNewMessage - resends a modified message with a new QueueMessage to the same channel as the original message.
type QueuesClient ¶ added in v1.5.0
type QueuesClient struct {
// contains filtered or unexported fields
}
QueuesClient is a client for managing queues in a messaging system.
func NewQueuesClient ¶ added in v1.5.0
func NewQueuesClient(ctx context.Context, op ...Option) (*QueuesClient, error)
NewQueuesClient - create a new client for interacting with KubeMQ Queues Parameters:
- ctx: the context.Context used for the client
- op...: options to configure the client
Returns:
- *QueuesClient: the created Queues client instance
- error: any error that occurred during the creation of the client
func NewQueuesStreamClient ¶ added in v1.7.1
func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error)
func (*QueuesClient) AckAll ¶ added in v1.5.0
func (q *QueuesClient) AckAll(ctx context.Context, request *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
func (*QueuesClient) Batch ¶ added in v1.5.0
func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)
Batch sends a batch of queue messages in a single request. It creates a new QueueMessages request with the given messages and sets the transport to the client's transport. It then calls the Send method of the QueueMessages request with the provided context and returns the result.
func (*QueuesClient) Close ¶ added in v1.5.0
func (q *QueuesClient) Close() error
func (*QueuesClient) Create ¶ added in v1.8.0
func (q *QueuesClient) Create(ctx context.Context, channel string) error
Create - creates a new channel with the specified name in the 'queues' category. The channelType is set to "queues". It sends a create channel request to the KubeMQ server using the provided client, clientId, channel, and channelType. It returns an error if the request fails or if an error occurs during channel creation. Example:
err := queuesClient.Create(ctx, "queues.A") if err != nil { log.Fatal(err) }
It is recommended to defer the closing of the queues client using `defer queuesClient.Close()`
func (*QueuesClient) Delete ¶ added in v1.8.0
func (q *QueuesClient) Delete(ctx context.Context, channel string) error
Delete deletes a channel by sending a delete channel request to the KubeMQ server. It takes a context, a client, a client ID, a channel, and a channel type as parameters. The channel is the name of the channel to be deleted, and the channel type is the type of the channel. It returns an error if there was a problem sending the delete channel request or if there was an error deleting the channel. Example usage:
if err := queuesClient.Delete(ctx, "queues.A"); err != nil { log.Fatal(err) }
func (*QueuesClient) List ¶ added in v1.8.0
func (q *QueuesClient) List(ctx context.Context, search string) ([]*common.QueuesChannel, error)
func (*QueuesClient) Peek ¶ added in v1.5.0
func (q *QueuesClient) Peek(ctx context.Context, request *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
Peek - retrieve messages from the queue without removing them, using the provided ReceiveQueueMessagesRequest.
func (*QueuesClient) Pull ¶ added in v1.5.0
func (q *QueuesClient) Pull(ctx context.Context, request *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
Pull - Pulls messages from a queue using the provided request parameters. The request parameters include the channel, maximum number of messages to receive, wait time, and whether to peak or dequeue the queue. The method sets the transport field of the request to the QueuesClient's transport, sets the IsPeak field of the request to false, completes and validates the request, and sends the request using the provided context. It returns the response received from the request or an error if any.
func (*QueuesClient) QueuesInfo ¶ added in v1.7.0
func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
func (*QueuesClient) Send ¶ added in v1.5.0
func (q *QueuesClient) Send(ctx context.Context, message *QueueMessage) (*SendQueueMessageResult, error)
Send sends a queue message using the specified context and message. The transport field of the message is set to the transport of the QueuesClient. Returns the result of the SendQueueMessage method of the client, with the provided context.
ctx - The context to use for the send operation. message - The queue message to send.
func (*QueuesClient) Subscribe ¶ added in v1.5.0
func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, onReceive func(response *ReceiveQueueMessagesResponse, err error)) (chan struct{}, error)
Subscribe receives queue messages and executes the provided callback function onReceive when messages are received. It takes a context.Context object ctx, a *ReceiveQueueMessagesRequest object request, and a callback function onReceive that accepts a *ReceiveQueueMessagesResponse object and an error object. It returns a channel of type struct{} and an error. The channel can be used to signal the completion of the subscription and stop the subscription process. The function spawns a goroutine that continuously sends requests to receive queue messages until the subscription is stopped or the context is canceled. When a response is received, the onReceive callback is executed with the response and any error as arguments. If an error occurs during the request or response processing, the onReceive callback is executed with a nil response and the corresponding error.
func (*QueuesClient) Transaction ¶ added in v1.5.0
func (q *QueuesClient) Transaction(ctx context.Context, request *QueueTransactionMessageRequest) (*QueueTransactionMessageResponse, error)
Transaction - allows to receive a single message from a queue using transactional processing. The method sends a request to the server through a gRPC stream. If the gRPC raw client fails to connect, an error is returned. If the request fails to complete or validate, an error is returned. If the gRPC client fails to stream the queue message, an error is returned. If the request has an empty ClientID, it is assigned the default client ID from the client options. The request is sent to the gRPC stream using a StreamQueueMessagesRequest. If an error occurs while sending the request, an error is returned. The response is received from the gRPC stream using a ReceiveQueueMessagesResponse. If an error occurs while receiving the response, an error is returned. If the response has an error, an error is returned. If the response message is not in error, a QueueTransactionMessageResponse is returned, including the client, stream, and the QueueMessage. The method returns a pointer to the QueueTransactionMessageResponse and an error.
func (*QueuesClient) TransactionStream ¶ added in v1.5.0
func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, onReceive func(response *QueueTransactionMessageResponse, err error)) (chan struct{}, error)
TransactionStream - opens a transaction stream for receiving queue transaction messages. The provided `onReceive` callback function will be called for each received queue transaction message in the stream. The function should have the following signature:
func(response *QueueTransactionMessageResponse, err error)
The `response` parameter will contain the received queue transaction message, or nil if an error occurred. The `err` parameter will contain any error that occurred during the receiving process. If the `onReceive` callback function is nil, an error will be returned. The `request` parameter represents the queue transaction message request. The request must be completed and validated before calling this method. The function returns a `done` channel that can be used to stop the transaction stream, and an error if one occurred.
type QueuesInfo ¶ added in v1.7.0
type ReceiveQueueMessagesRequest ¶ added in v1.2.0
type ReceiveQueueMessagesRequest struct { RequestID string ClientID string Channel string MaxNumberOfMessages int32 WaitTimeSeconds int32 IsPeak bool // contains filtered or unexported fields }
func NewReceiveQueueMessagesRequest ¶ added in v1.5.0
func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest
func (*ReceiveQueueMessagesRequest) AddTrace ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace
AddTrace - add tracing support to receive queue message request
func (*ReceiveQueueMessagesRequest) Complete ¶ added in v1.5.0
func (req *ReceiveQueueMessagesRequest) Complete(opts *Options) *ReceiveQueueMessagesRequest
func (*ReceiveQueueMessagesRequest) Send ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) Send(ctx context.Context) (*ReceiveQueueMessagesResponse, error)
Send - sending receive queue messages request , waiting for response or timeout
func (*ReceiveQueueMessagesRequest) SetChannel ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) SetChannel(channel string) *ReceiveQueueMessagesRequest
SetChannel - set receive queue message request channel - mandatory if default channel was not set
func (*ReceiveQueueMessagesRequest) SetClientId ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) SetClientId(clientId string) *ReceiveQueueMessagesRequest
SetClientId - set receive queue message request ClientId - mandatory if default client was not set
func (*ReceiveQueueMessagesRequest) SetId ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) SetId(id string) *ReceiveQueueMessagesRequest
SetId - set receive queue message request id, otherwise new random uuid will be set
func (*ReceiveQueueMessagesRequest) SetIsPeak ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) SetIsPeak(value bool) *ReceiveQueueMessagesRequest
SetIsPeak - set receive queue message request type, true - peaking at the queue and not actual dequeue , false - dequeue the queue
func (*ReceiveQueueMessagesRequest) SetMaxNumberOfMessages ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest
SetMaxNumberOfMessages - set receive queue message request max number of messages to receive in single call
func (*ReceiveQueueMessagesRequest) SetWaitTimeSeconds ¶ added in v1.2.0
func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest
SetWaitTimeSeconds - set receive queue message request wait timout for receiving all requested messages
func (*ReceiveQueueMessagesRequest) Validate ¶ added in v1.5.0
func (req *ReceiveQueueMessagesRequest) Validate() error
type ReceiveQueueMessagesResponse ¶ added in v1.2.0
type Response ¶
type Response struct { RequestId string ResponseTo string Metadata string Body []byte ClientId string ExecutedAt time.Time Err error Tags map[string]string // contains filtered or unexported fields }
func NewResponse ¶ added in v1.4.0
func NewResponse() *Response
func (*Response) SetClientId ¶
SetClientID - set clientId response, if not set default clientId will be used
func (*Response) SetExecutedAt ¶
SetExecutedAt - set query or command execution time
func (*Response) SetMetadata ¶
SetMetadata - set metadata response, for query only
func (*Response) SetRequestId ¶
SetId - set response corresponded requestId - mandatory
func (*Response) SetResponseTo ¶
SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory
type SendQueueMessageResult ¶ added in v1.2.0
type ServerInfo ¶ added in v1.2.0
type StreamQueueMessage ¶ added in v1.2.0
type StreamQueueMessage struct { RequestID string ClientID string Channel string // contains filtered or unexported fields }
func (*StreamQueueMessage) AddTrace ¶ added in v1.2.0
func (req *StreamQueueMessage) AddTrace(name string) *Trace
AddTrace - add tracing support to stream receive queue message request
func (*StreamQueueMessage) Close ¶ added in v1.2.0
func (req *StreamQueueMessage) Close()
Close - end stream of queue messages and cancel all pending operations
func (*StreamQueueMessage) Next ¶ added in v1.2.0
func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)
Next - receive queue messages request , waiting for response or timeout
func (*StreamQueueMessage) ResendWithNewMessage ¶ added in v1.2.0
func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error
ResendWithNewMessage - resend the current received message to a new channel
func (*StreamQueueMessage) SetChannel ¶ added in v1.2.0
func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage
SetChannel - set stream queue message request channel - mandatory if default channel was not set
func (*StreamQueueMessage) SetClientId ¶ added in v1.2.0
func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage
SetClientId - set stream queue message request ClientId - mandatory if default client was not set
func (*StreamQueueMessage) SetId ¶ added in v1.2.0
func (req *StreamQueueMessage) SetId(id string) *StreamQueueMessage
SetId - set stream queue message request id, otherwise new random uuid will be set
type SubscriptionOption ¶
type SubscriptionOption interface {
// contains filtered or unexported methods
}
func StartFromFirstEvent ¶
func StartFromFirstEvent() SubscriptionOption
StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point
func StartFromLastEvent ¶
func StartFromLastEvent() SubscriptionOption
StartFromLastEvent - replay last event and continue stream new events from this point
func StartFromNewEvents ¶
func StartFromNewEvents() SubscriptionOption
StartFromNewEvents - start event store subscription with only new events
func StartFromSequence ¶
func StartFromSequence(sequence int) SubscriptionOption
StartFromSequence - replay events from specific event sequence number and continue stream new events from this point
func StartFromTime ¶
func StartFromTime(since time.Time) SubscriptionOption
StartFromTime - replay events from specific time continue stream new events from this point
func StartFromTimeDelta ¶
func StartFromTimeDelta(delta time.Duration) SubscriptionOption
StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point
type Trace ¶ added in v1.2.0
type Trace struct { Name string // contains filtered or unexported fields }
func CreateTrace ¶ added in v1.2.0
func (*Trace) AddAnnotation ¶ added in v1.2.0
func (*Trace) AddBoolAttribute ¶ added in v1.2.0
func (*Trace) AddInt64Attribute ¶ added in v1.2.0
type Transport ¶
type Transport interface { Ping(ctx context.Context) (*ServerInfo, error) SendEvent(ctx context.Context, event *Event) error StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error) SubscribeToEvents(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error) SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error) SubscribeToEventsStore(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error) SendCommand(ctx context.Context, command *Command) (*CommandResponse, error) SubscribeToCommands(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error) SendQuery(ctx context.Context, query *Query) (*QueryResponse, error) SubscribeToQueries(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error) SendResponse(ctx context.Context, response *Response) error SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error) ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error) StreamQueueMessage(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, resCh chan *pb.StreamQueueMessagesResponse, errCh chan error, doneCh chan bool) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error) GetGRPCRawClient() (pb.KubemqClient, error) Close() error }
type TransportType ¶
type TransportType int
const ( TransportTypeGRPC TransportType = iota TransportTypeRest )