choria

package
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2021 License: Apache-2.0 Imports: 41 Imported by: 28

Documentation

Index

Constants

View Source
const (
	MessageMessageType        string = "message"
	RequestMessageType        string = "request"
	DirectRequestMessageType  string = "direct_request"
	ServiceRequestMessageType string = "service_request"
	ReplyMessageType          string = "reply"
)

Variables

This section is empty.

Functions

func AgentBroadcastTarget

func AgentBroadcastTarget(collective string, agent string) string

func BuildInfo added in v0.14.0

func BuildInfo() *build.Info

BuildInfo retrieves build information

func FileExist

func FileExist(path string) bool

FileExist checks if a file exist

func NewRequestID

func NewRequestID() (string, error)

NewRequestID Creates a new RequestID

func NodeDirectedTarget

func NodeDirectedTarget(collective string, identity string) string

func ReplyTarget

func ReplyTarget(msg *Message, requestid string) string

func ServiceBroadcastTarget added in v0.22.0

func ServiceBroadcastTarget(collective string, agent string) string

func TargetForMessage

func TargetForMessage(msg *Message, identity string) (string, error)

func UserConfig

func UserConfig() string

UserConfig determines what is the active config file for a user

Types

type AgentConnector

type AgentConnector interface {
	ConnectorInfo

	QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *ConnectorMessage) error
	Unsubscribe(name string) error
	AgentBroadcastTarget(collective string, agent string) string
	ServiceBroadcastTarget(collective string, agent string) string
}

AgentConnector provides the minimal Connector features for subscribing and unsubscribing agents

type ClientConnector

type ClientConnector interface {
	PublishableConnector

	ReplyTarget(msg *Message) string
	ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error)
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a actual NATS connection handler, it implements Connector

func (*Connection) AgentBroadcastTarget

func (conn *Connection) AgentBroadcastTarget(collective string, agent string) string

func (*Connection) ChanQueueSubscribe

func (conn *Connection) ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error)

ChanQueueSubscribe creates a channel of a certain size and subscribes to a queue group.

The given name would later be used should a unsubscribe be needed

func (*Connection) Close

func (conn *Connection) Close()

Close closes the NATS connection after flushing what needed to be sent

func (*Connection) Connect

func (conn *Connection) Connect(ctx context.Context) (err error)

Connect creates a new connection to NATS.

This will block until connected - basically forever should it never work. Due to short comings in the NATS library logging about failures is not optimal

func (*Connection) ConnectedServer

func (conn *Connection) ConnectedServer() string

ConnectedServer returns the URL of the current server that the library is connected to, "unknown" when not initialized

func (*Connection) ConnectionOptions

func (conn *Connection) ConnectionOptions() nats.Options

func (*Connection) ConnectionStats

func (conn *Connection) ConnectionStats() nats.Statistics

func (*Connection) Flush

func (conn *Connection) Flush()

Flush sends any unpublished data to the network

func (*Connection) IsConnected added in v0.13.0

func (conn *Connection) IsConnected() bool

IsConnected determines if we are connected to the network

func (*Connection) Nats

func (conn *Connection) Nats() *nats.Conn

func (*Connection) NodeDirectedTarget

func (conn *Connection) NodeDirectedTarget(collective string, identity string) string

func (*Connection) Publish

func (conn *Connection) Publish(msg *Message) error

Publish inspects a Message and publish it according to its Type

func (*Connection) PublishRaw

func (conn *Connection) PublishRaw(target string, data []byte) error

PublishRaw allows any data to be published to any target

func (*Connection) QueueSubscribe

func (conn *Connection) QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *ConnectorMessage) error

QueueSubscribe is a lot like ChanQueueSubscribe but you provide it the queue to dump messages in, it also takes a context and will unsubscribe when the context is canceled

func (*Connection) ReplyTarget

func (conn *Connection) ReplyTarget(msg *Message) (string, error)

func (*Connection) ServiceBroadcastTarget added in v0.22.0

func (conn *Connection) ServiceBroadcastTarget(collective string, agent string) string

func (*Connection) TargetForMessage

func (conn *Connection) TargetForMessage(msg *Message, identity string) (string, error)

func (*Connection) Unsubscribe

func (conn *Connection) Unsubscribe(name string) error

type ConnectionManager

type ConnectionManager interface {
	NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *log.Entry) (conn Connector, err error)
}

ConnectionManager is capable of being a factory for connection, mcollective.Choria is one

type Connector

type Connector interface {
	InstanceConnector

	ReplyTarget(msg *Message) (string, error)
	ChanQueueSubscribe(name string, subject string, group string, capacity int) (chan *ConnectorMessage, error)
	Connect(ctx context.Context) (err error)
	Nats() *nats.Conn
}

Connector is the interface a connector must implement to be valid be it NATS, Stomp, Testing etc

type ConnectorInfo

type ConnectorInfo interface {
	ConnectedServer() string
	ConnectionOptions() nats.Options
	ConnectionStats() nats.Statistics
}

type ConnectorMessage

type ConnectorMessage struct {
	Subject string
	Reply   string
	Data    []byte
	Msg     interface{}
}

func (*ConnectorMessage) Bytes

func (m *ConnectorMessage) Bytes() []byte

Bytes return the data in the message

type Framework

type Framework struct {
	Config *config.Config
	// contains filtered or unexported fields
}

Framework is a utility encompassing choria config and various utilities

func New

func New(path string) (*Framework, error)

New sets up a Choria with all its config loaded and so forth

func NewWithConfig

func NewWithConfig(cfg *config.Config) (*Framework, error)

NewWithConfig creates a new instance of the framework with the supplied config instance

func (*Framework) BuildInfo

func (fw *Framework) BuildInfo() *build.Info

BuildInfo retrieves build information

func (*Framework) CallerID

func (fw *Framework) CallerID() string

CallerID determines the cert based callerid

func (*Framework) Certname

func (fw *Framework) Certname() string

Certname determines the choria certname

func (*Framework) ClientTLSConfig added in v0.21.0

func (fw *Framework) ClientTLSConfig() (*tls.Config, error)

ClientTLSConfig creates a TLS configuration for use by NATS, HTTPS, specifically configured for clients

func (*Framework) Colorize added in v0.19.0

func (fw *Framework) Colorize(c string, format string, a ...interface{}) string

Colorize returns a string of either 'red', 'green' or 'yellow'. If the 'color' configuration is set to false then the string will have no color hints

func (*Framework) Configuration

func (fw *Framework) Configuration() *config.Config

Configuration returns the active configuration

func (*Framework) ConfigureProvisioning

func (fw *Framework) ConfigureProvisioning()

ConfigureProvisioning adjusts the active configuration to match the provisioning profile

func (*Framework) DisableTLSVerify

func (fw *Framework) DisableTLSVerify() bool

DisableTLSVerify indicates if the user whish to disable TLS verification

func (*Framework) DiscoveryServer

func (fw *Framework) DiscoveryServer() (srvcache.Server, error)

DiscoveryServer is the server configured as a discovery proxy

func (*Framework) Enroll

func (fw *Framework) Enroll(ctx context.Context, wait time.Duration, cb func(digest string, try int)) error

Enroll performs the tasks needed to join the security system, like create a new certificate, csr etc

func (*Framework) FacterCmd

func (fw *Framework) FacterCmd() string

FacterCmd finds the path to facter using first AIO path then a `which` like command

func (*Framework) FacterDomain

func (fw *Framework) FacterDomain() (string, error)

FacterDomain determines the machines domain by querying facter. Returns "" when unknown

func (*Framework) FacterFQDN

func (fw *Framework) FacterFQDN() (string, error)

FacterFQDN determines the machines fqdn by querying facter. Returns "" when unknown

func (*Framework) FacterStringFact

func (fw *Framework) FacterStringFact(fact string) (string, error)

FacterStringFact looks up a facter fact, returns "" when unknown

func (*Framework) FederationCollectives

func (fw *Framework) FederationCollectives() (collectives []string)

FederationCollectives determines the known Federation Member Collectives based on the CHORIA_FED_COLLECTIVE environment variable or the choria.federation.collectives config item

func (*Framework) FederationMiddlewareServers

func (fw *Framework) FederationMiddlewareServers() (servers srvcache.Servers, err error)

FederationMiddlewareServers determines the correct Federation Middleware Servers

It does this by:

  • looking for choria.federation_middleware_hosts configuration
  • Doing SRV lookups of _mcollective-federation_server._tcp and _x-puppet-mcollective_federation._tcp

func (*Framework) Getuid

func (fw *Framework) Getuid() int

Getuid returns the numeric user id of the caller

func (*Framework) HTTPClient added in v0.19.0

func (fw *Framework) HTTPClient(secure bool) (*http.Client, error)

HTTPClient creates a *http.Client prepared by the security provider with certificates and more set

func (*Framework) HasCollective

func (fw *Framework) HasCollective(collective string) bool

HasCollective determines if a collective is known in the configuration

func (*Framework) IsFederated

func (fw *Framework) IsFederated() (result bool)

IsFederated determiens if the configuration is setting up any Federation collectives

func (*Framework) Logger

func (fw *Framework) Logger(component string) *log.Entry

Logger creates a new logrus entry

func (*Framework) MiddlewareServers

func (fw *Framework) MiddlewareServers() (servers srvcache.Servers, err error)

MiddlewareServers determines the correct Middleware Servers

It does this by:

  • if ngs is configured and credentials are set and middleware_hosts are empty, use ngs
  • looking for choria.federation_middleware_hosts configuration
  • Doing SRV lookups of _mcollective-server._tcp and __x-puppet-mcollective._tcp
  • Defaulting to puppet:4222

func (*Framework) NetworkBrokerPeers

func (fw *Framework) NetworkBrokerPeers() (servers srvcache.Servers, err error)

NetworkBrokerPeers are peers in the broker cluster resolved from _mcollective-broker._tcp or from the plugin config

func (*Framework) NewConnector

func (fw *Framework) NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *log.Entry) (Connector, error)

NewConnector creates a new NATS connector

It will attempt to connect to the given servers and will keep trying till it manages to do so

func (*Framework) NewMessage

func (fw *Framework) NewMessage(payload string, agent string, collective string, msgType string, request *Message) (msg *Message, err error)

NewMessage creates a new Message associated with this Choria instance

func (*Framework) NewReply

func (fw *Framework) NewReply(request protocol.Request) (reply protocol.Reply, err error)

NewReply creates a new Reply, the version will match that of the given request

func (*Framework) NewReplyFromMessage

func (fw *Framework) NewReplyFromMessage(version string, msg *Message) (rep protocol.Reply, err error)

NewReplyFromMessage creates a new Reply with the Message settings preloaded complying with a specific protocol version like protocol.ReplyV1

func (*Framework) NewReplyFromSecureReply

func (fw *Framework) NewReplyFromSecureReply(sr protocol.SecureReply) (reply protocol.Reply, err error)

NewReplyFromSecureReply creates a new Reply from the JSON payload of SecureReply, the version will match what is in the JSON payload

func (*Framework) NewReplyFromTransportJSON

func (fw *Framework) NewReplyFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Reply, err error)

NewReplyFromTransportJSON creates a new Reply from a transport JSON

func (*Framework) NewReplyTransportForMessage

func (fw *Framework) NewReplyTransportForMessage(msg *Message, request protocol.Request) (protocol.TransportMessage, error)

NewReplyTransportForMessage creates a new Transport message based on a Message and the request its a reply to

The new transport message will have the same version as the request its based on

func (*Framework) NewRequest

func (fw *Framework) NewRequest(version string, agent string, senderid string, callerid string, ttl int, requestid string, collective string) (request protocol.Request, err error)

NewRequest creates a new Request complying with a specific protocol version like protocol.RequestV1

func (*Framework) NewRequestFromMessage

func (fw *Framework) NewRequestFromMessage(version string, msg *Message) (req protocol.Request, err error)

NewRequestFromMessage creates a new Request with the Message settings preloaded complying with a specific protocol version like protocol.RequestV1

func (*Framework) NewRequestFromSecureRequest

func (fw *Framework) NewRequestFromSecureRequest(sr protocol.SecureRequest) (request protocol.Request, err error)

NewRequestFromSecureRequest creates a new Request from a SecureRequest, the version will match what is in the JSON payload

func (*Framework) NewRequestFromTransportJSON

func (fw *Framework) NewRequestFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Request, err error)

NewRequestFromTransportJSON creates a new Request from transport JSON

func (*Framework) NewRequestID

func (fw *Framework) NewRequestID() (string, error)

NewRequestID Creates a new RequestID

func (*Framework) NewRequestMessageFromTransportJSON

func (fw *Framework) NewRequestMessageFromTransportJSON(payload []byte) (msg *Message, err error)

NewRequestMessageFromTransportJSON creates a Message from a Transport JSON that holds a Request

func (*Framework) NewRequestTransportForMessage

func (fw *Framework) NewRequestTransportForMessage(msg *Message, version string) (protocol.TransportMessage, error)

NewRequestTransportForMessage creates a new versioned Transport message based on a Message

func (*Framework) NewSecureReply

func (fw *Framework) NewSecureReply(reply protocol.Reply) (secure protocol.SecureReply, err error)

NewSecureReply creates a new SecureReply with the given Reply message as payload

func (*Framework) NewSecureReplyFromTransport

func (fw *Framework) NewSecureReplyFromTransport(message protocol.TransportMessage, skipvalidate bool) (secure protocol.SecureReply, err error)

NewSecureReplyFromTransport creates a new SecureReply from the JSON payload of TransportMessage, the version SecureReply will be the same as the TransportMessage

func (*Framework) NewSecureRequest

func (fw *Framework) NewSecureRequest(request protocol.Request) (secure protocol.SecureRequest, err error)

NewSecureRequest creates a new SecureRequest with the given Request message as payload

func (*Framework) NewSecureRequestFromTransport

func (fw *Framework) NewSecureRequestFromTransport(message protocol.TransportMessage, skipvalidate bool) (secure protocol.SecureRequest, err error)

NewSecureRequestFromTransport creates a new SecureRequest from the JSON payload of TransportMessage, the version SecureRequest will be the same as the TransportMessage

func (*Framework) NewTransportForSecureReply

func (fw *Framework) NewTransportForSecureReply(reply protocol.SecureReply) (message protocol.TransportMessage, err error)

NewTransportForSecureReply creates a new TransportMessage with a SecureReply as payload. The Transport will be the same version as the SecureRequest

func (*Framework) NewTransportForSecureRequest

func (fw *Framework) NewTransportForSecureRequest(request protocol.SecureRequest) (message protocol.TransportMessage, err error)

NewTransportForSecureRequest creates a new TransportMessage with a SecureRequest as payload. The Transport will be the same version as the SecureRequest

func (*Framework) NewTransportFromJSON

func (fw *Framework) NewTransportFromJSON(data string) (message protocol.TransportMessage, err error)

NewTransportFromJSON creates a new TransportMessage from a JSON payload. The version will match what is in the payload

func (*Framework) NewTransportMessage

func (fw *Framework) NewTransportMessage(version string) (message protocol.TransportMessage, err error)

NewTransportMessage creates a new TransportMessage complying with a specific protocol version like protocol.TransportV1

func (*Framework) OverrideCertname

func (fw *Framework) OverrideCertname() string

OverrideCertname indicates if the user wish to force a specific certname, empty when not

func (*Framework) PQLQuery added in v0.19.0

func (fw *Framework) PQLQuery(query string) ([]byte, error)

func (*Framework) PQLQueryCertNames added in v0.19.0

func (fw *Framework) PQLQueryCertNames(query string) ([]string, error)

func (*Framework) ParseSignerTokenUnverified added in v0.17.0

func (fw *Framework) ParseSignerTokenUnverified() (token *jwt.Token, claims jwt.MapClaims, err error)

func (*Framework) ProgressWidth added in v0.20.0

func (fw *Framework) ProgressWidth() int

ProgressWidth determines the width of the progress bar, when -1 there is not enough space for a progress bar

func (*Framework) PrometheusTextFileDir added in v0.15.0

func (fw *Framework) PrometheusTextFileDir() string

PrometheusTextFileDir is the configured directory where to write prometheus text file stats

func (*Framework) ProvisionMode

func (fw *Framework) ProvisionMode() bool

ProvisionMode determines if this instance is in provisioning mode if the setting `plugin.choria.server.provision` is set at all then the value of that is returned, else it the build time property ProvisionDefault is consulted

func (*Framework) ProvisioningServers

func (fw *Framework) ProvisioningServers(ctx context.Context) (srvcache.Servers, error)

ProvisioningServers determines the build time provisioning servers when it's unset or results in an empty server list this will return an error

func (*Framework) ProxiedDiscovery

func (fw *Framework) ProxiedDiscovery() bool

ProxiedDiscovery determines if a client is configured for proxied discover

func (*Framework) PuppetAIOCmd

func (fw *Framework) PuppetAIOCmd(command string, def string) string

PuppetAIOCmd looks up a command in the AIO paths, if it's not there it will try PATH and finally return a default if not in PATH

func (*Framework) PuppetDBServers added in v0.19.0

func (fw *Framework) PuppetDBServers() (servers srvcache.Servers, err error)

PuppetDBServers resolves the PuppetDB server based on configuration of _x-puppet-db._tcp

func (*Framework) PuppetSetting

func (fw *Framework) PuppetSetting(setting string) (string, error)

PuppetSetting retrieves a config setting by shelling out to puppet apply --configprint

func (*Framework) QuerySrvRecords

func (fw *Framework) QuerySrvRecords(records []string) (srvcache.Servers, error)

QuerySrvRecords looks for SRV records within the right domain either thanks to facter domain or the configured domain.

If the config disables SRV then a error is returned.

func (*Framework) SetLogWriter added in v0.13.1

func (fw *Framework) SetLogWriter(out io.Writer)

func (*Framework) SetLogger added in v0.16.0

func (fw *Framework) SetLogger(logger *log.Logger)

SetLogger sets the logger to use

func (*Framework) SetupLogging

func (fw *Framework) SetupLogging(debug bool) (err error)

SetupLogging configures logging based on choria config directives currently only file and console behaviors are supported

func (*Framework) ShouldUseNGS added in v0.13.0

func (fw *Framework) ShouldUseNGS() bool

ShouldUseNGS determined is we are configured to use NGS

func (*Framework) SignerToken added in v0.17.0

func (fw *Framework) SignerToken() (token string, err error)

Retrieves the AAA token used for signing requests

func (*Framework) TLSConfig

func (fw *Framework) TLSConfig() (*tls.Config, error)

TLSConfig creates a generic TLS configuration for use by NATS, HTTPS etc

func (*Framework) TrySrvLookup

func (fw *Framework) TrySrvLookup(names []string, defaultSrv srvcache.Server) (srvcache.Server, error)

TrySrvLookup will attempt to lookup a series of names returning the first found if SRV lookups are disabled or nothing is found the default will be returned

func (*Framework) UniqueID

func (fw *Framework) UniqueID() string

UniqueID creates a new unique ID, usually a v4 uuid, if that fails a random string based ID is made

func (*Framework) UniqueIDFromUnverifiedToken added in v0.17.0

func (fw *Framework) UniqueIDFromUnverifiedToken() (caller string, id string, token string, err error)

func (*Framework) ValidateSecurity

func (fw *Framework) ValidateSecurity() (errors []string, ok bool)

ValidateSecurity calls the security provider validation method and indicates if all dependencies are met for secure operation

type InstanceConnector

type InstanceConnector interface {
	AgentConnector
	PublishableConnector
	RawPublishableConnector

	NodeDirectedTarget(collective string, identity string) string

	IsConnected() bool
	Close()
}

type Message

type Message struct {
	Payload string
	Agent   string

	Request   *Message
	Filter    *protocol.Filter
	TTL       int
	TimeStamp time.Time

	SenderID        string
	CallerID        string
	RequestID       string
	DiscoveredHosts []string

	CustomTarget string

	sync.Mutex
	// contains filtered or unexported fields
}

Message represents a Choria message

func NewMessage

func NewMessage(payload string, agent string, collective string, msgType string, request *Message, choria *Framework) (msg *Message, err error)

NewMessage constructs a basic Message instance

func NewMessageFromRequest

func NewMessageFromRequest(req protocol.Request, replyto string, choria *Framework) (msg *Message, err error)

NewMessageFromRequest constructs a Message based on a Request

func (*Message) Base64Payload

func (m *Message) Base64Payload() string

Base64Payload retrieves the payload Base64 encoded

func (*Message) CacheTransport added in v0.14.0

func (m *Message) CacheTransport()

CacheTransport ensures that multiples calls to Transport() returns the same transport message

func (*Message) Collective

func (m *Message) Collective() string

Collective retrieves the sub collective this message is targeting

func (*Message) ExpectedMessageID

func (m *Message) ExpectedMessageID() string

ExpectedMessageID retrieves the expected message ID

func (*Message) IsCachedTransport added in v0.14.0

func (m *Message) IsCachedTransport() bool

IsCachedTransport determines if transport messages will be cached

func (*Message) NotifyPublish added in v0.19.0

func (m *Message) NotifyPublish()

NotifyPublish triggers the callback set using OnPublish() in a blocking fashion

func (*Message) OnPublish added in v0.19.0

func (m *Message) OnPublish(f func())

OnPublish sets a callback that should be called just before this message is published, it might be called several times for batched or direct messages. This will be called after the message have been signed - potentially by a remote signer etc

func (*Message) ReplyTo

func (m *Message) ReplyTo() string

ReplyTo retrieve the NATS reply target

func (*Message) SetBase64Payload

func (m *Message) SetBase64Payload(payload string) error

SetBase64Payload sets the payload for the message, use it if the payload is Base64 encoded

func (*Message) SetCollective

func (m *Message) SetCollective(collective string) error

SetCollective sets the sub collective this message is targeting

func (*Message) SetExpectedMsgID

func (m *Message) SetExpectedMsgID(id string) error

SetExpectedMsgID sets the Request ID that is expected from the reply data

func (*Message) SetProtocolVersion

func (m *Message) SetProtocolVersion(version string)

SetProtocolVersion sets the version of the protocol that will be used by Transport()

func (*Message) SetReplyTo

func (m *Message) SetReplyTo(replyTo string) error

SetReplyTo sets the NATS target where replies to this message should go

func (*Message) SetType

func (m *Message) SetType(msgType string) (err error)

SetType sets the message type. One message, request, direct_request, service_request or reply

func (*Message) String

func (m *Message) String() string

String creates a string representation of the message for logs etc

func (*Message) Transport

func (m *Message) Transport() (protocol.TransportMessage, error)

Transport creates a TransportMessage for this Message

In the case of a reply Message made using NewMessage the Transport will have the same version as the request that made it. If you made the Message using some other way then look at choria.NewReplyTransportForMessage.

For requests you need to set the protocol version using SetProtocolVersion() before calling Transport

func (*Message) Type

func (m *Message) Type() string

Type retrieves the message type

func (*Message) UniqueTransport added in v0.14.0

func (m *Message) UniqueTransport()

UniqueTransport ensures that every call to Transport() produce a unique transport message

func (*Message) Validate

func (m *Message) Validate() (bool, error)

Validate tests the Message and makes sure its settings are sane

func (*Message) ValidateTTL

func (m *Message) ValidateTTL() bool

ValidateTTL validates the message age, true if the message should be allowed

type PublishableConnector

type PublishableConnector interface {
	Publish(msg *Message) error
}

PublishableConnector provides the minimal Connector features to enable publishing of choria.Message instances

type RawPublishableConnector

type RawPublishableConnector interface {
	PublishRaw(target string, data []byte) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL