Documentation ¶
Overview ¶
Package frugal provides the library APIs used by the Frugal code generator.
Index ¶
- Constants
- func DefaultFNatsServerOnRequestFinished(properties map[interface{}]interface{})
- func DefaultFNatsServerOnRequestReceived(properties map[interface{}]interface{})
- func IsErrTooLarge(err error) bool
- func NewDefaultFNatsServerOnRequestStarted(highWatermark time.Duration) func(map[interface{}]interface{})
- func NewFrugalHandlerFunc(processor FProcessor, protocolFactory *FProtocolFactory) http.HandlerFunc
- func NewTFramedTransportFactory(factory thrift.TTransportFactory) thrift.TTransportFactory
- func NewTFramedTransportFactoryMaxLength(factory thrift.TTransportFactory, maxLength uint32) thrift.TTransportFactory
- func SetLogger(logger *logrus.Logger)
- func WriteBinary(p thrift.TProtocol, value []byte, name string, field int16) error
- func WriteBool(p thrift.TProtocol, value bool, name string, field int16) error
- func WriteByte(p thrift.TProtocol, value int8, name string, field int16) error
- func WriteDouble(p thrift.TProtocol, value float64, name string, field int16) error
- func WriteI16(p thrift.TProtocol, value int16, name string, field int16) error
- func WriteI32(p thrift.TProtocol, value int32, name string, field int16) error
- func WriteI64(p thrift.TProtocol, value int64, name string, field int16) error
- func WriteString(p thrift.TProtocol, value, name string, field int16) error
- func WriteStruct(p thrift.TProtocol, value thrift.TStruct, name string, field int16) error
- type Arguments
- type BaseFTransportMonitor
- type FAsyncCallback
- type FBaseProcessor
- func (f *FBaseProcessor) AddMiddleware(middleware ServiceMiddleware)
- func (f *FBaseProcessor) AddToAnnotationsMap(method string, annotations map[string]string)
- func (f *FBaseProcessor) AddToProcessorMap(key string, proc FProcessorFunction)
- func (f *FBaseProcessor) Annotations() map[string]map[string]string
- func (f *FBaseProcessor) GetWriteMutex() *sync.Mutex
- func (f *FBaseProcessor) Process(iprot, oprot *FProtocol) error
- type FBaseProcessorFunction
- func (f *FBaseProcessorFunction) AddMiddleware(middleware ServiceMiddleware)
- func (f *FBaseProcessorFunction) GetWriteMutex() *sync.Mutexdeprecated
- func (f *FBaseProcessorFunction) InvokeMethod(args []interface{}) Results
- func (f *FBaseProcessorFunction) SendError(ctx FContext, oprot *FProtocol, kind int32, method, message string) error
- func (f *FBaseProcessorFunction) SendReply(ctx FContext, oprot *FProtocol, method string, result thrift.TStruct) error
- type FClient
- type FContext
- type FContextImpl
- func (c *FContextImpl) AddEphemeralProperty(key, value interface{}) FContext
- func (c *FContextImpl) AddRequestHeader(name, value string) FContext
- func (c *FContextImpl) AddResponseHeader(name, value string) FContext
- func (c *FContextImpl) Clone() FContextWithEphemeralProperties
- func (c *FContextImpl) CorrelationID() string
- func (c *FContextImpl) EphemeralProperties() map[interface{}]interface{}
- func (c *FContextImpl) EphemeralProperty(key interface{}) (interface{}, bool)
- func (c *FContextImpl) RequestHeader(name string) (string, bool)
- func (c *FContextImpl) RequestHeaders() map[string]string
- func (c *FContextImpl) ResponseHeader(name string) (string, bool)
- func (c *FContextImpl) ResponseHeaders() map[string]string
- func (c *FContextImpl) SetTimeout(timeout time.Duration) FContext
- func (c *FContextImpl) Timeout() time.Duration
- type FContextWithEphemeralProperties
- type FHTTPTransportBuilder
- func (h *FHTTPTransportBuilder) Build() FTransport
- func (h *FHTTPTransportBuilder) WithRequestHeaders(requestHeaders map[string]string) *FHTTPTransportBuilder
- func (h *FHTTPTransportBuilder) WithRequestHeadersFromFContext(getRequestHeaders GetHeadersWithContext) *FHTTPTransportBuilder
- func (h *FHTTPTransportBuilder) WithRequestSizeLimit(requestSizeLimit uint) *FHTTPTransportBuilder
- func (h *FHTTPTransportBuilder) WithResponseSizeLimit(responseSizeLimit uint) *FHTTPTransportBuilder
- type FNatsPublisherTransportFactory
- type FNatsServerBuilder
- func (f *FNatsServerBuilder) Build() FServer
- func (f *FNatsServerBuilder) WithHighWatermark(highWatermark time.Duration) *FNatsServerBuilder
- func (f *FNatsServerBuilder) WithQueueGroup(queue string) *FNatsServerBuilder
- func (f *FNatsServerBuilder) WithQueueLength(queueLength uint) *FNatsServerBuilder
- func (f *FNatsServerBuilder) WithRequestFinishedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder
- func (f *FNatsServerBuilder) WithRequestReceivedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder
- func (f *FNatsServerBuilder) WithRequestStartedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder
- func (f *FNatsServerBuilder) WithWorkerCount(workerCount uint) *FNatsServerBuilder
- type FNatsSubscriberTransportFactory
- type FProcessor
- type FProcessorFunction
- type FProtocol
- type FProtocolFactory
- type FPublisherTransport
- type FPublisherTransportFactory
- type FScopeProvider
- type FServer
- type FServiceProvider
- type FSimpleServer
- type FStandardClient
- func (client *FStandardClient) Call(ctx FContext, method string, args, result thrift.TStruct) error
- func (client *FStandardClient) Close() error
- func (client *FStandardClient) Oneway(ctx FContext, method string, args thrift.TStruct) error
- func (client *FStandardClient) Open() error
- func (client *FStandardClient) Publish(ctx FContext, op, topic string, message thrift.TStruct) error
- type FSubscriberTransport
- type FSubscriberTransportFactory
- type FSubscription
- type FTransport
- type FTransportFactory
- type FTransportMonitor
- type GetHeadersWithContext
- type InvocationHandler
- type Method
- type Results
- type ServiceMiddleware
- type TFramedTransport
- func (p *TFramedTransport) Close() error
- func (p *TFramedTransport) Flush() error
- func (p *TFramedTransport) IsOpen() bool
- func (p *TFramedTransport) Open() error
- func (p *TFramedTransport) Read(buf []byte) (l int, err error)
- func (p *TFramedTransport) RemainingBytes() uint64
- func (p *TFramedTransport) Write(buf []byte) (int, error)
- type TMemoryOutputBuffer
Constants ¶
const ( // Inherited from thrift TRANSPORT_EXCEPTION_UNKNOWN = thrift.UNKNOWN_TRANSPORT_EXCEPTION TRANSPORT_EXCEPTION_NOT_OPEN = thrift.NOT_OPEN TRANSPORT_EXCEPTION_ALREADY_OPEN = thrift.ALREADY_OPEN TRANSPORT_EXCEPTION_TIMED_OUT = thrift.TIMED_OUT TRANSPORT_EXCEPTION_END_OF_FILE = thrift.END_OF_FILE // TRANSPORT_EXCEPTION_REQUEST_TOO_LARGE is a TTransportException // error type indicating the request exceeded the size limit. TRANSPORT_EXCEPTION_REQUEST_TOO_LARGE = 100 // TRANSPORT_EXCEPTION_RESPONSE_TOO_LARGE is a TTransportException // error type indicating the response exceeded the size limit. TRANSPORT_EXCEPTION_RESPONSE_TOO_LARGE = 101 // TRANSPORT_EXCEPTION_DISCONNECTED is a TTransportException error type // indicating the transport was disconnected TRANSPORT_EXCEPTION_DISCONNECTED = 102 )
TTransportException types used in frugal instantiated TTransportExceptions.
const ( // Inherited from thrift APPLICATION_EXCEPTION_UNKNOWN = thrift.UNKNOWN_APPLICATION_EXCEPTION APPLICATION_EXCEPTION_UNKNOWN_METHOD = thrift.UNKNOWN_METHOD APPLICATION_EXCEPTION_INVALID_MESSAGE_TYPE = thrift.INVALID_MESSAGE_TYPE_EXCEPTION APPLICATION_EXCEPTION_WRONG_METHOD_NAME = thrift.WRONG_METHOD_NAME APPLICATION_EXCEPTION_BAD_SEQUENCE_ID = thrift.BAD_SEQUENCE_ID APPLICATION_EXCEPTION_MISSING_RESULT = thrift.MISSING_RESULT APPLICATION_EXCEPTION_INTERNAL_ERROR = thrift.INTERNAL_ERROR APPLICATION_EXCEPTION_PROTOCOL_ERROR = thrift.PROTOCOL_ERROR APPLICATION_EXCEPTION_INVALID_TRANSFORM = 8 APPLICATION_EXCEPTION_INVALID_PROTOCOL = 9 APPLICATION_EXCEPTION_UNSUPPORTED_CLIENT_TYPE = 10 // APPLICATION_EXCEPTION_RESPONSE_TOO_LARGE is a TApplicationException // error type indicating the response exceeded the size limit. APPLICATION_EXCEPTION_RESPONSE_TOO_LARGE = 100 )
TApplicationException types used in frugal instantiated TApplicationExceptions.
const (
RequestReceivedTimeKey = "request_received_time"
)
Variables ¶
This section is empty.
Functions ¶
func DefaultFNatsServerOnRequestFinished ¶
func DefaultFNatsServerOnRequestFinished(properties map[interface{}]interface{})
DefaultFNatsServerOnRequestFinished is the default handler called when an FNatsServer finishes processing a message. If does nothing
func DefaultFNatsServerOnRequestReceived ¶
func DefaultFNatsServerOnRequestReceived(properties map[interface{}]interface{})
DefaultFNatsServerOnRequestReceived is the default handler called when an FNatsServer receives a message. It adds the time the request was received to the passed in properties.
func IsErrTooLarge ¶
IsErrTooLarge indicates if the given error is a TTransportException indicating an oversized request or response.
func NewDefaultFNatsServerOnRequestStarted ¶
func NewDefaultFNatsServerOnRequestStarted(highWatermark time.Duration) func(map[interface{}]interface{})
NewDefaultFNatsServerOnRequestStarted constructs a default handler for when an FNatsServer starts processing a message. It checks the current time against a start time in the passed in properties, and logs a warning if the difference is over a threshold.
func NewFrugalHandlerFunc ¶
func NewFrugalHandlerFunc(processor FProcessor, protocolFactory *FProtocolFactory) http.HandlerFunc
NewFrugalHandlerFunc is a function that creates a ready to use Frugal handler function.
func NewTFramedTransportFactory ¶
func NewTFramedTransportFactory(factory thrift.TTransportFactory) thrift.TTransportFactory
NewTFramedTransportFactory creates a new TTransportFactory that produces TFramedTransports.
func NewTFramedTransportFactoryMaxLength ¶
func NewTFramedTransportFactoryMaxLength(factory thrift.TTransportFactory, maxLength uint32) thrift.TTransportFactory
NewTFramedTransportFactoryMaxLength creates a new TTransportFactory that produces TFramedTransports with the given max length.
func WriteBinary ¶
WriteBinary writes []byte `value` of field name and id `name` and `field` respectively into `p`.
func WriteBool ¶
WriteBool writes bool `value` of field name and id `name` and `field` respectively into `p`.
func WriteByte ¶
WriteByte writes byte `value` of field name and id `name` and `field` respectively into `p`.
func WriteDouble ¶
WriteDouble writes float64 `value` of field name and id `name` and `field` respectively into `p`.
func WriteI16 ¶
WriteI16 writes int16 `value` of field name and id `name` and `field` respectively into `p`.
func WriteI32 ¶
WriteI32 writes int32 `value` of field name and id `name` and `field` respectively into `p`.
func WriteI64 ¶
WriteI64 writes int64 `value` of field name and id `name` and `field` respectively into `p`.
func WriteString ¶
WriteString writes string `value` of field name and id `name` and `field` respectively into `p`.
Types ¶
type Arguments ¶
type Arguments []interface{}
Arguments contains the arguments to a service method. The first argument will always be the FContext.
func (Arguments) SetContext ¶
Context sets the given FContext as the first argument.
type BaseFTransportMonitor ¶
type BaseFTransportMonitor struct { MaxReopenAttempts uint InitialWait time.Duration MaxWait time.Duration }
BaseFTransportMonitor is a default monitor implementation that attempts to re-open a closed transport with exponential backoff behavior and a capped number of retries. Its behavior can be customized by embedding this struct type in a new struct which "overrides" desired callbacks.
func (*BaseFTransportMonitor) OnClosedCleanly ¶
func (m *BaseFTransportMonitor) OnClosedCleanly()
OnClosedCleanly is called when the transport is closed cleanly by a call to Close()
func (*BaseFTransportMonitor) OnClosedUncleanly ¶
func (m *BaseFTransportMonitor) OnClosedUncleanly(cause error) (bool, time.Duration)
OnClosedUncleanly is called when the transport is closed for a reason *other* than a call to Close(). Returns whether to try reopening the transport and, if so, how long to wait before making the attempt.
func (*BaseFTransportMonitor) OnReopenFailed ¶
func (m *BaseFTransportMonitor) OnReopenFailed(prevAttempts uint, prevWait time.Duration) (bool, time.Duration)
OnReopenFailed is called when an attempt to reopen the transport fails. Given the number of previous attempts to re-open the transport and the length of the previous wait. Returns whether to attempt to re-open the transport, and how long to wait before making the attempt.
func (*BaseFTransportMonitor) OnReopenSucceeded ¶
func (m *BaseFTransportMonitor) OnReopenSucceeded()
OnReopenSucceeded is called after the transport has been successfully re-opened.
type FAsyncCallback ¶
type FAsyncCallback func(thrift.TTransport) error
FAsyncCallback is an internal callback which is constructed by generated code and invoked by an FRegistry when a RPC response is received. In other words, it's used to complete RPCs. The operation ID on FContext is used to look up the appropriate callback. FAsyncCallback is passed an in-memory TTransport which wraps the complete message. The callback returns an error or throws an exception if an unrecoverable error occurs and the transport needs to be shutdown.
type FBaseProcessor ¶
type FBaseProcessor struct {
// contains filtered or unexported fields
}
FBaseProcessor is a base implementation of FProcessor. FProcessors should embed this and register FProcessorFunctions. This should only be used by generated code.
func NewFBaseProcessor ¶
func NewFBaseProcessor() *FBaseProcessor
NewFBaseProcessor returns a new FBaseProcessor which FProcessors can extend.
func (*FBaseProcessor) AddMiddleware ¶
func (f *FBaseProcessor) AddMiddleware(middleware ServiceMiddleware)
AddMiddleware adds the given ServiceMiddleware to the FProcessor. This should only be called before the server is started.
func (*FBaseProcessor) AddToAnnotationsMap ¶
func (f *FBaseProcessor) AddToAnnotationsMap(method string, annotations map[string]string)
AddToAnnotationsMap registers the given annotations to the given method.
func (*FBaseProcessor) AddToProcessorMap ¶
func (f *FBaseProcessor) AddToProcessorMap(key string, proc FProcessorFunction)
AddToProcessorMap registers the given FProcessorFunction.
func (*FBaseProcessor) Annotations ¶
func (f *FBaseProcessor) Annotations() map[string]map[string]string
Annotations returns a map of method name to annotations as defined in the service IDL that is serviced by this processor.
func (*FBaseProcessor) GetWriteMutex ¶
func (f *FBaseProcessor) GetWriteMutex() *sync.Mutex
GetWriteMutex returns the Mutex which FProcessorFunctions should use to synchronize access to the output FProtocol.
func (*FBaseProcessor) Process ¶
func (f *FBaseProcessor) Process(iprot, oprot *FProtocol) error
Process the request from the input protocol and write the response to the output protocol.
type FBaseProcessorFunction ¶
type FBaseProcessorFunction struct {
// contains filtered or unexported fields
}
FBaseProcessorFunction is a base implementation of FProcessorFunction. FProcessorFunctions should embed this. This should only be used by generated code.
func NewFBaseProcessorFunction ¶
func NewFBaseProcessorFunction(writeMu *sync.Mutex, handler *Method) *FBaseProcessorFunction
NewFBaseProcessorFunction returns a new FBaseProcessorFunction which FProcessorFunctions can extend.
func (*FBaseProcessorFunction) AddMiddleware ¶
func (f *FBaseProcessorFunction) AddMiddleware(middleware ServiceMiddleware)
AddMiddleware adds the given ServiceMiddleware to the FProcessorFunction. This should only be called before the server is started.
func (*FBaseProcessorFunction) GetWriteMutex
deprecated
func (f *FBaseProcessorFunction) GetWriteMutex() *sync.Mutex
GetWriteMutex returns the Mutex which should be used to synchronize access to the output FProtocol.
Deprecated: use SendError or SendReply instead!
func (*FBaseProcessorFunction) InvokeMethod ¶
func (f *FBaseProcessorFunction) InvokeMethod(args []interface{}) Results
InvokeMethod invokes the handler method.
type FClient ¶
type FClient interface { Open() error // holdover from publisher refactor, remove in frugal v4 Close() error // holdover from publisher refactor, remvoe in frugal v4 Call(ctx FContext, method string, args, result thrift.TStruct) error Oneway(ctx FContext, method string, args thrift.TStruct) error Publish(ctx FContext, op, topic string, message thrift.TStruct) error }
FClient ...
type FContext ¶
type FContext interface { // CorrelationID returns the correlation id for the context. CorrelationID() string // AddRequestHeader adds a request header to the context for the given // name. The headers _cid and _opid are reserved. Returns the same FContext // to allow for chaining calls. AddRequestHeader(name, value string) FContext // RequestHeader gets the named request header. RequestHeader(name string) (string, bool) // RequestHeaders returns the request headers map. RequestHeaders() map[string]string // AddResponseHeader adds a response header to the context for the given // name. The _opid header is reserved. Returns the same FContext to allow // for chaining calls. AddResponseHeader(name, value string) FContext // ResponseHeader gets the named response header. ResponseHeader(name string) (string, bool) // ResponseHeaders returns the response headers map. ResponseHeaders() map[string]string // SetTimeout sets the request timeout. Default is 5 seconds. Returns the // same FContext to allow for chaining calls. SetTimeout(timeout time.Duration) FContext // Timeout returns the request timeout. Timeout() time.Duration }
FContext is the context for a Frugal message. Every RPC has an FContext, which can be used to set request headers, response headers, and the request timeout. The default timeout is five seconds. An FContext is also sent with every publish message which is then received by subscribers.
In addition to headers, the FContext also contains a correlation ID which can be used for distributed tracing purposes. A random correlation ID is generated for each FContext if one is not provided.
FContext also plays a key role in Frugal's multiplexing support. A unique, per-request operation ID is set on every FContext before a request is made. This operation ID is sent in the request and included in the response, which is then used to correlate a response to a request. The operation ID is an internal implementation detail and is not exposed to the user.
An FContext should belong to a single request for the lifetime of that request. It can be reused once the request has completed, though they should generally not be reused.
Implementations of FContext must adhere to the following:
- The CorrelationID should be stored as a request header with the header name "_cid"
- Threadsafe
func Clone ¶
Clone performs a deep copy of an FContext while handling opids correctly. TODO 4.0 consider adding this to the FContext interface.
func NewFContext ¶
NewFContext returns a Context for the given correlation id. If an empty correlation id is given, one will be generated. A Context should belong to a single request for the lifetime of the request. It can be reused once its request has completed, though they should generally not be reused.
type FContextImpl ¶
type FContextImpl struct {
// contains filtered or unexported fields
}
FContextImpl is an implementation of FContext.
func (*FContextImpl) AddEphemeralProperty ¶
func (c *FContextImpl) AddEphemeralProperty(key, value interface{}) FContext
AddEphemeralProperty adds a keyp-value pair to the ephemeral properties.
func (*FContextImpl) AddRequestHeader ¶
func (c *FContextImpl) AddRequestHeader(name, value string) FContext
AddRequestHeader adds a request header to the context for the given name. The headers _cid and _opid are reserved. Returns the same FContext to allow for chaining calls.
func (*FContextImpl) AddResponseHeader ¶
func (c *FContextImpl) AddResponseHeader(name, value string) FContext
AddResponseHeader adds a response header to the context for the given name. The _opid header is reserved. Returns the same FContext to allow for chaining calls.
func (*FContextImpl) Clone ¶
func (c *FContextImpl) Clone() FContextWithEphemeralProperties
Clone performs a deep copy of an FContextWithEphemeralProperties while handling opids correctly.
func (*FContextImpl) CorrelationID ¶
func (c *FContextImpl) CorrelationID() string
CorrelationID returns the correlation id for the context.
func (*FContextImpl) EphemeralProperties ¶
func (c *FContextImpl) EphemeralProperties() map[interface{}]interface{}
EphemeralProperties returns a copy of the ephemeral properties map.
func (*FContextImpl) EphemeralProperty ¶
func (c *FContextImpl) EphemeralProperty(key interface{}) (interface{}, bool)
EphemeralProperty gets the property associated with the given key.
func (*FContextImpl) RequestHeader ¶
func (c *FContextImpl) RequestHeader(name string) (string, bool)
RequestHeader gets the named request header.
func (*FContextImpl) RequestHeaders ¶
func (c *FContextImpl) RequestHeaders() map[string]string
RequestHeaders returns the request headers map.
func (*FContextImpl) ResponseHeader ¶
func (c *FContextImpl) ResponseHeader(name string) (string, bool)
ResponseHeader gets the named response header.
func (*FContextImpl) ResponseHeaders ¶
func (c *FContextImpl) ResponseHeaders() map[string]string
ResponseHeaders returns the response headers map.
func (*FContextImpl) SetTimeout ¶
func (c *FContextImpl) SetTimeout(timeout time.Duration) FContext
SetTimeout sets the request timeout. Default is 5 seconds. Returns the same FContext to allow for chaining calls.
func (*FContextImpl) Timeout ¶
func (c *FContextImpl) Timeout() time.Duration
Timeout returns the request timeout.
type FContextWithEphemeralProperties ¶
type FContextWithEphemeralProperties interface { FContext // Clone performs a deep copy of an FContextWithEphemeralProperties while // handling opids correctly. Clone() FContextWithEphemeralProperties // EphemeralProperty gets the property associated with the given key. EphemeralProperty(key interface{}) (interface{}, bool) // EphemeralProperties returns a copy of the ephemeral properties map. EphemeralProperties() map[interface{}]interface{} // AddEphemeralProperty adds a keyp-value pair to the ephemeral properties. AddEphemeralProperty(key, value interface{}) FContext }
FContextWithEphemeralProperties is an extension of the FContext interface with support for ephemeral properties. Ephemeral properties are a map of key-value pairs that won't be serialized with the rest of the FContext. TODO 4.0 add this to the FContext interface
type FHTTPTransportBuilder ¶
type FHTTPTransportBuilder struct {
// contains filtered or unexported fields
}
FHTTPTransportBuilder configures and builds HTTP FTransport instances.
func NewFHTTPTransportBuilder ¶
func NewFHTTPTransportBuilder(client *http.Client, url string) *FHTTPTransportBuilder
NewFHTTPTransportBuilder creates a builder which configures and builds HTTP FTransport instances.
func (*FHTTPTransportBuilder) Build ¶
func (h *FHTTPTransportBuilder) Build() FTransport
Build a new configured HTTP FTransport.
func (*FHTTPTransportBuilder) WithRequestHeaders ¶
func (h *FHTTPTransportBuilder) WithRequestHeaders(requestHeaders map[string]string) *FHTTPTransportBuilder
withRequestHeaders adds custom request headers. If set to nil (the default), there is no size limit on responses.
func (*FHTTPTransportBuilder) WithRequestHeadersFromFContext ¶
func (h *FHTTPTransportBuilder) WithRequestHeadersFromFContext(getRequestHeaders GetHeadersWithContext) *FHTTPTransportBuilder
withRequestHeadersFromFContext adds custom request headers to each request with a provided function that accepts an FContext and returns map of string key-value pairs
func (*FHTTPTransportBuilder) WithRequestSizeLimit ¶
func (h *FHTTPTransportBuilder) WithRequestSizeLimit(requestSizeLimit uint) *FHTTPTransportBuilder
WithRequestSizeLimit adds a request size limit. If set to 0 (the default), there is no size limit on requests.
func (*FHTTPTransportBuilder) WithResponseSizeLimit ¶
func (h *FHTTPTransportBuilder) WithResponseSizeLimit(responseSizeLimit uint) *FHTTPTransportBuilder
WithResponseSizeLimit adds a response size limit. If set to 0 (the default), there is no size limit on responses.
type FNatsPublisherTransportFactory ¶
type FNatsPublisherTransportFactory struct {
// contains filtered or unexported fields
}
FNatsPublisherTransportFactory creates FNatsPublisherTransports.
func NewFNatsPublisherTransportFactory ¶
func NewFNatsPublisherTransportFactory(conn *nats.Conn) *FNatsPublisherTransportFactory
NewFNatsPublisherTransportFactory creates an FNatsPublisherTransportFactory using the provided NATS connection.
func (*FNatsPublisherTransportFactory) GetTransport ¶
func (n *FNatsPublisherTransportFactory) GetTransport() FPublisherTransport
GetTransport creates a new NATS FPublisherTransport.
type FNatsServerBuilder ¶
type FNatsServerBuilder struct {
// contains filtered or unexported fields
}
FNatsServerBuilder configures and builds NATS server instances.
func NewFNatsServerBuilder ¶
func NewFNatsServerBuilder(conn *nats.Conn, processor FProcessor, protoFactory *FProtocolFactory, subjects []string) *FNatsServerBuilder
NewFNatsServerBuilder creates a builder which configures and builds NATS server instances.
func (*FNatsServerBuilder) Build ¶
func (f *FNatsServerBuilder) Build() FServer
Build a new configured NATS FServer.
func (*FNatsServerBuilder) WithHighWatermark ¶
func (f *FNatsServerBuilder) WithHighWatermark(highWatermark time.Duration) *FNatsServerBuilder
WithHighWatermark controls the time duration requests wait in queue before triggering slow consumer logic.
func (*FNatsServerBuilder) WithQueueGroup ¶
func (f *FNatsServerBuilder) WithQueueGroup(queue string) *FNatsServerBuilder
WithQueueGroup adds a NATS queue group to receive requests on.
func (*FNatsServerBuilder) WithQueueLength ¶
func (f *FNatsServerBuilder) WithQueueLength(queueLength uint) *FNatsServerBuilder
WithQueueLength controls the length of the work queue used to buffer requests.
func (*FNatsServerBuilder) WithRequestFinishedEventHandler ¶
func (f *FNatsServerBuilder) WithRequestFinishedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder
WithRequestFinishedEventHandler sets a function to be called after the FNatsServer processes a message.
If the same functionality can be accomplished through middleware, middleware is preferred as it is more flexible and more portable between different servers. This function should only handle events and behaviour specific to an FNatsServer that aren't applicable to other frugal servers.
func (*FNatsServerBuilder) WithRequestReceivedEventHandler ¶
func (f *FNatsServerBuilder) WithRequestReceivedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder
WithRequestReceivedEventHandler sets a function to be called when the FNatsServer receives a message, but before it is put onto a work queue. The properties map will be set on the FContext before processing is started.
If the same functionality can be accomplished through middleware, middleware is preferred as it is more flexible and more portable between different servers. This function should only handle events and behaviour specific to an FNatsServer that aren't applicable to other frugal servers.
func (*FNatsServerBuilder) WithRequestStartedEventHandler ¶
func (f *FNatsServerBuilder) WithRequestStartedEventHandler(handler func(map[interface{}]interface{})) *FNatsServerBuilder
WithRequestStartedEventHandler sets a function to be called before the FNatsServer processes a message. The properties map will be set on the FContext before processing begins.
If the same functionality can be accomplished through middleware, middleware is preferred as it is more flexible and more portable between different servers. This function should only handle events and behaviour specific to an FNatsServer that aren't applicable to other frugal servers.
func (*FNatsServerBuilder) WithWorkerCount ¶
func (f *FNatsServerBuilder) WithWorkerCount(workerCount uint) *FNatsServerBuilder
WithWorkerCount controls the number of goroutines used to process requests.
type FNatsSubscriberTransportFactory ¶
type FNatsSubscriberTransportFactory struct {
// contains filtered or unexported fields
}
FNatsSubscriberTransportFactory creates FNatsSubscriberTransports.
func NewFNatsSubscriberTransportFactory ¶
func NewFNatsSubscriberTransportFactory(conn *nats.Conn) *FNatsSubscriberTransportFactory
NewFNatsSubscriberTransportFactory creates an FNatsSubscriberTransportFactory using the provided NATS connection. Subscribers using this transport will not use a queue.
func NewFNatsSubscriberTransportFactoryWithQueue ¶
func NewFNatsSubscriberTransportFactoryWithQueue(conn *nats.Conn, queue string) *FNatsSubscriberTransportFactory
NewFNatsSubscriberTransportFactoryWithQueue creates an FNatsSubscriberTransportFactory using the provided NATS connection. Subscribers using this transport will subscribe to the provided queue, forming a queue group. When a queue group is formed, only one member receives the message.
func (*FNatsSubscriberTransportFactory) GetTransport ¶
func (n *FNatsSubscriberTransportFactory) GetTransport() FSubscriberTransport
GetTransport creates a new NATS FSubscriberTransport.
type FProcessor ¶
type FProcessor interface { // Process the request from the input protocol and write the response to // the output protocol. Process(in, out *FProtocol) error // AddMiddleware adds the given ServiceMiddleware to the FProcessor. This // should only be called before the server is started. AddMiddleware(ServiceMiddleware) // Annotations returns a map of method name to annotations as defined in // the service IDL that is serviced by this processor. Annotations() map[string]map[string]string }
FProcessor is Frugal's equivalent of Thrift's TProcessor. It's a generic object which operates upon an input stream and writes to an output stream. Specifically, an FProcessor is provided to an FServer in order to wire up a service handler to process requests.
type FProcessorFunction ¶
type FProcessorFunction interface { // Process the request from the input protocol and write the response to // the output protocol. Process(ctx FContext, in, out *FProtocol) error // AddMiddleware adds the given ServiceMiddleware to the // FProcessorFunction. This should only be called before the server is // started. AddMiddleware(middleware ServiceMiddleware) }
FProcessorFunction is used internally by generated code. An FProcessor registers an FProcessorFunction for each service method. Like FProcessor, an FProcessorFunction exposes a single process call, which is used to handle a method invocation.
type FProtocol ¶
FProtocol is Frugal's equivalent of Thrift's TProtocol. It defines the serialization protocol used for messages, such as JSON, binary, etc. FProtocol actually extends TProtocol and adds support for serializing FContext. In practice, FProtocol simply wraps a TProtocol and uses Thrift's built-in serialization. FContext is encoded before the TProtocol serialization of the message using a simple binary protocol. See the protocol documentation for more details.
func (*FProtocol) ReadRequestHeader ¶
ReadRequestHeader reads the request headers on the protocol into a returned Context
func (*FProtocol) ReadResponseHeader ¶
ReadResponseHeader reads the response headers on the protocol into a provided Context
func (*FProtocol) WriteRequestHeader ¶
WriteRequestHeader writes the request headers set on the given Context into the protocol
func (*FProtocol) WriteResponseHeader ¶
WriteResponseHeader writes the response headers set on the given Context into the protocol
type FProtocolFactory ¶
type FProtocolFactory struct {
// contains filtered or unexported fields
}
FProtocolFactory creates new FProtocol instances. It takes a TProtocolFactory and a TTransport and returns an FProtocol which wraps a TProtocol produced by the TProtocolFactory. The TProtocol itself wraps the provided TTransport. This makes it easy to produce an FProtocol which uses any existing Thrift transports and protocols in a composable manner.
func NewFProtocolFactory ¶
func NewFProtocolFactory(protoFactory thrift.TProtocolFactory) *FProtocolFactory
NewFProtocolFactory creates a new FProtocolFactory with the given TProtocolFactory.
func (*FProtocolFactory) GetProtocol ¶
func (f *FProtocolFactory) GetProtocol(tr thrift.TTransport) *FProtocol
GetProtocol returns a new FProtocol instance using the given TTransport.
type FPublisherTransport ¶
type FPublisherTransport interface { // Open opens the transport. Open() error // Close closes the transport. Close() error // IsOpen returns true if the transport is open, false otherwise. IsOpen() bool // GetPublishSizeLimit returns the maximum allowable size of a payload // to be published. 0 is returned to indicate an unbounded allowable size. GetPublishSizeLimit() uint // Publish sends the given payload with the transport. Implementations // of publish should be threadsafe. Publish(string, []byte) error }
FPublisherTransport is used exclusively for pub/sub scopes. Publishers use it to publish messages to a topic.
func NewNatsFPublisherTransport ¶
func NewNatsFPublisherTransport(conn *nats.Conn) FPublisherTransport
NewNatsFPublisherTransport creates a new FPublisherTransport which is used for publishing with scopes.
type FPublisherTransportFactory ¶
type FPublisherTransportFactory interface {
GetTransport() FPublisherTransport
}
FPublisherTransportFactory produces FPublisherTransports and is typically used by an FScopeProvider.
type FScopeProvider ¶
type FScopeProvider struct {
// contains filtered or unexported fields
}
FScopeProvider produces FScopeTransports and FProtocols for use by pub/sub scopes. It does this by wrapping an FScopeTransportFactory and FProtocolFactory. This also provides a shim for adding middleware to a publisher or subscriber.
func NewFScopeProvider ¶
func NewFScopeProvider(pub FPublisherTransportFactory, sub FSubscriberTransportFactory, prot *FProtocolFactory, middleware ...ServiceMiddleware) *FScopeProvider
NewFScopeProvider creates a new FScopeProvider using the given factories.
func (*FScopeProvider) GetMiddleware ¶
func (p *FScopeProvider) GetMiddleware() []ServiceMiddleware
GetMiddleware returns the ServiceMiddleware stored on this FScopeProvider.
func (*FScopeProvider) NewPublisher ¶
func (p *FScopeProvider) NewPublisher() (FPublisherTransport, *FProtocolFactory)
NewPublisher returns a new FPublisherTransport and FProtocol used by scope publishers.
func (*FScopeProvider) NewSubscriber ¶
func (p *FScopeProvider) NewSubscriber() (FSubscriberTransport, *FProtocolFactory)
NewSubscriber returns a new FSubscriberTransport and FProtocolFactory used by scope subscribers.
type FServer ¶
type FServer interface { // Serve starts the server. Serve() error // Stop the server. This is optional on a per-implementation basis. Not all // servers are required to be cleanly stoppable. Stop() error }
FServer is Frugal's equivalent of Thrift's TServer. It's used to run a Frugal RPC service by executing an FProcessor on client connections.
type FServiceProvider ¶
type FServiceProvider struct {
// contains filtered or unexported fields
}
FServiceProvider produces FTransports and FProtocolFactories for use by RPC service clients. The main purpose of this is to provide a shim for adding middleware to a client.
func NewFServiceProvider ¶
func NewFServiceProvider(transport FTransport, protocolFactory *FProtocolFactory, middleware ...ServiceMiddleware) *FServiceProvider
NewFServiceProvider creates a new FServiceProvider containing the given FTransport and FProtocolFactory.
func (*FServiceProvider) GetMiddleware ¶
func (f *FServiceProvider) GetMiddleware() []ServiceMiddleware
GetMiddleware returns the ServiceMiddleware stored on this FServiceProvider.
func (*FServiceProvider) GetProtocolFactory ¶
func (f *FServiceProvider) GetProtocolFactory() *FProtocolFactory
GetProtocolFactory returns the contained FProtocolFactory.
func (*FServiceProvider) GetTransport ¶
func (f *FServiceProvider) GetTransport() FTransport
GetTransport returns the contained FTransport.
type FSimpleServer ¶
type FSimpleServer struct {
// contains filtered or unexported fields
}
FSimpleServer is a simple FServer which starts a goroutine for each connection.
func NewFSimpleServer ¶
func NewFSimpleServer( processor FProcessor, serverTransport thrift.TServerTransport, protocolFactory *FProtocolFactory) *FSimpleServer
NewFSimpleServer creates a new FSimpleServer which is a simple FServer that starts a goroutine for each connection.
type FStandardClient ¶
type FStandardClient struct {
// contains filtered or unexported fields
}
FStandardClient implements FClient, and uses the standard message format for Frugal.
func NewFScopeClient ¶
func NewFScopeClient(provider *FScopeProvider) *FStandardClient
NewFScopeClient ...
func NewFStandardClient ¶
func NewFStandardClient(provider *FServiceProvider) *FStandardClient
NewFStandardClient implements FClient, and uses the standard message format for Frugal.
type FSubscriberTransport ¶
type FSubscriberTransport interface { // Subscribe opens the transport and sets the subscribe topic. Subscribe(string, FAsyncCallback) error // Unsubscribe unsubscribes from the topic and closes the transport. Unsubscribe() error // IsSubscribed returns true if the transport is subscribed to a topic, // false otherwise. IsSubscribed() bool }
FSubscriberTransport is used exclusively for pub/sub scopes. Subscribers use it to subscribe to a pub/sub topic.
func NewNatsFSubscriberTransport ¶
func NewNatsFSubscriberTransport(conn *nats.Conn) FSubscriberTransport
NewNatsFSubscriberTransport creates a new FSubscriberTransport which is used for pub/sub. Subscribers using this transport will not use a queue.
func NewNatsFSubscriberTransportWithQueue ¶
func NewNatsFSubscriberTransportWithQueue(conn *nats.Conn, queue string) FSubscriberTransport
NewNatsFSubscriberTransportWithQueue creates a new FSubscriberTransport which is used for pub/sub. Subscribers using this transport will subscribe to the provided queue, forming a queue group. When a queue group is formed, only one member receives the message.
type FSubscriberTransportFactory ¶
type FSubscriberTransportFactory interface {
GetTransport() FSubscriberTransport
}
FSubscriberTransportFactory produces FSubscriberTransports and is typically used by an FScopeProvider.
type FSubscription ¶
type FSubscription struct {
// contains filtered or unexported fields
}
FSubscription is a subscription to a pub/sub topic created by a scope. The topic subscription is actually handled by an FScopeTransport, which the FSubscription wraps. Each FSubscription should have its own FScopeTransport. The FSubscription is used to unsubscribe from the topic.
func NewFSubscription ¶
func NewFSubscription(topic string, transport FSubscriberTransport) *FSubscription
NewFSubscription creates a new FSubscription to the given topic which should be subscribed on the given FScopeTransport. This is to be used by generated code and should not be called directly.
func (*FSubscription) Remove ¶
func (s *FSubscription) Remove() error
Remove unsubscribes and removes durably stored information on the broker, if applicable.
func (*FSubscription) Topic ¶
func (s *FSubscription) Topic() string
Topic returns the subscription topic name.
func (*FSubscription) Unsubscribe ¶
func (s *FSubscription) Unsubscribe() error
Unsubscribe from the topic.
type FTransport ¶
type FTransport interface { // SetMonitor starts a monitor that can watch the health of, and reopen, // the transport. SetMonitor(FTransportMonitor) // Closed channel receives the cause of an FTransport close (nil if clean // close). Closed() <-chan error // Open prepares the transport to send data. Open() error // IsOpen returns true if the transport is open, false otherwise. IsOpen() bool // Close closes the transport. Close() error // Oneway transmits the given data and doesn't wait for a response. // Implementations of oneway should be threadsafe and respect the timeout // present on the context. Oneway(ctx FContext, payload []byte) error // Request transmits the given data and waits for a response. // Implementations of request should be threadsafe and respect the timeout // present on the context. Request(ctx FContext, payload []byte) (thrift.TTransport, error) // GetRequestSizeLimit returns the maximum number of bytes that can be // transmitted. Returns a non-positive number to indicate an unbounded // allowable size. GetRequestSizeLimit() uint }
FTransport is Frugal's equivalent of Thrift's TTransport. FTransport is comparable to Thrift's TTransport in that it represents the transport layer for frugal clients. However, frugal is callback based and sends only framed data. Due to this, instead of read, write, and flush methods, FTransport has a send method that sends framed frugal messages. To handle callback data, an FTransport also has an FRegistry, so it provides methods for registering and unregistering an FAsyncCallback to an FContext.
func NewAdapterTransport ¶
func NewAdapterTransport(tr thrift.TTransport) FTransport
NewAdapterTransport returns an FTransport which uses the given TTransport for read/write operations in a way that is compatible with Frugal. This allows TTransports which support blocking reads to work with Frugal by starting a goroutine that reads from the underlying transport and calling the registry on received frames.
func NewFNatsTransport ¶
func NewFNatsTransport(conn *nats.Conn, subject, inbox string) FTransport
NewFNatsTransport returns a new FTransport which uses the NATS messaging system as the underlying transport. This FTransport is stateless in that there is no connection maintained between the client and server. A request is simply published to a subject and responses are received on another subject. This requires requests and responses to fit within a single NATS message.
type FTransportFactory ¶
type FTransportFactory interface {
GetTransport(tr thrift.TTransport) FTransport
}
FTransportFactory produces FTransports by wrapping a provided TTransport.
func NewAdapterTransportFactory ¶
func NewAdapterTransportFactory() FTransportFactory
NewAdapterTransportFactory creates a new FTransportFactory which produces an FTransport implementation that acts as an adapter for thrift.TTransport. This allows TTransports which support blocking reads to work with Frugal by starting a goroutine that reads from the underlying transport and calling the registry on received frames.
type FTransportMonitor ¶
type FTransportMonitor interface { // OnClosedCleanly is called when the transport is closed cleanly by a call // to Close() OnClosedCleanly() // OnClosedUncleanly is called when the transport is closed for a reason // *other* than a call to Close(). Returns whether to try reopening the // transport and, if so, how long to wait before making the attempt. OnClosedUncleanly(cause error) (reopen bool, wait time.Duration) // OnReopenFailed is called when an attempt to reopen the transport fails. // Given the number of previous attempts to re-open the transport and the // length of the previous wait. Returns whether to attempt to re-open the // transport, and how long to wait before making the attempt. OnReopenFailed(prevAttempts uint, prevWait time.Duration) (reopen bool, wait time.Duration) // OnReopenSucceeded is called after the transport has been successfully // re-opened. OnReopenSucceeded() }
FTransportMonitor watches and heals an FTransport. It exposes a number of hooks which can be used to add logic around FTransport events, such as unexpected disconnects, expected disconnects, failed reconnects, and successful reconnects.
Most Frugal implementations include a base FTransportMonitor which implements basic reconnect logic with backoffs and max attempts. This can be extended or reimplemented to provide custom logic.
func NewDefaultFTransportMonitor ¶
func NewDefaultFTransportMonitor() FTransportMonitor
NewDefaultFTransportMonitor creates a new FTransportMonitor with default reconnect options (attempts to reconnect 60 times with 2 seconds between each attempt).
type GetHeadersWithContext ¶
type InvocationHandler ¶
InvocationHandler processes a service method invocation on a proxy instance and returns the result. The args and return value should match the arity of the proxied method and have the same types. The first argument will always be the FContext.
type Method ¶
type Method struct {
// contains filtered or unexported fields
}
Method contains an InvocationHandler and a handle to the method it proxies. This should only be used by generated code.
func NewMethod ¶
func NewMethod(proxiedHandler, method interface{}, methodName string, middleware []ServiceMiddleware) *Method
NewMethod creates a new Method which proxies the given handler. ProxiedHandler must be a struct and method must be a function. This should only be called by generated code.
func (*Method) AddMiddleware ¶
func (m *Method) AddMiddleware(middleware ServiceMiddleware)
AddMiddleware wraps the Method with the given ServiceMiddleware. This should only be called by generated code.
type Results ¶
type Results []interface{}
Results contains the return values from a service method invocation. The last return value will always be an error (or nil).
type ServiceMiddleware ¶
type ServiceMiddleware func(InvocationHandler) InvocationHandler
ServiceMiddleware is used to implement interceptor logic around API calls. This can be used, for example, to implement retry policies on service calls, logging, telemetry, or authentication and authorization. ServiceMiddleware can be applied to both RPC services and pub/sub scopes.
ServiceMiddleware returns an InvocationHandler which proxies the given InvocationHandler. This can be used to apply middleware logic around a service call.
type TFramedTransport ¶
type TFramedTransport struct {
// contains filtered or unexported fields
}
TFramedTransport is an implementation of thrift.TTransport which frames messages with their size.
func NewTFramedTransport ¶
func NewTFramedTransport(transport thrift.TTransport) *TFramedTransport
NewTFramedTransport creates a new TFramedTransport wrapping the given TTransport.
func NewTFramedTransportMaxLength ¶
func NewTFramedTransportMaxLength(transport thrift.TTransport, maxLength uint32) *TFramedTransport
NewTFramedTransportMaxLength creates a new TFramedTransport wrapping the given TTransport using the given max length.
func (*TFramedTransport) IsOpen ¶
func (p *TFramedTransport) IsOpen() bool
IsOpen checks if the transport is open.
func (*TFramedTransport) Read ¶
func (p *TFramedTransport) Read(buf []byte) (l int, err error)
Read from the transport.
func (*TFramedTransport) RemainingBytes ¶
func (p *TFramedTransport) RemainingBytes() uint64
RemainingBytes returns the current frame size.
type TMemoryOutputBuffer ¶
type TMemoryOutputBuffer struct { *thrift.TMemoryBuffer // contains filtered or unexported fields }
TMemoryOutputBuffer implements TTransport using a bounded memory buffer. Writes which cause the buffer to exceed its size return ErrTooLarge. The TMemoryOutputBuffer handles framing data.
func NewTMemoryOutputBuffer ¶
func NewTMemoryOutputBuffer(size uint) *TMemoryOutputBuffer
NewTMemoryOutputBuffer returns a new TFramedMemoryBuffer with the given size limit. If the provided limit is non-positive, the buffer is allowed to grow unbounded.
func (*TMemoryOutputBuffer) Bytes ¶
func (f *TMemoryOutputBuffer) Bytes() []byte
Bytes retrieves the framed contents of the buffer.
func (*TMemoryOutputBuffer) HasWriteData ¶
func (f *TMemoryOutputBuffer) HasWriteData() bool
HasWriteData determines if there's any data in the buffer to send.
Source Files ¶
- adapter_transport.go
- bounded_memory_buffer.go
- client.go
- context.go
- encoder.go
- errors.go
- framed_transport.go
- frugal.go
- http_transport.go
- middleware.go
- nats_scope_transport.go
- nats_server.go
- nats_transport.go
- processor.go
- protocol.go
- provider.go
- registry.go
- server.go
- simple_server.go
- subscription.go
- transport.go
- transport_monitor.go