Documentation ¶
Overview ¶
Reusable testing libraries for testing sources
Index ¶
- func GetCacheDuration(s Source) time.Duration
- func IsWildcard(s string) bool
- func NewItemSubject() string
- func NewResponseSubject() string
- func TestValidateItem(t *testing.T, i *sdp.Item)
- func TestValidateItems(t *testing.T, is []*sdp.Item)
- type CacheDefiner
- type Engine
- func (e *Engine) AddSources(sources ...Source)
- func (e *Engine) AddTriggers(triggers ...Trigger)
- func (e *Engine) CancelItemRequestHandler(cancelRequest *sdp.CancelItemRequest)
- func (e *Engine) ClearTriggers()
- func (e *Engine) Connect() error
- func (e *Engine) DeleteTrackedRequest(uuid [16]byte)
- func (e *Engine) ExecuteRequest(ctx context.Context, req *sdp.ItemRequest) ([]*sdp.Item, error)
- func (e *Engine) ExpandRequest(request *sdp.ItemRequest) []*sdp.ItemRequest
- func (e *Engine) FilterSources(typ string, context string) []Source
- func (e *Engine) Find(ctx context.Context, r *sdp.ItemRequest) ([]*sdp.Item, error)
- func (e *Engine) Get(ctx context.Context, r *sdp.ItemRequest) (*sdp.Item, error)
- func (e *Engine) GetTrackedRequest(uuid uuid.UUID) (*RequestTracker, error)
- func (e *Engine) IsNATSConnected() bool
- func (e *Engine) ItemRequestHandler(itemRequest *sdp.ItemRequest)
- func (e *Engine) NonHiddenSources() []Source
- func (e *Engine) ProcessTriggers(item *sdp.Item)
- func (e *Engine) Search(ctx context.Context, r *sdp.ItemRequest) ([]*sdp.Item, error)
- func (e *Engine) SetupThrottle()
- func (e *Engine) Sources() []Source
- func (e *Engine) Start() error
- func (e *Engine) Stop() error
- func (e *Engine) Subscribe(subject string, handler nats.Handler) error
- func (e *Engine) TrackRequest(uuid uuid.UUID, request *RequestTracker)
- type GetFindMutex
- type HiddenSource
- type NATSOptions
- type NilPublisher
- type RequestTracker
- type SearchableSource
- type Source
- type Throttle
- type Trigger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetCacheDuration ¶
GetCacheDuration Gets the cache duration for a specific source, or a default value
func IsWildcard ¶
IsWildcard checks if a string is the wildcard. Use this instead of implementing the wildcard check everwhere so that if we need to change the woldcard at a later date we can do so here
func NewItemSubject ¶ added in v0.8.0
func NewItemSubject() string
NewItemSubject Generates a random subject name for returning items e.g. return.item._INBOX.712ab421
func NewResponseSubject ¶ added in v0.8.0
func NewResponseSubject() string
NewResponseSubject Generates a random subject name for returning responses e.g. return.response._INBOX.978af6de
func TestValidateItem ¶ added in v0.4.0
TestValidateItem Checks an item to ensure it is a valid SDP item. This includes checking that all required attributes are populated
func TestValidateItems ¶ added in v0.4.0
TestValidateItems Runs TestValidateItem on many items
Types ¶
type CacheDefiner ¶
CacheDefiner Some backends may implement the CacheDefiner interface which means that they specify a custom default cache interval. The config will always take precedence however
type Engine ¶
type Engine struct { // Descriptive name of this engine. Used as responder name in SDP responses Name string // Options for connecting to NATS NATSOptions *NATSOptions // The maximum number of queries that can be executing in parallel. Defaults // to the number of CPUs MaxParallelExecutions int // contains filtered or unexported fields }
Engine is the main discovery engine. This is where all of the Sources and sources are stored and is responsible for calling out to the right sources to discover everything
Note that an engine that does not have a connected NATS connection will simply not communicate over NATS
func (*Engine) AddSources ¶
AddSources Adds a source to this engine
func (*Engine) AddTriggers ¶ added in v0.7.0
AddTriggers Adds a trigger to this engine. Triggers cause the engine to listen for items from other contexts and will fire a custom ItemRequest if they match
func (*Engine) CancelItemRequestHandler ¶ added in v0.6.0
func (e *Engine) CancelItemRequestHandler(cancelRequest *sdp.CancelItemRequest)
CancelItemRequestHandler Takes a CancelItemRequest and cancels that request if it exists
func (*Engine) ClearTriggers ¶ added in v0.7.0
func (e *Engine) ClearTriggers()
ClearTriggers removes all triggers from the engine
func (*Engine) DeleteTrackedRequest ¶ added in v0.6.0
DeleteTrackedRequest Deletes a request from tracking
func (*Engine) ExecuteRequest ¶
ExecuteRequest Executes a single request and returns the results without any linking
func (*Engine) ExpandRequest ¶ added in v0.6.0
func (e *Engine) ExpandRequest(request *sdp.ItemRequest) []*sdp.ItemRequest
ExpandRequest Expands requests with wildcards to no longer contain wildcards. Meaning that if we support 5 types, and a request comes in with a wildcard type, this function will expand that request into 5 requests, one for each type.
The same goes for contexts, if we have a request with a wildcard context, and a single source that supports 5 contexts, we will end up with 5 requests. The exception to this is if we have a source that supports all contexts, but is unable to list them. In this case there will still be some requests with wildcard contexts as they can't be expanded
func (*Engine) FilterSources ¶
FilterSources returns the set of sources that match the supplied type and context. Supports wildcards in either position
func (*Engine) Find ¶
Find executes Find() on all sources for a given type, returning the merged results. Only returns an error if all sources fail, in which case returns the first error
func (*Engine) Get ¶
Get Runs a get query against known sources in priority order. If nothing was found, returns the first error
func (*Engine) GetTrackedRequest ¶ added in v0.6.0
func (e *Engine) GetTrackedRequest(uuid uuid.UUID) (*RequestTracker, error)
GetTrackedRequest Returns the RequestTracked object for a given UUID. THis tracker can then be used to cancel the request
func (*Engine) IsNATSConnected ¶
IsNATSConnected returns whether the engine is connected to NATS
func (*Engine) ItemRequestHandler ¶ added in v0.6.0
func (e *Engine) ItemRequestHandler(itemRequest *sdp.ItemRequest)
NewItemRequestHandler Returns a function whose job is to handle a single request. This includes responses, linking etc.
func (*Engine) NonHiddenSources ¶ added in v0.4.0
NonHiddenSources Returns a slice of all known sources excliding hidden ones
func (*Engine) ProcessTriggers ¶ added in v0.7.0
func (e *Engine) ProcessTriggers(item *sdp.Item)
ProcessTriggers Checks all triggers against a given item and fires them if required
func (*Engine) Search ¶
Search executes Search() on all sources for a given type, returning the merged results. Only returns an error if all sources fail, in which case returns the first error
func (*Engine) SetupThrottle ¶
func (e *Engine) SetupThrottle()
SetupThrottle Sets up the throttling based on MaxParallelExecutions, including ensuring that it's not set to zero
func (*Engine) Start ¶
Start performs all of the initialisation steps required for the engine to work. Note that this creates NATS subscriptions for all available sources so modifying the Sources value after an engine has been started will not have any effect until the engine is restarted
func (*Engine) TrackRequest ¶ added in v0.6.0
func (e *Engine) TrackRequest(uuid uuid.UUID, request *RequestTracker)
TrackRequest Stores a RequestTracker in the engine so that it can be looked up later and cancelled if required. The UUID should be supplied as part of the request itself
type GetFindMutex ¶
type GetFindMutex struct {
// contains filtered or unexported fields
}
GetFindMutex A modified version of a RWMutex. Many get locks can be held but only one Find lock. A waiting Find lock (even if it hasn't been locked, just if someone is waiting) blocks all other get locks until it unlocks.
The intended usage of this is that it will allow a source which is trying to process many requests at once, to process a FIND request before any GET requests, since it's likely that once FIND has been run, subsequent GET requests will be able to be served from cache
func (*GetFindMutex) FindLock ¶
func (g *GetFindMutex) FindLock(itemContext string, typ string)
FindLock An exclusive lock. Ensure that all GetLocks have been unlocked and stops any more from being obtained. Provide a type and context to ensure that the lock is only help for that type and context combination rather than locking the whole engine
func (*GetFindMutex) FindUnlock ¶
func (g *GetFindMutex) FindUnlock(itemContext string, typ string)
FindUnlock Unlocks a FindLock
func (*GetFindMutex) GetLock ¶
func (g *GetFindMutex) GetLock(itemContext string, typ string)
GetLock Gets a lock that can be held by an unlimited number of goroutines, these locks are only blocked by FindLocks. A type and context must be provided since a Get in one type (or context) should not be blocked by a Find in another
func (*GetFindMutex) GetUnlock ¶
func (g *GetFindMutex) GetUnlock(itemContext string, typ string)
GetUnlock Unlocks the GetLock. This must be called once for each GetLock otherwise it will be impossible to ever obtain a FindLock
type HiddenSource ¶ added in v0.3.0
type HiddenSource interface {
Hidden() bool
}
HiddenSource Sources that define a `Hidden()` method are able to tell whether or not the items they produce should be marked as hidden within the metadata. Hidden items will not be shown in GUIs or stored in databases and are used for gathering data as part of other proccesses such as remotely executed secondary sources
type NATSOptions ¶
type NATSOptions struct { // The list of URLs to use for connecting to NATS URLs []string // The name given to the connection, useful in logging ConnectionName string // How long to wait when trying to connect to each NATS server ConnectTimeout time.Duration // How many times to retry if there was an error when connecting NumRetries int // Path to the customer CA file to use when using TLS (if required) CAFile string // Path to the NKey seed file NkeyFile string // Path to the JWT JWTFile string // The name of the queue to join when subscribing to subjects QueueName string }
type NilPublisher ¶
type NilPublisher struct{}
When testing this library, or running without a real NATS connection, it is necessary to create a fake publisher rather than pass in a nil pointer. This is due to the fact that the NATS libraries will panic if a method is called on a nil pointer
func (NilPublisher) Publish ¶
func (n NilPublisher) Publish(subject string, v interface{}) error
Publish Logs an error rather than publishing
type RequestTracker ¶
type RequestTracker struct { // The list of requests to track Request *sdp.ItemRequest // The engine that this is connected to, used for sending NATS messages Engine *Engine // contains filtered or unexported fields }
RequestTracker is used for tracking the progress of a single request (or a set of related requests). This is required because a single request could have a link depth that results in many requests being executed meaning that we need to not only track the first request, but also all other requests and items that result from linking
func (*RequestTracker) Cancel ¶ added in v0.6.0
func (r *RequestTracker) Cancel()
Cancel Cancells the currently running request
func (*RequestTracker) Execute ¶
func (r *RequestTracker) Execute() ([]*sdp.Item, error)
func (*RequestTracker) LinkedItems ¶
func (r *RequestTracker) LinkedItems() []*sdp.Item
type SearchableSource ¶
type SearchableSource interface { Source // Search executes a specific search and returns zero or many items as a // result (and optionally an error). The specific format of the query that // needs to be provided to Search is dependant on the source itself as each // source will respond to searches differently // // Note that the itemContext parameter represents the context of the item // from the perspective of State Description Protocol (SDP), whereas the // `context.Context` value is a golang context which is used for // cancellations and timeouts Search(ctx context.Context, itemContext string, query string) ([]*sdp.Item, error) }
SearchableItemSource Is a source of items that supports searching
type Source ¶
type Source interface { // Type The type of items that this source is capable of finding Type() string // Descriptive name for the source, used in logging and metadata Name() string // List of contexts that this source is capable of find items for. If the // source supports all contexts the special value "*" // should be used Contexts() []string // Get Get a single item with a given context and query. The item returned // should have a UniqueAttributeValue that matches the `query` parameter. // // Note that the itemContext parameter represents the context of the item // from the perspective of State Description Protocol (SDP), whereas the // `context.Context` value is a golang context which is used for // cancellations and timeouts Get(ctx context.Context, itemContext string, query string) (*sdp.Item, error) // Find Finds all items in a given context // // Note that the itemContext parameter represents the context of the item // from the perspective of State Description Protocol (SDP), whereas the // `context.Context` value is a golang context which is used for // cancellations and timeouts Find(ctx context.Context, itemContext string) ([]*sdp.Item, error) // Weight Returns the priority weighting of items returned by this source. // This is used to resolve conflicts where two sources of the same type // return an item for a GET request. In this instance only one item can be // sen on, so the one with the higher weight value will win. Weight() int }
Source is capable of finding information about items
Sources must implement all of the methods to satisfy this interface in order to be able to used as an SDP source. Note that the `context.Context` value that is passed to the Get(), Find() and Search() (optional) methods needs to handled by each source individually. Source authors should make an effort ensure that expensive operations that the source undertakes can be cancelled if the context `ctx` is cancelled
type Throttle ¶
type Throttle struct { NumParallel int // contains filtered or unexported fields }
Throttle limits the number of processes that can be executing at once to `NumParallel`. Users should call `Lock()` to obtain a lock and `Unlock()` once their work is done
type Trigger ¶ added in v0.7.0
type Trigger struct { // The item type that the trigger should activate on Type string // A regexp that will be used to filter items based on their // UniqueAttributeValue. If this is not supplied, the trigger will fire for // all items whose type matches UniqueAttributeValueRegex *regexp.Regexp // A function that will be run when a matching item is found. This should // return an ItemRequest and an error, if the error is nil, the ItemRequest // will be sent to the engine. Note that the returned ItemRequest only needs // to contain the following fields, the rest will be set automatically if // not provided: // // * Type // * Method // * Query // RequestGenerator func(in *sdp.Item) (*sdp.ItemRequest, error) }
Trigger defines a trigger that will send an ItemRequest to the engine of the conditions are met
func (*Trigger) ProcessItem ¶ added in v0.7.0
ProcessItem Processes an item to see if the trigger should fire. If the error returned is nil, the returned item request should be sent to the engine. Any non-nil error means that the trigger has not fired