Documentation
¶
Index ¶
- Constants
- Variables
- type APIVersionEvent
- type BlockAdded
- type BlockAddedEvent
- type Client
- type Consumer
- type CtxWorkerID
- type DeployAcceptedEvent
- type DeployProcessed
- type DeployProcessedEvent
- type ErrUnknownEventType
- type EventData
- type EventParser
- type EventStreamReader
- type EventType
- type Handler
- type HandlerFunc
- type HttpConnection
- type Middleware
- type MiddlewareHandler
- type RawEvent
- type Streamer
Constants ¶
const DefaultBufferSize = 4096
Variables ¶
var AllEventsNames = map[EventType]string{ APIVersionEventType: "ApiVersion", BlockAddedEventType: "BlockAdded", DeployProcessedEventType: "DeployProcessed", DeployAcceptedEventType: "DeployAccepted", DeployExpiredEventType: "DeployExpired", StepEventType: "Step", FaultEventType: "Fault", FinalitySignatureType: "FinalitySignature", }
var ErrFullStreamTimeoutError = errors.New("can't fill the stream, because it full")
var ErrHandlerNotRegistered = errors.New("handler is not registered")
Functions ¶
This section is empty.
Types ¶
type APIVersionEvent ¶
type APIVersionEvent struct {
APIVersion string `json:"ApiVersion"`
}
type BlockAdded ¶
BlockAddedEvent definition
type BlockAddedEvent ¶
type BlockAddedEvent struct {
BlockAdded BlockAdded `json:"BlockAdded"`
}
BlockAddedEvent definition
type Client ¶
type Client struct { Streamer *Streamer Consumer *Consumer EventStream chan RawEvent StreamErrorHandler func(<-chan error) ConsumerErrorHandler func(<-chan error) WorkersCount int // contains filtered or unexported fields }
Client is a facade that provide convenient interface to process data from the stream, and unites Streamer and Consumer under implementation. Also, the Client allows to register global middleware that will be applied for all handlers.
func (*Client) RegisterHandler ¶
func (p *Client) RegisterHandler(eventType EventType, handler HandlerFunc)
func (*Client) RegisterMiddleware ¶
func (p *Client) RegisterMiddleware(one Middleware)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a service that registers event handlers and assigns events from the stream to specific handlers.
func NewConsumer ¶
func NewConsumer() *Consumer
func (*Consumer) RegisterHandler ¶
func (c *Consumer) RegisterHandler(eventType EventType, handler HandlerFunc)
type DeployAcceptedEvent ¶
type DeployProcessed ¶
type DeployProcessedEvent ¶
type DeployProcessedEvent struct {
DeployProcessed DeployProcessed `json:"DeployProcessed"`
}
type ErrUnknownEventType ¶
type ErrUnknownEventType struct { RawData []byte // contains filtered or unexported fields }
func NewErrUnknownEventType ¶
func NewErrUnknownEventType(data []byte) ErrUnknownEventType
func (ErrUnknownEventType) Error ¶
func (e ErrUnknownEventType) Error() string
type EventData ¶
type EventData = json.RawMessage
type EventParser ¶
type EventParser struct {
// contains filtered or unexported fields
}
func NewEventParser ¶
func NewEventParser() *EventParser
func (*EventParser) ParseRawEvent ¶
func (e *EventParser) ParseRawEvent(data []byte) (RawEvent, error)
func (*EventParser) RegisterEvent ¶
func (e *EventParser) RegisterEvent(eventType EventType)
type EventStreamReader ¶
type EventStreamReader struct { MaxBufferSize int // contains filtered or unexported fields }
EventStreamReader scans an io.Reader looking for EventStream messages.
func (*EventStreamReader) ReadEvent ¶
func (e *EventStreamReader) ReadEvent() ([]byte, error)
ReadEvent scans the EventStream for events.
func (*EventStreamReader) RegisterStream ¶
func (e *EventStreamReader) RegisterStream(eventStream io.Reader)
RegisterStream register buffer scanner for stream of EventStreamReader.
type HandlerFunc ¶
HandlerFunc is the interface of function that should be implemented in each specific event handler.
type HttpConnection ¶
type HttpConnection struct { Headers map[string]string URL string // contains filtered or unexported fields }
HttpConnection is responsible to establish connection with SSE server. Create Request, handle http error and provide a response.
func NewHttpConnection ¶
func NewHttpConnection(httpClient *http.Client, sourceUrl string) *HttpConnection
type Middleware ¶
type Middleware func(handler HandlerFunc) HandlerFunc
type MiddlewareHandler ¶
type MiddlewareHandler interface {
Process(handler HandlerFunc) HandlerFunc
}
type RawEvent ¶
func (*RawEvent) ParseAsAPIVersionEvent ¶
func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)
func (*RawEvent) ParseAsBlockAddedEvent ¶
func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)
func (*RawEvent) ParseAsDeployProcessedEvent ¶
func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)
type Streamer ¶
type Streamer struct { Connection *HttpConnection StreamReader *EventStreamReader // This duration allows the stream's buffer to stay in fill up completely state, which could indicate // that the workers are working too slowly and have not received any messages. // If this period elapses without any messages being received, an ErrFullStreamTimeoutError will be thrown. BlockedStreamLimit time.Duration // contains filtered or unexported fields }
Streamer is a service that main responsibility is to fill the events' channel. The Connection management is isolated in this service. Service uses a HttpConnection to get HTTP response as a stream resource and provides it to the EventStreamReader, that supposes to parse bytes from the response's body. This design assumes to manage the connection and provide reconnection logic above of this service.
func DefaultStreamer ¶
DefaultStreamer is a shortcut to fast start with Streamer
func NewStreamer ¶
func NewStreamer( client *HttpConnection, reader *EventStreamReader, blockedStreamLimit time.Duration, ) *Streamer
NewStreamer is the idiomatic way to create Streamer