Documentation ¶
Index ¶
- func InGroups(set []string, size int, f func([]string) error) error
- func InterruptableSleep(ctx context.Context, d time.Duration) error
- type ChoriaClient
- type ChoriaFramework
- type Connector
- type DiscoveryEndFunc
- type DiscoveryStartFunc
- type Handler
- type NodeList
- type Option
- type RPC
- type RPCReply
- type RPCRequest
- type RequestOption
- func BroadcastRequest() RequestOption
- func Collective(c string) RequestOption
- func ConnectionName(n string) RequestOption
- func DirectRequest() RequestOption
- func DiscoveryEndCB(h DiscoveryEndFunc) RequestOption
- func DiscoveryStartCB(h DiscoveryStartFunc) RequestOption
- func DiscoveryTimeout(t time.Duration) RequestOption
- func Filter(f *protocol.Filter) RequestOption
- func InBatches(size int, sleep int) RequestOption
- func LimitMethod(m string) RequestOption
- func LimitSeed(s int64) RequestOption
- func LimitSize(s string) RequestOption
- func Protocol(v string) RequestOption
- func Replies(r chan *choria.ConnectorMessage) RequestOption
- func ReplyExprFilter(f string) RequestOption
- func ReplyHandler(f Handler) RequestOption
- func ReplyTo(r string) RequestOption
- func Targets(t []string) RequestOption
- func Timeout(t time.Duration) RequestOption
- func Workers(w int) RequestOption
- type RequestOptions
- type RequestResult
- type Stats
- func (s *Stats) Action() string
- func (s *Stats) Agent() string
- func (s *Stats) All() bool
- func (s *Stats) DiscoveredCount() int
- func (s *Stats) DiscoveredNodes() *[]string
- func (s *Stats) DiscoveryDuration() (time.Duration, error)
- func (s *Stats) End()
- func (s *Stats) EndDiscover()
- func (s *Stats) EndPublish()
- func (s *Stats) FailCount() int
- func (s *Stats) FailedRequestInc()
- func (s *Stats) Merge(other *Stats) error
- func (s *Stats) NoResponseFrom() []string
- func (s *Stats) OKCount() int
- func (s *Stats) OverrideDiscoveryTime(start time.Time, end time.Time)
- func (s *Stats) PassedRequestInc()
- func (s *Stats) PublishDuration() (time.Duration, error)
- func (s *Stats) RecordReceived(sender string)
- func (s *Stats) RequestDuration() (time.Duration, error)
- func (s *Stats) ResponsesCount() int
- func (s *Stats) SetAction(a string)
- func (s *Stats) SetAgent(a string)
- func (s *Stats) SetDiscoveredNodes(nodes []string)
- func (s *Stats) Start()
- func (s *Stats) StartDiscover()
- func (s *Stats) StartPublish()
- func (s *Stats) Started() time.Time
- func (s *Stats) UnexpectedResponseFrom() []string
- func (s *Stats) WaitingFor(nodes []string) bool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ChoriaClient ¶
type ChoriaClient interface {
Request(ctx context.Context, msg *choria.Message, handler cclient.Handler) (err error)
}
ChoriaClient implements the connection to the Choria network
type ChoriaFramework ¶
type ChoriaFramework interface { Logger(string) *logrus.Entry Configuration() *config.Config NewMessage(payload string, agent string, collective string, msgType string, request *choria.Message) (msg *choria.Message, err error) NewReplyFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Reply, err error) NewTransportFromJSON(data string) (message protocol.TransportMessage, err error) MiddlewareServers() (servers srvcache.Servers, err error) NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *logrus.Entry) (conn choria.Connector, err error) NewRequestID() (string, error) Certname() string PQLQueryCertNames(query string) ([]string, error) }
type Connector ¶
type Connector interface { QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *choria.ConnectorMessage) error Publish(msg *choria.Message) error }
Connector is a connection to the choria network
type DiscoveryEndFunc ¶
DiscoveryEndFunc gets called after discovery ends and include the discovered node count and what count of nodes will be targeted after limits were applied should this return error the RPC call will terminate
type DiscoveryStartFunc ¶
type DiscoveryStartFunc func()
DiscoveryStartFunc gets called before discovery starts
type NodeList ¶
NodeList is a list of nodes the client is interacting with and used to keep track of things like which have responded, still to respond etc
func (*NodeList) DeleteIfKnown ¶
DeleteIfKnown removes a node from the list if it's known, boolean result indicates if it was known
type Option ¶
type Option func(r *RPC)
Option configures the RPC client
func DiscoveryMethod ¶ added in v0.19.0
DiscoveryMethod sets a specific discovery method
type RPC ¶
type RPC struct {
// contains filtered or unexported fields
}
RPC is a MCollective compatible RPC client
func New ¶
func New(fw ChoriaFramework, agent string, opts ...Option) (rpc *RPC, err error)
New creates a new RPC request
A DDL is required when one is not given using the DDL() option as argument attempts will be made to find it on the file system should this fail an error will be returned
func (*RPC) Do ¶
func (r *RPC) Do(ctx context.Context, action string, payload interface{}, opts ...RequestOption) (RequestResult, error)
Do performs a RPC request and optionally processes replies
If a filter is supplied using the Filter() option and Targets() are not then discovery will be done for you using the broadcast method, should no nodes be discovered an error will be returned
type RPCReply ¶
type RPCReply struct { Statuscode mcorpc.StatusCode `json:"statuscode"` Statusmsg string `json:"statusmsg"` Data json.RawMessage `json:"data"` }
RPCReply is a basic RPC reply
func ParseReplyData ¶
ParseReplyData parses reply data and populates a Reply and custom Data
type RPCRequest ¶
type RPCRequest struct { Agent string `json:"agent"` Action string `json:"action"` Data json.RawMessage `json:"data"` }
RPCRequest is a basic RPC request
type RequestOption ¶
type RequestOption func(*RequestOptions)
RequestOption is a function capable of setting an option
func BroadcastRequest ¶
func BroadcastRequest() RequestOption
BroadcastRequest for the request to be a broadcast mode
**NOTE:** You need to ensure you have filters etc done
func Collective ¶
func Collective(c string) RequestOption
Collective sets the collective to target a message at
func ConnectionName ¶
func ConnectionName(n string) RequestOption
ConnectionName sets the prefix used for various connection names
Setting this when making many clients will minimize prometheus metrics being created - 2 or 3 per client which with random generated names will snowball over time
func DirectRequest ¶
func DirectRequest() RequestOption
DirectRequest force the request to be a direct request
func DiscoveryEndCB ¶
func DiscoveryEndCB(h DiscoveryEndFunc) RequestOption
DiscoveryEndCB sets the function to be called after discovery and node limiting
func DiscoveryStartCB ¶
func DiscoveryStartCB(h DiscoveryStartFunc) RequestOption
DiscoveryStartCB sets the function to be called before discovery starts
func DiscoveryTimeout ¶
func DiscoveryTimeout(t time.Duration) RequestOption
DiscoveryTimeout configures the request discovery timeout, defaults to configured discovery timeout
func Filter ¶
func Filter(f *protocol.Filter) RequestOption
Filter sets the filter, if its set discovery will be done prior to performing requests
func InBatches ¶
func InBatches(size int, sleep int) RequestOption
InBatches performs requests in batches
func LimitMethod ¶
func LimitMethod(m string) RequestOption
LimitMethod configures the method to use when limiting targets - "random" or "first"
func LimitSeed ¶
func LimitSeed(s int64) RequestOption
LimitSeed sets the random seed used to select targets when limiting and limit method is "random"
func LimitSize ¶
func LimitSize(s string) RequestOption
LimitSize sets limits on the targets, either a number of a percentage like "10%"
func Replies ¶
func Replies(r chan *choria.ConnectorMessage) RequestOption
Replies creates a custom channel for replies and will avoid processing them
func ReplyExprFilter ¶ added in v0.19.0
func ReplyExprFilter(f string) RequestOption
ReplyExprFilter filters reply by filter f, replies that match f will not be recorded and will not be passed to any handlers - they will count to received replies though as usual.
When this filter matches a reply and a handler is set the handler will be called using a nil 'rpcreply' allowing the handler to process progress bars and more
func ReplyHandler ¶
func ReplyHandler(f Handler) RequestOption
ReplyHandler configures a callback to be called for each message received
func ReplyTo ¶
func ReplyTo(r string) RequestOption
ReplyTo sets a custom reply to, else the connector will determine it
func Workers ¶
func Workers(w int) RequestOption
Workers configures the amount of workers used to process responses this is ignored during batched mode as that is always done with a single worker
type RequestOptions ¶
type RequestOptions struct { BatchSize int BatchSleep time.Duration Collective string ConnectionName string DiscoveryTimeout time.Duration Filter *protocol.Filter Handler Handler ProcessReplies bool ProtocolVersion string Replies chan *choria.ConnectorMessage ReplyTo string RequestID string RequestType string Targets []string Timeout time.Duration Workers int LimitSeed int64 LimitMethod string LimitSize string ReplyExprFilter string DiscoveryStartCB DiscoveryStartFunc DiscoveryEndCB DiscoveryEndFunc // contains filtered or unexported fields }
RequestOptions are options for a RPC request
func NewRequestOptions ¶
func NewRequestOptions(fw ChoriaFramework, ddl *agent.DDL) (*RequestOptions, error)
NewRequestOptions creates a initialized request options
func (*RequestOptions) ConfigureMessage ¶
func (o *RequestOptions) ConfigureMessage(msg *choria.Message) (err error)
ConfigureMessage configures a pre-made message object based on the settings contained
func (*RequestOptions) Stats ¶
func (o *RequestOptions) Stats() *Stats
Stats retrieves the stats for the completed request
type RequestResult ¶
type RequestResult interface {
Stats() *Stats
}
RequestResult is the result of a request
type Stats ¶
type Stats struct { RequestID string // contains filtered or unexported fields }
Stats represent stats for a request
func (*Stats) DiscoveredCount ¶
DiscoveredCount is how many nodes were discovered
func (*Stats) DiscoveredNodes ¶
DiscoveredNodes are the nodes that was discovered for this request
func (*Stats) DiscoveryDuration ¶
DiscoveryDuration determines how long discovery took, 0 and error when discovery was not done
func (*Stats) EndDiscover ¶
func (s *Stats) EndDiscover()
EndDiscover records the end time of the discovery process
func (*Stats) EndPublish ¶
func (s *Stats) EndPublish()
EndPublish records the publish process ended
func (*Stats) FailedRequestInc ¶
func (s *Stats) FailedRequestInc()
FailedRequestInc increments the failed request counter by one
func (*Stats) NoResponseFrom ¶
NoResponseFrom calculates discovered which hosts did not respond
func (*Stats) OverrideDiscoveryTime ¶ added in v0.16.0
OverrideDiscoveryTime sets specific discovery time
func (*Stats) PassedRequestInc ¶
func (s *Stats) PassedRequestInc()
PassedRequestInc increments the passed request counter by one
func (*Stats) PublishDuration ¶
PublishDuration calculates how long publishing took
func (*Stats) RecordReceived ¶
RecordReceived reords the fact that one message was received
func (*Stats) RequestDuration ¶
RequestDuration calculates the total duration
func (*Stats) ResponsesCount ¶
ResponsesCount if the total amount of nodes that responded so far
func (*Stats) SetDiscoveredNodes ¶
SetDiscoveredNodes records the node names we expect to communicate with
func (*Stats) StartDiscover ¶
func (s *Stats) StartDiscover()
StartDiscover records the start time of the discovery process
func (*Stats) StartPublish ¶
func (s *Stats) StartPublish()
StartPublish records the publish process started
func (*Stats) Started ¶ added in v0.16.0
Started is the time the request was started, zero time when not started
func (*Stats) UnexpectedResponseFrom ¶
UnexpectedResponseFrom calculates which hosts responses that we did not expect responses from
func (*Stats) WaitingFor ¶
WaitingFor checks if any of the given nodes are still outstanding