Documentation
¶
Index ¶
- Constants
- Variables
- func NewPackagesSyncer(logger types.Logger, available *protobufs.PackagesAvailable, sender Sender, ...) *packagesSyncer
- func NewWSReceiver(logger types.Logger, callbacks types.Callbacks, conn *websocket.Conn, ...) *wsReceiver
- type ClientCommon
- func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription
- func (c *ClientCommon) IsStopping() bool
- func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error
- func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSettings) error
- func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) error
- func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error
- func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
- func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context))
- func (c *ClientCommon) Stop(ctx context.Context) error
- func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error
- type ClientSyncedState
- func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription
- func (s *ClientSyncedState) PackageStatuses() *protobufs.PackageStatuses
- func (s *ClientSyncedState) RemoteConfigStatus() *protobufs.RemoteConfigStatus
- func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error
- func (s *ClientSyncedState) SetPackageStatuses(status *protobufs.PackageStatuses) error
- func (s *ClientSyncedState) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
- type HTTPSender
- type InMemPackagesStore
- func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error)
- func (l *InMemPackagesStore) CreatePackage(packageName string, typ protobufs.PackageAvailable_PackageType) error
- func (l *InMemPackagesStore) DeletePackage(packageName string) error
- func (l *InMemPackagesStore) FileContentHash(packageName string) ([]byte, error)
- func (l *InMemPackagesStore) GetContent() map[string][]byte
- func (l *InMemPackagesStore) LastReportedStatuses() (*protobufs.PackageStatuses, error)
- func (l *InMemPackagesStore) PackageState(packageName string) (state types.PackageState, err error)
- func (l *InMemPackagesStore) Packages() ([]string, error)
- func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error
- func (l *InMemPackagesStore) SetLastReportedStatuses(statuses *protobufs.PackageStatuses) error
- func (l *InMemPackagesStore) SetPackageState(packageName string, state types.PackageState) error
- func (l *InMemPackagesStore) UpdateContent(_ context.Context, packageName string, data io.Reader, contentHash []byte) error
- type MockServer
- type NextMessage
- type Sender
- type SenderCommon
- type WSSender
Constants ¶
const OpAMPPlainHTTPMethod = "POST"
Variables ¶
var ( ErrAgentDescriptionMissing = errors.New("AgentDescription is nil") ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined") )
Functions ¶
func NewPackagesSyncer ¶
func NewPackagesSyncer( logger types.Logger, available *protobufs.PackagesAvailable, sender Sender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, ) *packagesSyncer
func NewWSReceiver ¶
func NewWSReceiver( logger types.Logger, callbacks types.Callbacks, conn *websocket.Conn, sender *WSSender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, ) *wsReceiver
Types ¶
type ClientCommon ¶
type ClientCommon struct { Logger types.Logger Callbacks types.Callbacks // Client state storage. This is needed if the Server asks to report the state. ClientSyncedState ClientSyncedState // PackagesStateProvider provides access to the local state of packages. PackagesStateProvider types.PackagesStateProvider // contains filtered or unexported fields }
ClientCommon contains the OpAMP logic that is common between WebSocket and plain HTTP transports.
func NewClientCommon ¶
func NewClientCommon(logger types.Logger, sender Sender) ClientCommon
func (*ClientCommon) AgentDescription ¶
func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription
AgentDescription returns the current state of the AgentDescription.
func (*ClientCommon) IsStopping ¶
func (c *ClientCommon) IsStopping() bool
func (*ClientCommon) PrepareFirstMessage ¶
func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error
PrepareFirstMessage prepares the initial state of NextMessage struct that client sends when it first establishes a connection with the Server.
func (*ClientCommon) PrepareStart ¶
func (c *ClientCommon) PrepareStart( _ context.Context, settings types.StartSettings, ) error
func (*ClientCommon) SetAgentDescription ¶
func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) error
SetAgentDescription sends a status update to the Server with the new AgentDescription and remembers the AgentDescription in the client state so that it can be sent to the Server when the Server asks for it.
func (*ClientCommon) SetPackageStatuses ¶
func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error
func (*ClientCommon) SetRemoteConfigStatus ¶
func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
func (*ClientCommon) StartConnectAndRun ¶
func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context))
func (*ClientCommon) UpdateEffectiveConfig ¶
func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error
UpdateEffectiveConfig fetches the current local effective config using GetEffectiveConfig callback and sends it to the Server using provided Sender.
type ClientSyncedState ¶
type ClientSyncedState struct {
// contains filtered or unexported fields
}
ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to have access to synchronize to the Server. 3 messages can be stored in this store: AgentDescription, RemoteConfigStatus and PackageStatuses.
See OpAMP spec for more details on how state synchronization works: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#Agent-to-Server-state-synchronization
Note that the EffectiveConfig is subject to the same synchronization logic, however it is not stored in this struct since it can be large, and we do not want to always keep it in memory. To avoid storing it in memory the EffectiveConfig is supposed to be stored by the Agent implementation (e.g. it can be stored on disk) and is fetched via GetEffectiveConfig callback when it is needed by OpAMP client and then it is discarded from memory. See implementation of UpdateEffectiveConfig().
It is safe to call methods of this struct concurrently.
func (*ClientSyncedState) AgentDescription ¶
func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription
func (*ClientSyncedState) PackageStatuses ¶
func (s *ClientSyncedState) PackageStatuses() *protobufs.PackageStatuses
func (*ClientSyncedState) RemoteConfigStatus ¶
func (s *ClientSyncedState) RemoteConfigStatus() *protobufs.RemoteConfigStatus
func (*ClientSyncedState) SetAgentDescription ¶
func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error
SetAgentDescription sets the AgentDescription in the state.
func (*ClientSyncedState) SetPackageStatuses ¶
func (s *ClientSyncedState) SetPackageStatuses(status *protobufs.PackageStatuses) error
SetPackageStatuses sets the PackageStatuses in the state.
func (*ClientSyncedState) SetRemoteConfigStatus ¶
func (s *ClientSyncedState) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error
SetRemoteConfigStatus sets the RemoteConfigStatus in the state.
type HTTPSender ¶
type HTTPSender struct { SenderCommon // contains filtered or unexported fields }
HTTPSender allows scheduling messages to send. Once run, it will loop through a request/response cycle for each message to send and will process all received responses using a receivedProcessor. If there are no pending messages to send the HTTPSender will wait for the configured polling interval.
func NewHTTPSender ¶
func NewHTTPSender(logger types.Logger) *HTTPSender
func (*HTTPSender) Run ¶
func (h *HTTPSender) Run( ctx context.Context, url string, callbacks types.Callbacks, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, )
Run starts the processing loop that will perform the HTTP request/response. When there are no more messages to send Run will suspend until either there is a new message to send or the polling interval elapses. Should not be called concurrently with itself. Can be called concurrently with modifying NextMessage(). Run continues until ctx is cancelled.
func (*HTTPSender) SetPollingInterval ¶
func (h *HTTPSender) SetPollingInterval(duration time.Duration)
SetPollingInterval sets the interval between polling. Has effect starting from the next polling cycle.
func (*HTTPSender) SetRequestHeader ¶
func (h *HTTPSender) SetRequestHeader(header http.Header)
SetRequestHeader sets additional HTTP headers to send with all future requests. Should not be called concurrently with any other method.
type InMemPackagesStore ¶
type InMemPackagesStore struct {
// contains filtered or unexported fields
}
InMemPackagesStore is a package store used for testing. Keeps the packages in memory.
func NewInMemPackagesStore ¶
func NewInMemPackagesStore() *InMemPackagesStore
func (*InMemPackagesStore) AllPackagesHash ¶
func (l *InMemPackagesStore) AllPackagesHash() ([]byte, error)
func (*InMemPackagesStore) CreatePackage ¶
func (l *InMemPackagesStore) CreatePackage(packageName string, typ protobufs.PackageAvailable_PackageType) error
func (*InMemPackagesStore) DeletePackage ¶
func (l *InMemPackagesStore) DeletePackage(packageName string) error
func (*InMemPackagesStore) FileContentHash ¶
func (l *InMemPackagesStore) FileContentHash(packageName string) ([]byte, error)
func (*InMemPackagesStore) GetContent ¶
func (l *InMemPackagesStore) GetContent() map[string][]byte
func (*InMemPackagesStore) LastReportedStatuses ¶
func (l *InMemPackagesStore) LastReportedStatuses() (*protobufs.PackageStatuses, error)
func (*InMemPackagesStore) PackageState ¶
func (l *InMemPackagesStore) PackageState(packageName string) (state types.PackageState, err error)
func (*InMemPackagesStore) Packages ¶
func (l *InMemPackagesStore) Packages() ([]string, error)
func (*InMemPackagesStore) SetAllPackagesHash ¶
func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error
func (*InMemPackagesStore) SetLastReportedStatuses ¶
func (l *InMemPackagesStore) SetLastReportedStatuses(statuses *protobufs.PackageStatuses) error
func (*InMemPackagesStore) SetPackageState ¶
func (l *InMemPackagesStore) SetPackageState(packageName string, state types.PackageState) error
func (*InMemPackagesStore) UpdateContent ¶
type MockServer ¶
type MockServer struct { Endpoint string OnRequest func(w http.ResponseWriter, r *http.Request) OnConnect func(r *http.Request) OnWSConnect func(conn *websocket.Conn) OnMessage func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent // contains filtered or unexported fields }
func StartMockServer ¶
func StartMockServer(t *testing.T) *MockServer
func (*MockServer) Close ¶
func (m *MockServer) Close()
func (*MockServer) EnableExpectMode ¶
func (m *MockServer) EnableExpectMode()
EnableExpectMode enables the expect mode that allows using Expect() method to describe what message is expected to be received.
func (*MockServer) EventuallyExpect ¶
func (m *MockServer) EventuallyExpect( msg string, handler func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool), )
EventuallyExpect expects to receive a message and calls the handler for every received message until eventually the handler returns true for the second element of the return tuple. Typically used when we know we expect to receive a particular message but 0 or more other messages may be received before that.
func (*MockServer) Expect ¶
func (m *MockServer) Expect(handler receivedMessageHandler)
Expect defines a handler that will be called when a message is received. Expect must be called when we are certain that the message will be received (if it is not received a "time out" error will be recorded.
type NextMessage ¶
type NextMessage struct {
// contains filtered or unexported fields
}
NextMessage encapsulates the next message to be sent and provides a concurrency-safe interface to work with the message.
func NewNextMessage ¶
func NewNextMessage() NextMessage
func (*NextMessage) PopPending ¶
func (s *NextMessage) PopPending() *protobufs.AgentToServer
PopPending returns the next message to be sent, if it is pending or nil otherwise. Clears the "pending" flag.
func (*NextMessage) Update ¶
func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer))
Update applies the specified modifier function to the next message that will be sent and marks the message as pending to be sent.
type Sender ¶
type Sender interface { // NextMessage gives access to the next message that will be sent by this Sender. // Can be called concurrently with any other method. NextMessage() *NextMessage // ScheduleSend signals to Sender that the message in NextMessage struct // is now ready to be sent. The Sender should send the NextMessage as soon as possible. // If there is no pending message (e.g. the NextMessage was already sent and // "pending" flag is reset) then no message will be sent. ScheduleSend() // SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. SetInstanceUid(instanceUid string) error }
Sender is an interface of the sending portion of OpAMP protocol that stores the NextMessage to be sent and can be ordered to send the message.
type SenderCommon ¶
type SenderCommon struct {
// contains filtered or unexported fields
}
SenderCommon is partial Sender implementation that is common WebSocket and plain HTTP transports. This struct is intended to be embedded in the WebSocket and HTTP Sender implementations.
func NewSenderCommon ¶
func NewSenderCommon() SenderCommon
func (*SenderCommon) NextMessage ¶
func (h *SenderCommon) NextMessage() *NextMessage
NextMessage gives access to the next message that will be sent by this looper. Can be called concurrently with any other method.
func (*SenderCommon) ScheduleSend ¶
func (h *SenderCommon) ScheduleSend()
ScheduleSend signals to HTTPSender that the message in NextMessage struct is now ready to be sent. If there is no pending message (e.g. the NextMessage was already sent and "pending" flag is reset) then no message will be sent.
func (*SenderCommon) SetInstanceUid ¶
func (h *SenderCommon) SetInstanceUid(instanceUid string) error
SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. Can be called concurrently, normally is called when a message is received from the Server that instructs us to change our instance UID.
type WSSender ¶
type WSSender struct { SenderCommon // contains filtered or unexported fields }
WSSender implements the WebSocket client's sending portion of OpAMP protocol.
func (*WSSender) Start ¶
Start the sender and send the first message that was set via NextMessage().Update() earlier. To stop the WSSender cancel the ctx.
func (*WSSender) WaitToStop ¶
func (s *WSSender) WaitToStop()
WaitToStop blocks until the sender is stopped. To stop the sender cancel the context that was passed to Start().