Documentation
¶
Index ¶
- Constants
- Variables
- func ParseEvent[T interface{}](data []byte) (T, error)
- type APIVersionEvent
- type BlockAdded
- type BlockAddedEvent
- type Client
- type Consumer
- type CtxWorkerID
- type DeployAcceptedEvent
- type DeployProcessedEvent
- type DeployProcessedPayload
- type ErrUnknownEventType
- type EventData
- type EventParser
- type EventStreamReader
- type EventType
- type FaultEvent
- type FaultPayload
- type FinalitySignature
- type FinalitySignatureEvent
- type FinalitySignatureV1
- type FinalitySignatureV2
- type Handler
- type HandlerFunc
- type HttpConnection
- type Middleware
- type MiddlewareHandler
- type RawEvent
- func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)
- func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)
- func (d *RawEvent) ParseAsDeployAcceptedEvent() (DeployAcceptedEvent, error)
- func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)
- func (d *RawEvent) ParseAsFaultEvent() (FaultEvent, error)
- func (d *RawEvent) ParseAsFinalitySignatureEvent() (FinalitySignatureEvent, error)
- func (d *RawEvent) ParseAsStepEvent() (StepEvent, error)
- func (d *RawEvent) ParseAsTransactionAcceptedEvent() (TransactionAcceptedEvent, error)
- func (d *RawEvent) ParseAsTransactionExpiredEvent() (TransactionExpiredEvent, error)
- func (d *RawEvent) ParseAsTransactionProcessedEvent() (TransactionProcessedEvent, error)
- type StepEvent
- type StepPayload
- type Streamer
- type TransactionAcceptedEvent
- type TransactionAcceptedPayload
- type TransactionExpiredEvent
- type TransactionExpiredPayload
- type TransactionProcessedEvent
- type TransactionProcessedPayload
Constants ¶
const DefaultBufferSize = 4096
Variables ¶
var AllEventsNames = map[EventType]string{ APIVersionEventType: "ApiVersion", BlockAddedEventType: "BlockAdded", DeployProcessedEventType: "DeployProcessed", DeployAcceptedEventType: "DeployAccepted", DeployExpiredEventType: "DeployExpired", TransactionProcessedEventType: "TransactionProcessed", TransactionAcceptedEventType: "TransactionAccepted", TransactionExpiredEventType: "TransactionExpired", StepEventType: "Step", FaultEventType: "Fault", FinalitySignatureType: "FinalitySignature", ShutdownType: "Shutdown", }
var ErrFullStreamTimeoutError = errors.New("can't fill the stream, because it full")
var ErrHandlerNotRegistered = errors.New("handler is not registered")
Functions ¶
func ParseEvent ¶
Types ¶
type APIVersionEvent ¶
type APIVersionEvent struct {
APIVersion string `json:"ApiVersion"`
}
type BlockAdded ¶
BlockAddedEvent definition
type BlockAddedEvent ¶
type BlockAddedEvent struct {
BlockAdded BlockAdded `json:"BlockAdded"`
}
BlockAddedEvent definition
func (*BlockAddedEvent) UnmarshalJSON ¶
func (t *BlockAddedEvent) UnmarshalJSON(data []byte) error
type Client ¶
type Client struct { Streamer *Streamer Consumer *Consumer EventStream chan RawEvent StreamErrorHandler func(<-chan error) ConsumerErrorHandler func(<-chan error) WorkersCount int // contains filtered or unexported fields }
Client is a facade that provide convenient interface to process data from the stream, and unites Streamer and Consumer under implementation. Also, the Client allows to register global middleware that will be applied for all handlers.
func (*Client) RegisterHandler ¶
func (p *Client) RegisterHandler(eventType EventType, handler HandlerFunc)
func (*Client) RegisterMiddleware ¶
func (p *Client) RegisterMiddleware(one Middleware)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a service that registers event handlers and assigns events from the stream to specific handlers.
func NewConsumer ¶
func NewConsumer() *Consumer
func (*Consumer) RegisterHandler ¶
func (c *Consumer) RegisterHandler(eventType EventType, handler HandlerFunc)
type DeployAcceptedEvent ¶
type DeployProcessedEvent ¶
type DeployProcessedEvent struct {
DeployProcessed DeployProcessedPayload `json:"DeployProcessed"`
}
type DeployProcessedPayload ¶
type ErrUnknownEventType ¶
type ErrUnknownEventType struct { RawData []byte // contains filtered or unexported fields }
func NewErrUnknownEventType ¶
func NewErrUnknownEventType(data []byte) ErrUnknownEventType
func (ErrUnknownEventType) Error ¶
func (e ErrUnknownEventType) Error() string
type EventData ¶
type EventData = json.RawMessage
type EventParser ¶
type EventParser struct {
// contains filtered or unexported fields
}
func NewEventParser ¶
func NewEventParser() *EventParser
func (*EventParser) ParseRawEvent ¶
func (e *EventParser) ParseRawEvent(data []byte) (RawEvent, error)
func (*EventParser) RegisterEvent ¶
func (e *EventParser) RegisterEvent(eventType EventType)
type EventStreamReader ¶
type EventStreamReader struct { MaxBufferSize int // contains filtered or unexported fields }
EventStreamReader scans an io.Reader looking for EventStream messages.
func (*EventStreamReader) ReadEvent ¶
func (e *EventStreamReader) ReadEvent() ([]byte, error)
ReadEvent scans the EventStream for events.
func (*EventStreamReader) RegisterStream ¶
func (e *EventStreamReader) RegisterStream(eventStream io.Reader)
RegisterStream register buffer scanner for stream of EventStreamReader.
type EventType ¶
type EventType = int
const ( APIVersionEventType EventType = iota + 1 BlockAddedEventType DeployProcessedEventType DeployAcceptedEventType DeployExpiredEventType TransactionProcessedEventType TransactionAcceptedEventType TransactionExpiredEventType EventIDEventType FinalitySignatureType StepEventType FaultEventType ShutdownType )
type FaultEvent ¶
type FaultEvent struct {
Fault FaultPayload `json:"Fault"`
}
type FaultPayload ¶
type FinalitySignature ¶
type FinalitySignature struct { BlockHash key.Hash `json:"block_hash"` BlockHeight *uint64 `json:"block_height"` ChainNameHash *key.Hash `json:"chain_name_hash"` EraID uint64 `json:"era_id"` Signature types.HexBytes `json:"signature"` PublicKey keypair.PublicKey `json:"public_key"` OriginFinalitySignatureV1 *FinalitySignatureV1 }
type FinalitySignatureEvent ¶
type FinalitySignatureEvent struct {
FinalitySignature FinalitySignature `json:"FinalitySignature"`
}
func (*FinalitySignatureEvent) UnmarshalJSON ¶
func (t *FinalitySignatureEvent) UnmarshalJSON(data []byte) error
type FinalitySignatureV1 ¶
type FinalitySignatureV2 ¶
type HandlerFunc ¶
HandlerFunc is the interface of function that should be implemented in each specific event handler.
type HttpConnection ¶
type HttpConnection struct { Headers map[string]string URL string // contains filtered or unexported fields }
HttpConnection is responsible to establish connection with SSE server. Create Request, handle http error and provide a response.
func NewHttpConnection ¶
func NewHttpConnection(httpClient *http.Client, sourceUrl string) *HttpConnection
type Middleware ¶
type Middleware func(handler HandlerFunc) HandlerFunc
type MiddlewareHandler ¶
type MiddlewareHandler interface {
Process(handler HandlerFunc) HandlerFunc
}
type RawEvent ¶
func (*RawEvent) ParseAsAPIVersionEvent ¶
func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)
func (*RawEvent) ParseAsBlockAddedEvent ¶
func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)
func (*RawEvent) ParseAsDeployAcceptedEvent ¶
func (d *RawEvent) ParseAsDeployAcceptedEvent() (DeployAcceptedEvent, error)
func (*RawEvent) ParseAsDeployProcessedEvent ¶
func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)
func (*RawEvent) ParseAsFaultEvent ¶
func (d *RawEvent) ParseAsFaultEvent() (FaultEvent, error)
func (*RawEvent) ParseAsFinalitySignatureEvent ¶
func (d *RawEvent) ParseAsFinalitySignatureEvent() (FinalitySignatureEvent, error)
func (*RawEvent) ParseAsStepEvent ¶
func (*RawEvent) ParseAsTransactionAcceptedEvent ¶
func (d *RawEvent) ParseAsTransactionAcceptedEvent() (TransactionAcceptedEvent, error)
func (*RawEvent) ParseAsTransactionExpiredEvent ¶
func (d *RawEvent) ParseAsTransactionExpiredEvent() (TransactionExpiredEvent, error)
func (*RawEvent) ParseAsTransactionProcessedEvent ¶
func (d *RawEvent) ParseAsTransactionProcessedEvent() (TransactionProcessedEvent, error)
type StepEvent ¶
type StepEvent struct {
Step StepPayload `json:"step"`
}
type StepPayload ¶
type Streamer ¶
type Streamer struct { Connection *HttpConnection StreamReader *EventStreamReader // This duration allows the stream's buffer to stay in fill up completely state, which could indicate // that the workers are working too slowly and have not received any messages. // If this period elapses without any messages being received, an ErrFullStreamTimeoutError will be thrown. BlockedStreamLimit time.Duration // contains filtered or unexported fields }
Streamer is a service that main responsibility is to fill the events' channel. The Connection management is isolated in this service. Service uses a HttpConnection to get HTTP response as a stream resource and provides it to the EventStreamReader, that supposes to parse bytes from the response's body. This design assumes to manage the connection and provide reconnection logic above of this service.
func DefaultStreamer ¶
DefaultStreamer is a shortcut to fast start with Streamer
func NewStreamer ¶
func NewStreamer( client *HttpConnection, reader *EventStreamReader, blockedStreamLimit time.Duration, ) *Streamer
NewStreamer is the idiomatic way to create Streamer
func (*Streamer) FillStream ¶
func (*Streamer) RegisterEvent ¶
type TransactionAcceptedEvent ¶
type TransactionAcceptedEvent struct {
TransactionAcceptedPayload TransactionAcceptedPayload `json:"TransactionAccepted"`
}
func (*TransactionAcceptedEvent) UnmarshalJSON ¶
func (t *TransactionAcceptedEvent) UnmarshalJSON(data []byte) error
type TransactionAcceptedPayload ¶
type TransactionAcceptedPayload struct {
Transaction types.Transaction `json:"transaction"`
}
type TransactionExpiredEvent ¶
type TransactionExpiredEvent struct {
TransactionExpiredPayload TransactionExpiredPayload `json:"TransactionExpired"`
}
func (*TransactionExpiredEvent) UnmarshalJSON ¶
func (t *TransactionExpiredEvent) UnmarshalJSON(data []byte) error
type TransactionExpiredPayload ¶
type TransactionExpiredPayload struct {
TransactionHash types.TransactionHash `json:"transaction_hash"`
}
type TransactionProcessedEvent ¶
type TransactionProcessedEvent struct {
TransactionProcessedPayload TransactionProcessedPayload `json:"TransactionProcessed"`
}
func (*TransactionProcessedEvent) UnmarshalJSON ¶
func (t *TransactionProcessedEvent) UnmarshalJSON(data []byte) error
type TransactionProcessedPayload ¶
type TransactionProcessedPayload struct { BlockHash key.Hash `json:"block_hash"` TransactionHash types.TransactionHash `json:"transaction_hash"` InitiatorAddr types.InitiatorAddr `json:"initiator_addr"` Timestamp time.Time `json:"timestamp"` TTL string `json:"ttl"` ExecutionResult types.ExecutionResult `json:"execution_result"` Messages []types.Message `json:"messages"` }