kubemq

package module
v1.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2021 License: Apache-2.0 Imports: 19 Imported by: 207

README

KubeMQ Go SDK

KubeMQ is an enterprise-grade message queue and broker for containers, designed for any workload and architecture running in Kubernetes. This library is Go implementation of KubeMQ client connection.

Install KubeMQ Cluster/Server

Every installation method requires a KubeMQ key. Please register to obtain your KubeMQ key.

Kubernetes
Option 1

Install KubeMQ cluster on any Kubernetes cluster.

Step 1:

kubectl apply -f https://deploy.kubemq.io/init

Step 2:

kubectl apply -f https://deploy.kubemq.io/key/{{your key}}
Option 2

Build and Deploy KubeMQ Cluster with advanced configurations - Build & Deploy

Port-Forward KubeMQ Grpc Interface

Use kubectl to port-forward kubemq grpc interface

kubectl port-forward svc/kubemq-cluster-grpc 50000:50000 -n kubemq
Docker

Pull and run KubeMQ standalone docker container:

docker run -d -p 8080:8080 -p 50000:50000 -p 9090:9090 KEY={{yourkey}} kubemq/kubemq-standalone:latest
Binaries

KubeMQ standalone binaries are available for Edge locations and for local development.

Steps:

  1. Download the latest version of KubeMQ standalone from Releases
  2. Unpack the downloaded archive
  3. Run kubemq -k {{your key}} (A key is needed for the first time only)

Install KubeMQ Go SDK

go get github.com/kubemq-io/kubemq-go

Learn KubeMQ

Visit our Extensive KubeMQ Documentation.

Examples - Cookbook Recipes

Please visit our cookbook repository

Support

if you encounter any issues, please open an issue here, In addition, you can reach us for support by:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTransportDefined    = errors.New("no transport layer defined, create object with client instance")
	ErrNoTransportConnection = errors.New("no transport layer established, aborting")
)

Functions

This section is empty.

Types

type AckAllQueueMessagesRequest added in v1.2.0

type AckAllQueueMessagesRequest struct {
	RequestID       string
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func (*AckAllQueueMessagesRequest) AddTrace added in v1.2.0

func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to ack all receive queue message request

func (*AckAllQueueMessagesRequest) Complete added in v1.5.0

func (*AckAllQueueMessagesRequest) Send added in v1.2.0

Send - sending receive queue messages request , waiting for response or timeout

func (*AckAllQueueMessagesRequest) SetChannel added in v1.2.0

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllQueueMessagesRequest) SetClientId added in v1.2.0

func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest

SetClientId - set ack all queue message request ClientId - mandatory if default client was not set

func (*AckAllQueueMessagesRequest) SetId added in v1.2.0

SetId - set ack all queue message request id, otherwise new random uuid will be set

func (*AckAllQueueMessagesRequest) SetWaitTimeSeconds added in v1.2.0

func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

func (*AckAllQueueMessagesRequest) Validate added in v1.5.0

func (req *AckAllQueueMessagesRequest) Validate() error

type AckAllQueueMessagesResponse added in v1.2.0

type AckAllQueueMessagesResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type Client

type Client struct {
	ServerInfo *ServerInfo
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, op ...Option) (*Client, error)

NewClient - create client instance to be use to communicate with KubeMQ server

func (*Client) AQM added in v1.2.0

AQM - create an empty ack all receive queue messages request object

func (*Client) AckAllQueueMessages added in v1.2.0

func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)

AckAllQueueMessages - send ack all messages in queue

func (*Client) C

func (c *Client) C() *Command

C - create an empty command object

func (*Client) Close

func (c *Client) Close() error

Close - closing client connection. any on going transactions will be aborted

func (*Client) E

func (c *Client) E() *Event

E - create an empty event object

func (*Client) ES

func (c *Client) ES() *EventStore

ES - create an empty event store object

func (*Client) NewAckAllQueueMessagesRequest added in v1.2.0

func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest

NewAckAllQueueMessagesRequest - create an empty ack all receive queue messages request object

func (*Client) NewCommand added in v1.2.0

func (c *Client) NewCommand() *Command

NewCommand - create an empty command

func (*Client) NewEvent added in v1.2.0

func (c *Client) NewEvent() *Event

NewEvent - create an empty event

func (*Client) NewEventStore added in v1.2.0

func (c *Client) NewEventStore() *EventStore

NewEventStore- create an empty event store

func (*Client) NewQuery added in v1.2.0

func (c *Client) NewQuery() *Query

NewQuery - create an empty query

func (*Client) NewQueueMessage added in v1.2.0

func (c *Client) NewQueueMessage() *QueueMessage

NewQueueMessage - create an empty queue messages

func (*Client) NewQueueMessages added in v1.2.0

func (c *Client) NewQueueMessages() *QueueMessages

NewQueueMessages - create an empty queue messages array

func (*Client) NewReceiveQueueMessagesRequest added in v1.2.0

func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

NewReceiveQueueMessagesRequest - create an empty receive queue message request object

func (*Client) NewResponse added in v1.2.0

func (c *Client) NewResponse() *Response

NewResponse - create an empty response

func (*Client) NewStreamQueueMessage added in v1.2.0

func (c *Client) NewStreamQueueMessage() *StreamQueueMessage

NewStreamQueueMessage - create an empty stream receive queue message object

func (*Client) Ping added in v1.3.5

func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)

Ping - get status of current connection

func (*Client) Q

func (c *Client) Q() *Query

Q - create an empty query object

func (*Client) QM added in v1.2.0

func (c *Client) QM() *QueueMessage

QM - create an empty queue message object

func (*Client) QMB added in v1.2.0

func (c *Client) QMB() *QueueMessages

QMB - create an empty queue message array object

func (*Client) QueuesInfo added in v1.7.0

func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

QueuesInfo - get queues detailed information

func (*Client) R

func (c *Client) R() *Response

R - create an empty response object for command or query responses

func (*Client) RQM added in v1.2.0

RQM - create an empty receive queue message request object

func (*Client) ReceiveQueueMessages added in v1.2.0

ReceiveQueueMessages - call to receive messages from a queue

func (*Client) SQM added in v1.2.0

func (c *Client) SQM() *StreamQueueMessage

SQM - create an empty stream receive queue message object

func (*Client) SendQueueMessage added in v1.2.0

func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)

SendQueueMessage - send single queue message

func (*Client) SendQueueMessages added in v1.2.0

func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)

SendQueueMessages - send multiple queue messages

func (*Client) SetCommand added in v1.4.0

func (c *Client) SetCommand(cmd *Command) *Command

func (*Client) SetEvent added in v1.4.0

func (c *Client) SetEvent(e *Event) *Event

func (*Client) SetEventStore added in v1.4.0

func (c *Client) SetEventStore(es *EventStore) *EventStore

func (*Client) SetQuery added in v1.4.0

func (c *Client) SetQuery(query *Query) *Query

func (*Client) SetQueueMessage added in v1.4.0

func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage

func (*Client) SetResponse added in v1.4.0

func (c *Client) SetResponse(response *Response) *Response

func (*Client) StreamEvents

func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)

StreamEvents - send stream of events in a single call

func (*Client) StreamEventsStore

func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)

StreamEventsStore - send stream of events store in a single call

func (*Client) SubscribeToCommands

func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToCommandsWithRequest added in v1.5.0

func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToEvents

func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToEventsStore

func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)

SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsStoreWithRequest added in v1.5.0

func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)

SubscribeToEventsStoreWithRequest - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsWithRequest added in v1.5.0

func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToQueries

func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

func (*Client) SubscribeToQueriesWithRequest added in v1.5.0

func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

type Command

type Command struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewCommand added in v1.4.0

func NewCommand() *Command

func (*Command) AddTag added in v1.2.0

func (c *Command) AddTag(key, value string) *Command

AddTag - add key value tags to command message

func (*Command) AddTrace added in v1.2.0

func (c *Command) AddTrace(name string) *Trace

AddTrace - add tracing support to command

func (*Command) Send

func (c *Command) Send(ctx context.Context) (*CommandResponse, error)

Send - sending command , waiting for response or timeout

func (*Command) SetBody

func (c *Command) SetBody(body []byte) *Command

SetBody - set command body - mandatory if metadata field is empty

func (*Command) SetChannel

func (c *Command) SetChannel(channel string) *Command

SetChannel - set command channel - mandatory if default channel was not set

func (*Command) SetClientId

func (c *Command) SetClientId(clientId string) *Command

SetClientId - set command ClientId - mandatory if default client was not set

func (*Command) SetId

func (c *Command) SetId(id string) *Command

SetId - set command requestId, otherwise new random uuid will be set

func (*Command) SetMetadata

func (c *Command) SetMetadata(metadata string) *Command

SetMetadata - set command metadata - mandatory if body field is empty

func (*Command) SetTags added in v1.4.1

func (c *Command) SetTags(tags map[string]string) *Command

SetTags - set key value tags to command message

func (*Command) SetTimeout

func (c *Command) SetTimeout(timeout time.Duration) *Command

SetTimeout - set timeout for command to be returned. if timeout expired , send command will result with an error

type CommandReceive

type CommandReceive struct {
	Id         string
	ClientId   string
	Channel    string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type CommandResponse

type CommandResponse struct {
	CommandId        string
	ResponseClientId string
	Executed         bool
	ExecutedAt       time.Time
	Error            string
	Tags             map[string]string
}

type CommandsClient added in v1.5.0

type CommandsClient struct {
	// contains filtered or unexported fields
}

func NewCommandsClient added in v1.5.0

func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error)

func (*CommandsClient) Close added in v1.5.0

func (c *CommandsClient) Close() error

func (*CommandsClient) Response added in v1.5.0

func (c *CommandsClient) Response(ctx context.Context, response *Response) error

func (*CommandsClient) Send added in v1.5.0

func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)

func (*CommandsClient) Subscribe added in v1.5.0

func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error

type CommandsSubscription added in v1.5.0

type CommandsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*CommandsSubscription) Complete added in v1.5.0

func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription

func (*CommandsSubscription) Validate added in v1.5.0

func (cs *CommandsSubscription) Validate() error

type Event

type Event struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEvent added in v1.4.0

func NewEvent() *Event

func (*Event) AddTag added in v1.2.0

func (e *Event) AddTag(key, value string) *Event

AddTag - add key value tags to event message

func (*Event) Send

func (e *Event) Send(ctx context.Context) error

func (*Event) SetBody

func (e *Event) SetBody(body []byte) *Event

SetBody - set event body - mandatory if metadata field was not set

func (*Event) SetChannel

func (e *Event) SetChannel(channel string) *Event

SetChannel - set event channel - mandatory if default channel was not set

func (*Event) SetClientId

func (e *Event) SetClientId(clientId string) *Event

SetClientId - set event ClientId - mandatory if default client was not set

func (*Event) SetId

func (e *Event) SetId(id string) *Event

SetId - set event id otherwise new random uuid will be set

func (*Event) SetMetadata

func (e *Event) SetMetadata(metadata string) *Event

SetMetadata - set event metadata - mandatory if body field was not set

func (*Event) SetTags added in v1.4.1

func (e *Event) SetTags(tags map[string]string) *Event

SetTags - set key value tags to event message

type EventStore

type EventStore struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEventStore added in v1.4.0

func NewEventStore() *EventStore

func (*EventStore) AddTag added in v1.2.0

func (es *EventStore) AddTag(key, value string) *EventStore

AddTag - add key value tags to event store message

func (*EventStore) Send

func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)

Send - sending events store message

func (*EventStore) SetBody

func (es *EventStore) SetBody(body []byte) *EventStore

SetBody - set event store body - mandatory if metadata field was not set

func (*EventStore) SetChannel

func (es *EventStore) SetChannel(channel string) *EventStore

SetChannel - set event store channel - mandatory if default channel was not set

func (*EventStore) SetClientId

func (es *EventStore) SetClientId(clientId string) *EventStore

SetClientId - set event store ClientId - mandatory if default client was not set

func (*EventStore) SetId

func (es *EventStore) SetId(id string) *EventStore

SetId - set event store id otherwise new random uuid will be set

func (*EventStore) SetMetadata

func (es *EventStore) SetMetadata(metadata string) *EventStore

SetMetadata - set event store metadata - mandatory if body field was not set

func (*EventStore) SetTags added in v1.4.1

func (es *EventStore) SetTags(tags map[string]string) *EventStore

SetTags - set key value tags to event store message

type EventStoreReceive

type EventStoreReceive struct {
	Id        string
	Sequence  uint64
	Timestamp time.Time
	Channel   string
	Metadata  string
	Body      []byte
	ClientId  string
	Tags      map[string]string
}

type EventStoreResult

type EventStoreResult struct {
	Id   string
	Sent bool
	Err  error
}

type EventsClient added in v1.5.0

type EventsClient struct {
	// contains filtered or unexported fields
}

func NewEventsClient added in v1.5.0

func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error)

func (*EventsClient) Close added in v1.5.0

func (e *EventsClient) Close() error

func (*EventsClient) Send added in v1.5.0

func (e *EventsClient) Send(ctx context.Context, message *Event) error

func (*EventsClient) Stream added in v1.5.0

func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)

func (*EventsClient) Subscribe added in v1.5.0

func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error

type EventsErrorsHandler added in v1.5.0

type EventsErrorsHandler func(error)

type EventsMessageHandler added in v1.5.0

type EventsMessageHandler func(*Event)

type EventsStoreClient added in v1.5.0

type EventsStoreClient struct {
	// contains filtered or unexported fields
}

func NewEventsStoreClient added in v1.5.0

func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error)

func (*EventsStoreClient) Close added in v1.5.0

func (es *EventsStoreClient) Close() error

func (*EventsStoreClient) Send added in v1.5.0

func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)

func (*EventsStoreClient) Stream added in v1.5.0

func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)

func (*EventsStoreClient) Subscribe added in v1.5.0

func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error

type EventsStoreSubscription added in v1.5.0

type EventsStoreSubscription struct {
	Channel          string
	Group            string
	ClientId         string
	SubscriptionType SubscriptionOption
}

func (*EventsStoreSubscription) Complete added in v1.5.0

func (*EventsStoreSubscription) Validate added in v1.5.0

func (es *EventsStoreSubscription) Validate() error

type EventsSubscription added in v1.5.0

type EventsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*EventsSubscription) Complete added in v1.5.0

func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription

func (*EventsSubscription) Validate added in v1.5.0

func (es *EventsSubscription) Validate() error

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken added in v1.3.2

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect added in v1.4.0

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate added in v1.3.1

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection added in v1.4.0

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on client create

func WithClientId

func WithClientId(id string) Option

WithClientId - set client id to be used in all functions call with this client - mandatory

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel added in v1.3.1

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects added in v1.4.0

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval added in v1.4.0

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

func WithTransportType

func WithTransportType(transportType TransportType) Option

WithTransportType - set client transport type, currently GRPC or Rest

func WithUri added in v1.2.0

func WithUri(uri string) Option

WithUriAddress - set uri address of KubeMQ server

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate added in v1.2.0

func (o *Options) Validate() error

type QueriesClient added in v1.5.0

type QueriesClient struct {
	// contains filtered or unexported fields
}

func NewQueriesClient added in v1.5.0

func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error)

func (*QueriesClient) Close added in v1.5.0

func (q *QueriesClient) Close() error

func (*QueriesClient) Response added in v1.5.0

func (q *QueriesClient) Response(ctx context.Context, response *Response) error

func (*QueriesClient) Send added in v1.5.0

func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)

func (*QueriesClient) Subscribe added in v1.5.0

func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, onQueryReceive func(query *QueryReceive, err error)) error

type QueriesSubscription added in v1.5.0

type QueriesSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*QueriesSubscription) Complete added in v1.5.0

func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription

func (*QueriesSubscription) Validate added in v1.5.0

func (qs *QueriesSubscription) Validate() error

type Query

type Query struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	CacheKey string
	CacheTTL time.Duration
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewQuery added in v1.4.0

func NewQuery() *Query

func (*Query) AddTag added in v1.2.0

func (q *Query) AddTag(key, value string) *Query

AddTag - add key value tags to query message

func (*Query) AddTrace added in v1.2.0

func (q *Query) AddTrace(name string) *Trace

AddTrace - add tracing support to query

func (*Query) Send

func (q *Query) Send(ctx context.Context) (*QueryResponse, error)

Send - sending query request , waiting for response or timeout

func (*Query) SetBody

func (q *Query) SetBody(body []byte) *Query

SetBody - set query body - mandatory if metadata field is empty

func (*Query) SetCacheKey

func (q *Query) SetCacheKey(cacheKey string) *Query

SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests

func (*Query) SetCacheTTL

func (q *Query) SetCacheTTL(ttl time.Duration) *Query

SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set

func (*Query) SetChannel

func (q *Query) SetChannel(channel string) *Query

SetChannel - set query channel - mandatory if default channel was not set

func (*Query) SetClientId

func (q *Query) SetClientId(clientId string) *Query

SetClientId - set query ClientId - mandatory if default client was not set

func (*Query) SetId

func (q *Query) SetId(id string) *Query

SetId - set query requestId, otherwise new random uuid will be set

func (*Query) SetMetadata

func (q *Query) SetMetadata(metadata string) *Query

SetMetadata - set query metadata - mandatory if body field is empty

func (*Query) SetTags added in v1.4.1

func (q *Query) SetTags(tags map[string]string) *Query

SetTags - set key value tags to query message

func (*Query) SetTimeout

func (q *Query) SetTimeout(timeout time.Duration) *Query

SetTimeout - set timeout for query to be returned. if timeout expired , send query will result with an error

type QueryReceive

type QueryReceive struct {
	Id         string
	Channel    string
	ClientId   string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type QueryResponse

type QueryResponse struct {
	QueryId          string
	Executed         bool
	ExecutedAt       time.Time
	Metadata         string
	ResponseClientId string
	Body             []byte
	CacheHit         bool
	Error            string
	Tags             map[string]string
}

type QueueInfo added in v1.7.0

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage added in v1.2.0

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage added in v1.4.0

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack added in v1.2.0

func (qm *QueueMessage) Ack() error

ack - sending ack queue message in stream queue message mode

func (*QueueMessage) AddTag added in v1.2.0

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (*QueueMessage) AddTrace added in v1.2.0

func (qm *QueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to queue message

func (*QueueMessage) ExtendVisibility added in v1.2.0

func (qm *QueueMessage) ExtendVisibility(value int32) error

ExtendVisibility - extend the visibility time for the current receive message

func (*QueueMessage) Reject added in v1.2.0

func (qm *QueueMessage) Reject() error

reject - sending reject queue message in stream queue message mode

func (*QueueMessage) Resend added in v1.2.0

func (qm *QueueMessage) Resend(channel string) error

Resend - sending resend

func (*QueueMessage) Send added in v1.2.0

Send - sending queue message request , waiting for response or timeout

func (*QueueMessage) SetBody added in v1.2.0

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel added in v1.2.0

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message channel - mandatory if default channel was not set

func (*QueueMessage) SetClientId added in v1.2.0

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default client was not set

func (*QueueMessage) SetId added in v1.2.0

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata added in v1.2.0

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds added in v1.2.0

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds added in v1.2.0

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount added in v1.2.0

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue added in v1.2.0

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags added in v1.4.1

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueueMessageAttributes added in v1.2.0

type QueueMessageAttributes struct {
	Timestamp         int64
	Sequence          uint64
	MD5OfBody         string
	ReceiveCount      int32
	ReRouted          bool
	ReRoutedFromQueue string
	ExpirationAt      int64
	DelayedTo         int64
}

type QueueMessagePolicy added in v1.2.0

type QueueMessagePolicy struct {
	ExpirationSeconds int32
	DelaySeconds      int32
	MaxReceiveCount   int32
	MaxReceiveQueue   string
}

type QueueMessages added in v1.2.0

type QueueMessages struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (*QueueMessages) Add added in v1.2.0

func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages

Add - adding new queue message to array of messages

func (*QueueMessages) Send added in v1.2.0

Send - sending queue messages array request , waiting for response or timeout

type QueueTransactionMessageRequest added in v1.5.0

type QueueTransactionMessageRequest struct {
	RequestID         string
	ClientID          string
	Channel           string
	VisibilitySeconds int32
	WaitTimeSeconds   int32
}

func NewQueueTransactionMessageRequest added in v1.5.0

func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest

func (*QueueTransactionMessageRequest) Complete added in v1.5.0

func (*QueueTransactionMessageRequest) SetChannel added in v1.5.0

SetChannel - set receive queue transaction message request channel - mandatory if default channel was not set

func (*QueueTransactionMessageRequest) SetClientId added in v1.5.0

SetClientId - set receive queue transaction message request ClientId - mandatory if default client was not set

func (*QueueTransactionMessageRequest) SetId added in v1.5.0

SetId - set receive queue transaction message request id, otherwise new random uuid will be set

func (*QueueTransactionMessageRequest) SetVisibilitySeconds added in v1.5.0

func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest

SetVisibilitySeconds - set receive queue transaction message visibility seconds for hiding message from other clients during processing

func (*QueueTransactionMessageRequest) SetWaitTimeSeconds added in v1.5.0

func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest

SetWaitTimeSeconds - set receive queue transaction message request wait timout for receiving queue message

func (*QueueTransactionMessageRequest) Validate added in v1.5.0

func (req *QueueTransactionMessageRequest) Validate() error

type QueueTransactionMessageResponse added in v1.5.0

type QueueTransactionMessageResponse struct {
	Message *QueueMessage
	// contains filtered or unexported fields
}

func (*QueueTransactionMessageResponse) Ack added in v1.5.0

func (*QueueTransactionMessageResponse) ExtendVisibilitySeconds added in v1.5.0

func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error

func (*QueueTransactionMessageResponse) Reject added in v1.5.0

func (*QueueTransactionMessageResponse) Resend added in v1.5.0

func (qt *QueueTransactionMessageResponse) Resend(channel string) error

func (*QueueTransactionMessageResponse) ResendNewMessage added in v1.5.0

func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error

type QueuesClient added in v1.5.0

type QueuesClient struct {
	// contains filtered or unexported fields
}

func NewQueuesStreamClient added in v1.7.1

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error)

func (*QueuesClient) AckAll added in v1.5.0

func (*QueuesClient) Batch added in v1.5.0

func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)

func (*QueuesClient) Close added in v1.5.0

func (q *QueuesClient) Close() error

func (*QueuesClient) Peek added in v1.5.0

func (*QueuesClient) Pull added in v1.5.0

func (*QueuesClient) QueuesInfo added in v1.7.0

func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

func (*QueuesClient) Send added in v1.5.0

func (*QueuesClient) Subscribe added in v1.5.0

func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, onReceive func(response *ReceiveQueueMessagesResponse, err error)) (chan struct{}, error)

func (*QueuesClient) Transaction added in v1.5.0

func (*QueuesClient) TransactionStream added in v1.5.0

func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, onReceive func(response *QueueTransactionMessageResponse, err error)) (chan struct{}, error)

type QueuesInfo added in v1.7.0

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type ReceiveQueueMessagesRequest added in v1.2.0

type ReceiveQueueMessagesRequest struct {
	RequestID           string
	ClientID            string
	Channel             string
	MaxNumberOfMessages int32
	WaitTimeSeconds     int32
	IsPeak              bool
	// contains filtered or unexported fields
}

func NewReceiveQueueMessagesRequest added in v1.5.0

func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

func (*ReceiveQueueMessagesRequest) AddTrace added in v1.2.0

func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to receive queue message request

func (*ReceiveQueueMessagesRequest) Complete added in v1.5.0

func (*ReceiveQueueMessagesRequest) Send added in v1.2.0

Send - sending receive queue messages request , waiting for response or timeout

func (*ReceiveQueueMessagesRequest) SetChannel added in v1.2.0

SetChannel - set receive queue message request channel - mandatory if default channel was not set

func (*ReceiveQueueMessagesRequest) SetClientId added in v1.2.0

SetClientId - set receive queue message request ClientId - mandatory if default client was not set

func (*ReceiveQueueMessagesRequest) SetId added in v1.2.0

SetId - set receive queue message request id, otherwise new random uuid will be set

func (*ReceiveQueueMessagesRequest) SetIsPeak added in v1.2.0

SetIsPeak - set receive queue message request type, true - peaking at the queue and not actual dequeue , false - dequeue the queue

func (*ReceiveQueueMessagesRequest) SetMaxNumberOfMessages added in v1.2.0

func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest

SetMaxNumberOfMessages - set receive queue message request max number of messages to receive in single call

func (*ReceiveQueueMessagesRequest) SetWaitTimeSeconds added in v1.2.0

func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest

SetWaitTimeSeconds - set receive queue message request wait timout for receiving all requested messages

func (*ReceiveQueueMessagesRequest) Validate added in v1.5.0

func (req *ReceiveQueueMessagesRequest) Validate() error

type ReceiveQueueMessagesResponse added in v1.2.0

type ReceiveQueueMessagesResponse struct {
	RequestID        string
	Messages         []*QueueMessage
	MessagesReceived int32
	MessagesExpired  int32
	IsPeak           bool
	IsError          bool
	Error            string
}

type Response

type Response struct {
	RequestId  string
	ResponseTo string
	Metadata   string
	Body       []byte
	ClientId   string
	ExecutedAt time.Time
	Err        error
	Tags       map[string]string
	// contains filtered or unexported fields
}

func NewResponse added in v1.4.0

func NewResponse() *Response

func (*Response) AddTrace added in v1.2.0

func (r *Response) AddTrace(name string) *Trace

AddTrace - add tracing support to response

func (*Response) Send

func (r *Response) Send(ctx context.Context) error

Send - sending response to command or query request

func (*Response) SetBody

func (r *Response) SetBody(body []byte) *Response

SetMetadata - set body response, for query only

func (*Response) SetClientId

func (r *Response) SetClientId(clientId string) *Response

SetClientID - set clientId response, if not set default clientId will be used

func (*Response) SetError

func (r *Response) SetError(err error) *Response

SetError - set query or command execution error

func (*Response) SetExecutedAt

func (r *Response) SetExecutedAt(executedAt time.Time) *Response

SetExecutedAt - set query or command execution time

func (*Response) SetMetadata

func (r *Response) SetMetadata(metadata string) *Response

SetMetadata - set metadata response, for query only

func (*Response) SetRequestId

func (r *Response) SetRequestId(id string) *Response

SetId - set response corresponded requestId - mandatory

func (*Response) SetResponseTo

func (r *Response) SetResponseTo(channel string) *Response

SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory

func (*Response) SetTags added in v1.2.0

func (r *Response) SetTags(tags map[string]string) *Response

SetTags - set response tags

type SendQueueMessageResult added in v1.2.0

type SendQueueMessageResult struct {
	MessageID    string
	SentAt       int64
	ExpirationAt int64
	DelayedTo    int64
	IsError      bool
	Error        string
}

type ServerInfo added in v1.2.0

type ServerInfo struct {
	Host                string
	Version             string
	ServerStartTime     int64
	ServerUpTimeSeconds int64
}

type StreamQueueMessage added in v1.2.0

type StreamQueueMessage struct {
	RequestID string
	ClientID  string
	Channel   string
	// contains filtered or unexported fields
}

func (*StreamQueueMessage) AddTrace added in v1.2.0

func (req *StreamQueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to stream receive queue message request

func (*StreamQueueMessage) Close added in v1.2.0

func (req *StreamQueueMessage) Close()

Close - end stream of queue messages and cancel all pending operations

func (*StreamQueueMessage) Next added in v1.2.0

func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)

Next - receive queue messages request , waiting for response or timeout

func (*StreamQueueMessage) ResendWithNewMessage added in v1.2.0

func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error

ResendWithNewMessage - resend the current received message to a new channel

func (*StreamQueueMessage) SetChannel added in v1.2.0

func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage

SetChannel - set stream queue message request channel - mandatory if default channel was not set

func (*StreamQueueMessage) SetClientId added in v1.2.0

func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage

SetClientId - set stream queue message request ClientId - mandatory if default client was not set

func (*StreamQueueMessage) SetId added in v1.2.0

SetId - set stream queue message request id, otherwise new random uuid will be set

type SubscriptionOption

type SubscriptionOption interface {
	// contains filtered or unexported methods
}

func StartFromFirstEvent

func StartFromFirstEvent() SubscriptionOption

StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point

func StartFromLastEvent

func StartFromLastEvent() SubscriptionOption

StartFromLastEvent - replay last event and continue stream new events from this point

func StartFromNewEvents

func StartFromNewEvents() SubscriptionOption

StartFromNewEvents - start event store subscription with only new events

func StartFromSequence

func StartFromSequence(sequence int) SubscriptionOption

StartFromSequence - replay events from specific event sequence number and continue stream new events from this point

func StartFromTime

func StartFromTime(since time.Time) SubscriptionOption

StartFromTime - replay events from specific time continue stream new events from this point

func StartFromTimeDelta

func StartFromTimeDelta(delta time.Duration) SubscriptionOption

StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point

type Trace added in v1.2.0

type Trace struct {
	Name string
	// contains filtered or unexported fields
}

func CreateTrace added in v1.2.0

func CreateTrace(name string) *Trace

func (*Trace) AddAnnotation added in v1.2.0

func (t *Trace) AddAnnotation(timestamp time.Time, message string) *Trace

func (*Trace) AddBoolAttribute added in v1.2.0

func (t *Trace) AddBoolAttribute(key string, value bool) *Trace

func (*Trace) AddInt64Attribute added in v1.2.0

func (t *Trace) AddInt64Attribute(key string, value int64) *Trace

func (*Trace) AddStringAttribute added in v1.2.0

func (t *Trace) AddStringAttribute(key string, value string) *Trace

type Transport

type Transport interface {
	Ping(ctx context.Context) (*ServerInfo, error)
	SendEvent(ctx context.Context, event *Event) error
	StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
	SubscribeToEvents(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
	SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error)
	StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
	SubscribeToEventsStore(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
	SendCommand(ctx context.Context, command *Command) (*CommandResponse, error)
	SubscribeToCommands(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
	SendQuery(ctx context.Context, query *Query) (*QueryResponse, error)
	SubscribeToQueries(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
	SendResponse(ctx context.Context, response *Response) error
	SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
	SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
	ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
	AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
	StreamQueueMessage(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, resCh chan *pb.StreamQueueMessagesResponse, errCh chan error, doneCh chan bool)
	QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
	GetGRPCRawClient() (pb.KubemqClient, error)
	Close() error
}

type TransportType

type TransportType int
const (
	TransportTypeGRPC TransportType = iota
	TransportTypeRest
)

Directories

Path Synopsis
examples
pkg
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL