Documentation ¶
Overview ¶
The plugin package provides the functionality to both expose a Elasticfeed plugin binary and to connect to an existing Elasticfeed plugin binary.
Elasticfeed supports plugins in the form of self-contained external static Go binaries. These binaries behave in a certain way (enforced by this package) and are connected to in a certain way (also enforced by this package).
Index ¶
- Constants
- Variables
- func CleanupClients()
- func NewPluginManager(engine emodel.Elasticfeed) emodel.PluginManager
- type ArtifactRpcServer
- func (s *ArtifactRpcServer) BuilderId(args *interface{}, reply *string) error
- func (s *ArtifactRpcServer) Destroy(args *interface{}, reply *error) error
- func (s *ArtifactRpcServer) Files(args *interface{}, reply *[]string) error
- func (s *ArtifactRpcServer) Id(args *interface{}, reply *string) error
- func (s *ArtifactRpcServer) State(name string, reply *interface{}) error
- func (s *ArtifactRpcServer) String(args *interface{}, reply *string) error
- type BasicError
- type CacheRLockResponse
- type CacheRpcServer
- type Client
- type ClientConfig
- type IndexerPrepareArgs
- type IndexerPrepareResponse
- type IndexerRpcServer
- type PipelinePrepareArgs
- type PipelinePrepareResponse
- type PipelineRpcServer
- type Plugin
- type PluginManager
- func (c *PluginManager) Discover() error
- func (this *PluginManager) FindPlugin(name string, profiler *model.Profiler) *interface{}
- func (c *PluginManager) LoadIndexer(name string) (model.Indexer, error)
- func (c *PluginManager) LoadPipeline(name string) (model.Pipeline, error)
- func (this *PluginManager) RunPlugin(p Plugin) (err error)
- type RpcArtifact
- type RpcCache
- type RpcClient
- type RpcIndexer
- type RpcPipeline
- type RpcServer
Constants ¶
const ( DefaultIndexerEndpoint string = "Indexer" DefaultPipelineEndpoint string = "Pipeline" DefaultArtifactEndpoint string = "Artifact" )
const APIVersion = "4"
The APIVersion is outputted along with the RPC address. The plugin client validates this API version and will show an error if it doesn't know how to speak it.
const MagicCookieKey = "ELASTICFEED_PLUGIN_MAGIC_COOKIE"
const MagicCookieValue = "LQhHwrfdFtcZCudDzaQK8xkipGN3yqc3htghipXmJsakNRV9kwP]dPGLWuh"
Variables ¶
var Interrupts int32 = 0
This is a count of the number of interrupts the process has received. This is updated with sync/atomic whenever a SIGINT is received and can be checked by the plugin safely to take action.
var Killed = false
If this is true, then the "unexpected EOF" panic will not be raised throughout the clients.
Functions ¶
func CleanupClients ¶
func CleanupClients()
This makes sure all the managed subprocesses are killed and properly logged. This should be called before the parent process running the plugins exits.
This must only be called _once_.
func NewPluginManager ¶
func NewPluginManager(engine emodel.Elasticfeed) emodel.PluginManager
Types ¶
type ArtifactRpcServer ¶
type ArtifactRpcServer struct {
// contains filtered or unexported fields
}
ArtifactRpcServer wraps a packer.Artifact implementation and makes it exportable as part of a Golang RPC server.
func (*ArtifactRpcServer) BuilderId ¶
func (s *ArtifactRpcServer) BuilderId(args *interface{}, reply *string) error
func (*ArtifactRpcServer) Destroy ¶
func (s *ArtifactRpcServer) Destroy(args *interface{}, reply *error) error
func (*ArtifactRpcServer) Files ¶
func (s *ArtifactRpcServer) Files(args *interface{}, reply *[]string) error
func (*ArtifactRpcServer) Id ¶
func (s *ArtifactRpcServer) Id(args *interface{}, reply *string) error
func (*ArtifactRpcServer) State ¶
func (s *ArtifactRpcServer) State(name string, reply *interface{}) error
func (*ArtifactRpcServer) String ¶
func (s *ArtifactRpcServer) String(args *interface{}, reply *string) error
type BasicError ¶
type BasicError struct {
Message string
}
This is a type that wraps error types so that they can be messaged across RPC channels. Since "error" is an interface, we can't always gob-encode the underlying structure. This is a valid error interface implementer that we will push across.
func NewBasicError ¶
func NewBasicError(err error) *BasicError
func (*BasicError) Error ¶
func (e *BasicError) Error() string
type CacheRLockResponse ¶
type CacheRpcServer ¶
type CacheRpcServer struct {
// contains filtered or unexported fields
}
CacheRpcServer wraps a packer.Cache implementation and makes it exportable as part of a Golang RPC server.
func (*CacheRpcServer) RLock ¶
func (c *CacheRpcServer) RLock(key string, result *CacheRLockResponse) error
func (*CacheRpcServer) RUnlock ¶
func (c *CacheRpcServer) RUnlock(key string, result *interface{}) error
func (*CacheRpcServer) Unlock ¶
func (c *CacheRpcServer) Unlock(key string, result *interface{}) error
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client handles the lifecycle of a plugin application, determining its RPC address, and returning various types of packer interface implementations across the multi-process communication layer.
func NewClient ¶
func NewClient(config *ClientConfig) (c *Client)
Creates a new plugin client which manages the lifecycle of an external plugin and gets the address for the RPC connection.
The client must be cleaned up at some point by calling Kill(). If the client is a managed client (created with NewManagedClient) you can just call CleanupClients at the end of your program and they will be properly cleaned.
func (*Client) Indexer ¶
Returns a indexer implementation that is communicating over this client. If the client hasn't been started, this will start it.
func (*Client) Kill ¶
func (c *Client) Kill()
End the executing subprocess (if it is running) and perform any cleanup tasks necessary such as capturing any remaining logs and so on.
This method blocks until the process successfully exits.
This method can safely be called multiple times.
func (*Client) Pipeline ¶
Returns a pipeline implementation that is communicating over this client. If the client hasn't been started, this will start it.
func (*Client) Start ¶
Starts the underlying subprocess, communicating with it to negotiate a port for RPC connections, and returning the address to connect via RPC.
This method is safe to call multiple times. Subsequent calls have no effect. Once a client has been started once, it cannot be started again, even if it was killed.
type ClientConfig ¶
type ClientConfig struct { // The unstarted subprocess for starting the plugin. Cmd *exec.Cmd // Managed represents if the client should be managed by the // plugin package or not. If true, then by calling CleanupClients, // it will automatically be cleaned up. Otherwise, the client // user is fully responsible for making sure to Kill all plugin // clients. By default the client is _not_ managed. Managed bool // The minimum and maximum port to use for communicating with // the subprocess. If not set, this defaults to 10,000 and 25,000 // respectively. MinPort, MaxPort uint // StartTimeout is the timeout to wait for the plugin to say it // has started successfully. StartTimeout time.Duration // If non-nil, then the stderr of the client will be written to here // (as well as the log). Stderr io.Writer }
ClientConfig is the configuration used to initialize a new plugin client. After being used to initialize a plugin client, that configuration must not be modified again.
type IndexerPrepareArgs ¶
type IndexerPrepareArgs struct {
Configs []interface{}
}
type IndexerPrepareResponse ¶
type IndexerPrepareResponse struct { Warnings []string Error *BasicError }
type IndexerRpcServer ¶
type IndexerRpcServer struct {
// contains filtered or unexported fields
}
IndexerRpcServer wraps a Indexer implementation and makes it exportable as part of a Golang RPC server.
func (*IndexerRpcServer) Cancel ¶
func (b *IndexerRpcServer) Cancel(args *interface{}, reply *interface{}) error
func (*IndexerRpcServer) Prepare ¶
func (b *IndexerRpcServer) Prepare(args *IndexerPrepareArgs, reply *IndexerPrepareResponse) error
type PipelinePrepareArgs ¶
type PipelinePrepareArgs struct {
Data interface{}
}
type PipelinePrepareResponse ¶
type PipelinePrepareResponse struct { Warnings []string Error *BasicError Data interface{} }
type PipelineRpcServer ¶
type PipelineRpcServer struct {
// contains filtered or unexported fields
}
PipelineRpcServer wraps a Pipeline implementation and makes it exportable as part of a Golang RPC server.
func (*PipelineRpcServer) Cancel ¶
func (b *PipelineRpcServer) Cancel(args *interface{}, reply *interface{}) error
func (*PipelineRpcServer) Prepare ¶
func (b *PipelineRpcServer) Prepare(args *PipelinePrepareArgs, reply *PipelinePrepareResponse) error
func (*PipelineRpcServer) Run ¶
func (b *PipelineRpcServer) Run(args *PipelinePrepareArgs, reply *PipelinePrepareResponse) (err error)
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
func NewPlugin ¶
func NewPlugin(p *interface{}, pm *PluginManager, api *model.ResourceApi, profiler *model.Profiler) *Plugin
type PluginManager ¶
type PluginManager struct { Indexers map[string]string Crawlers map[string]string Sensors map[string]string Pipelines map[string]string Scenarios map[string]string Helpers map[string]string PluginMinPort uint PluginMaxPort uint // contains filtered or unexported fields }
func (*PluginManager) Discover ¶
func (c *PluginManager) Discover() error
Discover discovers plugins.
This looks in the directory of the executable and the CWD, in that order for priority.
func (*PluginManager) FindPlugin ¶
func (this *PluginManager) FindPlugin(name string, profiler *model.Profiler) *interface{}
func (*PluginManager) LoadIndexer ¶
func (c *PluginManager) LoadIndexer(name string) (model.Indexer, error)
func (*PluginManager) LoadPipeline ¶
func (c *PluginManager) LoadPipeline(name string) (model.Pipeline, error)
func (*PluginManager) RunPlugin ¶
func (this *PluginManager) RunPlugin(p Plugin) (err error)
type RpcArtifact ¶
type RpcArtifact struct {
// contains filtered or unexported fields
}
An implementation of packer.Artifact where the RpcArtifact is actually available over an RPC connection.
func (*RpcArtifact) BuilderId ¶
func (a *RpcArtifact) BuilderId() (result string)
func (*RpcArtifact) Destroy ¶
func (a *RpcArtifact) Destroy() error
func (*RpcArtifact) Files ¶
func (a *RpcArtifact) Files() (result []string)
func (*RpcArtifact) Id ¶
func (a *RpcArtifact) Id() (result string)
func (*RpcArtifact) State ¶
func (a *RpcArtifact) State(name string) (result interface{})
func (*RpcArtifact) String ¶
func (a *RpcArtifact) String() (result string)
type RpcCache ¶
type RpcCache struct {
// contains filtered or unexported fields
}
An implementation of packer.Cache where the RpcCache is actually executed over an RPC connection.
type RpcClient ¶
type RpcClient struct {
// contains filtered or unexported fields
}
RpcClient is the client end that communicates with a Packer RPC server. Establishing a connection is up to the user, the RpcClient can just communicate over any ReadWriteCloser.
func NewRpcClient ¶
func NewRpcClient(rwc io.ReadWriteCloser) (*RpcClient, error)
type RpcIndexer ¶
type RpcIndexer struct {
// contains filtered or unexported fields
}
An implementation of Indexer where the builder is actually executed over an RPC connection.
func (*RpcIndexer) Cancel ¶
func (b *RpcIndexer) Cancel()
func (*RpcIndexer) Prepare ¶
func (b *RpcIndexer) Prepare(config ...interface{}) ([]string, error)
type RpcPipeline ¶
type RpcPipeline struct {
// contains filtered or unexported fields
}
An implementation of Pipeline where the builder is actually executed over an RPC connection.
func (*RpcPipeline) Cancel ¶
func (b *RpcPipeline) Cancel()
func (*RpcPipeline) Prepare ¶
func (b *RpcPipeline) Prepare(config ...interface{}) ([]string, error)
func (*RpcPipeline) Run ¶
func (h *RpcPipeline) Run(data interface{}) (interface{}, error)
type RpcServer ¶
type RpcServer struct {
// contains filtered or unexported fields
}
RpcServer represents an RPC server for Packer. This must be paired on the other side with a Client.
func NewRpcServer ¶
func NewRpcServer(conn io.ReadWriteCloser) *RpcServer
NewRpcServer returns a new Packer RPC server.
func Server ¶
Server waits for a connection to this plugin and returns a Elasticfeed RPC server that you can use to register components and serve them.