Documentation ¶
Index ¶
- Constants
- Variables
- func DefaultErrorHandler(w http.ResponseWriter, r *http.Request, err error)
- func DefaultMarshalMessageFunc(url string, msg *message.Message) (*http.Request, error)
- func DefaultUnmarshalMessageFunc(topic string, req *http.Request) (*message.Message, error)
- func SetResponseStatusCode(m *message.Message, code int) *message.Message
- func StatusCodeFromContext(ctx context.Context, otherwise int) int
- func WithResponseStatusCode(ctx context.Context, code int) context.Context
- type HandleErrorFunc
- type JSONSSEMarshaler
- type MarshalMessageFunc
- type Publisher
- type PublisherConfig
- type SSEMarshaler
- type SSERouter
- type SSERouterConfig
- type ServerSentEvent
- type StreamAdapter
- type StringSSEMarshaler
- type Subscriber
- type SubscriberConfig
- type UnmarshalMessageFunc
Constants ¶
const ( HeaderUUID = "Message-Uuid" HeaderMetadata = "Message-Metadata" ProviderName = "http" )
Variables ¶
var ( // ErrPublisherClosed happens when trying to publish to a topic while the publisher is closed or closing. ErrPublisherClosed = errors.New("publisher is closed") ErrNoMarshalFunc = errors.New("marshal function is missing") ErrErrorResponse = errors.New("server responded with error status") )
Functions ¶
func DefaultErrorHandler ¶
func DefaultErrorHandler(w http.ResponseWriter, r *http.Request, err error)
DefaultErrorHandler writes JSON error response along with Internal Server Error code (500).
func DefaultMarshalMessageFunc ¶
DefaultMarshalMessageFunc transforms the message into a HTTP POST request. It encodes the UUID and Metadata in request headers.
func DefaultUnmarshalMessageFunc ¶
DefaultUnmarshalMessageFunc retrieves the UUID and Metadata from request headers, as encoded by DefaultMarshalMessageFunc.
func SetResponseStatusCode ¶ added in v2.3.0
SetResponseStatusCode sets a http status code to the given message.
func StatusCodeFromContext ¶ added in v2.3.0
StatusCodeFromContext returns the status code from the context.
Types ¶
type HandleErrorFunc ¶
type HandleErrorFunc func(w http.ResponseWriter, r *http.Request, err error)
type JSONSSEMarshaler ¶ added in v2.2.0
type JSONSSEMarshaler struct{}
func (JSONSSEMarshaler) Marshal ¶ added in v2.2.0
func (j JSONSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)
type MarshalMessageFunc ¶
MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisher creates a new Publisher. It publishes the received messages as HTTP requests. The URL, method and payload of the request are determined by the configured MarshalMessageFunc.
type PublisherConfig ¶
type PublisherConfig struct { MarshalMessageFunc MarshalMessageFunc Client *http.Client // if false (default), when server responds with error (>=400) to the webhook request, the response body is logged. DoNotLogResponseBodyOnServerError bool }
type SSEMarshaler ¶ added in v2.2.0
type SSEMarshaler interface {
Marshal(ctx context.Context, payload any) (ServerSentEvent, error)
}
type SSERouter ¶
type SSERouter struct {
// contains filtered or unexported fields
}
SSERouter is a router handling Server-Sent Events.
func NewSSERouter ¶
func NewSSERouter( config SSERouterConfig, logger watermill.LoggerAdapter, ) (SSERouter, error)
NewSSERouter creates a new SSERouter.
func (SSERouter) AddHandler ¶
func (r SSERouter) AddHandler(topic string, streamAdapter StreamAdapter) http.HandlerFunc
AddHandler starts a new handler for a given topic.
type SSERouterConfig ¶
type SSERouterConfig struct { UpstreamSubscriber message.Subscriber ErrorHandler HandleErrorFunc Marshaler SSEMarshaler }
type ServerSentEvent ¶ added in v2.2.0
type StreamAdapter ¶
type StreamAdapter interface { // InitialStreamResponse returns the first stream response to be sent back to client. // Any errors that occur should be handled and written to `w`. // Returning `ok` equal false ends processing the HTTP request. InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool) // NextStreamResponse returns the next stream response to be sent back to the client. // Typically this involves checking some kind of model ID extracted from the `msg`. // The response is sent to the client only if `ok` is true. // Any errors that occur should be either: // 1) logged and skipped, returning (nil, false) // 2) sent back to the client, returning (errorStruct, true) NextStreamResponse(r *http.Request, msg *message.Message) (response interface{}, ok bool) }
type StringSSEMarshaler ¶ added in v2.2.0
type StringSSEMarshaler struct{}
func (StringSSEMarshaler) Marshal ¶ added in v2.2.0
func (s StringSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber can subscribe to HTTP requests and create Watermill's messages based on them.
func NewSubscriber ¶
func NewSubscriber(addr string, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriber creates new Subscriber.
addr is TCP address to listen on
logger is Watermill's logger.
func (*Subscriber) Addr ¶
func (s *Subscriber) Addr() net.Addr
Addr returns the server address or nil if the server isn't running.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) StartHTTPServer ¶
func (s *Subscriber) StartHTTPServer() error
StartHTTPServer starts http server. It must be called after all Subscribe calls have completed. Just like http.Server.Serve(), it returns http.ErrServerClosed after the server's been closed. https://golang.org/pkg/net/http/#Server.Serve
func (*Subscriber) Subscribe ¶
Subscribe adds HTTP handler which will listen in provided url for messages.
Subscribe needs to be called before `StartHTTPServer`.
When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent. When Nack is sent, 500 HTTP status will be sent.
type SubscriberConfig ¶
type SubscriberConfig struct { Router chi.Router UnmarshalMessageFunc UnmarshalMessageFunc }