Documentation
¶
Index ¶
- Constants
- Variables
- func ParseEvent[T interface{}](data []byte) (T, error)
- type APIVersionEvent
- type BlockAdded
- type BlockAddedEvent
- type Client
- type Consumer
- type CtxWorkerID
- type DeployAcceptedEvent
- type DeployExpiredEvent
- type DeployExpiredPayload
- type DeployProcessedEvent
- type DeployProcessedPayload
- type ErrUnknownEventType
- type EventData
- type EventParser
- type EventStreamReader
- type EventType
- type FaultEvent
- type FaultPayload
- type FinalitySignatureEvent
- type FinalitySignaturePayload
- type Handler
- type HandlerFunc
- type HttpConnection
- type Middleware
- type MiddlewareHandler
- type RawEvent
- func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)
- func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)
- func (d *RawEvent) ParseAsDeployAcceptedEvent() (DeployAcceptedEvent, error)
- func (d *RawEvent) ParseAsDeployExpiredEvent() (DeployExpiredEvent, error)
- func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)
- func (d *RawEvent) ParseAsFaultEvent() (FaultEvent, error)
- func (d *RawEvent) ParseAsFinalitySignatureEvent() (FinalitySignatureEvent, error)
- func (d *RawEvent) ParseAsStepEvent() (StepEvent, error)
- type StepEvent
- type StepPayload
- type Streamer
Constants ¶
const DefaultBufferSize = 4096
Variables ¶
var AllEventsNames = map[EventType]string{ APIVersionEventType: "ApiVersion", BlockAddedEventType: "BlockAdded", DeployProcessedEventType: "DeployProcessed", DeployAcceptedEventType: "DeployAccepted", DeployExpiredEventType: "DeployExpired", StepEventType: "Step", FaultEventType: "Fault", FinalitySignatureType: "FinalitySignature", ShutdownType: "Shutdown", }
var ErrFullStreamTimeoutError = errors.New("can't fill the stream, because it full")
var ErrHandlerNotRegistered = errors.New("handler is not registered")
Functions ¶
func ParseEvent ¶ added in v1.0.0
Types ¶
type APIVersionEvent ¶
type APIVersionEvent struct {
APIVersion string `json:"ApiVersion"`
}
type BlockAdded ¶
BlockAddedEvent definition
type BlockAddedEvent ¶
type BlockAddedEvent struct {
BlockAdded BlockAdded `json:"BlockAdded"`
}
BlockAddedEvent definition
type Client ¶
type Client struct { Streamer *Streamer Consumer *Consumer EventStream chan RawEvent StreamErrorHandler func(<-chan error) ConsumerErrorHandler func(<-chan error) WorkersCount int // contains filtered or unexported fields }
Client is a facade that provide convenient interface to process data from the stream, and unites Streamer and Consumer under implementation. Also, the Client allows to register global middleware that will be applied for all handlers.
func (*Client) RegisterHandler ¶
func (p *Client) RegisterHandler(eventType EventType, handler HandlerFunc)
func (*Client) RegisterMiddleware ¶
func (p *Client) RegisterMiddleware(one Middleware)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a service that registers event handlers and assigns events from the stream to specific handlers.
func NewConsumer ¶
func NewConsumer() *Consumer
func (*Consumer) RegisterHandler ¶
func (c *Consumer) RegisterHandler(eventType EventType, handler HandlerFunc)
type DeployAcceptedEvent ¶
type DeployExpiredEvent ¶ added in v1.0.0
type DeployExpiredEvent struct {
DeployExpired DeployExpiredPayload `json:"DeployExpired"`
}
type DeployExpiredPayload ¶ added in v1.0.0
type DeployProcessedEvent ¶
type DeployProcessedEvent struct {
DeployProcessed DeployProcessedPayload `json:"DeployProcessed"`
}
type DeployProcessedPayload ¶ added in v1.0.0
type ErrUnknownEventType ¶
type ErrUnknownEventType struct { RawData []byte // contains filtered or unexported fields }
func NewErrUnknownEventType ¶
func NewErrUnknownEventType(data []byte) ErrUnknownEventType
func (ErrUnknownEventType) Error ¶
func (e ErrUnknownEventType) Error() string
type EventData ¶
type EventData = json.RawMessage
type EventParser ¶
type EventParser struct {
// contains filtered or unexported fields
}
func NewEventParser ¶
func NewEventParser() *EventParser
func (*EventParser) ParseRawEvent ¶
func (e *EventParser) ParseRawEvent(data []byte) (RawEvent, error)
func (*EventParser) RegisterEvent ¶
func (e *EventParser) RegisterEvent(eventType EventType)
type EventStreamReader ¶
type EventStreamReader struct { MaxBufferSize int // contains filtered or unexported fields }
EventStreamReader scans an io.Reader looking for EventStream messages.
func (*EventStreamReader) ReadEvent ¶
func (e *EventStreamReader) ReadEvent() ([]byte, error)
ReadEvent scans the EventStream for events.
func (*EventStreamReader) RegisterStream ¶
func (e *EventStreamReader) RegisterStream(eventStream io.Reader)
RegisterStream register buffer scanner for stream of EventStreamReader.
type FaultEvent ¶ added in v1.0.0
type FaultEvent struct {
Fault FaultPayload `json:"Fault"`
}
type FaultPayload ¶ added in v1.0.0
type FinalitySignatureEvent ¶ added in v1.0.0
type FinalitySignatureEvent struct {
FinalitySignature FinalitySignaturePayload `json:"FinalitySignature"`
}
type FinalitySignaturePayload ¶ added in v1.0.0
type HandlerFunc ¶
HandlerFunc is the interface of function that should be implemented in each specific event handler.
type HttpConnection ¶
type HttpConnection struct { Headers map[string]string URL string // contains filtered or unexported fields }
HttpConnection is responsible to establish connection with SSE server. Create Request, handle http error and provide a response.
func NewHttpConnection ¶
func NewHttpConnection(httpClient *http.Client, sourceUrl string) *HttpConnection
type Middleware ¶
type Middleware func(handler HandlerFunc) HandlerFunc
type MiddlewareHandler ¶
type MiddlewareHandler interface {
Process(handler HandlerFunc) HandlerFunc
}
type RawEvent ¶
func (*RawEvent) ParseAsAPIVersionEvent ¶
func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)
func (*RawEvent) ParseAsBlockAddedEvent ¶
func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)
func (*RawEvent) ParseAsDeployAcceptedEvent ¶ added in v1.0.0
func (d *RawEvent) ParseAsDeployAcceptedEvent() (DeployAcceptedEvent, error)
func (*RawEvent) ParseAsDeployExpiredEvent ¶ added in v1.0.0
func (d *RawEvent) ParseAsDeployExpiredEvent() (DeployExpiredEvent, error)
func (*RawEvent) ParseAsDeployProcessedEvent ¶
func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)
func (*RawEvent) ParseAsFaultEvent ¶ added in v1.0.0
func (d *RawEvent) ParseAsFaultEvent() (FaultEvent, error)
func (*RawEvent) ParseAsFinalitySignatureEvent ¶ added in v1.0.0
func (d *RawEvent) ParseAsFinalitySignatureEvent() (FinalitySignatureEvent, error)
func (*RawEvent) ParseAsStepEvent ¶ added in v1.0.0
type StepEvent ¶ added in v1.0.0
type StepEvent struct {
Step StepPayload `json:"step"`
}
type StepPayload ¶ added in v1.0.0
type StepPayload struct { EraID uint64 `json:"era_id"` ExecutionEffect types.Effect `json:"execution_effect"` // Todo: not sure, didn't found example to test Operations []types.Operation `json:"operations"` // Todo: not sure, didn't found example to test Transform types.TransformKey `json:"transform"` }
type Streamer ¶
type Streamer struct { Connection *HttpConnection StreamReader *EventStreamReader // This duration allows the stream's buffer to stay in fill up completely state, which could indicate // that the workers are working too slowly and have not received any messages. // If this period elapses without any messages being received, an ErrFullStreamTimeoutError will be thrown. BlockedStreamLimit time.Duration // contains filtered or unexported fields }
Streamer is a service that main responsibility is to fill the events' channel. The Connection management is isolated in this service. Service uses a HttpConnection to get HTTP response as a stream resource and provides it to the EventStreamReader, that supposes to parse bytes from the response's body. This design assumes to manage the connection and provide reconnection logic above of this service.
func DefaultStreamer ¶
DefaultStreamer is a shortcut to fast start with Streamer
func NewStreamer ¶
func NewStreamer( client *HttpConnection, reader *EventStreamReader, blockedStreamLimit time.Duration, ) *Streamer
NewStreamer is the idiomatic way to create Streamer