sse

package
v2.0.3-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

README

SSE client

SSE package provide basic functionality to work with Casper events that streamed by SSE server. It connects to the server and collect events to go channel, from other side consumers obtain this stream and delegate the process to specific handlers.

The example of simple usage:

func main() {
	client := sse.NewClient("http://52.3.38.81:9999/events/main")
	defer client.Stop()
	client.RegisterHandler(
		sse.DeployProcessedEventType,
		func(ctx context.Context, rawEvent sse.RawEvent) error {
			log.Printf("eventID: %d, raw data: %s", rawEvent.EventID, rawEvent.Data)
			deploy, err := rawEvent.ParseAsDeployProcessedEvent()
			if err != nil {
				return err
			}
			log.Printf("Deploy hash: %s", deploy.DeployProcessed.DeployHash)
			return nil
		})
	lastEventID := 1234
	client.Start(context.TODO(), lastEventID)
}

For more examples, please check example_test.go

The SSE client is flexible and configurable, for the advanced usage check the advanced doc

Documentation

Index

Constants

View Source
const DefaultBufferSize = 4096

Variables

View Source
var AllEventsNames = map[EventType]string{
	APIVersionEventType:           "ApiVersion",
	BlockAddedEventType:           "BlockAdded",
	DeployProcessedEventType:      "DeployProcessed",
	DeployAcceptedEventType:       "DeployAccepted",
	DeployExpiredEventType:        "DeployExpired",
	TransactionProcessedEventType: "TransactionProcessed",
	TransactionAcceptedEventType:  "TransactionAccepted",
	TransactionExpiredEventType:   "TransactionExpired",
	StepEventType:                 "Step",
	FaultEventType:                "Fault",
	FinalitySignatureType:         "FinalitySignature",
	ShutdownType:                  "Shutdown",
}
View Source
var ErrFullStreamTimeoutError = errors.New("can't fill the stream, because it full")
View Source
var ErrHandlerNotRegistered = errors.New("handler is not registered")

Functions

func ParseEvent

func ParseEvent[T interface{}](data []byte) (T, error)

Types

type APIVersionEvent

type APIVersionEvent struct {
	APIVersion string `json:"ApiVersion"`
}

type BlockAdded

type BlockAdded struct {
	BlockHash string      `json:"block_hash"`
	Block     types.Block `json:"block"`
}

BlockAddedEvent definition

type BlockAddedEvent

type BlockAddedEvent struct {
	BlockAdded BlockAdded `json:"BlockAdded"`
}

BlockAddedEvent definition

func (*BlockAddedEvent) UnmarshalJSON

func (t *BlockAddedEvent) UnmarshalJSON(data []byte) error

type Client

type Client struct {
	Streamer *Streamer
	Consumer *Consumer

	EventStream chan RawEvent

	StreamErrorHandler   func(<-chan error)
	ConsumerErrorHandler func(<-chan error)

	WorkersCount int
	// contains filtered or unexported fields
}

Client is a facade that provide convenient interface to process data from the stream, and unites Streamer and Consumer under implementation. Also, the Client allows to register global middleware that will be applied for all handlers.

func NewClient

func NewClient(url string) *Client

func (*Client) RegisterHandler

func (p *Client) RegisterHandler(eventType EventType, handler HandlerFunc)

func (*Client) RegisterMiddleware

func (p *Client) RegisterMiddleware(one Middleware)

func (*Client) Start

func (p *Client) Start(ctx context.Context, lastEventID int) error

func (*Client) Stop

func (p *Client) Stop()

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a service that registers event handlers and assigns events from the stream to specific handlers.

func NewConsumer

func NewConsumer() *Consumer

func (*Consumer) RegisterHandler

func (c *Consumer) RegisterHandler(eventType EventType, handler HandlerFunc)

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, events <-chan RawEvent, errCh chan<- error) error

type CtxWorkerID

type CtxWorkerID string
const CtxWorkerIDKey CtxWorkerID = "workerID"

type DeployAcceptedEvent

type DeployAcceptedEvent struct {
	DeployAccepted types.Deploy `json:"DeployAccepted"`
}

type DeployProcessedEvent

type DeployProcessedEvent struct {
	DeployProcessed DeployProcessedPayload `json:"DeployProcessed"`
}

type DeployProcessedPayload

type DeployProcessedPayload struct {
	DeployHash      key.Hash                `json:"deploy_hash"`
	Account         keypair.PublicKey       `json:"account"`
	Timestamp       time.Time               `json:"timestamp"`
	TTL             string                  `json:"ttl"`
	BlockHash       key.Hash                `json:"block_hash"`
	ExecutionResult types.ExecutionResultV1 `json:"execution_result"`
}

type ErrUnknownEventType

type ErrUnknownEventType struct {
	RawData []byte
	// contains filtered or unexported fields
}

func NewErrUnknownEventType

func NewErrUnknownEventType(data []byte) ErrUnknownEventType

func (ErrUnknownEventType) Error

func (e ErrUnknownEventType) Error() string

type EventData

type EventData = json.RawMessage

type EventParser

type EventParser struct {
	// contains filtered or unexported fields
}

func NewEventParser

func NewEventParser() *EventParser

func (*EventParser) ParseRawEvent

func (e *EventParser) ParseRawEvent(data []byte) (RawEvent, error)

func (*EventParser) RegisterEvent

func (e *EventParser) RegisterEvent(eventType EventType)

type EventStreamReader

type EventStreamReader struct {
	MaxBufferSize int
	// contains filtered or unexported fields
}

EventStreamReader scans an io.Reader looking for EventStream messages.

func (*EventStreamReader) ReadEvent

func (e *EventStreamReader) ReadEvent() ([]byte, error)

ReadEvent scans the EventStream for events.

func (*EventStreamReader) RegisterStream

func (e *EventStreamReader) RegisterStream(eventStream io.Reader)

RegisterStream register buffer scanner for stream of EventStreamReader.

type EventType

type EventType = int
const (
	APIVersionEventType EventType = iota + 1
	BlockAddedEventType
	DeployProcessedEventType
	DeployAcceptedEventType
	DeployExpiredEventType
	TransactionProcessedEventType
	TransactionAcceptedEventType
	TransactionExpiredEventType
	EventIDEventType
	FinalitySignatureType
	StepEventType
	FaultEventType
	ShutdownType
)

type FaultEvent

type FaultEvent struct {
	Fault FaultPayload `json:"Fault"`
}

type FaultPayload

type FaultPayload struct {
	EraID     uint64            `json:"era_id"`
	PublicKey keypair.PublicKey `json:"public_key"`
	Timestamp types.Timestamp   `json:"timestamp"`
}

type FinalitySignature

type FinalitySignature struct {
	BlockHash     key.Hash          `json:"block_hash"`
	BlockHeight   *uint64           `json:"block_height"`
	ChainNameHash *key.Hash         `json:"chain_name_hash"`
	EraID         uint64            `json:"era_id"`
	Signature     types.HexBytes    `json:"signature"`
	PublicKey     keypair.PublicKey `json:"public_key"`

	OriginFinalitySignatureV1 *FinalitySignatureV1
}

type FinalitySignatureEvent

type FinalitySignatureEvent struct {
	FinalitySignature FinalitySignature `json:"FinalitySignature"`
}

func (*FinalitySignatureEvent) UnmarshalJSON

func (t *FinalitySignatureEvent) UnmarshalJSON(data []byte) error

type FinalitySignatureV1

type FinalitySignatureV1 struct {
	BlockHash key.Hash          `json:"block_hash"`
	EraID     uint64            `json:"era_id"`
	Signature types.HexBytes    `json:"signature"`
	PublicKey keypair.PublicKey `json:"public_key"`
}

type FinalitySignatureV2

type FinalitySignatureV2 struct {
	BlockHash     key.Hash          `json:"block_hash"`
	BlockHeight   *uint64           `json:"block_height"`
	ChainNameHash *key.Hash         `json:"chain_name_hash"`
	EraID         uint64            `json:"era_id"`
	Signature     types.HexBytes    `json:"signature"`
	PublicKey     keypair.PublicKey `json:"public_key"`
}

type Handler

type Handler interface {
	Handle(context.Context, RawEvent) error
}

type HandlerFunc

type HandlerFunc func(context.Context, RawEvent) error

HandlerFunc is the interface of function that should be implemented in each specific event handler.

type HttpConnection

type HttpConnection struct {
	Headers map[string]string
	URL     string
	// contains filtered or unexported fields
}

HttpConnection is responsible to establish connection with SSE server. Create Request, handle http error and provide a response.

func NewHttpConnection

func NewHttpConnection(httpClient *http.Client, sourceUrl string) *HttpConnection

func (*HttpConnection) Request

func (c *HttpConnection) Request(ctx context.Context, lastEventID int) (*http.Response, error)

type Middleware

type Middleware func(handler HandlerFunc) HandlerFunc

type MiddlewareHandler

type MiddlewareHandler interface {
	Process(handler HandlerFunc) HandlerFunc
}

type RawEvent

type RawEvent struct {
	EventType EventType
	Data      EventData
	EventID   uint64
}

func (*RawEvent) ParseAsAPIVersionEvent

func (d *RawEvent) ParseAsAPIVersionEvent() (APIVersionEvent, error)

func (*RawEvent) ParseAsBlockAddedEvent

func (d *RawEvent) ParseAsBlockAddedEvent() (BlockAddedEvent, error)

func (*RawEvent) ParseAsDeployAcceptedEvent

func (d *RawEvent) ParseAsDeployAcceptedEvent() (DeployAcceptedEvent, error)

func (*RawEvent) ParseAsDeployProcessedEvent

func (d *RawEvent) ParseAsDeployProcessedEvent() (DeployProcessedEvent, error)

func (*RawEvent) ParseAsFaultEvent

func (d *RawEvent) ParseAsFaultEvent() (FaultEvent, error)

func (*RawEvent) ParseAsFinalitySignatureEvent

func (d *RawEvent) ParseAsFinalitySignatureEvent() (FinalitySignatureEvent, error)

func (*RawEvent) ParseAsStepEvent

func (d *RawEvent) ParseAsStepEvent() (StepEvent, error)

func (*RawEvent) ParseAsTransactionAcceptedEvent

func (d *RawEvent) ParseAsTransactionAcceptedEvent() (TransactionAcceptedEvent, error)

func (*RawEvent) ParseAsTransactionExpiredEvent

func (d *RawEvent) ParseAsTransactionExpiredEvent() (TransactionExpiredEvent, error)

func (*RawEvent) ParseAsTransactionProcessedEvent

func (d *RawEvent) ParseAsTransactionProcessedEvent() (TransactionProcessedEvent, error)

type StepEvent

type StepEvent struct {
	Step StepPayload `json:"step"`
}

type StepPayload

type StepPayload struct {
	EraID            uint64        `json:"era_id"`
	ExecutionEffect  types.Effect  `json:"execution_effect"`
	ExecutionEffects types.Effects `json:"execution_effects"`
}

type Streamer

type Streamer struct {
	Connection *HttpConnection

	StreamReader *EventStreamReader
	// This duration allows the stream's buffer to stay in fill up completely state, which could indicate
	// that the workers are working too slowly and have not received any messages.
	// If this period elapses without any messages being received, an ErrFullStreamTimeoutError will be thrown.
	BlockedStreamLimit time.Duration
	// contains filtered or unexported fields
}

Streamer is a service that main responsibility is to fill the events' channel. The Connection management is isolated in this service. Service uses a HttpConnection to get HTTP response as a stream resource and provides it to the EventStreamReader, that supposes to parse bytes from the response's body. This design assumes to manage the connection and provide reconnection logic above of this service.

func DefaultStreamer

func DefaultStreamer(url string) *Streamer

DefaultStreamer is a shortcut to fast start with Streamer

func NewStreamer

func NewStreamer(
	client *HttpConnection,
	reader *EventStreamReader,
	blockedStreamLimit time.Duration,
) *Streamer

NewStreamer is the idiomatic way to create Streamer

func (*Streamer) FillStream

func (i *Streamer) FillStream(ctx context.Context, lastEventID int, stream chan<- RawEvent, errorsCh chan<- error) error

func (*Streamer) RegisterEvent

func (i *Streamer) RegisterEvent(eventType EventType)

type TransactionAcceptedEvent

type TransactionAcceptedEvent struct {
	TransactionAcceptedPayload TransactionAcceptedPayload `json:"TransactionAccepted"`
}

func (*TransactionAcceptedEvent) UnmarshalJSON

func (t *TransactionAcceptedEvent) UnmarshalJSON(data []byte) error

type TransactionAcceptedPayload

type TransactionAcceptedPayload struct {
	Transaction types.Transaction `json:"transaction"`
}

type TransactionExpiredEvent

type TransactionExpiredEvent struct {
	TransactionExpiredPayload TransactionExpiredPayload `json:"TransactionExpired"`
}

func (*TransactionExpiredEvent) UnmarshalJSON

func (t *TransactionExpiredEvent) UnmarshalJSON(data []byte) error

type TransactionExpiredPayload

type TransactionExpiredPayload struct {
	TransactionHash types.TransactionHash `json:"transaction_hash"`
}

type TransactionProcessedEvent

type TransactionProcessedEvent struct {
	TransactionProcessedPayload TransactionProcessedPayload `json:"TransactionProcessed"`
}

func (*TransactionProcessedEvent) UnmarshalJSON

func (t *TransactionProcessedEvent) UnmarshalJSON(data []byte) error

type TransactionProcessedPayload

type TransactionProcessedPayload struct {
	BlockHash       key.Hash              `json:"block_hash"`
	TransactionHash types.TransactionHash `json:"transaction_hash"`
	InitiatorAddr   types.InitiatorAddr   `json:"initiator_addr"`
	Timestamp       time.Time             `json:"timestamp"`
	TTL             string                `json:"ttl"`
	ExecutionResult types.ExecutionResult `json:"execution_result"`
	Messages        []types.Message       `json:"messages"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL