loop

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2024 License: MIT Imports: 58 Imported by: 34

README

LOOP Plugins Go Reference

Local out of process (LOOP) plugins using github.com/hashicorp/go-plugin.

Packages

flowchart
    subgraph plugin-common/pkg
        loop
        internal[loop/internal]
        pb[loop/internal/pb]
        test[loop/internal/test]

        internal --> pb
        test --> internal
        loop --> internal
        loop --> test
    end
    
    grpc[google.golang.org/grpc]
    hashicorp[hashicorp/go-plugin]

    loop ---> hashicorp
    loop ---> grpc
    test ---> grpc
    internal ---> grpc
    pb ---> grpc
    hashicorp --> grpc

package loop

Public API and hashicorp/go-plugin integration.

package test

Testing utilities.

package internal

GRPC client & server implementations.

package pb

Protocol buffer definitions & generated code.

Communication

GRPC client/server pairs are used to communicated between the host and each plugin. Plugins cannot communicate directly with one another, but the host can proxy a connection between them.

Here are the main components for the case of Median:

sequenceDiagram
    autonumber
    participant relayer as Relayer (plugin)
    participant core as Plugin (host)
    participant median as Median (plugin)

    Note over core: KeystoreServer
    core->>+relayer: NewRelayer(Config, Keystore)
    Note over relayer: KeystoreClient
    Note over relayer: RelayerServer
    relayer->>-core: Relayer ID 
    Note over core: RelayerClient

    core->>+relayer: NewMedianProvider(RelayArgs, PluginArgs)
    Note over relayer: MedianProviderServer
    relayer->>-core: MedianProvider ID
    Note over core: MedianProvider (Proxy)

    Note over core:  DataSourceServer
    Note over core:  ErrorLogServer

    core->>+median: NewMedianFactory(MedianProvider, DataSource, ErrorLog)
    Note over median: MedianProviderClient
    Note over median: DataSourceClient
    Note over median: ErrorLogClient
    Note over median: MedianFactoryServer
    median->>-core: MedianFactory ID
    Note over core: MedianFactoryClient

    core->>+median: NewReportingPlugin(ReportingPluginConfig)
    Note over median: ReportingPluginServer
    median->>-core: ReportingPlugin ID
    Note over core: ReportingPluginClient

Note: MedianProvider includes multiple component services on the same connection.

sequenceDiagram
    autonumber
    participant relayer as Relayer (plugin)
    participant core as Plugin (host)
    participant median as Median (plugin)

    core->>+relayer: NewMedianProvider(RelayArgs, PluginArgs)
    Note over relayer: OffchainConfigDigesterServer
    Note over relayer: ContractConfigTrackerServer
    Note over relayer: ContractTransmitterServer
    Note over relayer: ReportCodecServer
    Note over relayer: MedianContractServer
    Note over relayer: OnchainConfigCodecServer
    
    relayer->>-core: MedianProvider ID
    Note over core: MedianProvider (Proxy)
    
    Note over core: OffchainConfigDigesterClient
    Note over core: ContractConfigTrackerClient
    Note over core: ContractTransmitterClient
    
    core->>+median: NewMedianFactory(MedianProvider, DataSource, ErrorLog)
    Note over median: ReportCodecClient
    Note over median: MedianContractClient
    Note over median: OnchainConfigCodecClient

Auto-Recovery

The pluginService type contains reusable automatic recovery code.

type pluginService[P grpcPlugin, S services.Service] struct

Each plugin implements their own interface (Relayer, Median, etc.) with a new type that also embeds a pluginService. This new service type implements the original interface, but internally manages re-starting and re-connecting to the plugin as an implementation detail. So there is one long-lived instance of each plugin service and client, and whenever the plugin process (along with its server) crashes, the service restarts the plugin process and re-establishes the client's connection to the new server.

sequenceDiagram
    participant core as Plugin (host)
    participant relayer as Process 1 (plugin)
    participant relayer2 as Process 2 (plugin)
    
    Note over core: RelayerService
    
    core->>+relayer: exec.Cmd
    core->>relayer: NewRelayerServer
    Note over relayer: RelayerServer
    relayer->>core: RelayerID
    Note over core: RelayerClient.init()
    
    loop
        core->>relayer: Alive?
        relayer->>core: Yes
    end
    core->>relayer: Alive?
    relayer->>-core: No

    core->>+relayer2: exec.Cmd
    core->>relayer2: NewRelayerServer
    Note over relayer2: RelayerServer
    relayer2->>core: RelayerID
    Note over core: RelayerClient.update()

    loop
        core->>relayer2: Alive?
        relayer2->>-core: Yes
    end

Documentation

Index

Constants

View Source
const CCIPExecutionLOOPName = "ccip_execution"

CCIPExecutionLOOPName is the name for types.CCIPExecutionFactoryGenerator/[NewExecutionLOOP].

View Source
const PluginMedianName = "median"

PluginMedianName is the name for types.PluginMedian/[NewGRPCPluginMedian].

View Source
const PluginMercuryName = "mercury"

PluginMercurynName is the name for types.PluginMercury/[NewGRPCPluginMercury].

View Source
const PluginRelayerName = "relayer"

PluginRelayerName is the name for types.PluginRelayer/[NewGRPCPluginRelayer].

View Source
const PluginStandardCapabilitiesName = "standardcapabilities"

Variables

View Source
var ErrPluginUnavailable = goplugin.ErrPluginUnavailable

Functions

func HCLogLogger

func HCLogLogger(l logger.Logger) hclog.Logger

HCLogLogger returns an hclog.Logger backed by the given logger.Logger.

func ManagedGRPCClientConfig

func ManagedGRPCClientConfig(clientConfig *plugin.ClientConfig, c BrokerConfig) *plugin.ClientConfig

ManagedGRPCClientConfig return a Managed plugin and set grpc config values from the BrokerConfig. Note: managed plugins shutdown when the parent process exits. We may want to change this behavior in the future to enable host process restarts without restarting the plugin. To do that we would also need supply the appropriate ReattachConfig to the plugin.ClientConfig.

func NewLogger

func NewLogger() (logger.Logger, error)

NewLogger returns a new logger.Logger configured to encode hclog compatible JSON.

func PluginCCIPExecutionHandshakeConfig

func PluginCCIPExecutionHandshakeConfig() plugin.HandshakeConfig

func PluginMedianHandshakeConfig

func PluginMedianHandshakeConfig() plugin.HandshakeConfig

func PluginMercuryHandshakeConfig

func PluginMercuryHandshakeConfig() plugin.HandshakeConfig

func PluginRelayerHandshakeConfig

func PluginRelayerHandshakeConfig() plugin.HandshakeConfig

func SetupTracing

func SetupTracing(config TracingConfig) (err error)

SetupTracing initializes open telemetry with the provided config. It sets the global trace provider and opens a connection to the configured collector.

func StandardCapabilitiesHandshakeConfig

func StandardCapabilitiesHandshakeConfig() plugin.HandshakeConfig

Types

type BrokerConfig

type BrokerConfig = net.BrokerConfig

type ContextValues

type ContextValues struct {
	JobID   any
	JobName any

	ContractID    any
	FeedID        any
	TransmitterID any
}

ContextValues is a helper for passing values via a context.Context.

func (*ContextValues) Args

func (v *ContextValues) Args() (a []any)

Args returns a slice of args to pass to logger.Logger.With.

func (*ContextValues) ContextWithValues

func (v *ContextValues) ContextWithValues(ctx context.Context) context.Context

ContextWithValues returns a context.Context with values set from v.

func (*ContextValues) SetValues

func (v *ContextValues) SetValues(ctx context.Context)

SetValues sets v to values from the ctx.

type EnvConfig

type EnvConfig struct {
	DatabaseURL *url.URL

	PrometheusPort int

	TracingEnabled         bool
	TracingCollectorTarget string
	TracingSamplingRatio   float64
	TracingTLSCertPath     string
	TracingAttributes      map[string]string
}

EnvConfig is the configuration between the application and the LOOP executable. The values are fully resolved and static and passed via the environment.

func (*EnvConfig) AsCmdEnv

func (e *EnvConfig) AsCmdEnv() (env []string)

AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.

type ErrorLog

type ErrorLog = core.ErrorLog

Deprecated

type ExecutionFactoryService

type ExecutionFactoryService struct {
	goplugin.PluginService[*ExecutionLoop, types.ReportingPluginFactory]
}

ExecutionFactoryService is a types.Service that maintains an internal types.CCIPExecutionFactoryGenerator.

func NewExecutionService

func NewExecutionService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.CCIPExecProvider) *ExecutionFactoryService

NewExecutionService returns a new *ExecutionFactoryService. cmd must return a new exec.Cmd each time it is called.

type ExecutionLoop

type ExecutionLoop struct {
	plugin.NetRPCUnsupportedPlugin

	BrokerConfig

	PluginServer types.CCIPExecutionFactoryGenerator
	// contains filtered or unexported fields
}

func (*ExecutionLoop) ClientConfig

func (p *ExecutionLoop) ClientConfig() *plugin.ClientConfig

func (*ExecutionLoop) GRPCClient

func (p *ExecutionLoop) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

GRPCClient implements plugin.GRPCPlugin and returns the pluginClient types.CCIPExecutionFactoryGenerator, updated with the new broker and conn.

func (*ExecutionLoop) GRPCServer

func (p *ExecutionLoop) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type GRPCOpts

type GRPCOpts = loopnet.GRPCOpts

func NewGRPCOpts

func NewGRPCOpts(registerer prometheus.Registerer) GRPCOpts

NewGRPCOpts initializes open telemetry and returns GRPCOpts with telemetry interceptors. It is called from the host and each plugin - intended as there is bidirectional communication

type GRPCPluginMedian

type GRPCPluginMedian struct {
	plugin.NetRPCUnsupportedPlugin

	BrokerConfig

	PluginServer core.PluginMedian
	// contains filtered or unexported fields
}

func (*GRPCPluginMedian) ClientConfig

func (p *GRPCPluginMedian) ClientConfig() *plugin.ClientConfig

func (*GRPCPluginMedian) GRPCClient

func (p *GRPCPluginMedian) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

GRPCClient implements plugin.GRPCPlugin and returns the pluginClient types.PluginMedian, updated with the new broker and conn.

func (*GRPCPluginMedian) GRPCServer

func (p *GRPCPluginMedian) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type GRPCPluginMercury

type GRPCPluginMercury struct {
	plugin.NetRPCUnsupportedPlugin

	BrokerConfig

	PluginServer types.PluginMercury
	// contains filtered or unexported fields
}

func (*GRPCPluginMercury) ClientConfig

func (p *GRPCPluginMercury) ClientConfig() *plugin.ClientConfig

func (*GRPCPluginMercury) GRPCClient

func (p *GRPCPluginMercury) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

GRPCClient implements plugin.GRPCPlugin and returns the pluginClient types.PluginMercury, updated with the new broker and conn.

func (*GRPCPluginMercury) GRPCServer

func (p *GRPCPluginMercury) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type GRPCPluginRelayer

type GRPCPluginRelayer struct {
	plugin.NetRPCUnsupportedPlugin

	BrokerConfig

	PluginServer PluginRelayer
	// contains filtered or unexported fields
}

GRPCPluginRelayer implements plugin.GRPCPlugin for types.PluginRelayer.

func (*GRPCPluginRelayer) ClientConfig

func (p *GRPCPluginRelayer) ClientConfig() *plugin.ClientConfig

func (*GRPCPluginRelayer) GRPCClient

func (p *GRPCPluginRelayer) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

func (*GRPCPluginRelayer) GRPCServer

func (p *GRPCPluginRelayer) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type Keystore

type Keystore = core.Keystore

Deprecated

type LogMessageExtraArgs

type LogMessageExtraArgs struct {
	Key   string      `json:"key"`
	Value interface{} `json:"value"`
}

LogMessageExtraArgs is a key value pair within the Output payload

type MedianService

MedianService is a types.Service that maintains an internal types.PluginMedian.

func NewMedianService

func NewMedianService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MedianProvider, dataSource, juelsPerFeeCoin, gasPriceSubunits median.DataSource, errorLog core.ErrorLog) *MedianService

NewMedianService returns a new *MedianService. cmd must return a new exec.Cmd each time it is called.

type MercuryV1Service

type MercuryV1Service struct {
	goplugin.PluginService[*GRPCPluginMercury, types.MercuryPluginFactory]
}

MercuryV1Service is a types.Service that maintains an internal types.PluginMedian.

func NewMercuryV1Service

func NewMercuryV1Service(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MercuryProvider, dataSource mercury_v1_types.DataSource) *MercuryV1Service

NewMercuryV1Service returns a new *MercuryV1Service. cmd must return a new exec.Cmd each time it is called.

type MercuryV2Service

type MercuryV2Service struct {
	goplugin.PluginService[*GRPCPluginMercury, types.MercuryPluginFactory]
}

MercuryV2Service is a types.Service that maintains an internal types.PluginMedian.

func NewMercuryV2Service

func NewMercuryV2Service(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MercuryProvider, dataSource mercury_v2_types.DataSource) *MercuryV2Service

NewMercuryV2Service returns a new *MercuryV2Service. cmd must return a new exec.Cmd each time it is called.

type MercuryV3Service

type MercuryV3Service struct {
	goplugin.PluginService[*GRPCPluginMercury, types.MercuryPluginFactory]
}

MercuryV3Service is a types.Service that maintains an internal types.PluginMedian.

func NewMercuryV3Service

func NewMercuryV3Service(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MercuryProvider, dataSource mercury_v3_types.DataSource) *MercuryV3Service

NewMercuryV3Service returns a new *MercuryV3Service. cmd must return a new exec.Cmd each time it is called.

type MercuryV4Service

type MercuryV4Service struct {
	goplugin.PluginService[*GRPCPluginMercury, types.MercuryPluginFactory]
}

MercuryV3Service is a types.Service that maintains an internal types.PluginMedian.

func NewMercuryV4Service

func NewMercuryV4Service(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MercuryProvider, dataSource mercury_v4_types.DataSource) *MercuryV4Service

NewMercuryV3Service returns a new *MercuryV3Service. cmd must return a new exec.Cmd each time it is called.

type Plugin

type Plugin struct {
	Logger logger.Logger
	// contains filtered or unexported fields
}

Plugin is a base layer for plugins to easily manage sub-[types.Service]s. Useful for implementing PluginRelayer and PluginMedian.

func (*Plugin) Close

func (p *Plugin) Close() (err error)

func (*Plugin) HealthReport

func (p *Plugin) HealthReport() map[string]error

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Ready

func (p *Plugin) Ready() error

func (*Plugin) Start

func (p *Plugin) Start(ctx context.Context) error

func (*Plugin) SubService

func (p *Plugin) SubService(s services.Service)

type PluginMedian

type PluginMedian = core.PluginMedian

Deprecated

type PluginRelayer

type PluginRelayer = looptypes.PluginRelayer

type PromServer

type PromServer struct {
	// contains filtered or unexported fields
}

func NewPromServer

func NewPromServer(port int, lggr logger.Logger) *PromServer

func (*PromServer) Close

func (p *PromServer) Close() error

Close shuts down the underlying HTTP server. See http.Server.Close for details

func (*PromServer) Name

func (p *PromServer) Name() string

Name of the server

func (*PromServer) Start

func (p *PromServer) Start() error

Start starts HTTP server on specified port to handle metrics requests

type PromServerOpts

type PromServerOpts struct {
	Handler http.Handler
}

func (PromServerOpts) New

func (o PromServerOpts) New(port int, lggr logger.Logger) *PromServer

type ProviderServer

type ProviderServer interface {
	Start(context.Context) error
	Close() error
	GetConn() (grpc.ClientConnInterface, error)
}

type Relayer

type Relayer = looptypes.Relayer

type RelayerAdapter added in v0.2.0

type RelayerAdapter = relay.RelayerAdapter

TODO temporary alias to allow 2 step migration of solana and starknet repositories

type RelayerService

type RelayerService struct {
	goplugin.PluginService[*GRPCPluginRelayer, Relayer]
}

RelayerService is a types.Service that maintains an internal Relayer.

func NewRelayerService

func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) *RelayerService

NewRelayerService returns a new *RelayerService. cmd must return a new exec.Cmd each time it is called.

func (*RelayerService) GetChainStatus

func (r *RelayerService) GetChainStatus(ctx context.Context) (types.ChainStatus, error)

func (*RelayerService) ListNodeStatuses

func (r *RelayerService) ListNodeStatuses(ctx context.Context, pageSize int32, pageToken string) (nodes []types.NodeStatus, nextPageToken string, total int, err error)

func (*RelayerService) NewChainWriter

func (r *RelayerService) NewChainWriter(ctx context.Context, chainWriterConfig []byte) (types.ChainWriter, error)

func (*RelayerService) NewConfigProvider

func (r *RelayerService) NewConfigProvider(ctx context.Context, args types.RelayArgs) (types.ConfigProvider, error)

func (*RelayerService) NewContractReader

func (r *RelayerService) NewContractReader(ctx context.Context, contractReaderConfig []byte) (types.ContractReader, error)

func (*RelayerService) NewLLOProvider

func (r *RelayerService) NewLLOProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.LLOProvider, error)

func (*RelayerService) NewPluginProvider

func (r *RelayerService) NewPluginProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.PluginProvider, error)

func (*RelayerService) Transact

func (r *RelayerService) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error

type ReportingPluginFactory

type ReportingPluginFactory = types.ReportingPluginFactory

Deprecated

type Server

type Server struct {
	GRPCOpts GRPCOpts
	Logger   logger.SugaredLogger
	// contains filtered or unexported fields
}

Server holds common plugin server fields.

func MustNewStartedServer

func MustNewStartedServer(loggerName string) *Server

MustNewStartedServer returns a new started Server like NewStartedServer, but logs and exits in the event of error. The caller is responsible for calling Server.Stop().

func NewStartedServer

func NewStartedServer(loggerName string) (*Server, error)

NewStartedServer returns a started Server. The caller is responsible for calling Server.Stop().

func (*Server) MustRegister

func (s *Server) MustRegister(c services.HealthReporter)

MustRegister registers the HealthReporter with services.HealthChecker, or exits upon failure.

func (*Server) Register

func (s *Server) Register(c services.HealthReporter) error

func (*Server) Stop

func (s *Server) Stop()

Stop closes resources and flushes logs.

type StandardCapabilities

type StandardCapabilities interface {
	services.Service
	Initialise(ctx context.Context, config string, telemetryService core.TelemetryService, store core.KeyValueStore,
		capabilityRegistry core.CapabilitiesRegistry, errorLog core.ErrorLog,
		pipelineRunner core.PipelineRunnerService, relayerSet core.RelayerSet) error
	Infos(ctx context.Context) ([]capabilities.CapabilityInfo, error)
}

type StandardCapabilitiesLoop

type StandardCapabilitiesLoop struct {
	Logger logger.Logger
	plugin.NetRPCUnsupportedPlugin
	BrokerConfig
	PluginServer StandardCapabilities
	// contains filtered or unexported fields
}

func (*StandardCapabilitiesLoop) ClientConfig

func (p *StandardCapabilitiesLoop) ClientConfig() *plugin.ClientConfig

func (*StandardCapabilitiesLoop) GRPCClient

func (p *StandardCapabilitiesLoop) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

func (*StandardCapabilitiesLoop) GRPCServer

func (p *StandardCapabilitiesLoop) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type StandardCapabilitiesService

type StandardCapabilitiesService struct {
	goplugin.PluginService[*StandardCapabilitiesLoop, StandardCapabilities]
}

func NewStandardCapabilitiesService

func NewStandardCapabilitiesService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd) *StandardCapabilitiesService

type TracingConfig

type TracingConfig struct {
	// NodeAttributes are the attributes to attach to traces.
	NodeAttributes map[string]string

	// Enables tracing; requires a collector to be provided
	Enabled bool

	// Collector is the address of the OTEL collector to send traces to.
	CollectorTarget string

	// SamplingRatio is the ratio of traces to sample. 1.0 means sample all traces.
	SamplingRatio float64

	// TLSCertPath is the path to the TLS certificate to use when connecting to the collector.
	TLSCertPath string

	// OnDialError is called when the dialer fails, providing an opportunity to log.
	OnDialError func(error)
}

Directories

Path Synopsis
adapters
internal
net
pb
pb/mercury/v2
NOTE: the relative paths in the proto_path are to ensure we find common utilities, like BigInt
NOTE: the relative paths in the proto_path are to ensure we find common utilities, like BigInt

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL