server

package
v1.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2024 License: MIT Imports: 13 Imported by: 18

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultAddress          = ":0"
	DefaultName             = "go.vine.server"
	DefaultVersion          = "latest"
	DefaultId               = uuid.New().String()
	DefaultServer           Server
	DefaultRegisterCheck    = func(context.Context) error { return nil }
	DefaultRegisterInterval = time.Second * 20
	DefaultRegisterTTL      = time.Second * 30

	Flag = pflag.NewFlagSet("server", pflag.ExitOnError)
)

Functions

func Handle

func Handle(h Handler) error

Handle registers a handler interface with the default server to handle inbound requests.

func Init

func Init(opts ...Option)

Init initialises the default server with options passed in

func Run

func Run() error

Run starts the default server and waits for a kill signal before existing. Also registers/deregisters the server

func Start

func Start() error

Start starts the default server

func Stop

func Stop() error

Stop stops the default server

func String

func String() string

String returns name of Server implementation

func Subscribe

func Subscribe(s Subscriber) error

Subscribe registers a subscriber interface with the default server which subscribes to specified topic with the broker

Types

type Handler

type Handler interface {
	Name() string
	Handler() interface{}
	Endpoints() []*registry.Endpoint
	Options() HandlerOptions
}

Handler interface represents a request handler. It's generated by passing any type of public concrete object with endpoints into server.NewHandler. Most will pass in a struct.

Example:

type Greeter struct{}

func (g *Greeter) Hello(context, request, response) error {
	return nil
}

func NewHandler

func NewHandler(h interface{}, opts ...HandlerOption) Handler

NewHandler creates a new handler interface using the default server Handlers are required to be a public object with public endpoint. Call to a service endpoint such as Foo.Bar expects the type:

type Foo struct{}

func (f *Foo) Bar(ctx, req, rsp) error {
    return nil
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error

HandlerFunc represents a single method of a handler. It's used primarily for the wrappers. What's handed to the actual method is the concrete request and response types.

type HandlerOption

type HandlerOption func(*HandlerOptions)

func EndpointMetadata

func EndpointMetadata(name string, md map[string]string) HandlerOption

EndpointMetadata is a Handler option that allows metadata to be added to individual endpoints.

func InternalHandler

func InternalHandler(b bool) HandlerOption

InternalHandler options specifies that a handler is not advertised to the discovery system. In the future this may also limit request to the internal network or authorised user.

type HandlerOptions

type HandlerOptions struct {
	Internal bool
	Metadata map[string]map[string]string
}

type HandlerWrapper

type HandlerWrapper func(HandlerFunc) HandlerFunc

HandlerWrapper wraps the HandlerFunc and returns the equivalent

type Message

type Message interface {
	// Topic of the message
	Topic() string
	// Payload the decoded payload value
	Payload() interface{}
	// ContentType the content type of the payload
	ContentType() string
	// Header the raw headers of the message
	Header() map[string]string
	// Body the raw body of the message
	Body() []byte
	// Codec used tp decode the message
	Codec() codec.Reader
}

Message is an async message interface

type Option

type Option func(*Options)

func Address

func Address(a string) Option

Address to bind to - host:port

func Advertise(a string) Option

Advertise the address to advertise for discovery - host:port

func Broker

func Broker(b broker.Broker) Option

Broker to use for pub/sub

func Codec

func Codec(contentType string, c codec.NewCodec) Option

Codec to use to encode/decode requests for a given content type

func Context

func Context(ctx context.Context) Option

Context specifies a context for the service. Can be used to signal shutdown of the service Can be used for extra option values.

func Id

func Id(id string) Option

Id Unique server id

func Metadata

func Metadata(md map[string]string) Option

Metadata associated with the server

func Name

func Name(n string) Option

Name the name of server

func RegisterCheck

func RegisterCheck(fn func(context.Context) error) Option

RegisterCheck run func before registry service

func RegisterInterval

func RegisterInterval(t time.Duration) Option

RegisterInterval register the service with at interval

func RegisterTTL

func RegisterTTL(t time.Duration) Option

RegisterTTL register the service with a TTL

func Registry

func Registry(r registry.Registry) Option

Registry used for discovery

func Version

func Version(v string) Option

Version of the service

func Wait

func Wait(wg *sync.WaitGroup) Option

Wait tells the server to wait for requests to finish before exiting If `wg` is nil, server only wait for completion of rpc handler. For user need finer grained control, pass a concrete `wg` here, server will wait against it on stop.

func WithRouter

func WithRouter(r Router) Option

WithRouter sets the request router

func WrapHandler

func WrapHandler(w HandlerWrapper) Option

WrapHandler adds a handler Wrapper to a list of options passed into the server

func WrapSubscriber

func WrapSubscriber(w SubscriberWrapper) Option

WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server

type Options

type Options struct {
	Codecs       map[string]codec.NewCodec
	Broker       broker.Broker
	Registry     registry.Registry
	Metadata     map[string]string
	Name         string
	Address      string
	Advertise    string
	Id           string
	Version      string
	HdlrWrappers []HandlerWrapper
	SubWrappers  []SubscriberWrapper

	// RegisterCheck runs a check function before registering the service
	RegisterCheck func(context.Context) error
	// The register expiry time
	RegisterTTL time.Duration
	// The interval on which to register
	RegisterInterval time.Duration

	// The router for requests
	Router Router

	// TLSConfig specifies tls.Config for secure serving
	TLSConfig *tls.Config

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns config options for the default service

func NewOptions

func NewOptions(opt ...Option) Options

type Request

type Request interface {
	// Service name requested
	Service() string
	// Method the action requested
	Method() string
	// Endpoint name requested
	Endpoint() string
	// ContentType Content Type provided
	ContentType() string
	// Header of the request
	Header() map[string]string
	// Body is the initial decoded value
	Body() interface{}
	// Read the undecoded request body
	Read() ([]byte, error)
	// Codec the encoded message body
	Codec() codec.Reader
	// Stream indicates whether it's a stream
	Stream() bool
}

Request is a synchronous request interface

type Response

type Response interface {
	// Codec encoded writer
	Codec() codec.Writer
	// WriteHeader write the header
	WriteHeader(map[string]string)
	// Write a response directly to the client
	Write([]byte) error
}

Response is the response write for unencoded messages

type Router

type Router interface {
	// ProcessMessage processes a message
	ProcessMessage(context.Context, Message) error
	// ServeRequest processes a request to completion
	ServeRequest(context.Context, Request, Response) error
}

Router handle serving messages

type Server

type Server interface {
	// Init initialise options
	Init(...Option) error
	// Options retrieve the options
	Options() Options
	// Handle register a handler
	Handle(Handler) error
	// NewHandler create a new handler
	NewHandler(interface{}, ...HandlerOption) Handler
	// NewSubscriber create a new subscriber
	NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
	// Subscribe register a subscriber
	Subscribe(Subscriber) error
	// Start the server
	Start() error
	// Stop the server
	Stop() error
	// String implementation
	String() string
}

Server is a simple vine server abstraction

type Stream

type Stream interface {
	Context() context.Context
	Request() Request
	Send(interface{}) error
	Recv(interface{}) error
	Error() error
	Close() error
}

Stream represents a stream established with a client. A stream can be bidirectional which is indicated by the request. The last error will be left in Error(). EOF indicates end of the stream.

type StreamWrapper

type StreamWrapper func(Stream) Stream

StreamWrapper wraps a Stream interface and returns the equivalent. Because streams exist for the lifetime of a method invocation this is a convenient way to wrap a Stream as its in use for trace, monitoring. metrics, etc.

type Subscriber

type Subscriber interface {
	Topic() string
	Subscriber() interface{}
	Endpoints() []*registry.Endpoint
	Options() SubscriberOptions
}

Subscriber interface represents a subscription to a given topic using a specific subscriber function or object with endpoints. It mirrors the handler in its behaviour.

func NewSubscriber

func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber

NewSubscriber creates a new subscriber interface with the given topic and handler using the default server

type SubscriberFunc

type SubscriberFunc func(ctx context.Context, msg Message) error

SubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message.

type SubscriberOption

type SubscriberOption func(*SubscriberOptions)

func DisableAutoAck

func DisableAutoAck() SubscriberOption

DisableAutoAck will disable auto acking of messages after they have been handled.

func InternalSubscriber

func InternalSubscriber(b bool) SubscriberOption

InternalSubscriber options specifies that a subscriber is not advertised to the discovery system.

func SubscriberContext

func SubscriberContext(ctx context.Context) SubscriberOption

SubscriberContext set context options to allow broker SubscriberOption passed

func SubscriberQueue

func SubscriberQueue(n string) SubscriberOption

SubscriberQueue Shared queue name distributed messages across subscribers

type SubscriberOptions

type SubscriberOptions struct {
	// AutoAck defaults to true. When a handler returns
	// with a nil error the message is acked.
	AutoAck  bool
	Queue    string
	Internal bool
	Context  context.Context
}

func NewSubscriberOptions

func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions

type SubscriberWrapper

type SubscriberWrapper func(SubscriberFunc) SubscriberFunc

SubscriberWrapper wraps the SubscriberFunc and returns the equivalent

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL