pipeline

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2020 License: MIT Imports: 16 Imported by: 259

Documentation

Overview

Package pipeline implements an HTTP request/response middleware pipeline whose policy objects mutate an HTTP request's URL, query parameters, and/or headers before the request is sent over the wire.

Not all policy objects mutate an HTTP request; some policy objects simply impact the flow of requests/responses by performing operations such as logging, retry policies, timeouts, failure injection, and deserialization of response payloads.

Implementing the Policy Interface

To implement a policy, define a struct that implements the pipeline.Policy interface's Do method. Your Do method is called when an HTTP request wants to be sent over the network. Your Do method can perform any operation(s) it desires. For example, it can log the outgoing request, mutate the URL, headers, and/or query parameters, inject a failure, etc. Your Do method must then forward the HTTP request to next Policy object in a linked-list ensuring that the remaining Policy objects perform their work. Ultimately, the last Policy object sends the HTTP request over the network (by calling the HTTPSender's Do method).

When an HTTP response comes back, each Policy object in the linked-list gets a chance to process the response (in reverse order). The Policy object can log the response, retry the operation if due to a transient failure or timeout, deserialize the response body, etc. Ultimately, the last Policy object returns the HTTP response to the code that initiated the original HTTP request.

Here is a template for how to define a pipeline.Policy object:

type myPolicy struct {
   node   PolicyNode
   // TODO: Add configuration/setting fields here (if desired)...
}

func (p *myPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
   // TODO: Mutate/process the HTTP request here...
   response, err := p.node.Do(ctx, request)	// Forward HTTP request to next Policy & get HTTP response
   // TODO: Mutate/process the HTTP response here...
   return response, err	// Return response/error to previous Policy
}

Implementing the Factory Interface

Each Policy struct definition requires a factory struct definition that implements the pipeline.Factory interface's New method. The New method is called when application code wants to initiate a new HTTP request. Factory's New method is passed a pipeline.PolicyNode object which contains a reference to the owning pipeline.Pipeline object (discussed later) and a reference to the next Policy object in the linked list. The New method should create its corresponding Policy object passing it the PolicyNode and any other configuration/settings fields appropriate for the specific Policy object.

Here is a template for how to define a pipeline.Policy object:

// NOTE: Once created & initialized, Factory objects should be goroutine-safe (ex: immutable);
// this allows reuse (efficient use of memory) and makes these objects usable by multiple goroutines concurrently.
type myPolicyFactory struct {
   // TODO: Add any configuration/setting fields if desired...
}

func (f *myPolicyFactory) New(node pipeline.PolicyNode) Policy {
   return &myPolicy{node: node} // TODO: Also initialize any configuration/setting fields here (if desired)...
}

Using your Factory and Policy objects via a Pipeline

To use the Factory and Policy objects, an application constructs a slice of Factory objects and passes this slice to the pipeline.NewPipeline function.

func NewPipeline(factories []pipeline.Factory, sender pipeline.HTTPSender) Pipeline

This function also requires an object implementing the HTTPSender interface. For simple scenarios, passing nil for HTTPSender causes a standard Go http.Client object to be created and used to actually send the HTTP response over the network. For more advanced scenarios, you can pass your own HTTPSender object in. This allows sharing of http.Client objects or the use of custom-configured http.Client objects or other objects that can simulate the network requests for testing purposes.

Now that you have a pipeline.Pipeline object, you can create a pipeline.Request object (which is a simple wrapper around Go's standard http.Request object) and pass it to Pipeline's Do method along with passing a context.Context for cancelling the HTTP request (if desired).

type Pipeline interface {
   Do(ctx context.Context, methodFactory pipeline.Factory, request pipeline.Request) (pipeline.Response, error)
}

Do iterates over the slice of Factory objects and tells each one to create its corresponding Policy object. After the linked-list of Policy objects have been created, Do calls the first Policy object passing it the Context & HTTP request parameters. These parameters now flow through all the Policy objects giving each object a chance to look at and/or mutate the HTTP request. The last Policy object sends the message over the network.

When the network operation completes, the HTTP response and error return values pass back through the same Policy objects in reverse order. Most Policy objects ignore the response/error but some log the result, retry the operation (depending on the exact reason the operation failed), or deserialize the response's body. Your own Policy objects can do whatever they like when processing outgoing requests or incoming responses.

Note that after an I/O request runs to completion, the Policy objects for that request are garbage collected. However, Pipeline object (like Factory objects) are goroutine-safe allowing them to be created once and reused over many I/O operations. This allows for efficient use of memory and also makes them safely usable by multiple goroutines concurrently.

Inserting a Method-Specific Factory into the Linked-List of Policy Objects

While Pipeline and Factory objects can be reused over many different operations, it is common to have special behavior for a specific operation/method. For example, a method may need to deserialize the response's body to an instance of a specific data type. To accommodate this, the Pipeline's Do method takes an additional method-specific Factory object. The Do method tells this Factory to create a Policy object and injects this method-specific Policy object into the linked-list of Policy objects.

When creating a Pipeline object, the slice of Factory objects passed must have 1 (and only 1) entry marking where the method-specific Factory should be injected. The Factory marker is obtained by calling the pipeline.MethodFactoryMarker() function:

func MethodFactoryMarker() pipeline.Factory

Creating an HTTP Request Object

The HTTP request object passed to Pipeline's Do method is not Go's http.Request struct. Instead, it is a pipeline.Request struct which is a simple wrapper around Go's standard http.Request. You create a pipeline.Request object by calling the pipeline.NewRequest function:

func NewRequest(method string, url url.URL, options pipeline.RequestOptions) (request pipeline.Request, err error)

To this function, you must pass a pipeline.RequestOptions that looks like this:

type RequestOptions struct {
   // The readable and seekable stream to be sent to the server as the request's body.
   Body io.ReadSeeker

   // The callback method (if not nil) to be invoked to report progress as the stream is uploaded in the HTTP request.
   Progress ProgressReceiver
}

The method and struct ensure that the request's body stream is a read/seekable stream. A seekable stream is required so that upon retry, the final Policy object can seek the stream back to the beginning before retrying the network request and re-uploading the body. In addition, you can associate a ProgressReceiver callback function which will be invoked periodically to report progress while bytes are being read from the body stream and sent over the network.

Processing the HTTP Response

When an HTTP response comes in from the network, a reference to Go's http.Response struct is embedded in a struct that implements the pipeline.Response interface:

type Response interface {
   Response() *http.Response
}

This interface is returned through all the Policy objects. Each Policy object can call the Response interface's Response method to examine (or mutate) the embedded http.Response object.

A Policy object can internally define another struct (implementing the pipeline.Response interface) that embeds an http.Response and adds additional fields and return this structure to other Policy objects. This allows a Policy object to deserialize the body to some other struct and return the original http.Response and the additional struct back through the Policy chain. Other Policy objects can see the Response but cannot see the additional struct with the deserialized body. After all the Policy objects have returned, the pipeline.Response interface is returned by Pipeline's Do method. The caller of this method can perform a type assertion attempting to get back to the struct type really returned by the Policy object. If the type assertion is successful, the caller now has access to both the http.Response and the deserialized struct object.

Index

Constants

View Source
const (
	// UserAgent is the string to be used in the user agent string when making requests.
	UserAgent = "azure-pipeline-go/" + Version

	// Version is the semantic version (see http://semver.org) of the pipeline package.
	Version = "0.2.1"
)

Variables

This section is empty.

Functions

func Cause

func Cause(err error) error

Cause walks all the preceding errors and return the originating error.

func ForceLog

func ForceLog(level LogLevel, msg string)

ForceLog should rarely be used. It forceable logs an entry to the Windows Event Log (on Windows) or to the SysLog (on Linux)

func NewError

func NewError(cause error, msg string) error

NewError creates a simple string error (like Error.New). But, this error also captures the caller's Program Counter and the preceding error (if provided).

func NewRequestBodyProgress

func NewRequestBodyProgress(requestBody io.ReadSeeker, pr ProgressReceiver) io.ReadSeeker

NewRequestBodyProgress adds progress reporting to an HTTP request's body stream.

func NewResponseBodyProgress

func NewResponseBodyProgress(responseBody io.ReadCloser, pr ProgressReceiver) io.ReadCloser

NewResponseBodyProgress adds progress reporting to an HTTP response's body stream.

func SetForceLogEnabled added in v0.2.0

func SetForceLogEnabled(enable bool)

SetForceLogEnabled can be used to disable ForceLog There is no threadsafety or locking on the underlying variable, so call this function just once at startup of your application (Don't later try to change the setting on the fly).

func SetLogSanitizer added in v0.2.0

func SetLogSanitizer(s LogSanitizer)

SetLogSanitizer can be called to supply a custom LogSanitizer. There is no threadsafety or locking on the underlying variable, so call this function just once at startup of your application (Don't later try to change the sanitizer on the fly).

func WriteRequestWithResponse

func WriteRequestWithResponse(b *bytes.Buffer, request *http.Request, response *http.Response, err error)

WriteRequestWithResponse appends a formatted HTTP request into a Buffer. If request and/or err are not nil, then these are also written into the Buffer.

Types

type ErrorNode

type ErrorNode struct {
	// contains filtered or unexported fields
}

ErrorNode can be an embedded field in a private error object. This field adds Program Counter support and a 'cause' (reference to a preceding error). When initializing a error type with this embedded field, initialize the ErrorNode field by calling ErrorNode{}.Initialize(cause).

func (*ErrorNode) Cause

func (e *ErrorNode) Cause() error

Cause returns the error that preceded this error.

func (*ErrorNode) Error

func (e *ErrorNode) Error(msg string) string

Error returns a string with the PC's symbols or "" if the PC is invalid. When defining a new error type, have its Error method call this one passing it the string representation of the error.

func (ErrorNode) Initialize

func (ErrorNode) Initialize(cause error, callersToSkip int) ErrorNode

Initialize is used to initialize an embedded ErrorNode field. It captures the caller's program counter and saves the cause (preceding error). To initialize the field, use "ErrorNode{}.Initialize(cause, 3)". A callersToSkip value of 3 is very common; but, depending on your code nesting, you may need a different value.

func (ErrorNode) Temporary

func (e ErrorNode) Temporary() bool

Temporary returns true if the error occurred due to a temporary condition.

func (ErrorNode) Timeout

func (e ErrorNode) Timeout() bool

Timeout returns true if the error occurred due to time expiring.

func (*ErrorNode) Unwrap added in v0.2.3

func (e *ErrorNode) Unwrap() error

Unwrap provides compatibility for Go 1.13 error chains.

type ErrorNodeNoCause added in v0.1.9

type ErrorNodeNoCause struct {
	// contains filtered or unexported fields
}

ErrorNodeNoCause can be an embedded field in a private error object. This field adds Program Counter support. When initializing a error type with this embedded field, initialize the ErrorNodeNoCause field by calling ErrorNodeNoCause{}.Initialize().

func (*ErrorNodeNoCause) Error added in v0.1.9

func (e *ErrorNodeNoCause) Error(msg string) string

Error returns a string with the PC's symbols or "" if the PC is invalid. When defining a new error type, have its Error method call this one passing it the string representation of the error.

func (ErrorNodeNoCause) Initialize added in v0.1.9

func (ErrorNodeNoCause) Initialize(callersToSkip int) ErrorNodeNoCause

Initialize is used to initialize an embedded ErrorNode field. It captures the caller's program counter. To initialize the field, use "ErrorNodeNoCause{}.Initialize(3)". A callersToSkip value of 3 is very common; but, depending on your code nesting, you may need a different value.

func (ErrorNodeNoCause) Temporary added in v0.1.9

func (e ErrorNodeNoCause) Temporary() bool

Temporary returns true if the error occurred due to a temporary condition.

func (ErrorNodeNoCause) Timeout added in v0.1.9

func (e ErrorNodeNoCause) Timeout() bool

Timeout returns true if the error occurred due to time expiring.

type Factory

type Factory interface {
	New(next Policy, po *PolicyOptions) Policy
}

The Factory interface represents an object that can create its Policy object. Each HTTP request sent requires that this Factory create a new instance of its Policy object.

func MethodFactoryMarker

func MethodFactoryMarker() Factory

MethodFactoryMarker returns a special marker Factory object. When Pipeline's Do method is called, any MethodMarkerFactory object is replaced with the specified methodFactory object. If nil is passed fro Do's methodFactory parameter, then the MethodFactoryMarker is ignored as the linked-list of Policy objects is created.

type FactoryFunc

type FactoryFunc func(next Policy, po *PolicyOptions) PolicyFunc

FactoryFunc is an adapter that allows the use of an ordinary function as a Factory interface.

func (FactoryFunc) New

func (f FactoryFunc) New(next Policy, po *PolicyOptions) Policy

New calls f(next,po).

type LogLevel

type LogLevel uint32

LogLevel tells a logger the minimum level to log. When code reports a log entry, the LogLevel indicates the level of the log entry. The logger only records entries whose level is at least the level it was told to log. See the Log* constants. For example, if a logger is configured with LogError, then LogError, LogPanic, and LogFatal entries will be logged; lower level entries are ignored.

const (
	// LogNone tells a logger not to log any entries passed to it.
	LogNone LogLevel = iota

	// LogFatal tells a logger to log all LogFatal entries passed to it.
	LogFatal

	// LogPanic tells a logger to log all LogPanic and LogFatal entries passed to it.
	LogPanic

	// LogError tells a logger to log all LogError, LogPanic and LogFatal entries passed to it.
	LogError

	// LogWarning tells a logger to log all LogWarning, LogError, LogPanic and LogFatal entries passed to it.
	LogWarning

	// LogInfo tells a logger to log all LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
	LogInfo

	// LogDebug tells a logger to log all LogDebug, LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
	LogDebug
)

type LogOptions

type LogOptions struct {
	Log func(level LogLevel, message string)

	// ShouldLog is called periodically allowing you to return whether the specified LogLevel should be logged or not.
	// An application can return different values over the its lifetime; this allows the application to dynamically
	// alter what is logged. NOTE: This method can be called by multiple goroutines simultaneously so make sure
	// you implement it in a goroutine-safe way. If nil, nothing is logged (the equivalent of returning LogNone).
	// Usually, the function will be implemented simply like this: return level <= LogWarning
	ShouldLog func(level LogLevel) bool
}

LogOptions configures the pipeline's logging mechanism & level filtering.

type LogSanitizer added in v0.2.0

type LogSanitizer interface {
	SanitizeLogMessage(raw string) string
}

LogSanitizer can be implemented to clean secrets from lines logged by ForceLog By default no implemetation is provided here, because pipeline may be used in many different contexts, so the correct implementation is context-dependent

type Options

type Options struct {
	HTTPSender Factory // If sender is nil, then the pipeline's default client is used to send the HTTP requests.
	Log        LogOptions
}

Options configures a Pipeline's behavior.

type Pipeline

type Pipeline interface {
	Do(ctx context.Context, methodFactory Factory, request Request) (Response, error)
}

The Pipeline interface represents an ordered list of Factory objects and an object implementing the HTTPSender interface. You construct a Pipeline by calling the pipeline.NewPipeline function. To send an HTTP request, call pipeline.NewRequest and then call Pipeline's Do method passing a context, the request, and a method-specific Factory (or nil). Passing a method-specific Factory allows this one call to Do to inject a Policy into the linked-list. The policy is injected where the MethodFactoryMarker (see the pipeline.MethodFactoryMarker function) is in the slice of Factory objects.

When Do is called, the Pipeline object asks each Factory object to construct its Policy object and adds each Policy to a linked-list. THen, Do sends the Context and Request through all the Policy objects. The final Policy object sends the request over the network (via the HTTPSender object passed to NewPipeline) and the response is returned backwards through all the Policy objects. Since Pipeline and Factory objects are goroutine-safe, you typically create 1 Pipeline object and reuse it to make many HTTP requests.

func NewPipeline

func NewPipeline(factories []Factory, o Options) Pipeline

NewPipeline creates a new goroutine-safe Pipeline object from the slice of Factory objects and the specified options.

type Policy

type Policy interface {
	Do(ctx context.Context, request Request) (Response, error)
}

The Policy interface represents a mutable Policy object created by a Factory. The object can mutate/process the HTTP request and then forward it on to the next Policy object in the linked-list. The returned Response goes backward through the linked-list for additional processing. NOTE: Request is passed by value so changes do not change the caller's version of the request. However, Request has some fields that reference mutable objects (not strings). These references are copied; a deep copy is not performed. Specifically, this means that you should avoid modifying the objects referred to by these fields: URL, Header, Body, GetBody, TransferEncoding, Form, MultipartForm, Trailer, TLS, Cancel, and Response.

type PolicyFunc

type PolicyFunc func(ctx context.Context, request Request) (Response, error)

PolicyFunc is an adapter that allows the use of an ordinary function as a Policy interface.

func (PolicyFunc) Do

func (f PolicyFunc) Do(ctx context.Context, request Request) (Response, error)

Do calls f(ctx, request).

type PolicyOptions

type PolicyOptions struct {
	// contains filtered or unexported fields
}

A PolicyOptions represents optional information that can be used by a node in the linked-list of Policy objects. A PolicyOptions is passed to the Factory's New method which passes it (if desired) to the Policy object it creates. Today, the Policy object uses the options to perform logging. But, in the future, this could be used for more.

func (*PolicyOptions) Log

func (po *PolicyOptions) Log(level LogLevel, msg string)

Log logs a string to the Pipeline's Logger.

func (*PolicyOptions) ShouldLog

func (po *PolicyOptions) ShouldLog(level LogLevel) bool

ShouldLog returns true if the specified log level should be logged.

type ProgressReceiver

type ProgressReceiver func(bytesTransferred int64)

ProgressReceiver defines the signature of a callback function invoked as progress is reported.

type Request

type Request struct {
	*http.Request
}

Request is a thin wrapper over an http.Request. The wrapper provides several helper methods.

func NewRequest

func NewRequest(method string, url url.URL, body io.ReadSeeker) (request Request, err error)

NewRequest initializes a new HTTP request object with any desired options.

func (Request) Copy

func (r Request) Copy() Request

Copy makes a copy of an http.Request. Specifically, it makes a deep copy of its Method, URL, Host, Proto(Major/Minor), Header. ContentLength, Close, RemoteAddr, RequestURI. Copy makes a shallow copy of the Body, GetBody, TLS, Cancel, Response, and ctx fields. Copy panics if any of these fields are not nil: TransferEncoding, Form, PostForm, MultipartForm, or Trailer.

func (Request) RewindBody

func (r Request) RewindBody() error

RewindBody seeks the request's Body stream back to the beginning so it can be resent when retrying an operation.

func (Request) SetBody

func (r Request) SetBody(body io.ReadSeeker) error

SetBody sets the body and content length, assumes body is not nil.

type Response

type Response interface {
	Response() *http.Response
}

The Response interface exposes an http.Response object as it returns through the pipeline of Policy objects. This ensures that Policy objects have access to the HTTP response. However, the object this interface encapsulates might be a struct with additional fields that is created by a Policy object (typically a method-specific Factory). The method that injected the method-specific Factory gets this returned Response and performs a type assertion to the expected struct and returns the struct to its caller.

func NewHTTPResponse

func NewHTTPResponse(response *http.Response) Response

NewHTTPResponse is typically called by a Policy object to return a Response object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL