streams

package module
v0.0.0-...-fadec25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: MIT Imports: 20 Imported by: 0

README

streams

import "github.com/smartcontractkit/data-streams-sdk/go"

Overview

Package streams implements a client library for the Data Streams API providing a domain oriented abstraction for both report Streams and point in time retrieval with fault tolerant capabilities.

Index

Examples
Package files

auth.go client.go config.go doc.go endpoints.go stream.go

Variables

var (
    ErrStreamClosed = fmt.Errorf("client: use of closed Stream")
)

func LogPrintf

func LogPrintf(format string, a ...any)

LogPrintf implements a LogFunction using fmt.Printf

type Client

type Client interface {
    // GetFeeds lists all feeds available to this client.
    GetFeeds(ctx context.Context) (r []*feed.Feed, err error)

    // GetLatestReport fetches the latest report available for the given feedID.
    GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error)

    // GetReports fetches the reports for the given feedIDs and timestamp.
    GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error)

    // GetReportPage paginates the reports for the given feedID and start timestamp.
    GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error)

    // Stream creates realtime report stream for the given feedIDs.
    Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error)
}

Client is the data streams client interface.

Example Client:
cfg := streams.Config{
    ApiKey:    "mykey",
    ApiSecret: "mysecret",
    RestURL:   "https://streams.url",
    WsURL:     "https://streams.url",
    Logger:    streams.LogPrintf,
}

streamsClient, err := streams.New(cfg)
if err != nil {
    streams.LogPrintf("error creating client: %s", err)
    os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
availableFeeds, err := streamsClient.GetFeeds(ctx)
cancel()
if err != nil {
    streams.LogPrintf("error fetching feeds: %s", err)
    os.Exit(1)
}

f := availableFeeds[0]
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
report, err := streamsClient.GetLatestReport(ctx, f.FeedID)
cancel()
if err != nil {
    streams.LogPrintf("error fetching latest report: %s", err)
    os.Exit(1)
}

streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
    report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
reports, err := streamsClient.GetReports(
    ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID}, uint64(time.Now().Unix())-3)
cancel()
if err != nil {
    streams.LogPrintf("error fetching reports: %s", err)
    os.Exit(1)
}

for _, report = range reports {
    decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
    if err != nil {
        streams.LogPrintf("error decoding decoded: %s", err)
        os.Exit(1)
    }

    streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
        report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)
    streams.LogPrintf(
        "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
        decoded.Data.FeedID.String(),
        decoded.Data.FeedID.Version(),
        decoded.Data.Bid.String(),
        decoded.Data.Ask.String(),
        decoded.Data.BenchmarkPrice.String(),
        decoded.Data.LinkFee.String(),
        decoded.Data.NativeFee.String(),
        decoded.Data.ValidFromTimestamp,
        decoded.Data.ExpiresAt,
    )
}
func New
func New(cfg Config) (c Client, err error)

New creates a new Client with the given config. New does not initialize any connection to the Data Streams service.

type Config

type Config struct {
    ApiKey    string // Client Api key
    ApiSecret string // Client Api secret
    RestURL   string // Rest Api url

    WsURL string // Websocket Api url

    WsHA               bool                          // Use concurrent connections to multiple Streams servers
    WsMaxReconnect     int                           // Maximum number of reconnection attempts for Stream underlying connections
    LogDebug           bool                          // Log debug information
    InsecureSkipVerify bool                          // Skip server certificate chain and host name verification
    Logger             func(format string, a ...any) // Logger function

    // InspectHttp intercepts http responses for rest requests.
    // The response object must not be modified.
    InspectHttpResponse func(*http.Response)
    // contains filtered or unexported fields
}

Config specifies the client configuration and dependencies. If specified the Logger function will be used to log informational client activity.

type CtxKey

type CtxKey string

CtxKey type for context values

const (

    // CustomHeadersCtxKey is used as key in the context.Context object
    // to pass in a custom http headers in a http.Header to be used by the client.
    // Custom header values will overwrite client headers if they have the same key.
    CustomHeadersCtxKey CtxKey = "CustomHeaders"
)

type ReportPage

type ReportPage struct {
    Reports    []*ReportResponse
    NextPageTS uint64
}

ReportPage implements the server pagination response. NextPageTS is the timestamp to be used when requesting the next page.

type ReportResponse

type ReportResponse struct {
    FeedID                feed.ID `json:"feedID"`
    FullReport            []byte  `json:"fullReport"`
    ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
    ObservationsTimestamp uint64  `json:"observationsTimestamp"`
}

ReportResponse implements the report envelope that contains the full report payload, its FeedID and timestamps. For decoding the Report Payload use report.Decode().

func (*ReportResponse) MarshalJSON
func (r *ReportResponse) MarshalJSON() ([]byte, error)
func (*ReportResponse) String
func (r *ReportResponse) String() (s string)
func (*ReportResponse) UnmarshalJSON
func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)

type Stats

type Stats struct {
    Accepted              uint64 // Total number of accepted reports
    Deduplicated          uint64 // Total number of deduplicated reports when in HA
    TotalReceived         uint64 // Total number of received reports
    PartialReconnects     uint64 // Total number of partial reconnects when in HA
    FullReconnects        uint64 // Total number of full reconnects
    ConfiguredConnections uint64 // Number of configured connections if in HA
    ActiveConnections     uint64 // Current number of active connections
}

Stats for the Stream

func (Stats) String
func (s Stats) String() (st string)

type Stream

type Stream interface {
    // Read the next available report on the Stream.
    // Read blocks until a report is received, the context is canceled or
    // all underlying connections are in a error state.
    Read(context.Context) (*ReportResponse, error)

    // Stats return basic stats about the Stream.
    Stats() Stats

    // Close the Stream. Is the caller responsibility to call close when
    // the stream is no longer needed.
    Close() error
}

Stream represents a realtime report stream. Safe for concurrent usage.

The Stream will maintain at least 2 concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.

Example Stream:
cfg := streams.Config{
    ApiKey:    "mykey",
    ApiSecret: "mysecret",
    RestURL:   "https://streams.url",
    WsURL:     "https://streams.url",
    Logger:    streams.LogPrintf,
}

client, err := streams.New(cfg)
if err != nil {
    streams.LogPrintf("error creating client: %s", err)
    os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
availableFeeds, err := client.GetFeeds(ctx)
cancel()
if err != nil {
    streams.LogPrintf("error fetching feeds: %s", err)
    os.Exit(1)
}

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
stream, err := client.Stream(
    ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID})
cancel()
if err != nil {
    streams.LogPrintf("error subscribing: %s", err)
    os.Exit(1)
}

defer stream.Close()
for stream.Stats().Accepted < 100 {
    report, err := stream.Read(context.Background())
    if err != nil {
        streams.LogPrintf("error reading from stream: %s", err)
        os.Exit(1)
    }

    streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
        report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)

    decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
    if err != nil {
        streams.LogPrintf("error decoding decoded: %s", err)
        os.Exit(1)
    }

    streams.LogPrintf(
        "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
        decoded.Data.FeedID.String(),
        decoded.Data.FeedID.Version(),
        decoded.Data.Bid.String(),
        decoded.Data.Ask.String(),
        decoded.Data.BenchmarkPrice.String(),
        decoded.Data.LinkFee.String(),
        decoded.Data.NativeFee.String(),
        decoded.Data.ValidFromTimestamp,
        decoded.Data.ExpiresAt,
    )

    streams.LogPrintf("stream stats: %s", stream.Stats().String())
}

Generated by godoc2md

Documentation

Overview

Package streams implements a client library for the Data Streams API providing a domain oriented abstraction for both report Streams and point in time retrieval with fault tolerant capabilities.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamClosed = fmt.Errorf("client: use of closed Stream")
)

Functions

func LogPrintf

func LogPrintf(format string, a ...any)

LogPrintf implements a LogFunction using fmt.Printf

Types

type Client

type Client interface {
	// GetFeeds lists all feeds available to this client.
	GetFeeds(ctx context.Context) (r []*feed.Feed, err error)

	// GetLatestReport fetches the latest report available for the given feedID.
	GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error)

	// GetReports fetches the reports for the given feedIDs and timestamp.
	GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error)

	// GetReportPage paginates the reports for the given feedID and start timestamp.
	GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error)

	// Stream creates realtime report stream for the given feedIDs.
	Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error)

	// Stream creates realtime report stream for the given feedIDs.
	StreamWithStatusCallback(ctx context.Context, feedIDs []feed.ID,
		connStatusCallback func(isConnected bool, host string, origin string)) (Stream, error)
}

Client is the data streams client interface.

Example
package main

import (
	"context"
	"os"
	"time"

	streams "github.com/smartcontractkit/data-streams-sdk/go"
	"github.com/smartcontractkit/data-streams-sdk/go/feed"

	streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report"

	v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"
)

func main() {
	cfg := streams.Config{
		ApiKey:    "mykey",
		ApiSecret: "mysecret",
		RestURL:   "https://streams.url",
		WsURL:     "https://streams.url",
		Logger:    streams.LogPrintf,
	}

	streamsClient, err := streams.New(cfg)
	if err != nil {
		streams.LogPrintf("error creating client: %s", err)
		os.Exit(1)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	availableFeeds, err := streamsClient.GetFeeds(ctx)
	cancel()
	if err != nil {
		streams.LogPrintf("error fetching feeds: %s", err)
		os.Exit(1)
	}

	f := availableFeeds[0]
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	report, err := streamsClient.GetLatestReport(ctx, f.FeedID)
	cancel()
	if err != nil {
		streams.LogPrintf("error fetching latest report: %s", err)
		os.Exit(1)
	}

	streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
		report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)

	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	reports, err := streamsClient.GetReports(
		ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID}, uint64(time.Now().Unix())-3)
	cancel()
	if err != nil {
		streams.LogPrintf("error fetching reports: %s", err)
		os.Exit(1)
	}

	for _, report = range reports {
		decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
		if err != nil {
			streams.LogPrintf("error decoding decoded: %s", err)
			os.Exit(1)
		}

		streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
			report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)
		streams.LogPrintf(
			"FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
			decoded.Data.FeedID.String(),
			decoded.Data.FeedID.Version(),
			decoded.Data.Bid.String(),
			decoded.Data.Ask.String(),
			decoded.Data.BenchmarkPrice.String(),
			decoded.Data.LinkFee.String(),
			decoded.Data.NativeFee.String(),
			decoded.Data.ValidFromTimestamp,
			decoded.Data.ExpiresAt,
		)
	}
}
Output:

func New

func New(cfg Config) (c Client, err error)

New creates a new Client with the given config. New does not initialize any connection to the Data Streams service.

type Config

type Config struct {
	ApiKey    string // Client Api key
	ApiSecret string // Client Api secret
	RestURL   string // Rest Api url

	WsURL string // Websocket Api url

	WsHA               bool                          // Use concurrent connections to multiple Streams servers
	WsMaxReconnect     int                           // Maximum number of reconnection attempts for Stream underlying connections
	LogDebug           bool                          // Log debug information
	InsecureSkipVerify bool                          // Skip server certificate chain and host name verification
	Logger             func(format string, a ...any) // Logger function

	// InspectHttp intercepts http responses for rest requests.
	// The response object must not be modified.
	InspectHttpResponse func(*http.Response)
	// contains filtered or unexported fields
}

Config specifies the client configuration and dependencies. If specified the Logger function will be used to log informational client activity.

type CtxKey

type CtxKey string

CtxKey type for context values

const (

	// CustomHeadersCtxKey is used as key in the context.Context object
	// to pass in a custom http headers in a http.Header to be used by the client.
	// Custom header values will overwrite client headers if they have the same key.
	CustomHeadersCtxKey CtxKey = "CustomHeaders"
)

type ReportPage

type ReportPage struct {
	Reports    []*ReportResponse
	NextPageTS uint64
}

ReportPage implements the server pagination response. NextPageTS is the timestamp to be used when requesting the next page.

type ReportResponse

type ReportResponse struct {
	FeedID                feed.ID `json:"feedID"`
	FullReport            []byte  `json:"fullReport"`
	ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
	ObservationsTimestamp uint64  `json:"observationsTimestamp"`
}

ReportResponse implements the report envelope that contains the full report payload, its FeedID and timestamps. For decoding the Report Payload use report.Decode().

func (*ReportResponse) MarshalJSON

func (r *ReportResponse) MarshalJSON() ([]byte, error)

func (*ReportResponse) String

func (r *ReportResponse) String() (s string)

func (*ReportResponse) UnmarshalJSON

func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)

type Stats

type Stats struct {
	Accepted              uint64 // Total number of accepted reports
	Deduplicated          uint64 // Total number of deduplicated reports when in HA
	TotalReceived         uint64 // Total number of received reports
	PartialReconnects     uint64 // Total number of partial reconnects when in HA
	FullReconnects        uint64 // Total number of full reconnects
	ConfiguredConnections uint64 // Number of configured connections if in HA
	ActiveConnections     uint64 // Current number of active connections
}

Stats for the Stream

func (Stats) String

func (s Stats) String() (st string)

type Stream

type Stream interface {
	// Read the next available report on the Stream.
	// Read blocks until a report is received, the context is canceled or
	// all underlying connections are in a error state.
	Read(context.Context) (*ReportResponse, error)

	// Stats return basic stats about the Stream.
	Stats() Stats

	// Close the Stream. Is the caller responsibility to call close when
	// the stream is no longer needed.
	Close() error
}

Stream represents a realtime report stream. Safe for concurrent usage.

The Stream will maintain at least 2 concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.

Example
package main

import (
	"context"
	"os"
	"time"

	streams "github.com/smartcontractkit/data-streams-sdk/go"
	"github.com/smartcontractkit/data-streams-sdk/go/feed"

	streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report"

	v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"
)

func main() {
	cfg := streams.Config{
		ApiKey:    "mykey",
		ApiSecret: "mysecret",
		RestURL:   "https://streams.url",
		WsURL:     "https://streams.url",
		Logger:    streams.LogPrintf,
	}

	client, err := streams.New(cfg)
	if err != nil {
		streams.LogPrintf("error creating client: %s", err)
		os.Exit(1)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	availableFeeds, err := client.GetFeeds(ctx)
	cancel()
	if err != nil {
		streams.LogPrintf("error fetching feeds: %s", err)
		os.Exit(1)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	stream, err := client.StreamWithStatusCallback(
		ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID},
		func(isConnected bool, host string, origin string) {
			streams.LogPrintf("Host: %s, Origin: %s, isConnected: %s", host, origin, isConnected)
		})
	cancel()
	if err != nil {
		streams.LogPrintf("error subscribing: %s", err)
		os.Exit(1)
	}

	defer stream.Close()
	for stream.Stats().Accepted < 100 {
		report, err := stream.Read(context.Background())
		if err != nil {
			streams.LogPrintf("error reading from stream: %s", err)
			os.Exit(1)
		}

		streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
			report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)

		decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
		if err != nil {
			streams.LogPrintf("error decoding decoded: %s", err)
			os.Exit(1)
		}

		streams.LogPrintf(
			"FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
			decoded.Data.FeedID.String(),
			decoded.Data.FeedID.Version(),
			decoded.Data.Bid.String(),
			decoded.Data.Ask.String(),
			decoded.Data.BenchmarkPrice.String(),
			decoded.Data.LinkFee.String(),
			decoded.Data.NativeFee.String(),
			decoded.Data.ValidFromTimestamp,
			decoded.Data.ExpiresAt,
		)

		streams.LogPrintf("stream stats: %s", stream.Stats().String())
	}
}
Output:

Directories

Path Synopsis
Package report implements the Streams feed and feed schema version types.
Package report implements the Streams feed and feed schema version types.
Package report implements the Streams report schema, type and has sub packages that implements the report data schema and types.
Package report implements the Streams report schema, type and has sub packages that implements the report data schema and types.
v1
v2
v3
v4

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL