Documentation ¶
Index ¶
- Constants
- Variables
- func NewPackagesSyncer(logger types.Logger, available *protobufs.PackagesAvailable, sender Sender, ...) *packagesSyncer
- func NewWSReceiver(logger types.Logger, callbacks types.Callbacks, conn *websocket.Conn, ...) *wsReceiver
- type ClientCommon
- func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription
- func (c *ClientCommon) IsStopping() bool
- func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error
- func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSettings) error
- func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error
- func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
- func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) error
- func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error
- func (c *ClientCommon) SetHealth(health *protobufs.ComponentHealth) error
- func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error
- func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
- func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context))
- func (c *ClientCommon) Stop(ctx context.Context) error
- func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error
- type ClientSyncedState
- func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription
- func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities
- func (s *ClientSyncedState) HasCustomCapability(capability string) bool
- func (s *ClientSyncedState) Health() *protobufs.ComponentHealth
- func (s *ClientSyncedState) PackageStatuses() *protobufs.PackageStatuses
- func (s *ClientSyncedState) RemoteConfigStatus() *protobufs.RemoteConfigStatus
- func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error
- func (s *ClientSyncedState) SetCustomCapabilities(capabilities *protobufs.CustomCapabilities) error
- func (s *ClientSyncedState) SetHealth(health *protobufs.ComponentHealth) error
- func (s *ClientSyncedState) SetPackageStatuses(status *protobufs.PackageStatuses) error
- func (s *ClientSyncedState) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
- type HTTPSender
- func (h *HTTPSender) AddTLSConfig(config *tls.Config)
- func (h *HTTPSender) EnableCompression()
- func (h *HTTPSender) Run(ctx context.Context, url string, callbacks types.Callbacks, ...)
- func (h *HTTPSender) SetPollingInterval(duration time.Duration)
- func (h *HTTPSender) SetRequestHeader(header http.Header)
- type InMemPackagesStore
- func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error)
- func (l *InMemPackagesStore) CreatePackage(packageName string, typ protobufs.PackageType) error
- func (l *InMemPackagesStore) DeletePackage(packageName string) error
- func (l *InMemPackagesStore) FileContentHash(packageName string) ([]byte, error)
- func (l *InMemPackagesStore) GetContent() map[string][]byte
- func (l *InMemPackagesStore) LastReportedStatuses() (*protobufs.PackageStatuses, error)
- func (l *InMemPackagesStore) PackageState(packageName string) (state types.PackageState, err error)
- func (l *InMemPackagesStore) Packages() ([]string, error)
- func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error
- func (l *InMemPackagesStore) SetLastReportedStatuses(statuses *protobufs.PackageStatuses) error
- func (l *InMemPackagesStore) SetPackageState(packageName string, state types.PackageState) error
- func (l *InMemPackagesStore) UpdateContent(_ context.Context, packageName string, data io.Reader, contentHash []byte) error
- type MockServer
- type NextMessage
- type Sender
- type SenderCommon
- type WSSender
Constants ¶
const OpAMPPlainHTTPMethod = "POST"
Variables ¶
var ( ErrAgentDescriptionMissing = errors.New("AgentDescription is nil") ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined") ErrHealthMissing = errors.New("health is nil") ErrReportsEffectiveConfigNotSet = errors.New("ReportsEffectiveConfig capability is not set") ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set") ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") )
Functions ¶
func NewPackagesSyncer ¶
func NewPackagesSyncer( logger types.Logger, available *protobufs.PackagesAvailable, sender Sender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, ) *packagesSyncer
NewPackagesSyncer creates a new packages syncer.
func NewWSReceiver ¶
func NewWSReceiver( logger types.Logger, callbacks types.Callbacks, conn *websocket.Conn, sender *WSSender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, ) *wsReceiver
NewWSReceiver creates a new Receiver that uses WebSocket to receive messages from the server.
Types ¶
type ClientCommon ¶
type ClientCommon struct { Logger types.Logger Callbacks types.Callbacks // Agent's capabilities defined at Start() time. Capabilities protobufs.AgentCapabilities // Client state storage. This is needed if the Server asks to report the state. ClientSyncedState ClientSyncedState // PackagesStateProvider provides access to the local state of packages. PackagesStateProvider types.PackagesStateProvider // contains filtered or unexported fields }
ClientCommon contains the OpAMP logic that is common between WebSocket and plain HTTP transports.
func NewClientCommon ¶
func NewClientCommon(logger types.Logger, sender Sender) ClientCommon
NewClientCommon creates a new ClientCommon.
func (*ClientCommon) AgentDescription ¶
func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription
AgentDescription returns the current state of the AgentDescription.
func (*ClientCommon) IsStopping ¶
func (c *ClientCommon) IsStopping() bool
IsStopping returns true if Stop() was called.
func (*ClientCommon) PrepareFirstMessage ¶
func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error
PrepareFirstMessage prepares the initial state of NextMessage struct that client sends when it first establishes a connection with the Server.
func (*ClientCommon) PrepareStart ¶
func (c *ClientCommon) PrepareStart( _ context.Context, settings types.StartSettings, ) error
PrepareStart prepares the client state for the next Start() call. It returns an error if the client is already started, or if the settings are invalid.
func (*ClientCommon) RequestConnectionSettings ¶
func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error
func (*ClientCommon) SendCustomMessage ¶
func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
SendCustomMessage sends the specified custom message to the server.
func (*ClientCommon) SetAgentDescription ¶
func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) error
SetAgentDescription sends a status update to the Server with the new AgentDescription and remembers the AgentDescription in the client state so that it can be sent to the Server when the Server asks for it.
func (*ClientCommon) SetCustomCapabilities ¶
func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error
SetCustomCapabilities sends a message to the Server with the new custom capabilities.
func (*ClientCommon) SetHealth ¶
func (c *ClientCommon) SetHealth(health *protobufs.ComponentHealth) error
SetHealth sends a status update to the Server with the new agent health and remembers the health in the client state so that it can be sent to the Server when the Server asks for it.
func (*ClientCommon) SetPackageStatuses ¶
func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error
SetPackageStatuses sends a status update to the Server if the new PackageStatuses are different from the ones we already have in the state. It also remembers the new PackageStatuses in the client state so that it can be sent to the Server when the Server asks for it.
func (*ClientCommon) SetRemoteConfigStatus ¶
func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
SetRemoteConfigStatus sends a status update to the Server if the new RemoteConfigStatus is different from the status we already have in the state. It also remembers the new RemoteConfigStatus in the client state so that it can be sent to the Server when the Server asks for it.
func (*ClientCommon) StartConnectAndRun ¶
func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context))
StartConnectAndRun initiates the connection with the Server and starts the background goroutine that handles the communication unitl client is stopped.
func (*ClientCommon) Stop ¶
func (c *ClientCommon) Stop(ctx context.Context) error
Stop stops the client. It returns an error if the client is not started.
func (*ClientCommon) UpdateEffectiveConfig ¶
func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error
UpdateEffectiveConfig fetches the current local effective config using GetEffectiveConfig callback and sends it to the Server using provided Sender.
type ClientSyncedState ¶
type ClientSyncedState struct {
// contains filtered or unexported fields
}
ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to have access to synchronize to the Server. 5 messages can be stored in this store: AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses and CustomCapabilities.
See OpAMP spec for more details on how status reporting works: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
Note that the EffectiveConfig is subject to the same synchronization logic, however it is not stored in this struct since it can be large, and we do not want to always keep it in memory. To avoid storing it in memory the EffectiveConfig is supposed to be stored by the Agent implementation (e.g. it can be stored on disk) and is fetched via GetEffectiveConfig callback when it is needed by OpAMP client and then it is discarded from memory. See implementation of UpdateEffectiveConfig().
It is safe to call methods of this struct concurrently.
func (*ClientSyncedState) AgentDescription ¶
func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription
func (*ClientSyncedState) CustomCapabilities ¶
func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities
func (*ClientSyncedState) HasCustomCapability ¶
func (s *ClientSyncedState) HasCustomCapability(capability string) bool
HasCustomCapability returns true if the provided capability is in the CustomCapabilities.
func (*ClientSyncedState) Health ¶
func (s *ClientSyncedState) Health() *protobufs.ComponentHealth
func (*ClientSyncedState) PackageStatuses ¶
func (s *ClientSyncedState) PackageStatuses() *protobufs.PackageStatuses
func (*ClientSyncedState) RemoteConfigStatus ¶
func (s *ClientSyncedState) RemoteConfigStatus() *protobufs.RemoteConfigStatus
func (*ClientSyncedState) SetAgentDescription ¶
func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error
SetAgentDescription sets the AgentDescription in the state.
func (*ClientSyncedState) SetCustomCapabilities ¶
func (s *ClientSyncedState) SetCustomCapabilities(capabilities *protobufs.CustomCapabilities) error
SetCustomCapabilities sets the CustomCapabilities in the state.
func (*ClientSyncedState) SetHealth ¶
func (s *ClientSyncedState) SetHealth(health *protobufs.ComponentHealth) error
SetHealth sets the agent health in the state.
func (*ClientSyncedState) SetPackageStatuses ¶
func (s *ClientSyncedState) SetPackageStatuses(status *protobufs.PackageStatuses) error
SetPackageStatuses sets the PackageStatuses in the state.
func (*ClientSyncedState) SetRemoteConfigStatus ¶
func (s *ClientSyncedState) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
SetRemoteConfigStatus sets the RemoteConfigStatus in the state.
type HTTPSender ¶
type HTTPSender struct { SenderCommon // contains filtered or unexported fields }
HTTPSender allows scheduling messages to send. Once run, it will loop through a request/response cycle for each message to send and will process all received responses using a receivedProcessor. If there are no pending messages to send the HTTPSender will wait for the configured polling interval.
func NewHTTPSender ¶
func NewHTTPSender(logger types.Logger) *HTTPSender
NewHTTPSender creates a new Sender that uses HTTP to send messages with default settings.
func (*HTTPSender) AddTLSConfig ¶
func (h *HTTPSender) AddTLSConfig(config *tls.Config)
func (*HTTPSender) EnableCompression ¶
func (h *HTTPSender) EnableCompression()
func (*HTTPSender) Run ¶
func (h *HTTPSender) Run( ctx context.Context, url string, callbacks types.Callbacks, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, )
Run starts the processing loop that will perform the HTTP request/response. When there are no more messages to send Run will suspend until either there is a new message to send or the polling interval elapses. Should not be called concurrently with itself. Can be called concurrently with modifying NextMessage(). Run continues until ctx is cancelled.
func (*HTTPSender) SetPollingInterval ¶
func (h *HTTPSender) SetPollingInterval(duration time.Duration)
SetPollingInterval sets the interval between polling. Has effect starting from the next polling cycle.
func (*HTTPSender) SetRequestHeader ¶
func (h *HTTPSender) SetRequestHeader(header http.Header)
SetRequestHeader sets additional HTTP headers to send with all future requests. Should not be called concurrently with any other method.
type InMemPackagesStore ¶
type InMemPackagesStore struct {
// contains filtered or unexported fields
}
InMemPackagesStore is a package store used for testing. Keeps the packages in memory.
func NewInMemPackagesStore ¶
func NewInMemPackagesStore() *InMemPackagesStore
func (*InMemPackagesStore) AllPackagesHash ¶
func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error)
func (*InMemPackagesStore) CreatePackage ¶
func (l *InMemPackagesStore) CreatePackage(packageName string, typ protobufs.PackageType) error
func (*InMemPackagesStore) DeletePackage ¶
func (l *InMemPackagesStore) DeletePackage(packageName string) error
func (*InMemPackagesStore) FileContentHash ¶
func (l *InMemPackagesStore) FileContentHash(packageName string) ([]byte, error)
func (*InMemPackagesStore) GetContent ¶
func (l *InMemPackagesStore) GetContent() map[string][]byte
func (*InMemPackagesStore) LastReportedStatuses ¶
func (l *InMemPackagesStore) LastReportedStatuses() (*protobufs.PackageStatuses, error)
func (*InMemPackagesStore) PackageState ¶
func (l *InMemPackagesStore) PackageState(packageName string) (state types.PackageState, err error)
func (*InMemPackagesStore) Packages ¶
func (l *InMemPackagesStore) Packages() ([]string, error)
func (*InMemPackagesStore) SetAllPackagesHash ¶
func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error
func (*InMemPackagesStore) SetLastReportedStatuses ¶
func (l *InMemPackagesStore) SetLastReportedStatuses(statuses *protobufs.PackageStatuses) error
func (*InMemPackagesStore) SetPackageState ¶
func (l *InMemPackagesStore) SetPackageState(packageName string, state types.PackageState) error
func (*InMemPackagesStore) UpdateContent ¶
type MockServer ¶
type MockServer struct { Endpoint string OnRequest func(w http.ResponseWriter, r *http.Request) OnConnect func(r *http.Request) OnWSConnect func(conn *websocket.Conn) OnMessage func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent // contains filtered or unexported fields }
func StartMockServer ¶
func StartMockServer(t *testing.T) *MockServer
func StartTLSMockServer ¶
func StartTLSMockServer(t *testing.T) *MockServer
func (*MockServer) Close ¶
func (m *MockServer) Close()
func (*MockServer) EnableCompression ¶
func (m *MockServer) EnableCompression()
func (*MockServer) EnableExpectMode ¶
func (m *MockServer) EnableExpectMode()
EnableExpectMode enables the expect mode that allows using Expect() method to describe what message is expected to be received.
func (*MockServer) EventuallyExpect ¶
func (m *MockServer) EventuallyExpect( msg string, handler func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool), )
EventuallyExpect expects to receive a message and calls the handler for every received message until eventually the handler returns true for the second element of the return tuple. Typically used when we know we expect to receive a particular message but 0 or more other messages may be received before that.
func (*MockServer) Expect ¶
func (m *MockServer) Expect(handler receivedMessageHandler)
Expect defines a handler that will be called when a message is received. Expect must be called when we are certain that the message will be received (if it is not received a "time out" error will be recorded.
func (*MockServer) GetHTTPTestServer ¶
func (m *MockServer) GetHTTPTestServer() *httptest.Server
type NextMessage ¶
type NextMessage struct {
// contains filtered or unexported fields
}
NextMessage encapsulates the next message to be sent and provides a concurrency-safe interface to work with the message.
func NewNextMessage ¶
func NewNextMessage() NextMessage
NewNextMessage returns a new empty NextMessage.
func (*NextMessage) PopPending ¶
func (s *NextMessage) PopPending() *protobufs.AgentToServer
PopPending returns the next message to be sent, if it is pending or nil otherwise. Clears the "pending" flag.
func (*NextMessage) Update ¶
func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) (messageSendingChannel chan struct{})
Update applies the specified modifier function to the next message that will be sent and marks the message as pending to be sent.
The messageSendingChannel returned by this function is closed when the modified message is popped in PopPending before being sent to the server. After this channel is closed, additional calls to Update will be applied to the next message and will return a channel corresponding to that message.
type Sender ¶
type Sender interface { // NextMessage gives access to the next message that will be sent by this Sender. // Can be called concurrently with any other method. NextMessage() *NextMessage // ScheduleSend signals to Sender that the message in NextMessage struct // is now ready to be sent. The Sender should send the NextMessage as soon as possible. // If there is no pending message (e.g. the NextMessage was already sent and // "pending" flag is reset) then no message will be sent. ScheduleSend() // SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. SetInstanceUid(instanceUid types.InstanceUid) error }
Sender is an interface of the sending portion of OpAMP protocol that stores the NextMessage to be sent and can be ordered to send the message.
type SenderCommon ¶
type SenderCommon struct {
// contains filtered or unexported fields
}
SenderCommon is partial Sender implementation that is common between WebSocket and plain HTTP transports. This struct is intended to be embedded in the WebSocket and HTTP Sender implementations.
func NewSenderCommon ¶
func NewSenderCommon() SenderCommon
NewSenderCommon creates a new SenderCommon. This is intended to be used by the WebSocket and HTTP Sender implementations.
func (*SenderCommon) NextMessage ¶
func (h *SenderCommon) NextMessage() *NextMessage
NextMessage gives access to the next message that will be sent by this looper. Can be called concurrently with any other method.
func (*SenderCommon) ScheduleSend ¶
func (h *SenderCommon) ScheduleSend()
ScheduleSend signals to HTTPSender that the message in NextMessage struct is now ready to be sent. If there is no pending message (e.g. the NextMessage was already sent and "pending" flag is reset) then no message will be sent.
func (*SenderCommon) SetInstanceUid ¶
func (h *SenderCommon) SetInstanceUid(instanceUid types.InstanceUid) error
SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. Can be called concurrently, normally is called when a message is received from the Server that instructs us to change our instance UID.
type WSSender ¶
type WSSender struct { SenderCommon // contains filtered or unexported fields }
WSSender implements the WebSocket client's sending portion of OpAMP protocol.
func (*WSSender) IsStopped ¶
func (s *WSSender) IsStopped() <-chan struct{}
IsStopped returns a channel that's closed when the sender is stopped.
func (*WSSender) Start ¶
Start the sender and send the first message that was set via NextMessage().Update() earlier. To stop the WSSender cancel the ctx.
func (*WSSender) StoppingErr ¶
StoppingErr returns an error if there was a problem with stopping the sender. If stopping was successful will return nil. StoppingErr() can be called only after IsStopped() is signalled.