Documentation ¶
Index ¶
- Constants
- func AgentBroadcastTarget(collective string, agent string) string
- func BuildInfo() *build.Info
- func FileExist(path string) bool
- func NewRequestID() (string, error)
- func NodeDirectedTarget(collective string, identity string) string
- func ReplyTarget(msg *Message, requestid string) string
- func ServiceBroadcastTarget(collective string, agent string) string
- func TargetForMessage(msg *Message, identity string) (string, error)
- func UserConfig() string
- type AgentConnector
- type ClientConnector
- type Connection
- func (conn *Connection) AgentBroadcastTarget(collective string, agent string) string
- func (conn *Connection) ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error)
- func (conn *Connection) Close()
- func (conn *Connection) Connect(ctx context.Context) (err error)
- func (conn *Connection) ConnectedServer() string
- func (conn *Connection) ConnectionOptions() nats.Options
- func (conn *Connection) ConnectionStats() nats.Statistics
- func (conn *Connection) Flush()
- func (conn *Connection) IsConnected() bool
- func (conn *Connection) Nats() *nats.Conn
- func (conn *Connection) NodeDirectedTarget(collective string, identity string) string
- func (conn *Connection) Publish(msg *Message) error
- func (conn *Connection) PublishRaw(target string, data []byte) error
- func (conn *Connection) QueueSubscribe(ctx context.Context, name string, subject string, group string, ...) error
- func (conn *Connection) ReplyTarget(msg *Message) (string, error)
- func (conn *Connection) ServiceBroadcastTarget(collective string, agent string) string
- func (conn *Connection) TargetForMessage(msg *Message, identity string) (string, error)
- func (conn *Connection) Unsubscribe(name string) error
- type ConnectionManager
- type Connector
- type ConnectorInfo
- type ConnectorMessage
- type Framework
- func (fw *Framework) BuildInfo() *build.Info
- func (fw *Framework) CallerID() string
- func (fw *Framework) Certname() string
- func (fw *Framework) ClientTLSConfig() (*tls.Config, error)
- func (fw *Framework) Colorize(c string, format string, a ...interface{}) string
- func (fw *Framework) Configuration() *config.Config
- func (fw *Framework) ConfigureProvisioning()
- func (fw *Framework) DisableTLSVerify() bool
- func (fw *Framework) DiscoveryServer() (srvcache.Server, error)
- func (fw *Framework) Enroll(ctx context.Context, wait time.Duration, cb func(digest string, try int)) error
- func (fw *Framework) FacterCmd() string
- func (fw *Framework) FacterDomain() (string, error)
- func (fw *Framework) FacterFQDN() (string, error)
- func (fw *Framework) FacterStringFact(fact string) (string, error)
- func (fw *Framework) FederationCollectives() (collectives []string)
- func (fw *Framework) FederationMiddlewareServers() (servers srvcache.Servers, err error)
- func (fw *Framework) Getuid() int
- func (fw *Framework) HTTPClient(secure bool) (*http.Client, error)
- func (fw *Framework) HasCollective(collective string) bool
- func (fw *Framework) IsFederated() (result bool)
- func (fw *Framework) Logger(component string) *log.Entry
- func (fw *Framework) MiddlewareServers() (servers srvcache.Servers, err error)
- func (fw *Framework) NetworkBrokerPeers() (servers srvcache.Servers, err error)
- func (fw *Framework) NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, ...) (Connector, error)
- func (fw *Framework) NewMessage(payload string, agent string, collective string, msgType string, ...) (msg *Message, err error)
- func (fw *Framework) NewReply(request protocol.Request) (reply protocol.Reply, err error)
- func (fw *Framework) NewReplyFromMessage(version string, msg *Message) (rep protocol.Reply, err error)
- func (fw *Framework) NewReplyFromSecureReply(sr protocol.SecureReply) (reply protocol.Reply, err error)
- func (fw *Framework) NewReplyFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Reply, err error)
- func (fw *Framework) NewReplyTransportForMessage(msg *Message, request protocol.Request) (protocol.TransportMessage, error)
- func (fw *Framework) NewRequest(version string, agent string, senderid string, callerid string, ttl int, ...) (request protocol.Request, err error)
- func (fw *Framework) NewRequestFromMessage(version string, msg *Message) (req protocol.Request, err error)
- func (fw *Framework) NewRequestFromSecureRequest(sr protocol.SecureRequest) (request protocol.Request, err error)
- func (fw *Framework) NewRequestFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Request, err error)
- func (fw *Framework) NewRequestID() (string, error)
- func (fw *Framework) NewRequestMessageFromTransportJSON(payload []byte) (msg *Message, err error)
- func (fw *Framework) NewRequestTransportForMessage(msg *Message, version string) (protocol.TransportMessage, error)
- func (fw *Framework) NewSecureReply(reply protocol.Reply) (secure protocol.SecureReply, err error)
- func (fw *Framework) NewSecureReplyFromTransport(message protocol.TransportMessage, skipvalidate bool) (secure protocol.SecureReply, err error)
- func (fw *Framework) NewSecureRequest(request protocol.Request) (secure protocol.SecureRequest, err error)
- func (fw *Framework) NewSecureRequestFromTransport(message protocol.TransportMessage, skipvalidate bool) (secure protocol.SecureRequest, err error)
- func (fw *Framework) NewTransportForSecureReply(reply protocol.SecureReply) (message protocol.TransportMessage, err error)
- func (fw *Framework) NewTransportForSecureRequest(request protocol.SecureRequest) (message protocol.TransportMessage, err error)
- func (fw *Framework) NewTransportFromJSON(data string) (message protocol.TransportMessage, err error)
- func (fw *Framework) NewTransportMessage(version string) (message protocol.TransportMessage, err error)
- func (fw *Framework) OverrideCertname() string
- func (fw *Framework) PQLQuery(query string) ([]byte, error)
- func (fw *Framework) PQLQueryCertNames(query string) ([]string, error)
- func (fw *Framework) ParseSignerTokenUnverified() (token *jwt.Token, claims jwt.MapClaims, err error)
- func (fw *Framework) ProgressWidth() int
- func (fw *Framework) PrometheusTextFileDir() string
- func (fw *Framework) ProvisionMode() bool
- func (fw *Framework) ProvisioningServers(ctx context.Context) (srvcache.Servers, error)
- func (fw *Framework) ProxiedDiscovery() bool
- func (fw *Framework) PuppetAIOCmd(command string, def string) string
- func (fw *Framework) PuppetDBServers() (servers srvcache.Servers, err error)
- func (fw *Framework) PuppetSetting(setting string) (string, error)
- func (fw *Framework) QuerySrvRecords(records []string) (srvcache.Servers, error)
- func (fw *Framework) SetLogWriter(out io.Writer)
- func (fw *Framework) SetLogger(logger *log.Logger)
- func (fw *Framework) SetupLogging(debug bool) (err error)
- func (fw *Framework) ShouldUseNGS() bool
- func (fw *Framework) SignerToken() (token string, err error)
- func (fw *Framework) TLSConfig() (*tls.Config, error)
- func (fw *Framework) TrySrvLookup(names []string, defaultSrv srvcache.Server) (srvcache.Server, error)
- func (fw *Framework) UniqueID() string
- func (fw *Framework) UniqueIDFromUnverifiedToken() (caller string, id string, token string, err error)
- func (fw *Framework) ValidateSecurity() (errors []string, ok bool)
- type InstanceConnector
- type Message
- func (m *Message) Base64Payload() string
- func (m *Message) CacheTransport()
- func (m *Message) Collective() string
- func (m *Message) ExpectedMessageID() string
- func (m *Message) IsCachedTransport() bool
- func (m *Message) NotifyPublish()
- func (m *Message) OnPublish(f func())
- func (m *Message) ReplyTo() string
- func (m *Message) SetBase64Payload(payload string) error
- func (m *Message) SetCollective(collective string) error
- func (m *Message) SetExpectedMsgID(id string) error
- func (m *Message) SetProtocolVersion(version string)
- func (m *Message) SetReplyTo(replyTo string) error
- func (m *Message) SetType(msgType string) (err error)
- func (m *Message) String() string
- func (m *Message) Transport() (protocol.TransportMessage, error)
- func (m *Message) Type() string
- func (m *Message) UniqueTransport()
- func (m *Message) Validate() (bool, error)
- func (m *Message) ValidateTTL() bool
- type PublishableConnector
- type RawPublishableConnector
Constants ¶
Variables ¶
This section is empty.
Functions ¶
func AgentBroadcastTarget ¶
func NodeDirectedTarget ¶
func ReplyTarget ¶
func ServiceBroadcastTarget ¶ added in v0.22.0
func UserConfig ¶
func UserConfig() string
UserConfig determines what is the active config file for a user
Types ¶
type AgentConnector ¶
type AgentConnector interface { ConnectorInfo QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *ConnectorMessage) error Unsubscribe(name string) error AgentBroadcastTarget(collective string, agent string) string ServiceBroadcastTarget(collective string, agent string) string }
AgentConnector provides the minimal Connector features for subscribing and unsubscribing agents
type ClientConnector ¶
type ClientConnector interface { PublishableConnector ReplyTarget(msg *Message) string ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error) }
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a actual NATS connection handler, it implements Connector
func (*Connection) AgentBroadcastTarget ¶
func (conn *Connection) AgentBroadcastTarget(collective string, agent string) string
func (*Connection) ChanQueueSubscribe ¶
func (conn *Connection) ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error)
ChanQueueSubscribe creates a channel of a certain size and subscribes to a queue group.
The given name would later be used should a unsubscribe be needed
func (*Connection) Close ¶
func (conn *Connection) Close()
Close closes the NATS connection after flushing what needed to be sent
func (*Connection) Connect ¶
func (conn *Connection) Connect(ctx context.Context) (err error)
Connect creates a new connection to NATS.
This will block until connected - basically forever should it never work. Due to short comings in the NATS library logging about failures is not optimal
func (*Connection) ConnectedServer ¶
func (conn *Connection) ConnectedServer() string
ConnectedServer returns the URL of the current server that the library is connected to, "unknown" when not initialized
func (*Connection) ConnectionOptions ¶
func (conn *Connection) ConnectionOptions() nats.Options
func (*Connection) ConnectionStats ¶
func (conn *Connection) ConnectionStats() nats.Statistics
func (*Connection) Flush ¶
func (conn *Connection) Flush()
Flush sends any unpublished data to the network
func (*Connection) IsConnected ¶ added in v0.13.0
func (conn *Connection) IsConnected() bool
IsConnected determines if we are connected to the network
func (*Connection) Nats ¶
func (conn *Connection) Nats() *nats.Conn
func (*Connection) NodeDirectedTarget ¶
func (conn *Connection) NodeDirectedTarget(collective string, identity string) string
func (*Connection) Publish ¶
func (conn *Connection) Publish(msg *Message) error
Publish inspects a Message and publish it according to its Type
func (*Connection) PublishRaw ¶
func (conn *Connection) PublishRaw(target string, data []byte) error
PublishRaw allows any data to be published to any target
func (*Connection) QueueSubscribe ¶
func (conn *Connection) QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *ConnectorMessage) error
QueueSubscribe is a lot like ChanQueueSubscribe but you provide it the queue to dump messages in, it also takes a context and will unsubscribe when the context is canceled
func (*Connection) ReplyTarget ¶
func (conn *Connection) ReplyTarget(msg *Message) (string, error)
func (*Connection) ServiceBroadcastTarget ¶ added in v0.22.0
func (conn *Connection) ServiceBroadcastTarget(collective string, agent string) string
func (*Connection) TargetForMessage ¶
func (conn *Connection) TargetForMessage(msg *Message, identity string) (string, error)
func (*Connection) Unsubscribe ¶
func (conn *Connection) Unsubscribe(name string) error
type ConnectionManager ¶
type ConnectionManager interface {
NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *log.Entry) (conn Connector, err error)
}
ConnectionManager is capable of being a factory for connection, mcollective.Choria is one
type Connector ¶
type Connector interface { InstanceConnector ReplyTarget(msg *Message) (string, error) ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error) Connect(ctx context.Context) (err error) Nats() *nats.Conn }
Connector is the interface a connector must implement to be valid be it NATS, Stomp, Testing etc
type ConnectorInfo ¶
type ConnectorInfo interface { ConnectedServer() string ConnectionOptions() nats.Options ConnectionStats() nats.Statistics }
type ConnectorMessage ¶
func (*ConnectorMessage) Bytes ¶
func (m *ConnectorMessage) Bytes() []byte
Bytes return the data in the message
type Framework ¶
Framework is a utility encompassing choria config and various utilities
func NewWithConfig ¶
NewWithConfig creates a new instance of the framework with the supplied config instance
func (*Framework) ClientTLSConfig ¶ added in v0.21.0
ClientTLSConfig creates a TLS configuration for use by NATS, HTTPS, specifically configured for clients
func (*Framework) Colorize ¶ added in v0.19.0
Colorize returns a string of either 'red', 'green' or 'yellow'. If the 'color' configuration is set to false then the string will have no color hints
func (*Framework) Configuration ¶
Configuration returns the active configuration
func (*Framework) ConfigureProvisioning ¶
func (fw *Framework) ConfigureProvisioning()
ConfigureProvisioning adjusts the active configuration to match the provisioning profile
func (*Framework) DisableTLSVerify ¶
DisableTLSVerify indicates if the user whish to disable TLS verification
func (*Framework) DiscoveryServer ¶
DiscoveryServer is the server configured as a discovery proxy
func (*Framework) Enroll ¶
func (fw *Framework) Enroll(ctx context.Context, wait time.Duration, cb func(digest string, try int)) error
Enroll performs the tasks needed to join the security system, like create a new certificate, csr etc
func (*Framework) FacterCmd ¶
FacterCmd finds the path to facter using first AIO path then a `which` like command
func (*Framework) FacterDomain ¶
FacterDomain determines the machines domain by querying facter. Returns "" when unknown
func (*Framework) FacterFQDN ¶
FacterFQDN determines the machines fqdn by querying facter. Returns "" when unknown
func (*Framework) FacterStringFact ¶
FacterStringFact looks up a facter fact, returns "" when unknown
func (*Framework) FederationCollectives ¶
FederationCollectives determines the known Federation Member Collectives based on the CHORIA_FED_COLLECTIVE environment variable or the choria.federation.collectives config item
func (*Framework) FederationMiddlewareServers ¶
FederationMiddlewareServers determines the correct Federation Middleware Servers
It does this by:
- looking for choria.federation_middleware_hosts configuration
- Doing SRV lookups of _mcollective-federation_server._tcp and _x-puppet-mcollective_federation._tcp
func (*Framework) HTTPClient ¶ added in v0.19.0
HTTPClient creates a *http.Client prepared by the security provider with certificates and more set
func (*Framework) HasCollective ¶
HasCollective determines if a collective is known in the configuration
func (*Framework) IsFederated ¶
IsFederated determiens if the configuration is setting up any Federation collectives
func (*Framework) MiddlewareServers ¶
MiddlewareServers determines the correct Middleware Servers
It does this by:
- if ngs is configured and credentials are set and middleware_hosts are empty, use ngs
- looking for choria.federation_middleware_hosts configuration
- Doing SRV lookups of _mcollective-server._tcp and __x-puppet-mcollective._tcp
- Defaulting to puppet:4222
func (*Framework) NetworkBrokerPeers ¶
NetworkBrokerPeers are peers in the broker cluster resolved from _mcollective-broker._tcp or from the plugin config
func (*Framework) NewConnector ¶
func (fw *Framework) NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *log.Entry) (Connector, error)
NewConnector creates a new NATS connector
It will attempt to connect to the given servers and will keep trying till it manages to do so
func (*Framework) NewMessage ¶
func (fw *Framework) NewMessage(payload string, agent string, collective string, msgType string, request *Message) (msg *Message, err error)
NewMessage creates a new Message associated with this Choria instance
func (*Framework) NewReply ¶
NewReply creates a new Reply, the version will match that of the given request
func (*Framework) NewReplyFromMessage ¶
func (fw *Framework) NewReplyFromMessage(version string, msg *Message) (rep protocol.Reply, err error)
NewReplyFromMessage creates a new Reply with the Message settings preloaded complying with a specific protocol version like protocol.ReplyV1
func (*Framework) NewReplyFromSecureReply ¶
func (fw *Framework) NewReplyFromSecureReply(sr protocol.SecureReply) (reply protocol.Reply, err error)
NewReplyFromSecureReply creates a new Reply from the JSON payload of SecureReply, the version will match what is in the JSON payload
func (*Framework) NewReplyFromTransportJSON ¶
func (fw *Framework) NewReplyFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Reply, err error)
NewReplyFromTransportJSON creates a new Reply from a transport JSON
func (*Framework) NewReplyTransportForMessage ¶
func (fw *Framework) NewReplyTransportForMessage(msg *Message, request protocol.Request) (protocol.TransportMessage, error)
NewReplyTransportForMessage creates a new Transport message based on a Message and the request its a reply to
The new transport message will have the same version as the request its based on
func (*Framework) NewRequest ¶
func (fw *Framework) NewRequest(version string, agent string, senderid string, callerid string, ttl int, requestid string, collective string) (request protocol.Request, err error)
NewRequest creates a new Request complying with a specific protocol version like protocol.RequestV1
func (*Framework) NewRequestFromMessage ¶
func (fw *Framework) NewRequestFromMessage(version string, msg *Message) (req protocol.Request, err error)
NewRequestFromMessage creates a new Request with the Message settings preloaded complying with a specific protocol version like protocol.RequestV1
func (*Framework) NewRequestFromSecureRequest ¶
func (fw *Framework) NewRequestFromSecureRequest(sr protocol.SecureRequest) (request protocol.Request, err error)
NewRequestFromSecureRequest creates a new Request from a SecureRequest, the version will match what is in the JSON payload
func (*Framework) NewRequestFromTransportJSON ¶
func (fw *Framework) NewRequestFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Request, err error)
NewRequestFromTransportJSON creates a new Request from transport JSON
func (*Framework) NewRequestID ¶
NewRequestID Creates a new RequestID
func (*Framework) NewRequestMessageFromTransportJSON ¶
NewRequestMessageFromTransportJSON creates a Message from a Transport JSON that holds a Request
func (*Framework) NewRequestTransportForMessage ¶
func (fw *Framework) NewRequestTransportForMessage(msg *Message, version string) (protocol.TransportMessage, error)
NewRequestTransportForMessage creates a new versioned Transport message based on a Message
func (*Framework) NewSecureReply ¶
NewSecureReply creates a new SecureReply with the given Reply message as payload
func (*Framework) NewSecureReplyFromTransport ¶
func (fw *Framework) NewSecureReplyFromTransport(message protocol.TransportMessage, skipvalidate bool) (secure protocol.SecureReply, err error)
NewSecureReplyFromTransport creates a new SecureReply from the JSON payload of TransportMessage, the version SecureReply will be the same as the TransportMessage
func (*Framework) NewSecureRequest ¶
func (fw *Framework) NewSecureRequest(request protocol.Request) (secure protocol.SecureRequest, err error)
NewSecureRequest creates a new SecureRequest with the given Request message as payload
func (*Framework) NewSecureRequestFromTransport ¶
func (fw *Framework) NewSecureRequestFromTransport(message protocol.TransportMessage, skipvalidate bool) (secure protocol.SecureRequest, err error)
NewSecureRequestFromTransport creates a new SecureRequest from the JSON payload of TransportMessage, the version SecureRequest will be the same as the TransportMessage
func (*Framework) NewTransportForSecureReply ¶
func (fw *Framework) NewTransportForSecureReply(reply protocol.SecureReply) (message protocol.TransportMessage, err error)
NewTransportForSecureReply creates a new TransportMessage with a SecureReply as payload. The Transport will be the same version as the SecureRequest
func (*Framework) NewTransportForSecureRequest ¶
func (fw *Framework) NewTransportForSecureRequest(request protocol.SecureRequest) (message protocol.TransportMessage, err error)
NewTransportForSecureRequest creates a new TransportMessage with a SecureRequest as payload. The Transport will be the same version as the SecureRequest
func (*Framework) NewTransportFromJSON ¶
func (fw *Framework) NewTransportFromJSON(data string) (message protocol.TransportMessage, err error)
NewTransportFromJSON creates a new TransportMessage from a JSON payload. The version will match what is in the payload
func (*Framework) NewTransportMessage ¶
func (fw *Framework) NewTransportMessage(version string) (message protocol.TransportMessage, err error)
NewTransportMessage creates a new TransportMessage complying with a specific protocol version like protocol.TransportV1
func (*Framework) OverrideCertname ¶
OverrideCertname indicates if the user wish to force a specific certname, empty when not
func (*Framework) PQLQueryCertNames ¶ added in v0.19.0
func (*Framework) ParseSignerTokenUnverified ¶ added in v0.17.0
func (*Framework) ProgressWidth ¶ added in v0.20.0
ProgressWidth determines the width of the progress bar, when -1 there is not enough space for a progress bar
func (*Framework) PrometheusTextFileDir ¶ added in v0.15.0
PrometheusTextFileDir is the configured directory where to write prometheus text file stats
func (*Framework) ProvisionMode ¶
ProvisionMode determines if this instance is in provisioning mode if the setting `plugin.choria.server.provision` is set at all then the value of that is returned, else it the build time property ProvisionDefault is consulted
func (*Framework) ProvisioningServers ¶
ProvisioningServers determines the build time provisioning servers when it's unset or results in an empty server list this will return an error
func (*Framework) ProxiedDiscovery ¶
ProxiedDiscovery determines if a client is configured for proxied discover
func (*Framework) PuppetAIOCmd ¶
PuppetAIOCmd looks up a command in the AIO paths, if it's not there it will try PATH and finally return a default if not in PATH
func (*Framework) PuppetDBServers ¶ added in v0.19.0
PuppetDBServers resolves the PuppetDB server based on configuration of _x-puppet-db._tcp
func (*Framework) PuppetSetting ¶
PuppetSetting retrieves a config setting by shelling out to puppet apply --configprint
func (*Framework) QuerySrvRecords ¶
QuerySrvRecords looks for SRV records within the right domain either thanks to facter domain or the configured domain.
If the config disables SRV then a error is returned.
func (*Framework) SetLogWriter ¶ added in v0.13.1
func (*Framework) SetupLogging ¶
SetupLogging configures logging based on choria config directives currently only file and console behaviors are supported
func (*Framework) ShouldUseNGS ¶ added in v0.13.0
ShouldUseNGS determined is we are configured to use NGS
func (*Framework) SignerToken ¶ added in v0.17.0
Retrieves the AAA token used for signing requests
func (*Framework) TLSConfig ¶
TLSConfig creates a generic TLS configuration for use by NATS, HTTPS etc
func (*Framework) TrySrvLookup ¶
func (fw *Framework) TrySrvLookup(names []string, defaultSrv srvcache.Server) (srvcache.Server, error)
TrySrvLookup will attempt to lookup a series of names returning the first found if SRV lookups are disabled or nothing is found the default will be returned
func (*Framework) UniqueID ¶
UniqueID creates a new unique ID, usually a v4 uuid, if that fails a random string based ID is made
func (*Framework) UniqueIDFromUnverifiedToken ¶ added in v0.17.0
func (*Framework) ValidateSecurity ¶
ValidateSecurity calls the security provider validation method and indicates if all dependencies are met for secure operation
type InstanceConnector ¶
type InstanceConnector interface { AgentConnector PublishableConnector RawPublishableConnector NodeDirectedTarget(collective string, identity string) string IsConnected() bool Close() }
type Message ¶
type Message struct { Payload string Agent string Request *Message Filter *protocol.Filter TTL int TimeStamp time.Time SenderID string CallerID string RequestID string DiscoveredHosts []string CustomTarget string sync.Mutex // contains filtered or unexported fields }
Message represents a Choria message
func NewMessage ¶
func NewMessage(payload string, agent string, collective string, msgType string, request *Message, choria *Framework) (msg *Message, err error)
NewMessage constructs a basic Message instance
func NewMessageFromRequest ¶
func NewMessageFromRequest(req protocol.Request, replyto string, choria *Framework) (msg *Message, err error)
NewMessageFromRequest constructs a Message based on a Request
func (*Message) Base64Payload ¶
Base64Payload retrieves the payload Base64 encoded
func (*Message) CacheTransport ¶ added in v0.14.0
func (m *Message) CacheTransport()
CacheTransport ensures that multiples calls to Transport() returns the same transport message
func (*Message) Collective ¶
Collective retrieves the sub collective this message is targeting
func (*Message) ExpectedMessageID ¶
ExpectedMessageID retrieves the expected message ID
func (*Message) IsCachedTransport ¶ added in v0.14.0
IsCachedTransport determines if transport messages will be cached
func (*Message) NotifyPublish ¶ added in v0.19.0
func (m *Message) NotifyPublish()
NotifyPublish triggers the callback set using OnPublish() in a blocking fashion
func (*Message) OnPublish ¶ added in v0.19.0
func (m *Message) OnPublish(f func())
OnPublish sets a callback that should be called just before this message is published, it might be called several times for batched or direct messages. This will be called after the message have been signed - potentially by a remote signer etc
func (*Message) SetBase64Payload ¶
SetBase64Payload sets the payload for the message, use it if the payload is Base64 encoded
func (*Message) SetCollective ¶
SetCollective sets the sub collective this message is targeting
func (*Message) SetExpectedMsgID ¶
SetExpectedMsgID sets the Request ID that is expected from the reply data
func (*Message) SetProtocolVersion ¶
SetProtocolVersion sets the version of the protocol that will be used by Transport()
func (*Message) SetReplyTo ¶
SetReplyTo sets the NATS target where replies to this message should go
func (*Message) SetType ¶
SetType sets the message type. One message, request, direct_request, service_request or reply
func (*Message) Transport ¶
func (m *Message) Transport() (protocol.TransportMessage, error)
Transport creates a TransportMessage for this Message
In the case of a reply Message made using NewMessage the Transport will have the same version as the request that made it. If you made the Message using some other way then look at choria.NewReplyTransportForMessage.
For requests you need to set the protocol version using SetProtocolVersion() before calling Transport
func (*Message) UniqueTransport ¶ added in v0.14.0
func (m *Message) UniqueTransport()
UniqueTransport ensures that every call to Transport() produce a unique transport message
func (*Message) ValidateTTL ¶
ValidateTTL validates the message age, true if the message should be allowed
type PublishableConnector ¶
PublishableConnector provides the minimal Connector features to enable publishing of choria.Message instances