Documentation
¶
Index ¶
- type Client
- func (c *Client) C() *Command
- func (c *Client) Close() error
- func (c *Client) E() *Event
- func (c *Client) ES() *EventStore
- func (c *Client) Q() *Query
- func (c *Client) R() *Response
- func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
- func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, ...)
- func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)
- func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)
- func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, ...) (<-chan *EventStoreReceive, error)
- func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)
- type Command
- func (c *Command) Send(ctx context.Context) (*CommandResponse, error)
- func (c *Command) SetBody(body []byte) *Command
- func (c *Command) SetChannel(channel string) *Command
- func (c *Command) SetClientId(clientId string) *Command
- func (c *Command) SetId(id string) *Command
- func (c *Command) SetMetadata(metadata string) *Command
- func (c *Command) SetTimeout(timeout time.Duration) *Command
- type CommandReceive
- type CommandResponse
- type Event
- type EventStore
- func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)
- func (es *EventStore) SetBody(body []byte) *EventStore
- func (es *EventStore) SetChannel(channel string) *EventStore
- func (es *EventStore) SetClientId(clientId string) *EventStore
- func (es *EventStore) SetId(id string) *EventStore
- func (es *EventStore) SetMetadata(metadata string) *EventStore
- type EventStoreReceive
- type EventStoreResult
- type Option
- func WithAddress(host string, port int) Option
- func WithClientId(id string) Option
- func WithCredentials(certFile, serverOverrideDomain string) Option
- func WithDefaultCacheTTL(ttl time.Duration) Option
- func WithDefualtChannel(channel string) Option
- func WithReceiveBufferSize(size int) Option
- func WithToken(token string) Option
- func WithTransportType(transportType TransportType) Option
- type Options
- type Query
- func (q *Query) Send(ctx context.Context) (*QueryResponse, error)
- func (q *Query) SetBody(body []byte) *Query
- func (q *Query) SetCacheKey(cacheKey string) *Query
- func (q *Query) SetCacheTTL(ttl time.Duration) *Query
- func (q *Query) SetChannel(channel string) *Query
- func (q *Query) SetClientId(clientId string) *Query
- func (q *Query) SetId(id string) *Query
- func (q *Query) SetMetadata(metadata string) *Query
- func (q *Query) SetTimeout(timeout time.Duration) *Query
- type QueryReceive
- type QueryResponse
- type Response
- func (r *Response) Send(ctx context.Context) error
- func (r *Response) SetBody(body []byte) *Response
- func (r *Response) SetClientId(clientId string) *Response
- func (r *Response) SetError(err error) *Response
- func (r *Response) SetExecutedAt(executedAt time.Time) *Response
- func (r *Response) SetMetadata(metadata string) *Response
- func (r *Response) SetRequestId(id string) *Response
- func (r *Response) SetResponseTo(channel string) *Response
- type SubscriptionOption
- func StartFromFirstEvent() SubscriptionOption
- func StartFromLastEvent() SubscriptionOption
- func StartFromNewEvents() SubscriptionOption
- func StartFromSequence(sequence int) SubscriptionOption
- func StartFromTime(since time.Time) SubscriptionOption
- func StartFromTimeDelta(delta time.Duration) SubscriptionOption
- type Transport
- type TransportType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) StreamEvents ¶
StreamEvents - send stream of events in a single call
func (*Client) StreamEventsStore ¶
func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
StreamEventsStore - send stream of events store in a single call
func (*Client) SubscribeToCommands ¶
func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)
SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error
func (*Client) SubscribeToEvents ¶
func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)
SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error
func (*Client) SubscribeToEventsStore ¶
func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)
SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error
func (*Client) SubscribeToQueries ¶
func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)
SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error
type Command ¶
type Command struct { Id string Channel string Metadata string Body []byte Timeout time.Duration ClientId string // contains filtered or unexported fields }
func (*Command) Send ¶
func (c *Command) Send(ctx context.Context) (*CommandResponse, error)
Send - sending command , waiting for response or timeout
func (*Command) SetChannel ¶
SetChannel - set command channel - mandatory if default channel was not set
func (*Command) SetClientId ¶
SetClientId - set command ClientId - mandatory if default client was not set
func (*Command) SetMetadata ¶
SetMetadata - set command metadata - mandatory if body field is empty
type CommandReceive ¶
type CommandResponse ¶
type Event ¶
type Event struct { Id string Channel string Metadata string Body []byte ClientId string // contains filtered or unexported fields }
func (*Event) SetChannel ¶
SetChannel - set event channel - mandatory if default channel was not set
func (*Event) SetClientId ¶
SetClientId - set event ClientId - mandatory if default client was not set
func (*Event) SetMetadata ¶
SetMetadata - set event metadata - mandatory if body field was not set
type EventStore ¶
type EventStore struct { Id string Channel string Metadata string Body []byte ClientId string // contains filtered or unexported fields }
func (*EventStore) Send ¶
func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)
func (*EventStore) SetBody ¶
func (es *EventStore) SetBody(body []byte) *EventStore
SetBody - set event store body - mandatory if metadata field was not set
func (*EventStore) SetChannel ¶
func (es *EventStore) SetChannel(channel string) *EventStore
SetChannel - set event store channel - mandatory if default channel was not set
func (*EventStore) SetClientId ¶
func (es *EventStore) SetClientId(clientId string) *EventStore
SetClientId - set event store ClientId - mandatory if default client was not set
func (*EventStore) SetId ¶
func (es *EventStore) SetId(id string) *EventStore
SetId - set event store id otherwise new random uuid will be set
func (*EventStore) SetMetadata ¶
func (es *EventStore) SetMetadata(metadata string) *EventStore
SetMetadata - set event store metadata - mandatory if body field was not set
type EventStoreReceive ¶
type EventStoreResult ¶
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithAddress ¶
WithAddress - set host and port address of KubeMQ server
func WithClientId ¶
WithClientId - set client id to be used in all functions call with this client - mandatory
func WithCredentials ¶
WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithDefaultCacheTTL ¶
WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value
func WithDefualtChannel ¶
WithDefaultChannel - set default channel for any outbound requests
func WithReceiveBufferSize ¶
WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions
func WithToken ¶
WithToken - set KubeMQ token to be used for KubeMQ connection - not mandatory, only if enforced by the KubeMQ server
func WithTransportType ¶
func WithTransportType(transportType TransportType) Option
WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
func GetDefaultOptions ¶
func GetDefaultOptions() *Options
type Query ¶
type Query struct { Id string Channel string Metadata string Body []byte Timeout time.Duration ClientId string CacheKey string CacheTTL time.Duration // contains filtered or unexported fields }
func (*Query) Send ¶
func (q *Query) Send(ctx context.Context) (*QueryResponse, error)
Send - sending query request , waiting for response or timeout
func (*Query) SetCacheKey ¶
SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests
func (*Query) SetCacheTTL ¶
SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set
func (*Query) SetChannel ¶
SetChannel - set query channel - mandatory if default channel was not set
func (*Query) SetClientId ¶
SetClientId - set query ClientId - mandatory if default client was not set
func (*Query) SetMetadata ¶
SetMetadata - set query metadata - mandatory if body field is empty
type QueryReceive ¶
type QueryResponse ¶
type Response ¶
type Response struct { RequestId string ResponseTo string Metadata string Body []byte ClientId string ExecutedAt time.Time Err error // contains filtered or unexported fields }
func (*Response) SetClientId ¶
SetClientID - set clientId response, if not set default clientId will be used
func (*Response) SetExecutedAt ¶
SetExecutedAt - set query or command execution time
func (*Response) SetMetadata ¶
SetMetadata - set metadata response, for query only
func (*Response) SetRequestId ¶
SetId - set response corresponded requestId - mandatory
func (*Response) SetResponseTo ¶
SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory
type SubscriptionOption ¶
type SubscriptionOption interface {
// contains filtered or unexported methods
}
func StartFromFirstEvent ¶
func StartFromFirstEvent() SubscriptionOption
StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point
func StartFromLastEvent ¶
func StartFromLastEvent() SubscriptionOption
StartFromLastEvent - replay last event and continue stream new events from this point
func StartFromNewEvents ¶
func StartFromNewEvents() SubscriptionOption
StartFromNewEvents - start event store subscription with only new events
func StartFromSequence ¶
func StartFromSequence(sequence int) SubscriptionOption
StartFromSequence - replay events from specific event sequence number and continue stream new events from this point
func StartFromTime ¶
func StartFromTime(since time.Time) SubscriptionOption
StartFromTime - replay events from specific time continue stream new events from this point
func StartFromTimeDelta ¶
func StartFromTimeDelta(delta time.Duration) SubscriptionOption
StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point
type Transport ¶
type Transport interface { SendEvent(ctx context.Context, event *Event) error StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error) SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error) SendCommand(ctx context.Context, command *Command) (*CommandResponse, error) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error) SendQuery(ctx context.Context, query *Query) (*QueryResponse, error) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error) SendResponse(ctx context.Context, response *Response) error Close() error }