http

package
v2.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2024 License: MIT Imports: 16 Imported by: 3

Documentation

Index

Constants

View Source
const (
	HeaderUUID     = "Message-Uuid"
	HeaderMetadata = "Message-Metadata"
	ProviderName   = "http"
)

Variables

View Source
var (
	// ErrPublisherClosed happens when trying to publish to a topic while the publisher is closed or closing.
	ErrPublisherClosed = errors.New("publisher is closed")
	ErrNoMarshalFunc   = errors.New("marshal function is missing")
	ErrErrorResponse   = errors.New("server responded with error status")
)

Functions

func DefaultErrorHandler

func DefaultErrorHandler(w http.ResponseWriter, r *http.Request, err error)

DefaultErrorHandler writes JSON error response along with Internal Server Error code (500).

func DefaultMarshalMessageFunc

func DefaultMarshalMessageFunc(url string, msg *message.Message) (*http.Request, error)

DefaultMarshalMessageFunc transforms the message into a HTTP POST request. It encodes the UUID and Metadata in request headers.

func DefaultUnmarshalMessageFunc

func DefaultUnmarshalMessageFunc(topic string, req *http.Request) (*message.Message, error)

DefaultUnmarshalMessageFunc retrieves the UUID and Metadata from request headers, as encoded by DefaultMarshalMessageFunc.

func SetResponseStatusCode added in v2.3.0

func SetResponseStatusCode(m *message.Message, code int) *message.Message

SetResponseStatusCode sets a http status code to the given message.

func StatusCodeFromContext added in v2.3.0

func StatusCodeFromContext(ctx context.Context, otherwise int) int

StatusCodeFromContext returns the status code from the context.

func WithResponseStatusCode added in v2.3.0

func WithResponseStatusCode(ctx context.Context, code int) context.Context

WithResponseStatusCode returns a new context with the status code.

Types

type HandleErrorFunc

type HandleErrorFunc func(w http.ResponseWriter, r *http.Request, err error)

type JSONSSEMarshaler added in v2.2.0

type JSONSSEMarshaler struct{}

func (JSONSSEMarshaler) Marshal added in v2.2.0

func (j JSONSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)

type MarshalMessageFunc

type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)

MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new Publisher. It publishes the received messages as HTTP requests. The URL, method and payload of the request are determined by the configured MarshalMessageFunc.

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

type PublisherConfig

type PublisherConfig struct {
	MarshalMessageFunc MarshalMessageFunc
	Client             *http.Client
	// if false (default), when server responds with error (>=400) to the webhook request, the response body is logged.
	DoNotLogResponseBodyOnServerError bool
}

type SSEMarshaler added in v2.2.0

type SSEMarshaler interface {
	Marshal(ctx context.Context, payload any) (ServerSentEvent, error)
}

type SSERouter

type SSERouter struct {
	// contains filtered or unexported fields
}

SSERouter is a router handling Server-Sent Events.

func NewSSERouter

func NewSSERouter(
	config SSERouterConfig,
	logger watermill.LoggerAdapter,
) (SSERouter, error)

NewSSERouter creates a new SSERouter.

func (SSERouter) AddHandler

func (r SSERouter) AddHandler(topic string, streamAdapter StreamAdapter) http.HandlerFunc

AddHandler starts a new handler for a given topic.

func (SSERouter) Close

func (r SSERouter) Close() error

Close stops the SSERouter.

func (SSERouter) Run

func (r SSERouter) Run(ctx context.Context) error

Run starts the SSERouter.

func (SSERouter) Running

func (r SSERouter) Running() chan struct{}

Running is closed when the SSERouter is running.

type SSERouterConfig

type SSERouterConfig struct {
	UpstreamSubscriber message.Subscriber
	ErrorHandler       HandleErrorFunc
	Marshaler          SSEMarshaler
}

type ServerSentEvent added in v2.2.0

type ServerSentEvent struct {
	Event string
	Data  []byte
}

type StreamAdapter

type StreamAdapter interface {
	// InitialStreamResponse returns the first stream response to be sent back to client.
	// Any errors that occur should be handled and written to `w`.
	// Returning `ok` equal false ends processing the HTTP request.
	InitialStreamResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
	// NextStreamResponse returns the next stream response to be sent back to the client.
	// Typically this involves checking some kind of model ID extracted from the `msg`.
	// The response is sent to the client only if `ok` is true.
	// Any errors that occur should be either:
	//    1) logged and skipped, returning (nil, false)
	//    2) sent back to the client, returning (errorStruct, true)
	NextStreamResponse(r *http.Request, msg *message.Message) (response interface{}, ok bool)
}

type StringSSEMarshaler added in v2.2.0

type StringSSEMarshaler struct{}

func (StringSSEMarshaler) Marshal added in v2.2.0

func (s StringSSEMarshaler) Marshal(ctx context.Context, payload any) (ServerSentEvent, error)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber can subscribe to HTTP requests and create Watermill's messages based on them.

func NewSubscriber

func NewSubscriber(addr string, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates new Subscriber.

addr is TCP address to listen on

logger is Watermill's logger.

func (*Subscriber) Addr

func (s *Subscriber) Addr() net.Addr

Addr returns the server address or nil if the server isn't running.

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) StartHTTPServer

func (s *Subscriber) StartHTTPServer() error

StartHTTPServer starts http server. It must be called after all Subscribe calls have completed. Just like http.Server.Serve(), it returns http.ErrServerClosed after the server's been closed. https://golang.org/pkg/net/http/#Server.Serve

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message.Message, error)

Subscribe adds HTTP handler which will listen in provided url for messages.

Subscribe needs to be called before `StartHTTPServer`.

When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent. When Nack is sent, 500 HTTP status will be sent.

type SubscriberConfig

type SubscriberConfig struct {
	Router               chi.Router
	UnmarshalMessageFunc UnmarshalMessageFunc
}

type UnmarshalMessageFunc

type UnmarshalMessageFunc func(topic string, request *http.Request) (*message.Message, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL