discovery

package module
v0.12.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2022 License: MIT Imports: 20 Imported by: 16

README

SDP Discovery Libraries

Go Reference Tests

Code to help with all things related to discovering system state using State Description Protocol. Allows users to easily create software that discovers system state, for example:

  • Source containers for running with srcman
  • Agents for discovering local state on servers and other devices

This library is currently under development and documentation can be found on pkg.go.dev

Engine

The engine is responsible for managing all communication over NATS, handling requests, reporting on progress, caching etc. Authors of sources should only need to do the following in order to have a functional source:

  • Give the engine a name
    • Note that this name is used as the Responder when responding to requests, this means that this name should be unique as if there are multiple responders with the same name, users will not be able to properly track the progress of their requests
  • Provide the engine with config
  • Manage the engine's lifecycle (start and stop it)

Look at the tests for some simple examples of starting and running an engine, or use the source-template to generate the required wrapper code.

Triggers

Triggers allow source developers to have their source be triggered by the discover of other items on the NATS network. This allows for a pattern where a source is triggered by a relevant resource being discovered by another query, rather than by being queried directly. This can be used to write secondary sources that fire automatically e.g.

When a package with the name "nginx" is found in any context, the source should be triggered to try to find the config file for nginx in this context, parse it, and return more detailed information.

The anatomy of a trigger is as follows:

var trigger = Trigger{
    // The type of item that this trigger should fire for
    Type:                      "person",
    // The trigger will only fire if both the type and the
    // UniqueAttributeValueRegex match
    UniqueAttributeValueRegex: regexp.MustCompile(`^[Dd]ylan$`),
    // When both of the above match, the below function will be called, this
    // function should return the request that should be forwarded to the
    // engine that the trigger is registered with
    RequestGenerator: func(in *sdp.Item) (*sdp.ItemRequest, error) {
        if in.GetContext() != "something" {
            return nil, errors.New("only 'something' context supported")
        } else {
            return &sdp.ItemRequest{
                Type:   "dog",
                Method: sdp.RequestMethod_SEARCH,
                Query:  "pug",
            }, nil
        }
    },
}

When the above trigger fires it will result in the engine that it is assigned to processing a SEARCH request as defined above. Note that while only the Type, Method and Query attributes have been specified, the rest will be filled in automatically with data from the Metadata.SourceRequest of the originating item to ensure that the responses are sent to the user that originated the request.

Developing

This repository is configured to us VSCode devcontainers. This means that if you don't want to install Go locally, you can do all of your development inside a container. You can also use Github codespaces to host these containers meaning that the only requirement is having VSCode installed. Use of this is optional but does have some benefits:

  • Local environment not polluted
  • NATS sidecar container automatically started for end-to-end tests

Documentation

Overview

Reusable testing libraries for testing sources

Index

Constants

View Source
const DefaultCheckInterval = 3 * time.Second
View Source
const DefaultMaxRequestTimeout = 1 * time.Minute

Variables

View Source
var RFC1123 = regexp.MustCompile(`^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`)

Functions

func GetCacheDuration

func GetCacheDuration(s Source) time.Duration

GetCacheDuration Gets the cache duration for a specific source, or a default value

func IsWildcard

func IsWildcard(s string) bool

IsWildcard checks if a string is the wildcard. Use this instead of implementing the wildcard check everwhere so that if we need to change the woldcard at a later date we can do so here

func NewItemSubject added in v0.8.0

func NewItemSubject() string

NewItemSubject Generates a random subject name for returning items e.g. return.item._INBOX.712ab421

func NewResponseSubject added in v0.8.0

func NewResponseSubject() string

NewResponseSubject Generates a random subject name for returning responses e.g. return.response._INBOX.978af6de

func TestValidateItem added in v0.4.0

func TestValidateItem(t *testing.T, i *sdp.Item)

TestValidateItem Checks an item to ensure it is a valid SDP item. This includes checking that all required attributes are populated

func TestValidateItems added in v0.4.0

func TestValidateItems(t *testing.T, is []*sdp.Item)

TestValidateItems Runs TestValidateItem on many items

Types

type CacheDefiner

type CacheDefiner interface {
	DefaultCacheDuration() time.Duration
}

CacheDefiner Some backends may implement the CacheDefiner interface which means that they specify a custom default cache interval. The config will always take precedence however

type Engine

type Engine struct {
	// Descriptive name of this engine. Used as responder name in SDP responses
	Name string

	NATSOptions   *multiconn.NATSConnectionOptions // Options for connecting to NATS
	NATSQueueName string                           // The name of the queue to use when subscribing

	// The maximum number of queries that can be executing in parallel. Defaults
	// to the number of CPUs
	MaxParallelExecutions int

	// The maximum request timeout. Defaults to `DefaultMaxRequestTimeout` if
	// set to zero. If a client does not send a timeout, it will default to this
	// value. Requests with timouts larger than this value will have their
	// timeouts overridden
	MaxRequestTimeout time.Duration

	// How often to check for closed connections and try to recover
	ConnectionWatchInterval time.Duration
	ConnectionWatcher       NATSWatcher
	// contains filtered or unexported fields
}

Engine is the main discovery engine. This is where all of the Sources and sources are stored and is responsible for calling out to the right sources to discover everything

Note that an engine that does not have a connected NATS connection will simply not communicate over NATS

func (*Engine) AddSources

func (e *Engine) AddSources(sources ...Source)

AddSources Adds a source to this engine

func (*Engine) AddTriggers added in v0.7.0

func (e *Engine) AddTriggers(triggers ...Trigger)

AddTriggers Adds a trigger to this engine. Triggers cause the engine to listen for items from other contexts and will fire a custom ItemRequest if they match

func (*Engine) CancelHandler added in v0.12.3

func (e *Engine) CancelHandler(cancelRequest *sdp.CancelItemRequest)

CancelHandler calls HandleCancelItemRequest in a goroutine

func (*Engine) ClearCache added in v0.10.1

func (e *Engine) ClearCache()

ClearCache Completely clears the cache

func (*Engine) ClearTriggers added in v0.7.0

func (e *Engine) ClearTriggers()

ClearTriggers removes all triggers from the engine

func (*Engine) DeleteTrackedRequest added in v0.6.0

func (e *Engine) DeleteTrackedRequest(uuid [16]byte)

DeleteTrackedRequest Deletes a request from tracking

func (*Engine) ExecuteRequest

func (e *Engine) ExecuteRequest(ctx context.Context, req *sdp.ItemRequest) ([]*sdp.Item, error)

ExecuteRequest Executes a single request and returns the results without any linking

func (*Engine) ExpandRequest added in v0.6.0

func (e *Engine) ExpandRequest(request *sdp.ItemRequest) map[*sdp.ItemRequest][]Source

ExpandRequest Expands requests with wildcards to no longer contain wildcards. Meaning that if we support 5 types, and a request comes in with a wildcard type, this function will expand that request into 5 requests, one for each type.

The same goes for contexts, if we have a request with a wildcard context, and a single source that supports 5 contexts, we will end up with 5 requests. The exception to this is if we have a source that supports all contexts, but is unable to list them. In this case there will still be some requests with wildcard contexts as they can't be expanded

This functions returns a map of requests with the sources that they should be run against

func (*Engine) Find

func (e *Engine) Find(ctx context.Context, r *sdp.ItemRequest, relevantSources []Source) ([]*sdp.Item, error)

Find executes Find() on all sources for a given type, returning the merged results. Only returns an error if all sources fail, in which case returns the first error

func (*Engine) Get

func (e *Engine) Get(ctx context.Context, r *sdp.ItemRequest, relevantSources []Source) (*sdp.Item, error)

Get Runs a get query against known sources in priority order. If nothing was found, returns the first error

func (*Engine) GetTrackedRequest added in v0.6.0

func (e *Engine) GetTrackedRequest(uuid uuid.UUID) (*RequestTracker, error)

GetTrackedRequest Returns the RequestTracked object for a given UUID. THis tracker can then be used to cancel the request

func (*Engine) HandleCancelItemRequest added in v0.12.3

func (e *Engine) HandleCancelItemRequest(cancelRequest *sdp.CancelItemRequest)

HandleCancelItemRequest Takes a CancelItemRequest and cancels that request if it exists

func (*Engine) HandleItemRequest added in v0.12.3

func (e *Engine) HandleItemRequest(itemRequest *sdp.ItemRequest)

HandleItemRequest Handles a single request. This includes responses, linking etc.

func (*Engine) IsNATSConnected

func (e *Engine) IsNATSConnected() bool

IsNATSConnected returns whether the engine is connected to NATS

func (*Engine) ItemRequestHandler added in v0.6.0

func (e *Engine) ItemRequestHandler(itemRequest *sdp.ItemRequest)

ItemRequestHandler Calls HandleItemRequest but in a goroutine so that it can happen in parallel

func (*Engine) ManagedConnection added in v0.12.6

func (e *Engine) ManagedConnection() *nats.EncodedConn

ManagedConnection Returns the connection that the engine is using. Note that the lifecycle of this connection is managed by the engine, causing it to disconnect will cause issues with the engine. Use Engine.Stop() instead

func (*Engine) NonHiddenSources added in v0.4.0

func (e *Engine) NonHiddenSources() []Source

NonHiddenSources Returns a slice of all known sources excliding hidden ones

func (*Engine) ProcessTriggers added in v0.7.0

func (e *Engine) ProcessTriggers(item *sdp.Item)

ProcessTriggers Checks all triggers against a given item and fires them if required

func (*Engine) Restart added in v0.11.1

func (e *Engine) Restart() error

Restart Restarts the engine. If called in parallel, subsequent calls are ignored until the restart is completed

func (*Engine) Search

func (e *Engine) Search(ctx context.Context, r *sdp.ItemRequest, relevantSources []Source) ([]*sdp.Item, error)

Search executes Search() on all sources for a given type, returning the merged results. Only returns an error if all sources fail, in which case returns the first error

func (*Engine) SetupMaxRequestTimeout added in v0.12.8

func (e *Engine) SetupMaxRequestTimeout()

func (*Engine) SetupThrottle

func (e *Engine) SetupThrottle()

SetupThrottle Sets up the throttling based on MaxParallelExecutions, including ensuring that it's not set to zero

func (*Engine) Sources

func (e *Engine) Sources() []Source

Sources Returns a slice of all known sources

func (*Engine) Start

func (e *Engine) Start() error

Start performs all of the initialisation steps required for the engine to work. Note that this creates NATS subscriptions for all available sources so modifying the Sources value after an engine has been started will not have any effect until the engine is restarted

func (*Engine) Stop

func (e *Engine) Stop() error

Stop Stops the engine running and disconnects from NATS

func (*Engine) TrackRequest added in v0.6.0

func (e *Engine) TrackRequest(uuid uuid.UUID, request *RequestTracker)

TrackRequest Stores a RequestTracker in the engine so that it can be looked up later and cancelled if required. The UUID should be supplied as part of the request itself

func (*Engine) WillRespond added in v0.10.1

func (e *Engine) WillRespond(req *sdp.ItemRequest) bool

WillRespond Performs a cursory check to see if it's likely that this engine will respond to a given request based on the type and context of teh request. Should be used an initial check before proceeding to detailed processing.

type GetFindMutex

type GetFindMutex struct {
	// contains filtered or unexported fields
}

GetFindMutex A modified version of a RWMutex. Many get locks can be held but only one Find lock. A waiting Find lock (even if it hasn't been locked, just if someone is waiting) blocks all other get locks until it unlocks.

The intended usage of this is that it will allow a source which is trying to process many requests at once, to process a FIND request before any GET requests, since it's likely that once FIND has been run, subsequent GET requests will be able to be served from cache

func (*GetFindMutex) FindLock

func (g *GetFindMutex) FindLock(itemContext string, typ string)

FindLock An exclusive lock. Ensure that all GetLocks have been unlocked and stops any more from being obtained. Provide a type and context to ensure that the lock is only help for that type and context combination rather than locking the whole engine

func (*GetFindMutex) FindUnlock

func (g *GetFindMutex) FindUnlock(itemContext string, typ string)

FindUnlock Unlocks a FindLock

func (*GetFindMutex) GetLock

func (g *GetFindMutex) GetLock(itemContext string, typ string)

GetLock Gets a lock that can be held by an unlimited number of goroutines, these locks are only blocked by FindLocks. A type and context must be provided since a Get in one type (or context) should not be blocked by a Find in another

func (*GetFindMutex) GetUnlock

func (g *GetFindMutex) GetUnlock(itemContext string, typ string)

GetUnlock Unlocks the GetLock. This must be called once for each GetLock otherwise it will be impossible to ever obtain a FindLock

type HiddenSource added in v0.3.0

type HiddenSource interface {
	Hidden() bool
}

HiddenSource Sources that define a `Hidden()` method are able to tell whether or not the items they produce should be marked as hidden within the metadata. Hidden items will not be shown in GUIs or stored in databases and are used for gathering data as part of other proccesses such as remotely executed secondary sources

type NATSWatcher added in v0.11.1

type NATSWatcher struct {
	// Connection The NATS connection to watch
	Connection WatchableConnection

	// FailureHandler will be called when the connection has been closed and is
	// no longer trying to reconnect.
	FailureHandler func()
	// contains filtered or unexported fields
}

func (*NATSWatcher) Start added in v0.11.1

func (w *NATSWatcher) Start(checkInterval time.Duration)

func (*NATSWatcher) Stop added in v0.11.1

func (w *NATSWatcher) Stop()

type NilConnection added in v0.12.1

type NilConnection struct{}

When testing this library, or running without a real NATS connection, it is necessary to create a fake publisher rather than pass in a nil pointer. This is due to the fact that the NATS libraries will panic if a method is called on a nil pointer

func (NilConnection) Publish added in v0.12.1

func (n NilConnection) Publish(subject string, v interface{}) error

Publish Logs an error rather than publishing

func (NilConnection) Subscribe added in v0.12.1

func (n NilConnection) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)

Subscribe Does nothing

type RequestTracker

type RequestTracker struct {
	// The request to track
	Request *sdp.ItemRequest

	// The engine that this is connected to, used for sending NATS messages
	Engine *Engine
	// contains filtered or unexported fields
}

RequestTracker is used for tracking the progress of a single requestt. This is used because a single request could have a link depth that results in many requests being executed meaning that we need to not only track the first request, but also all other requests and items that result from linking

func (*RequestTracker) Cancel added in v0.6.0

func (r *RequestTracker) Cancel()

Cancel Cancells the currently running request

func (*RequestTracker) Execute

func (r *RequestTracker) Execute() ([]*sdp.Item, error)

func (*RequestTracker) LinkedItems

func (r *RequestTracker) LinkedItems() []*sdp.Item

type SearchableSource

type SearchableSource interface {
	Source
	// Search executes a specific search and returns zero or many items as a
	// result (and optionally an error). The specific format of the query that
	// needs to be provided to Search is dependant on the source itself as each
	// source will respond to searches differently
	//
	// Note that the itemContext parameter represents the context of the item
	// from the perspective of State Description Protocol (SDP), whereas the
	// `context.Context` value is a golang context which is used for
	// cancellations and timeouts
	Search(ctx context.Context, itemContext string, query string) ([]*sdp.Item, error)
}

SearchableItemSource Is a source of items that supports searching

type Source

type Source interface {
	// Type The type of items that this source is capable of finding
	Type() string

	// Descriptive name for the source, used in logging and metadata
	Name() string

	// List of contexts that this source is capable of find items for. If the
	// source supports all contexts the special value "*"
	// should be used
	Contexts() []string

	// Get Get a single item with a given context and query. The item returned
	// should have a UniqueAttributeValue that matches the `query` parameter.
	//
	// Note that the itemContext parameter represents the context of the item
	// from the perspective of State Description Protocol (SDP), whereas the
	// `context.Context` value is a golang context which is used for
	// cancellations and timeouts
	Get(ctx context.Context, itemContext string, query string) (*sdp.Item, error)

	// Find Finds all items in a given context
	//
	// Note that the itemContext parameter represents the context of the item
	// from the perspective of State Description Protocol (SDP), whereas the
	// `context.Context` value is a golang context which is used for
	// cancellations and timeouts
	Find(ctx context.Context, itemContext string) ([]*sdp.Item, error)

	// Weight Returns the priority weighting of items returned by this source.
	// This is used to resolve conflicts where two sources of the same type
	// return an item for a GET request. In this instance only one item can be
	// sen on, so the one with the higher weight value will win.
	Weight() int
}

Source is capable of finding information about items

Sources must implement all of the methods to satisfy this interface in order to be able to used as an SDP source. Note that the `context.Context` value that is passed to the Get(), Find() and Search() (optional) methods needs to handled by each source individually. Source authors should make an effort ensure that expensive operations that the source undertakes can be cancelled if the context `ctx` is cancelled

type Throttle

type Throttle struct {
	NumParallel int
	// contains filtered or unexported fields
}

Throttle limits the number of processes that can be executing at once to `NumParallel`. Users should call `Lock()` to obtain a lock and `Unlock()` once their work is done

func (*Throttle) Lock

func (t *Throttle) Lock()

func (*Throttle) Unlock

func (t *Throttle) Unlock()

type Trigger added in v0.7.0

type Trigger struct {
	// The item type that the trigger should activate on
	Type string

	// A regexp that will be used to filter items based on their
	// UniqueAttributeValue. If this is not supplied, the trigger will fire for
	// all items whose type matches
	UniqueAttributeValueRegex *regexp.Regexp

	// A function that will be run when a matching item is found. This should
	// return an ItemRequest and an error, if the error is nil, the ItemRequest
	// will be sent to the engine. Note that the returned ItemRequest only needs
	// to contain the following fields, the rest will be set automatically if
	// not provided:
	//
	// * Type
	// * Method
	// * Query
	//
	RequestGenerator func(in *sdp.Item) (*sdp.ItemRequest, error)
}

Trigger defines a trigger that will send an ItemRequest to the engine of the conditions are met

func (*Trigger) ProcessItem added in v0.7.0

func (t *Trigger) ProcessItem(i *sdp.Item) (*sdp.ItemRequest, error)

ProcessItem Processes an item to see if the trigger should fire. If the error returned is nil, the returned item request should be sent to the engine. Any non-nil error means that the trigger has not fired

type WatchableConnection added in v0.11.1

type WatchableConnection interface {
	Status() nats.Status
	Stats() nats.Statistics
	LastError() error
}

WatchableConnection Is ususally a *nats.Conn, we are using an interface here to allow easier testing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL