client

package
v0.0.0-...-26c4482 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package client provides a generic access layer for streaming telemetry providers.

The Client interface is implemented by 3 types in this package:

  • BaseClient simply forwards all messages from the underlying connection to NotificationHandler or ProtoHandler (see type Query).
  • CacheClient wraps around BaseClient and adds a persistence layer for all notifications. The notifications build up an internal tree which can be queried and walked using CacheClient's methods.
  • ReconnectClient wraps around any Client implementation (BaseClient, CacheClient or a user-provided one) and adds transparent reconnection loop in Subscribe. Reconnection attempts are done with exponential backoff.

This package uses pluggable transport implementations. For example, for gNMI targets you need to add this blank import:

import _ "github.com/openconfig/gnmi/client/gnmi"

That import will automatically register itself as available ClientType in this package (using func init).

If you want to write a custom implementation, implement Impl interface and register it with unique name via func Register.

Take a look at package examples in godoc for typical use cases.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStopReading is the common error defined to have the client stop a read
	// loop.
	ErrStopReading = errors.New("stop the result reading loop")
	// ErrClientInit is the common error for when making calls before the client
	// has been started via Subscribe.
	ErrClientInit = errors.New("Subscribe() must be called before any operations on client")
	// ErrUnsupported is returned by Impl's methods when the underlying
	// implementation doesn't support it.
	ErrUnsupported = errors.New("operation not supported by client implementation")
)
View Source
var (
	// ReconnectBaseDelay is the minimum delay between re-Subscribe attempts in
	// Reconnect. You can change this before creating ReconnectClient instances.
	ReconnectBaseDelay = time.Second
	// ReconnectMaxDelay is the maximum delay between re-Subscribe attempts in
	// Reconnect. You can change this before creating ReconnectClient instances.
	ReconnectMaxDelay = time.Minute
)

Functions

func Register

func Register(t string, f InitImpl) error

Register will register the transport specific implementation. The name must be unique across all transports.

func RegisterTest

func RegisterTest(t string, f InitImpl) error

RegisterTest allows tests to override client implementation for any client type. It's identical to Register, except t uniqueness is not enforced.

RegisterTest is similar to ResetRegisteredImpls + Register. Commonly used with the fake client (./fake directory).

func RegisteredImpls

func RegisteredImpls() []string

RegisteredImpls returns a slice of currently registered client types.

func ResetRegisteredImpls

func ResetRegisteredImpls()

ResetRegisteredImpls removes and Impls registered with Register. This should only be used in tests to clear out their mock Impls, so that they don't affect other tests.

Types

type BaseClient

type BaseClient struct {
	// contains filtered or unexported fields
}

BaseClient is a streaming telemetry client with minimal footprint. The caller must call Subscribe to perform the actual query. BaseClient stores no state. All updates must be handled by the provided handlers inside of Query.

The zero value of BaseClient is ready for use (there is no constructor).

func (*BaseClient) Close

func (c *BaseClient) Close() error

Close implements the Client interface.

func (*BaseClient) Impl

func (c *BaseClient) Impl() (Impl, error)

Impl implements the Client interface.

func (*BaseClient) Poll

func (c *BaseClient) Poll() error

Poll implements the Client interface.

func (*BaseClient) Subscribe

func (c *BaseClient) Subscribe(ctx context.Context, q Query, clientType ...string) error

Subscribe implements the Client interface.

type CacheClient

type CacheClient struct {
	*BaseClient
	*ctree.Tree
	// contains filtered or unexported fields
}

Client adds a caching layer on top of a simple query client.

It works similarly to BaseClient and adds the Leaves method to return current tree state.

func New

func New() *CacheClient

New returns an initialized caching client.

func (*CacheClient) Leaves

func (c *CacheClient) Leaves() Leaves

Leaves returns the current state of the received tree. It's safe to call at any point after New.

func (*CacheClient) Poll

func (c *CacheClient) Poll() error

Poll implements the Client interface. Poll also closes the channel returned by Synced and resets it.

func (*CacheClient) Subscribe

func (c *CacheClient) Subscribe(ctx context.Context, q Query, clientType ...string) error

Subscribe implements the Client interface.

func (*CacheClient) Synced

func (c *CacheClient) Synced() <-chan struct{}

Synced will close when a sync is recieved from the query.

type Client

type Client interface {
	// Subscribe will perform the provided query against the requested
	// clientType. clientType is the name of a specific Impl specified in
	// Register (most implementations will call Register in init()).
	//
	// It will try each clientType listed in order until one succeeds. If
	// clientType is nil, it will try each registered clientType in random
	// order.
	Subscribe(ctx context.Context, q Query, clientType ...string) error
	// Poll will send a poll request to the server and process all
	// notifications. It is up the caller to identify the sync and realize the
	// Poll is complete.
	Poll() error
	// Close terminates the underlying Impl, which usually terminates the
	// connection right away.
	// Close must be called to release any resources that Impl could have
	// allocated.
	Close() error
	// Impl will return the underlying client implementation. Most users
	// shouldn't use this.
	Impl() (Impl, error)
}

Client defines a set of methods which every client must implement. This package provides a few implementations: BaseClient, CacheClient, ReconnectClient.

Do not confuse this with Impl.

type Connected

type Connected struct{}

Connected is a synthetic notification sent when connection is established. It's sent before any other notifications on a new client.

type Credentials

type Credentials struct {
	Username string
	Password string
}

Credentials contains information necessary to authenticate with the target. Currently only username/password are supported, but may contain TLS client certificate in the future.

type Delete

type Delete Leaf

Delete is an explicit delete of the path in the tree.

type Destination

type Destination struct {
	// Addrs is a slice of addresses by which a target may be reached. Most
	// clients will only handle the first element.
	Addrs []string
	// Target is the target of the query.  Maybe empty if the query is performed
	// against an end target vs. a collector.
	Target string
	// Replica is the specific backend to contact.  This field is implementation
	// specific and for direct agent communication should not be set. default is
	// first available.
	Replica int
	// Timeout is the connection timeout for the query. It will *not* prevent a
	// slow (or streaming) query from completing, this only affects the initial
	// connection and broken connection detection.
	//
	// If Timeout is not set, default is 1 minute.
	Timeout time.Duration
	// Credentials are used for authentication with the target. Optional.
	Credentials *Credentials
	// TLS config to use when connecting to target. Optional.
	TLS *tls.Config
	// Extra contains arbitrary additional metadata to be passed to the
	// target. Optional.
	Extra map[string]string
}

Destination contains data used to connect to a server.

func (Destination) Validate

func (d Destination) Validate() error

Validate validates the fields of Destination.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is a inband error notification. It could be received without breaking the query or connection.

func NewError

func NewError(s string) Error

NewError will return a new error with the provided text.

func (Error) Error

func (e Error) Error() string

Error is provided to implement the error interface.

type Impl

type Impl interface {
	// Subscribe sends a Subscribe request to the server.
	Subscribe(context.Context, Query) error
	// Recv processes a single message from the server. This method is exposed to
	// allow the generic client control the state of message processing.
	Recv() error
	// Close will close the underlying rpc connections.
	Close() error
	// Poll will send an implementation specific Poll request to the server.
	Poll() error
}

Impl is the protocol/RPC specific implementation of the streaming Client. Unless you're implementing a new RPC format, this shouldn't be used directly.

func NewImpl

func NewImpl(ctx context.Context, d Destination, clientType ...string) (Impl, error)

NewImpl returns a client implementation based on the registered types. It will try all clientTypes listed in parallel until one succeeds. If clientType is nil, it will try all registered clientTypes.

This function is only used internally and is exposed for testing only.

type InitImpl

type InitImpl func(context.Context, Destination) (Impl, error)

InitImpl is a constructor signature for all transport specific implementations.

type Leaf

type Leaf struct {
	Path Path
	Val  interface{}
	TS   time.Time // TS is the timestamp of last update to this leaf.
	Dups uint32    // Dups is the number of coalesced duplicates for this leaf.
}

Leaf represents a leaf value in the tree. It includes the path to the node in the tree. This is returned via Leaves. It is also the basis for all Notification types that are used by the NotificationHandler callbacks see "notification.go".

type Leaves

type Leaves []Leaf

Leaves implements sort.Interface over []Leaf based on paths.

func (Leaves) Len

func (pv Leaves) Len() int

func (Leaves) Less

func (pv Leaves) Less(i, j int) bool

func (Leaves) Swap

func (pv Leaves) Swap(i, j int)

type Notification

type Notification interface {
	// contains filtered or unexported methods
}

Notification are internal messages used for abstracting protocol specific messages for clients. isNotification is only present to force typing assertions.

type NotificationHandler

type NotificationHandler func(Notification) error

NotificationHandler is a type for the client specific handler function.

Client implementations will pass all kinds of received notifications as they arrive.

type Path

type Path []string

Path is a standard type for describing a path inside of streaming telemetry tree.

func (Path) Equal

func (p Path) Equal(p2 Path) bool

Equal returns true if p is equivalent to p2.

func (Path) Less

func (p Path) Less(p2 Path) bool

Less returns true if p sorts before p2.

type ProtoHandler

type ProtoHandler func(proto.Message) error

ProtoHandler is a type for the raw handling of the RPC layer. Most users should use NotificationHandler instead.

type Query

type Query struct {
	// Addrs is a slice of addresses by which a target may be reached. Most
	// clients will only handle the first element.
	Addrs []string
	// Target is the target of the query.  Maybe empty if the query is performed
	// against an end target vs. a collector.
	Target string
	// Replica is the specific backend to contact.  This field is implementation
	// specific and for direct agent communication should not be set. default is
	// first available.
	Replica int
	// UpdatesOnly will only stream incremental updates rather than providing the
	// client with an initial snapshot.  This again is implementation specific
	// if the agent doesn't not accept that query it is up the client library to
	// decide wheter to return an error or to make a normal subscription then
	// ignore the initial sync and only provide increment updates.
	UpdatesOnly bool
	// Queries contains the list of Paths to query.
	Queries []Path
	// Type of query to perform.
	Type Type
	// Timeout is the connection timeout for the query. It will *not* prevent a
	// slow (or streaming) query from completing, this only affects the initial
	// connection and broken connection detection.
	//
	// If Timeout is not set, default is 1 minute.
	Timeout time.Duration
	// NotificationHandler is the per notification callback handed to a vendor
	// specific implementation. For every notificaiton this call back will be
	// called.
	NotificationHandler NotificationHandler
	// ProtoHandler, if set, will receive all response protos sent by the
	// backend. Only one of NotificationHandler or ProtoHandler may be
	// set.
	ProtoHandler ProtoHandler
	// Credentials are used for authentication with the target. Optional.
	Credentials *Credentials
	// TLS config to use when connecting to target. Optional.
	TLS *tls.Config
	// Extra contains arbitrary additional metadata to be passed to the
	// target. Optional.
	Extra map[string]string
	// SubReq is an optional field. If not nil, gnmi client implementation uses
	// it rather than generating from client.Query while sending gnmi Subscribe RPC.
	SubReq *gpb.SubscribeRequest
}

Query contains all of the parameters necessary to initiate the query.

func NewQuery

func NewQuery(sr *gpb.SubscribeRequest) (Query, error)

NewQuery returns a populated Query from given gnmi SubscribeRequest. Query fields that are not part of SubscribeRequest must be set on the returned object. During transtion to support only gnmi, having Query and SubscribeRequest in sync is important. There are two approaches to ensure that; one is validating whether Query and SubscribeRequest are same after they are set, the other is populating the fields of Query from SubscribeRequest and filling out the rest on the returned object. NewQuery embraces the latter option.

func (Query) Destination

func (q Query) Destination() Destination

Destination extracts a Destination instance out of Query fields.

Ideally we would embed Destination in Query. But in order to not break the existing API we have this workaround.

func (Query) Validate

func (q Query) Validate() error

Validate validates that query contains valid values that any client should be able use to form a valid backend request.

type ReconnectClient

type ReconnectClient struct {
	Client
	// contains filtered or unexported fields
}

ReconnectClient is a wrapper around any Client that never returns from Subscribe (unless explicitly closed). Underlying calls to Subscribe are repeated indefinitely, with an exponential backoff between attempts.

ReconnectClient should only be used with streaming or polling queries. Once queries will fail immediately in Subscribe.

func Reconnect

func Reconnect(c Client, disconnect, reset func()) *ReconnectClient

Reconnect wraps c and returns a new ReconnectClient using it.

disconnect callback is called each time the underlying Subscribe returns, it may be nil.

reset callback is called each time the underlying Subscribe is retried, it may be nil.

Closing the returned ReconnectClient will unblock Subscribe.

func (*ReconnectClient) Close

func (p *ReconnectClient) Close() error

Close implements Client interface.

func (*ReconnectClient) Impl

func (p *ReconnectClient) Impl() (Impl, error)

Impl implements Client interface.

func (*ReconnectClient) Poll

func (p *ReconnectClient) Poll() error

Poll implements Client interface. Poll may fail if Subscribe is reconnecting when it's called.

func (*ReconnectClient) Subscribe

func (p *ReconnectClient) Subscribe(ctx context.Context, q Query, clientType ...string) error

Subscribe implements Client interface.

type Sync

type Sync struct{}

Sync is an inband notification that the client has sent everything in it's cache at least once. This does not mean EVERYTHING you wanted is there only that the target has sent everything it currently has... which may be nothing.

type TreeVal

type TreeVal struct {
	Val interface{} `json:"value"`
	TS  time.Time   `json:"timestamp"`
}

TreeVal contains the current branch's value and the timestamp when the node was last updated.

type Type

type Type int

Type defines the type of query in a Query.

const (
	// Unknown is an unknown query and should always be treated as an error.
	Unknown Type = iota
	// Once will perform a Once query against the agent.
	Once
	// Poll will perform a Polling query against the agent.
	Poll
	// Stream will perform a Streaming query against the agent.
	Stream
)

func NewType

func NewType(s string) Type

NewType returns a new QueryType based on the provided string.

func (Type) String

func (q Type) String() string

String returns the string representation of the QueryType.

type Update

type Update Leaf

Update is an update to the leaf in the tree.

Directories

Path Synopsis
Package client implements a fake client implementation to be used with streaming telemetry collection.
Package client implements a fake client implementation to be used with streaming telemetry collection.
Package flags defines extra flag types for use in command line flag parsing.
Package flags defines extra flag types for use in command line flag parsing.
Package client contains transport implementation for the parent client library using gnmi.proto.
Package client contains transport implementation for the parent client library using gnmi.proto.
Package grpcutil provides helper functions for working with gRPC targets.
Package grpcutil provides helper functions for working with gRPC targets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL