README
¶
SabuHP
Power your backend with a simple service architecture that provides direct connection into a function/processor network through supported protocols (HTTP, WebSocket, ServerSent Events).
SabuHP exposes a two server system by providing a Client Server
and a Worker Server
architecture that allow
better scaling of client connections and business logic processing in the new age of message busses as backbone of
communications.
The client server
exists to allow direct connections from clients (CLI, Browsers) which can directly send desired
request payload to desired topics and receive response from a target message bus. This allows us decouple the definition
of our APIs, and their desired behaviour from how clients interact and connect to with them. The client servers purpose is
to hide way the needed intricacies to access this message queues or buses, providing a clear and familiar APIs that clients
can work with such systems with ease.
The worker server
exists to provided scalable services that can be horizontal scaled with only required to be able to
connect to a message bus to listen and process request payload for target topics with ease. This allows us decouple entirely
how we connect and process messages or work within a software systems.
Protocols
SabuHP supports the following protocols for communicating with the service server (allowing both backend and frontend easily inter-related through such protocols):
- Websocket
- HTTP
- HTTP Server Sent Events
Getting
go get -u github.com/ewe-studios/sabuhp
Client Server
Client servers provides a server which hosts all necessary client protocols (http, websocket, server-sent event routes) which allows clients (browsers, CLI agents) to connect into the SabuHP networks allowing these clients to deliver requests and receive responses for their requests
package main
import (
"context"
"log"
"github.com/influx6/npkg/ndaemon"
"github.com/ewe-studios/sabuhp"
"github.com/ewe-studios/sabuhp/bus/redispub"
"github.com/ewe-studios/sabuhp/servers/clientServer"
redis "github.com/go-redis/redis/v8"
)
func main() {
var ctx, canceler = context.WithCancel(context.Background())
ndaemon.WaiterForKillWithSignal(ndaemon.WaitForKillChan(), canceler)
var logger sabuhp.GoLogImpl
var redisBus, busErr = redispub.Stream(redispub.Config{
Logger: logger,
Ctx: ctx,
Redis: redis.Options{},
Codec: clientServer.DefaultCodec,
})
if busErr != nil {
log.Fatalf("Failed to create bus connection: %q\n", busErr.Error())
}
var cs = clientServer.New(
ctx,
logger,
redisBus,
clientServer.WithHttpAddr("0.0.0.0:9650"),
)
cs.Start()
log.Println("Starting client server")
if err := cs.ErrGroup.Wait(); err != nil {
log.Fatalf("service group finished with error: %+s", err.Error())
}
}
Worker Server
Worker servers exposes a server with different registered workers (Functions, Processors) who will listen to the connected message bus for new requests to be processed. These servers can be scaled horizontally and grouped into listen groups based on support by the underline message bus to create a cloud of processors that allow endless scaling.
package main
import (
"context"
"fmt"
"log"
"github.com/ewe-studios/sabuhp/actions"
"github.com/ewe-studios/sabuhp/servers/serviceServer"
"github.com/influx6/npkg/ndaemon"
"github.com/ewe-studios/sabuhp"
"github.com/ewe-studios/sabuhp/bus/redispub"
"github.com/ewe-studios/sabuhp/servers/clientServer"
redis "github.com/go-redis/redis/v8"
)
func main() {
var ctx, canceler = context.WithCancel(context.Background())
ndaemon.WaiterForKillWithSignal(ndaemon.WaitForKillChan(), canceler)
var logger sabuhp.GoLogImpl
var redisBus, busErr = redispub.Stream(redispub.Config{
Logger: logger,
Ctx: ctx,
Redis: redis.Options{},
Codec: clientServer.DefaultCodec,
})
if busErr != nil {
log.Fatalf("Failed to create bus connection: %q\n", busErr.Error())
}
var workers = actions.NewWorkerTemplateRegistry()
var cs = serviceServer.New(
ctx,
logger,
redisBus,
serviceServer.WithWorkerRegistry(workers),
)
fmt.Println("Starting worker service")
cs.Start()
fmt.Println("Started worker service")
if err := cs.ErrGroup.Wait(); err != nil {
log.Fatalf("service group finished with error: %+s", err.Error())
}
fmt.Println("Closed worker service")
}
Contact
Ewetumo Alexander @influx6
License
Source code is available under the MIT License.
Documentation
¶
Index ¶
- Constants
- Variables
- func IsTokenRune(r rune) bool
- func MaxBytesReader(r io.ReadCloser, n int64) io.ReadCloser
- func SplitMessagesToGroups(b []Message) (subGroups []Message, unsubGroups []Message, dataGroups []Message)
- type BusBuilder
- type BusRelay
- type BytesDecoder
- type BytesSplitter
- type Channel
- type Client
- type Codec
- type CodecWriter
- type Conn
- type Cookie
- type ErrChannel
- type GoLogImpl
- type Handler
- type HandlerFunc
- type Header
- type HeaderModifications
- type HttpClient
- type HttpDecoder
- type HttpDecoderImpl
- type HttpEncoder
- type HttpEncoderImpl
- type HttpMatcher
- type HttpWrapper
- type HttpWrappers
- type LocationService
- type LogHandler
- type Logger
- type Matcher
- type Message
- func BasicMsg(topic Topic, message string, fromAddr string) Message
- func NOTOK(message string, fromAddr string) Message
- func NewMessage(topic Topic, fromAddr string, payload []byte) Message
- func OK(message string, fromAddr string) Message
- func SubscribeMessage(topic string, grp string, fromAddr string) Message
- func UnsubscribeMessage(topic string, grp string, fromAddr string) Message
- func (m Message) Copy() Message
- func (m Message) EncodeObject(encoder npkg.ObjectEncoder)
- func (m Message) ReplyTo() Message
- func (m Message) ReplyToWith(params Params, meta Params, payload []byte) Message
- func (m Message) ReplyWithTopic(t Topic) Message
- func (m Message) String() string
- func (m *Message) WithId(t nxid.ID)
- func (m *Message) WithMetadata(meta map[string]string)
- func (m *Message) WithParams(params map[string]string)
- func (m *Message) WithPayload(lp []byte)
- func (m *Message) WithTopic(t Topic)
- type MessageBus
- type MessageErr
- type MessageRouter
- type NoopChannel
- type Params
- type PbGroup
- func (sc *PbGroup) Add(id nxid.ID, tr TransportResponse) error
- func (sc *PbGroup) IsEmpty() bool
- func (sc *PbGroup) Link(bus MessageBus)
- func (sc *PbGroup) Listen(handler TransportResponse) Channel
- func (sc *PbGroup) Notify(ctx context.Context, msg Message, transport Transport) MessageErr
- func (sc *PbGroup) Remove(id nxid.ID)
- func (sc *PbGroup) Run()
- type PbRelay
- type RetryFunc
- type SameSite
- type Sock
- type Socket
- type SocketByteHandler
- type SocketHandler
- type SocketMessageHandler
- type SocketNotification
- type SocketServer
- type SocketServers
- type SocketService
- type SocketStat
- type StreamBus
- func (st *StreamBus) SocketBusSend(b Message, sock Socket) MessageErr
- func (st *StreamBus) SocketClosed(socket Socket)
- func (st *StreamBus) SocketOpened(socket Socket)
- func (st *StreamBus) SocketSubscribe(b Message, socket Socket) MessageErr
- func (st *StreamBus) SocketUnsubscribe(b Message, socket Socket) MessageErr
- type StreamBusRelay
- func (st *StreamBusRelay) SocketBusSend(b Message, sock Socket) MessageErr
- func (st *StreamBusRelay) SocketClosed(socket Socket)
- func (st *StreamBusRelay) SocketOpened(socket Socket)
- func (st *StreamBusRelay) SocketSubscribe(b Message, socket Socket) MessageErr
- func (st *StreamBusRelay) SocketUnsubscribe(b Message, socket Socket) MessageErr
- type StreamFunc
- type StreamRelay
- func (st *StreamRelay) SocketBusSend(b Message, sock Socket) MessageErr
- func (st *StreamRelay) SocketClosed(socket Socket)
- func (st *StreamRelay) SocketOpened(socket Socket)
- func (st *StreamRelay) SocketSubscribe(b Message, socket Socket) MessageErr
- func (st *StreamRelay) SocketUnsubscribe(b Message, socket Socket) MessageErr
- type Topic
- type TopicPartial
- type Transport
- type TransportResponse
- type TransportResponseFunc
- type Wrapper
- type Wrappers
- type WriterToSplitter
Constants ¶
const MessageContentType = "application/x-event-message"
const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
TimeFormat is the time format to use when generating times in HTTP headers. It is like time.RFC1123 but hard-codes GMT as the time zone. The time being formatted must be in UTC for Format to generate the correct format.
For parsing this time format, see ParseTime.
Variables ¶
var ( SUBSCRIBE = T("+SUB") UNSUBSCRIBE = T("-USUB") DONE = T("+OK") NOTDONE = T("-NOK") )
var BodyToLargeErr = &requestTooLargeErr{Err: errors.New("http: req body to large")}
var ErrAlreadySubscribed = nerror.New("id is already used")
Functions ¶
func IsTokenRune ¶
func MaxBytesReader ¶
func MaxBytesReader(r io.ReadCloser, n int64) io.ReadCloser
MaxBytesReader is similar to io.LimitReader but is intended for limiting the size of incoming request bodies. In contrast to io.LimitReader, MaxBytesReader's result is a ReadCloser, returns a non-EOF error for a Read beyond the limit, and closes the underlying reader when its Close method is called.
Returns a RequestToLargeErr object when request body is to large.
MaxBytesReader prevents clients from accidentally or maliciously sending a large request and wasting server resources.
Types ¶
type BusBuilder ¶
type BusBuilder struct { SendFunc func(data ...Message) SendForReplyFunc func(tm time.Duration, from Topic, replyGroup string, data ...Message) *nthen.Future ListenFunc func(topic string, grp string, handler TransportResponse) Channel }
func (BusBuilder) Listen ¶
func (t BusBuilder) Listen(topic string, grp string, handler TransportResponse) Channel
func (BusBuilder) Send ¶
func (t BusBuilder) Send(data ...Message)
func (BusBuilder) SendForReply ¶ added in v0.5.1
type BusRelay ¶
type BusRelay struct { Relay *PbRelay Bus MessageBus // contains filtered or unexported fields }
func BusWithRelay ¶
func BusWithRelay(relay *PbRelay, bus MessageBus) *BusRelay
func NewBusRelay ¶
func NewBusRelay(ctx context.Context, logger Logger, bus MessageBus) *BusRelay
type BytesDecoder ¶
BytesDecoder transforms a http request into a Message to be delivered.
type BytesSplitter ¶
BytesSplitter takes a large block of bytes returning a chan of messages which are part messages which represent the whole of said bytes. This allows larger messages be sent across message channels with ease.
type Channel ¶
Channel represents a generated subscription on a topic which provides the giving callback an handler to define the point at which the channel should be closed and stopped from receiving updates.
type Codec ¶
Codec embodies implementation for the serialization of a message into bytes and vice-versa.
type CodecWriter ¶
func NewCodecWriter ¶
func NewCodecWriter(client Client, codec Codec, logger Logger) *CodecWriter
type Conn ¶
type Conn interface{}
Conn defines the connection type which we can retrieve and understand the type.
type Cookie ¶
type Cookie struct { Name string Value string Path string // optional Domain string // optional Expires time.Time // optional RawExpires string // for reading cookies only // MaxAge=0 means no 'Max-Age' attribute specified. // MaxAge<0 means delete cookie now, equivalently 'Max-Age: 0' // MaxAge>0 means Max-Age attribute present and given in seconds MaxAge int Secure bool HttpOnly bool SameSite SameSite Raw string Unparsed []string // Raw text of unparsed attribute-value pairs }
A Cookie represents an HTTP cookie as sent in the Set-Cookie header of an HTTP response or the Cookie header of an HTTP request.
See https://tools.ietf.org/html/rfc6265 for details.
func ReadCookies ¶
ReadCookies parses all "Cookie" values from the header h and returns the successfully parsed Cookies.
if filter isn't empty, only cookies of that name are returned
func ReadSetCookies ¶
ReadSetCookies parses all "Set-Cookie" values from the header h and returns the successfully parsed Cookies.
ReadSetCookies is more stricter on the names and values of cookies.
type ErrChannel ¶
type ErrChannel struct{ Error error }
ErrChannel implements the Channel interface but has servers purpose to always return an error.
func (*ErrChannel) Close ¶
func (n *ErrChannel) Close()
func (*ErrChannel) Err ¶
func (n *ErrChannel) Err() error
type HandlerFunc ¶
type HandlerFunc func(http.ResponseWriter, *http.Request, Params)
func (HandlerFunc) Handle ¶
func (h HandlerFunc) Handle(rw http.ResponseWriter, r *http.Request, p Params)
type HeaderModifications ¶
type HttpClient ¶ added in v0.6.3
type HttpDecoder ¶
HttpDecoder transforms a http request into a Message to be delivered.
type HttpDecoderImpl ¶
func NewHttpDecoderImpl ¶
func NewHttpDecoderImpl(codec Codec, logger Logger, maxBody int64) *HttpDecoderImpl
type HttpEncoder ¶
type HttpEncoder interface {
Encode(req http.ResponseWriter, message Message) error
}
HttpEncoder transforms a message into an appropriate response to an http response object.
type HttpEncoderImpl ¶ added in v0.4.1
func NewHttpEncoderImpl ¶ added in v0.4.1
func NewHttpEncoderImpl(codec Codec, logger Logger) *HttpEncoderImpl
func (*HttpEncoderImpl) Encode ¶ added in v0.4.1
func (r *HttpEncoderImpl) Encode(res http.ResponseWriter, m Message) error
type HttpMatcher ¶
HttpMatcher embodies a matcher which indicates if the request exclusively belongs to it, hence allowing it to divert a giving request to itself.
type HttpWrapper ¶
type HttpWrappers ¶
type HttpWrappers []HttpWrapper
func (HttpWrappers) For ¶
func (w HttpWrappers) For(main Handler) Handler
For registers the wrappers for a specific handler and returns a handler that can be passed via the `UseHandle` function.
func (HttpWrappers) ForFunc ¶
func (w HttpWrappers) ForFunc(mainFunc HandlerFunc) Handler
ForFunc registers the wrappers for a specific raw handler function and returns a handler that can be passed via the `UseHandle` function.
type LocationService ¶
type LogHandler ¶
type LogHandler func([]*Message)
type Matcher ¶
Matcher is the interface that all Matchers should be implemented in order to be registered into the Mux via the `Mux#AddRequestHandler/Match/MatchFunc` functions.
Look the `Mux#AddRequestHandler` for more.
type Message ¶
type Message struct { // Optional future which will indicate if message delivery should // notify attached future on result. Future *nthen.Future // Path of the request producing this if from http. Path string // IP of the request producing this if from http. IP string // LocalIP of the request producing this if from http. LocalIP string // ExpectReply indicates if the receiver of said message should // handle this as a SendReply operation. ExpectReply bool // Optional reply error send to indicate message is an error reply // and the error that occurred. ReplyErr error // SuggestedStatusCode is an optional field settable by the // creator to suggest possible status code of a message. SuggestedStatusCode int // ContentType is an required value set default to MessageContentType. // Its an important use in the translators where its the deciding factor // if a message is written as a whole or just the payload into the // response object. ContentType string // FormName is optional attached form name which represents this data. FormName string // FileName is optional attached file name which represents this data. FileName string // Headers are related facts attached to a message. Headers Header // Headers are related facts attached to a message. // // Only available when set, so it's very optional Cookies []Cookie // Form contains the parsed form data, including both the URL // field's query parameters and the PATCH, POST, or PUT form data. // // Only available when set, so it's very optional Form url.Values // Query contains the parsed form data, including both the URL // field's query parameters and the PATCH, POST, or PUT form data. // // Only available when set, so it's very optional Query url.Values // Within indicates senders intent on how long they are // willing to wait for message delivery. Usually this should end // with error resolution of attached future if present. Within time.Duration // Id is the unique id attached to giving message // for tracking it's delivery and trace its different touch // points where it was handled. Id nxid.ID // EndPartId is the unique id attached to giving messages which // indicate the expected end id which when seen as the Id // should consider a part stream as completed. // // This will be created from the start and then tagged to the final // message as both the EndPartId and PartId fields, which will identify // that a series of broken messages have been completed. EndPartId nxid.ID // PartId is the unique id attached to giving messages when // they are a shared part of a larger messages. There are cases // when a message may become sent as broken parts for recollection // at the other end. PartId nxid.ID // SubscribeGroup for subscribe/unsubscribe message types which // allow to indicate which group a topic should fall into. SubscribeGroup string // SubscribeTo for subscribe/unsubscribe message types which // allow to indicate which topic should a subscribe or unsubscribe // be applied to. SubscribeTo string // Topic for giving message (serving as to address). Topic Topic // ReplyGroup is the when provided the suggested topic to reply to by receiving party. ReplyGroup string // FromAddr is the logical address of the sender of message. FromAddr string // Bytes is the payload for giving message. Bytes []byte // Metadata are related facts attached to a message. Metadata Params // Params are related facts attached to a message based on some route or // sender and provide some values to keyed expectation, unlike metadata // it has specific input in the message. Params Params // Parts are possible fragments collected of a message which was split into // multiple parts to send over the wire and have being collected through the use // of the PartId. // // We do this because we do not let handlers handle a list of messages but one // and to accommodate large messages split in parts or messages which are logical // parts of themselves, this field is an option, generally. // Codecs should never read this Parts []Message }
func UnsubscribeMessage ¶
func (Message) Copy ¶
Copy returns a copy of this commands with underline data copied across. The copy
func (Message) EncodeObject ¶
func (m Message) EncodeObject(encoder npkg.ObjectEncoder)
func (Message) ReplyTo ¶
ReplyTo returns a new instance of a Message using the FromAddr as the topic.
func (Message) ReplyToWith ¶
ReplyToWith returns a new instance of a Message using the FromAddr as the topic.
func (Message) ReplyWithTopic ¶ added in v0.4.8
ReplyWithTopic returns a new message with provided topic.
func (*Message) WithMetadata ¶
func (*Message) WithParams ¶
func (*Message) WithPayload ¶
type MessageBus ¶
type MessageBus interface { Send(data ...Message) Listen(topic string, grp string, handler TransportResponse) Channel SendForReply(tm time.Duration, fromTopic Topic, replyGroup string, data ...Message) *nthen.Future }
MessageBus defines what an underline message transport implementation like a message bus or rpc connection that can deliver according to required semantics of one-to-one and one-to-many.
type MessageErr ¶
func WrapErr ¶
func WrapErr(err error, shouldAck bool) MessageErr
func WrapErrWithStatusCode ¶ added in v0.5.0
func WrapErrWithStatusCode(err error, code int, shouldAck bool) MessageErr
type MessageRouter ¶
type MessageRouter interface { TransportResponse Matcher }
type NoopChannel ¶
type NoopChannel struct{}
NoopChannel implements the Channel interface but has no internal operational capacity. It represents a non functioning channel.
func (NoopChannel) Close ¶
func (n NoopChannel) Close()
func (NoopChannel) Err ¶
func (n NoopChannel) Err() error
type Params ¶
func (Params) EncodeObject ¶
func (h Params) EncodeObject(encoder npkg.ObjectEncoder)
type PbGroup ¶
type PbGroup struct {
// contains filtered or unexported fields
}
func (*PbGroup) Link ¶
func (sc *PbGroup) Link(bus MessageBus)
func (*PbGroup) Listen ¶
func (sc *PbGroup) Listen(handler TransportResponse) Channel
func (*PbGroup) Notify ¶
Notify will notify the groups of handlers and will return the first occurrence of a message error seen by one of the handlers. This means if one of the handlers returns an error then the publisher will be notified as if all handlers failed to handle the message. If you dont want that, then dont return an error from any of your registered handlers and handle the error appropriately.
type PbRelay ¶
type PbRelay struct {
// contains filtered or unexported fields
}
PbRelay aka MultiSubscriberSingleTopicManager wraps a transport object with a subscription management core that allows multiple subscribers listen to messages for one topic, allowing only one listeners to this topic (the manager itself).
This is useful when your underline transport bares a cost with two many subscriptions or does not support multiple subscriptions to a single topic.
func (*PbRelay) UnlistenAllWithId ¶
UnlistenAllWithId sends a signal to remove possible handler with giving id from all topics.
func (*PbRelay) UnlistenWithId ¶
UnlistenWithId sends a signal to remove possible handler with giving id from specific topic.
type SameSite ¶
type SameSite int
SameSite allows a server to define a cookie attribute making it impossible for the browser to send this cookie along with cross-site requests. The main goal is to mitigate the risk of cross-origin information leakage, and provide some protection against cross-site request forgery attacks.
See https://tools.ietf.org/html/draft-ietf-httpbis-cookie-same-site-00 for details.
type Sock ¶
type Sock struct {
// contains filtered or unexported fields
}
func NewSock ¶
func NewSock(handler SocketMessageHandler) *Sock
func (*Sock) Use ¶
func (sh *Sock) Use(handler SocketMessageHandler)
type Socket ¶
type Socket interface { ID() nxid.ID Send(...Message) Stat() SocketStat LocalAddr() net.Addr RemoteAddr() net.Addr Listen(SocketMessageHandler) }
Socket defines an underline connection handler which handles delivery of messages to the underline stream.
type SocketByteHandler ¶
SocketByteHandler defines the function contract a manager uses to handle a message.
Be aware that returning an error from the handler to the Gorilla socket will cause the immediate closure of that socket and ending communication with the client and the error will be logged. So unless your intention is to end the connection, handle it yourself.
type SocketHandler ¶
type SocketHandler func(from Socket)
SocketHandler defines the function contract to be called for a socket instace.
type SocketMessageHandler ¶
SocketMessageHandler defines the function contract a sabuhp.Socket uses to handle a message.
Be aware that returning an error from the handler to the Gorilla sabuhp.Socket will cause the immediate closure of that socket and ending communication with the client and the error will be logged. So unless your intention is to end the connection, handle it yourself.
type SocketNotification ¶
type SocketNotification func(socket Socket)
type SocketServer ¶
type SocketServer interface {
Stream(SocketService)
}
type SocketServers ¶
type SocketServers struct {
// contains filtered or unexported fields
}
func NewSocketServers ¶
func NewSocketServers() *SocketServers
func (*SocketServers) SocketClosed ¶
func (htp *SocketServers) SocketClosed(socket Socket)
func (*SocketServers) SocketOpened ¶
func (htp *SocketServers) SocketOpened(socket Socket)
func (*SocketServers) Stream ¶
func (htp *SocketServers) Stream(server SocketService)
type SocketService ¶
type SocketStat ¶
type SocketStat struct { Addr net.Addr RemoteAddr net.Addr Id string Sent int64 Received int64 Handled int64 }
func (SocketStat) EncodeObject ¶
func (g SocketStat) EncodeObject(encoder npkg.ObjectEncoder)
type StreamBus ¶
type StreamBus struct { Logger Logger Bus MessageBus // contains filtered or unexported fields }
func NewStreamBus ¶
func NewStreamBus(logger Logger, bus MessageBus) *StreamBus
func WithBus ¶
func WithBus(logger Logger, socketServer SocketServer, bus MessageBus) *StreamBus
WithBus returns the instance of a StreamBus which will be connected to the provided SocketServer and handle delivery of messages to a message bus and subscription/unsubcriptions as well.
func (*StreamBus) SocketBusSend ¶
func (st *StreamBus) SocketBusSend(b Message, sock Socket) MessageErr
func (*StreamBus) SocketClosed ¶
func (*StreamBus) SocketOpened ¶
func (*StreamBus) SocketSubscribe ¶
func (st *StreamBus) SocketSubscribe(b Message, socket Socket) MessageErr
func (*StreamBus) SocketUnsubscribe ¶
func (st *StreamBus) SocketUnsubscribe(b Message, socket Socket) MessageErr
type StreamBusRelay ¶
type StreamBusRelay struct { Logger Logger BusRelay *BusRelay Bus MessageBus }
func NewStreamBusRelay ¶
func NewStreamBusRelay(logger Logger, bus MessageBus, relay *BusRelay) *StreamBusRelay
func WithBusRelay ¶
func WithBusRelay(logger Logger, socketServer SocketServer, bus MessageBus, relay *BusRelay) *StreamBusRelay
WithBusRelay returns the instance of a StreamBusRelay which will be connected to the provided SocketServer and handle delivery of messages to a message bus and subscription to a target relay.
func (*StreamBusRelay) SocketBusSend ¶
func (st *StreamBusRelay) SocketBusSend(b Message, sock Socket) MessageErr
func (*StreamBusRelay) SocketClosed ¶
func (st *StreamBusRelay) SocketClosed(socket Socket)
func (*StreamBusRelay) SocketOpened ¶
func (st *StreamBusRelay) SocketOpened(socket Socket)
func (*StreamBusRelay) SocketSubscribe ¶
func (st *StreamBusRelay) SocketSubscribe(b Message, socket Socket) MessageErr
func (*StreamBusRelay) SocketUnsubscribe ¶
func (st *StreamBusRelay) SocketUnsubscribe(b Message, socket Socket) MessageErr
type StreamFunc ¶
type StreamFunc struct { Listen func(Message, Socket) error Subscribe func(Message, Socket) error Unsubscribe func(Message, Socket) error Closed func(Socket) }
func (StreamFunc) SocketClosed ¶
func (st StreamFunc) SocketClosed(socket Socket)
func (StreamFunc) SocketOpened ¶
func (st StreamFunc) SocketOpened(socket Socket)
type StreamRelay ¶
type StreamRelay struct { Logger Logger Relay *PbRelay Bus MessageBus }
func NewStreamRelay ¶
func NewStreamRelay(logger Logger, bus MessageBus, relay *PbRelay) *StreamRelay
func WithRelay ¶
func WithRelay(logger Logger, socketServer SocketServer, bus MessageBus, relay *PbRelay) *StreamRelay
WithRelay returns the instance of a StreamRelay which will be connected to the provided SocketServer and handle delivery of messages to a message bus and subscription to a target relay.
func (*StreamRelay) SocketBusSend ¶
func (st *StreamRelay) SocketBusSend(b Message, sock Socket) MessageErr
func (*StreamRelay) SocketClosed ¶
func (st *StreamRelay) SocketClosed(socket Socket)
func (*StreamRelay) SocketOpened ¶
func (st *StreamRelay) SocketOpened(socket Socket)
func (*StreamRelay) SocketSubscribe ¶
func (st *StreamRelay) SocketSubscribe(b Message, socket Socket) MessageErr
func (*StreamRelay) SocketUnsubscribe ¶
func (st *StreamRelay) SocketUnsubscribe(b Message, socket Socket) MessageErr
type Topic ¶ added in v0.5.1
func TR ¶ added in v0.6.1
TR allows creating a topic with a environment prefix and a 20 length random string suffix.
func TRS ¶ added in v0.6.1
TRS allows creating a topic with a environment and service prefix and a 20 length random string suffix.
func (Topic) ReplyTopic ¶ added in v0.5.1
type TopicPartial ¶ added in v0.6.2
func CreateTopicPartial ¶ added in v0.6.2
func CreateTopicPartial(env string, service string) TopicPartial
CreateTopicPartial returns a TopicPartial which will always generate a topic match the target TRS topic naming format: env.service.topic_name.
type Transport ¶
type Transport struct { Bus MessageBus Socket Socket }
func (Transport) ToBusElseSocket ¶ added in v0.4.7
func (Transport) ToSocketElseBus ¶ added in v0.4.7
func (Transport) ToSocketOnly ¶ added in v0.4.7
type TransportResponse ¶
type TransportResponse interface {
Handle(context.Context, Message, Transport) MessageErr
}
type TransportResponseFunc ¶
type TransportResponseFunc func(context.Context, Message, Transport) MessageErr
func (TransportResponseFunc) Handle ¶
func (t TransportResponseFunc) Handle(ctx context.Context, message Message, tr Transport) MessageErr
type Wrapper ¶
type Wrapper func(response TransportResponse) TransportResponse
Wrapper is just a type of `func(TransportResponse) TransportResponse` which is a common type definition for net/http middlewares.
type Wrappers ¶
type Wrappers []Wrapper
Wrappers contains `Wrapper`s that can be registered and used by a "main route handler". Look the `Pre` and `For/ForFunc` functions too.
func Pre ¶
Pre starts a chain of handlers for wrapping a "main route handler" the registered "middleware" will run before the main handler(see `Wrappers#For/ForFunc`).
Usage: mux := muxie.NewMux() myMiddlewares := muxie.Pre(myFirstMiddleware, mySecondMiddleware) mux.UseHandle("/", myMiddlewares.ForFunc(myMainRouteTransportResponse))
func (Wrappers) For ¶
func (w Wrappers) For(main TransportResponse) TransportResponse
For registers the wrappers for a specific handler and returns a handler that can be passed via the `UseHandle` function.
func (Wrappers) ForFunc ¶
func (w Wrappers) ForFunc(mainFunc TransportResponseFunc) TransportResponse
ForFunc registers the wrappers for a specific raw handler function and returns a handler that can be passed via the `UseHandle` function.