sabuhp

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2021 License: MIT Imports: 18 Imported by: 0

README

SabuHP

GoDoc

Power your backend with a hybrid service layer communicating across a message bus regardless of protocol.

It exposes a your services across both a http, server-sent events and websocket endpoints, allowing varying clients to communicate across your services over a message bus.

Protocols

SabuHP supports the following protocols for communicating with the service server (allowing both backend and frontend easily inter-related through such protocols):

  • Websocket
  • HTTP
  • HTTP Server Sent Events

Getting

go get -u github.com/influx6/sabuhp

Using

Create a sample hello service

package main

import (
	"context"
	"log"
	"net/http"
	"time"

	"github.com/influx6/npkg/ndaemon"
	"github.com/influx6/npkg/nerror"
	"github.com/influx6/npkg/njson"
	"github.com/influx6/npkg/nxid"

	"github.com/influx6/sabuhp"
	"github.com/influx6/sabuhp/ochestrator"
	"github.com/influx6/sabuhp/slaves"
	"github.com/influx6/sabuhp/testingutils"
)

var (
	mainLogger = &testingutils.LoggerPub{}
)

func main() {
	var logStack = njson.Log(mainLogger)
	defer njson.ReleaseLogStack(logStack)

	// worker template registry
	var workerRegistry = slaves.NewWorkerTemplateRegistry()
	workerRegistry.Register(slaves.WorkerRequest{
		ActionName:  "hello_world",
		PubSubTopic: "hello",
		WorkerCreator: func(config slaves.WorkerConfig) *slaves.WorkerGroup {
			config.Instance = slaves.ScalingInstances
			config.Behaviour = slaves.RestartAll
			config.Action = slaves.ActionFunc(func(ctx context.Context, to string, message *sabuhp.Message, t sabuhp.Transport) {
				if sendErr := t.SendToAll(&sabuhp.Message{
					ID:       nxid.New(),
					Topic:    message.FromAddr,
					FromAddr: to,
					Payload:  []byte("hello world"),
					Metadata: nil,
					Params:   nil,
				}, 5*time.Second); sendErr != nil {
					logStack.New().
						LError().
						Message("failed to send response message").
						String("error", nerror.WrapOnly(sendErr).Error()).
						End()
				}
			})
			return slaves.NewWorkGroup(config)
		},
	})

	// register for terminal kill signal
	var masterCtx, masterEnder = context.WithCancel(context.Background())
	ndaemon.WaiterForKillWithSignal(ndaemon.WaitForKillChan(), masterEnder)

	var workerId = nxid.New()
	var station = ochestrator.DefaultStation(masterCtx, workerId, ":7800", mainLogger, workerRegistry)

	// use json encoder
	station.CreateCodec = ochestrator.JsonCodec

	if stationInitErr := station.Init(); stationInitErr != nil {
		var wrapErr = nerror.WrapOnly(stationInitErr)
		log.Fatalf("Closing application due to station initialization: %+q", wrapErr)
	}

	// create a http to event route redirect to an event
	station.Router().RedirectTo("hello", "/hello")

	// create a normal http route
	station.Router().Http("/pop", sabuhp.HandlerFunc(func(
		writer http.ResponseWriter,
		request *http.Request,
		params sabuhp.Params,
	) {
		writer.WriteHeader(http.StatusOK)
		_, _ = writer.Write([]byte("stay forever!"))
	}))

	if err := station.Wait(); err != nil {
		var wrapErr = nerror.WrapOnly(err)
		log.Fatalf("Closing application: %+q", wrapErr)
	}
}

Contact

Ewetumo Alexander @influx6

License

Source code is available under the MIT License.

Documentation

Index

Constants

View Source
const (
	SUBSCRIBE   = "+SUB"
	UNSUBSCRIBE = "-USUB"
	DONE        = "+OK"
	NOTDONE     = "-NOK"
)
View Source
const MessageContentType = "application/x-event-message"
View Source
const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"

TimeFormat is the time format to use when generating times in HTTP headers. It is like time.RFC1123 but hard-codes GMT as the time zone. The time being formatted must be in UTC for Format to generate the correct format.

For parsing this time format, see ParseTime.

Variables

View Source
var BodyToLargeErr = &requestTooLargeErr{Err: errors.New("http: req body to large")}

Functions

func IsTokenRune

func IsTokenRune(r rune) bool

func MaxBytesReader

func MaxBytesReader(r io.ReadCloser, n int64) io.ReadCloser

MaxBytesReader is similar to io.LimitReader but is intended for limiting the size of incoming request bodies. In contrast to io.LimitReader, MaxBytesReader's result is a ReadCloser, returns a non-EOF error for a Read beyond the limit, and closes the underlying reader when its Close method is called.

Returns a RequestToLargeErr object when request body is to large.

MaxBytesReader prevents clients from accidentally or maliciously sending a large request and wasting server resources.

Types

type BinaryPayload

type BinaryPayload []byte

func (BinaryPayload) Copy

func (b BinaryPayload) Copy() Payload

func (BinaryPayload) Type

func (b BinaryPayload) Type() PayloadType

type Channel

type Channel interface {
	Close()
	Err() error
}

Channel represents a generated subscription on a topic which provides the giving callback an handler to define the point at which the channel should be closed and stopped from receiving updates.

type Client

type Client interface {
	Send(data []byte, timeout time.Duration) error
}

type Codec

type Codec interface {
	Encode(msg *Message) ([]byte, error)
	Decode(b []byte) (*Message, error)
}

Codec embodies implementation for the serialization of a message into bytes and vice-versa.

type CodecTranslator added in v0.1.4

type CodecTranslator struct {
	Codec  Codec
	Logger Logger
}

func NewCodecTranslator added in v0.1.4

func NewCodecTranslator(codec Codec, logger Logger) *CodecTranslator

func (*CodecTranslator) Translate added in v0.1.4

func (r *CodecTranslator) Translate(res http.ResponseWriter, m *Message) error

func (*CodecTranslator) TranslateBytes added in v0.1.4

func (r *CodecTranslator) TranslateBytes(res http.ResponseWriter, data []byte, m MessageMeta) error

func (*CodecTranslator) TranslateWriter added in v0.1.4

func (r *CodecTranslator) TranslateWriter(res http.ResponseWriter, w io.WriterTo, m MessageMeta) error

type CodecTransposer

type CodecTransposer struct {
	Codec       Codec
	Logger      Logger
	MaxBodySize int64
}

func NewCodecTransposer

func NewCodecTransposer(codec Codec, logger Logger, maxBody int64) *CodecTransposer

func (*CodecTransposer) Transpose

func (r *CodecTransposer) Transpose(req *http.Request, params Params) (*Message, error)

type CodecWriter

type CodecWriter struct {
	Client Client
	Codec  Codec
	Logger Logger
}

func NewCodecWriter

func NewCodecWriter(client Client, codec Codec, logger Logger) *CodecWriter

func (*CodecWriter) Send

func (c *CodecWriter) Send(msg *Message, timeout time.Duration) error

type Conn

type Conn interface{}

Conn defines the connection type which we can retrieve and understand the type.

type Cookie struct {
	Name  string
	Value string

	Path       string    // optional
	Domain     string    // optional
	Expires    time.Time // optional
	RawExpires string    // for reading cookies only

	// MaxAge=0 means no 'Max-Age' attribute specified.
	// MaxAge<0 means delete cookie now, equivalently 'Max-Age: 0'
	// MaxAge>0 means Max-Age attribute present and given in seconds
	MaxAge   int
	Secure   bool
	HttpOnly bool
	SameSite SameSite
	Raw      string
	Unparsed []string // Raw text of unparsed attribute-value pairs
}

A Cookie represents an HTTP cookie as sent in the Set-Cookie header of an HTTP response or the Cookie header of an HTTP request.

See https://tools.ietf.org/html/rfc6265 for details.

func ReadCookies

func ReadCookies(h Header, filter string) []Cookie

ReadCookies parses all "Cookie" values from the header h and returns the successfully parsed Cookies.

if filter isn't empty, only cookies of that name are returned

func ReadSetCookies

func ReadSetCookies(h Header) []Cookie

ReadSetCookies parses all "Set-Cookie" values from the header h and returns the successfully parsed Cookies.

ReadSetCookies is more stricter on the names and values of cookies.

func (*Cookie) String

func (c *Cookie) String() string

String returns the serialization of the cookie for use in a Cookie header (if only Name and Value are set) or a Set-Cookie response header (if other fields are set). If c is nil or c.PageName is invalid, the empty string is returned.

type DeliveryMark

type DeliveryMark int
const (
	SendToAll DeliveryMark = iota
	SendToOne
)

type ErrChannel

type ErrChannel struct{ Error error }

ErrChannel implements the Channel interface but has servers purpose to always return an error.

func (*ErrChannel) Close

func (n *ErrChannel) Close()

func (*ErrChannel) Err

func (n *ErrChannel) Err() error

type ErrorWaiter

type ErrorWaiter interface {
	Wait() error
}

type Handler

type Handler interface {
	Handle(http.ResponseWriter, *http.Request, Params)
}

type HandlerFunc

type HandlerFunc func(http.ResponseWriter, *http.Request, Params)

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(rw http.ResponseWriter, r *http.Request, p Params)
type Header map[string][]string

func (Header) Add

func (h Header) Add(k string, v string)

func (Header) Delete

func (h Header) Delete(k string)

func (Header) Get

func (h Header) Get(k string) string

func (Header) Set

func (h Header) Set(k string, v string)

func (Header) Values

func (h Header) Values(k string) []string

type HeaderModifications

type HeaderModifications func(header http.Header)

type HttpMatcher

type HttpMatcher interface {
	Handler

	Match(http.ResponseWriter, *http.Request, Params)
}

HttpMatcher embodies a matcher which indicates if the request exclusively belongs to it, hence allowing it to divert a giving request to itself.

type HttpResponsePayload

type HttpResponsePayload struct {
	*http.Response
}

HttpResponsePayload wrapped a *http.Response as a Payload type.

func (*HttpResponsePayload) Copy

func (h *HttpResponsePayload) Copy() Payload

func (*HttpResponsePayload) Type

func (h *HttpResponsePayload) Type() PayloadType

type HttpResponseWriterPayload

type HttpResponseWriterPayload struct {
	http.ResponseWriter
}

HttpResponseWriterPayload wrapped a http.ResponseWriter as a Payload type.

func (*HttpResponseWriterPayload) Copy

func (*HttpResponseWriterPayload) Type

type HttpWrapper

type HttpWrapper func(Handler) Handler

type HttpWrappers

type HttpWrappers []HttpWrapper

func (HttpWrappers) For

func (w HttpWrappers) For(main Handler) Handler

For registers the wrappers for a specific handler and returns a handler that can be passed via the `UseHandle` function.

func (HttpWrappers) ForFunc

func (w HttpWrappers) ForFunc(mainFunc HandlerFunc) Handler

ForFunc registers the wrappers for a specific raw handler function and returns a handler that can be passed via the `UseHandle` function.

type LogHandler

type LogHandler func([]*Message)

type Logger

type Logger interface {
	njson.Logger
}

type Matcher

type Matcher interface {
	Match(message *Message) bool
}

Matcher is the interface that all Matchers should be implemented in order to be registered into the Mux via the `Mux#AddRequestHandler/Match/MatchFunc` functions.

Look the `Mux#AddRequestHandler` for more.

type Message

type Message struct {
	MessageMeta

	// ID is the unique id attached to giving message
	// for tracking it's delivery and trace its different touch
	// points where it was handled.
	ID nxid.ID

	// Topic for giving message (serving as to address).
	Topic string

	// FromAddr is the logical address of the sender of message.
	FromAddr string

	// Delivery embodies how this message is to be delivered. It's
	// usually defined by the sender to indicate the target is all
	// fan-out sequence (send to all) or a target sequence (sent to one).
	//
	// This should be set by the transport handling said message and not by
	// creator.
	Delivery DeliveryMark

	// Payload is the payload for giving message.
	Payload []byte

	// Metadata are related facts attached to a message.
	Metadata Params

	// Params are related facts attached to a message based on some route or
	// sender and provide some values to keyed expectation, unlike metadata
	// it has specific input in the message.
	Params Params

	// LocalPayload is the payload attached which can be a
	// concrete object for which this message is to communicate
	// such a payload may not be able to be serialized and only
	// serves the purpose of a local runtime communication.
	LocalPayload Payload `json:"-" messagepack:"-" gob:"-"`

	// OverridingTransport allows special messages which may not fall under
	// the same behaviour as normal messages which are delivered over a
	// transport, the general idea is that all handlers will use this
	// specific transport instead of the default one. This makes this message
	// a special occurrence, and will be treated as so.
	OverridingTransport Transport `json:"-" messagepack:"-" gob:"-"`
}

func BasicMsg

func BasicMsg(topic string, message string, fromAddr string) *Message

func NOTOK

func NOTOK(message string, fromAddr string) *Message

func NewMessage

func NewMessage(topic string, fromAddr string, payload []byte) *Message

func OK

func OK(message string, fromAddr string) *Message

func SubscribeMessage

func SubscribeMessage(topic string, fromAddr string) *Message

func UnsubscribeMessage

func UnsubscribeMessage(topic string, fromAddr string) *Message

func (*Message) Copy

func (m *Message) Copy() *Message

Copy returns a copy of this commands with underline data copied across. The copy

func (*Message) EncodeObject

func (m *Message) EncodeObject(encoder npkg.ObjectEncoder)

func (*Message) ReplyTo added in v0.3.1

func (m *Message) ReplyTo() *Message

ReplyTo returns a new instance of a Message using the FromAddr as the topic.

func (*Message) ReplyToWith added in v0.3.1

func (m *Message) ReplyToWith(params Params, meta Params, payload []byte) *Message

ReplyToWith returns a new instance of a Message using the FromAddr as the topic.

func (*Message) String

func (m *Message) String() string

func (*Message) WithLocalPayload

func (m *Message) WithLocalPayload(lp Payload) *Message

func (*Message) WithMetadata

func (m *Message) WithMetadata(meta map[string]string) *Message

func (*Message) WithParams

func (m *Message) WithParams(params map[string]string) *Message

func (*Message) WithPayload

func (m *Message) WithPayload(lp []byte) *Message

type MessageErr

type MessageErr interface {
	error
	ShouldAck() bool
}

func WrapErr

func WrapErr(err error, shouldAck bool) MessageErr

type MessageHandler

type MessageHandler func(b []byte, from Socket) error

MessageHandler defines the function contract a manager uses to handle a message.

Be aware that returning an error from the handler to the Gorilla socket will cause the immediate closure of that socket and ending communication with the client and the error will be logged. So unless your intention is to end the connection, handle it yourself.

type MessageLog

type MessageLog interface {
	Clear(owner nxid.ID) error
	Persist(owner nxid.ID, message *Message) error
	CatchUp(owner nxid.ID, lastId nxid.ID, handler Handler) error
}

type MessageMeta added in v0.1.4

type MessageMeta struct {
	// Path of the request producing this if from http.
	Path string

	// SuggestedStatusCode is an optional field settable by the
	// creator to suggest possible status code of a message.
	SuggestedStatusCode int

	// ContentType is an required value set default to MessageContentType.
	// Its an important use in the translators where its the deciding factor
	// if a message is written as a whole or just the payload into the
	// response object.
	ContentType string

	// Headers are related facts attached to a message.
	Headers Header

	// Headers are related facts attached to a message.
	//
	// Only available when set, so it's very optional
	Cookies []Cookie

	// Form contains the parsed form data, including both the URL
	// field's query parameters and the PATCH, POST, or PUT form data.
	//
	// Only available when set, so it's very optional
	Form url.Values

	// Query contains the parsed form data, including both the URL
	// field's query parameters and the PATCH, POST, or PUT form data.
	//
	// Only available when set, so it's very optional
	Query url.Values

	// PostForm contains the parsed form data from PATCH, POST
	// or PUT body parameters.
	//
	// Only available when set, so it's very optional
	PostForm url.Values

	// MultipartForm is the parsed multipart form, including file uploads.
	//
	// Only available when set, so it's very optional
	MultipartReader *multipart.Reader
}

type MessageRouter

type MessageRouter interface {
	TransportResponse
	Matcher
}

type NoopChannel

type NoopChannel struct{}

NoopChannel implements the Channel interface but has no internal operational capacity. It represents a non functioning channel.

func (NoopChannel) Close

func (n NoopChannel) Close()

func (NoopChannel) Err

func (n NoopChannel) Err() error

type Params

type Params map[string]string

func (Params) Delete

func (h Params) Delete(k string)

func (Params) EncodeObject

func (h Params) EncodeObject(encoder npkg.ObjectEncoder)

func (Params) Get

func (h Params) Get(k string) string

func (Params) Set

func (h Params) Set(k string, v string)

type Payload

type Payload interface {
	Type() PayloadType
	Copy() Payload
}

func HttpResponseAsPayload

func HttpResponseAsPayload(rw *http.Response) Payload

HttpResponseAsPayload returns a *http.Response object wrapped as a payload.

func HttpResponseWriterAsPayload

func HttpResponseWriterAsPayload(rw http.ResponseWriter) Payload

HttpResponseWriterAsPayload returns a http.ResponseWriter object wrapped as a payload.

type PayloadType

type PayloadType int
const (
	BinaryPayloadType PayloadType = iota
	TextPayloadType
	HTTPRWPayload
)

type RetryFunc

type RetryFunc func(last int) time.Duration

type SameSite

type SameSite int

SameSite allows a server to define a cookie attribute making it impossible for the browser to send this cookie along with cross-site requests. The main goal is to mitigate the risk of cross-origin information leakage, and provide some protection against cross-site request forgery attacks.

See https://tools.ietf.org/html/draft-ietf-httpbis-cookie-same-site-00 for details.

const (
	SameSiteDefaultMode SameSite = iota + 1
	SameSiteLaxMode
	SameSiteStrictMode
	SameSiteNoneMode
)

type Socket

type Socket interface {
	ID() nxid.ID
	Stat() SocketStat
	RemoteAddr() net.Addr
	LocalAddr() net.Addr
	Send([]byte, MessageMeta, time.Duration) error
	SendWriter(io.WriterTo, MessageMeta, time.Duration) ErrorWaiter
}

type SocketHandler added in v0.1.4

type SocketHandler func(b *Message, from Socket) error

SocketHandler defines the function contract a sabuhp.Socket uses to handle a message.

Be aware that returning an error from the handler to the Gorilla sabuhp.Socket will cause the immediate closure of that socket and ending communication with the client and the error will be logged. So unless your intention is to end the connection, handle it yourself.

type SocketStat

type SocketStat struct {
	Addr       net.Addr
	RemoteAddr net.Addr
	Id         string
	Sent       int64
	Received   int64
	Handled    int64
}

func (SocketStat) EncodeObject

func (g SocketStat) EncodeObject(encoder npkg.ObjectEncoder)

type SocketWriterTo

type SocketWriterTo struct {
	// contains filtered or unexported fields
}

func NewSocketWriterTo

func NewSocketWriterTo(w io.WriterTo) *SocketWriterTo

func (*SocketWriterTo) Abort

func (se *SocketWriterTo) Abort(err error)

func (*SocketWriterTo) Wait

func (se *SocketWriterTo) Wait() error

func (*SocketWriterTo) WriteTo

func (se *SocketWriterTo) WriteTo(w io.Writer) (int64, error)

type TextPayload

type TextPayload string

func (TextPayload) Copy

func (b TextPayload) Copy() Payload

func (TextPayload) Type

func (b TextPayload) Type() PayloadType

type Translator added in v0.1.4

type Translator interface {
	Translate(req http.ResponseWriter, message *Message) error
	TranslateBytes(req http.ResponseWriter, data []byte, message MessageMeta) error
	TranslateWriter(req http.ResponseWriter, w io.WriterTo, meta MessageMeta) error
}

Translator transforms a message into an appropriate response to an http response object.

type Transport

type Transport interface {
	Conn() Conn

	Listen(topic string, handler TransportResponse) Channel
	SendToOne(data *Message, timeout time.Duration) error
	SendToAll(data *Message, timeout time.Duration) error
}

Transport defines what an underline message transport implementation like a message bus or rpc connection that can deliver according to required semantics of one-to-one and one-to-many.

type TransportResponse

type TransportResponse interface {
	Handle(*Message, Transport) MessageErr
}

type TransportResponseFunc

type TransportResponseFunc func(*Message, Transport) MessageErr

func (TransportResponseFunc) Handle

func (t TransportResponseFunc) Handle(message *Message, tr Transport) MessageErr

type Transposer

type Transposer interface {
	Transpose(req *http.Request, params Params) (*Message, error)
}

Transposer transforms a http request into a Message to be delivered.

type WrappedCodec added in v0.1.4

type WrappedCodec interface {
	Encode(payload *WrappedPayload) ([]byte, error)
	Decode(b []byte) (*WrappedPayload, error)
}

type WrappedPayload added in v0.1.4

type WrappedPayload struct {
	ContentType string
	Payload     []byte
}

func NewWrappedPayload added in v0.1.4

func NewWrappedPayload(contentType string, payload []byte) *WrappedPayload

func WrappedMessagePayload added in v0.1.4

func WrappedMessagePayload(payload []byte) *WrappedPayload

type WrappedTransposer added in v0.1.4

type WrappedTransposer interface {
	Transpose([]byte) (*Message, *WrappedPayload, error)
}

WrappedTransposer transforms a http request into a Message to be delivered.

type Wrapper

type Wrapper func(response TransportResponse) TransportResponse

Wrapper is just a type of `func(TransportResponse) TransportResponse` which is a common type definition for net/http middlewares.

type Wrappers

type Wrappers []Wrapper

Wrappers contains `Wrapper`s that can be registered and used by a "main route handler". Look the `Pre` and `For/ForFunc` functions too.

func Pre

func Pre(middleware ...Wrapper) Wrappers

Pre starts a chain of handlers for wrapping a "main route handler" the registered "middleware" will run before the main handler(see `Wrappers#For/ForFunc`).

Usage: mux := muxie.NewMux() myMiddlewares := muxie.Pre(myFirstMiddleware, mySecondMiddleware) mux.UseHandle("/", myMiddlewares.ForFunc(myMainRouteTransportResponse))

func (Wrappers) For

For registers the wrappers for a specific handler and returns a handler that can be passed via the `UseHandle` function.

func (Wrappers) ForFunc

func (w Wrappers) ForFunc(mainFunc TransportResponseFunc) TransportResponse

ForFunc registers the wrappers for a specific raw handler function and returns a handler that can be passed via the `UseHandle` function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL