Documentation ¶
Overview ¶
Package streams implements a client library for the Data Streams API providing a domain oriented abstraction for both report Streams and point in time retrieval with fault tolerant capabilities.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrStreamClosed = fmt.Errorf("client: use of closed Stream")
)
Functions ¶
Types ¶
type Client ¶
type Client interface { // GetFeeds lists all feeds available to this client. GetFeeds(ctx context.Context) (r []*feed.Feed, err error) // GetLatestReport fetches the latest report available for the given feedID. GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error) // GetReports fetches the reports for the given feedIDs and timestamp. GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error) // GetReportPage paginates the reports for the given feedID and start timestamp. GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error) // Stream creates realtime report stream for the given feedIDs. Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error) // Stream creates realtime report stream for the given feedIDs. StreamWithStatusCallback(ctx context.Context, feedIDs []feed.ID, connStatusCallback func(isConnected bool, host string, origin string)) (Stream, error) }
Client is the data streams client interface.
Example ¶
package main import ( "context" "os" "time" streams "github.com/smartcontractkit/data-streams-sdk/go" "github.com/smartcontractkit/data-streams-sdk/go/feed" streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report" v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3" ) func main() { cfg := streams.Config{ ApiKey: "mykey", ApiSecret: "mysecret", RestURL: "https://streams.url", WsURL: "https://streams.url", Logger: streams.LogPrintf, } streamsClient, err := streams.New(cfg) if err != nil { streams.LogPrintf("error creating client: %s", err) os.Exit(1) } ctx, cancel := context.WithTimeout(context.Background(), time.Second) availableFeeds, err := streamsClient.GetFeeds(ctx) cancel() if err != nil { streams.LogPrintf("error fetching feeds: %s", err) os.Exit(1) } f := availableFeeds[0] ctx, cancel = context.WithTimeout(context.Background(), time.Second) report, err := streamsClient.GetLatestReport(ctx, f.FeedID) cancel() if err != nil { streams.LogPrintf("error fetching latest report: %s", err) os.Exit(1) } streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d", report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp) ctx, cancel = context.WithTimeout(context.Background(), time.Second) reports, err := streamsClient.GetReports( ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID}, uint64(time.Now().Unix())-3) cancel() if err != nil { streams.LogPrintf("error fetching reports: %s", err) os.Exit(1) } for _, report = range reports { decoded, err := streamsReport.Decode[v3.Data](report.FullReport) if err != nil { streams.LogPrintf("error decoding decoded: %s", err) os.Exit(1) } streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d", report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp) streams.LogPrintf( "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d", decoded.Data.FeedID.String(), decoded.Data.FeedID.Version(), decoded.Data.Bid.String(), decoded.Data.Ask.String(), decoded.Data.BenchmarkPrice.String(), decoded.Data.LinkFee.String(), decoded.Data.NativeFee.String(), decoded.Data.ValidFromTimestamp, decoded.Data.ExpiresAt, ) } }
Output:
type Config ¶
type Config struct { ApiKey string // Client Api key ApiSecret string // Client Api secret RestURL string // Rest Api url WsURL string // Websocket Api url WsHA bool // Use concurrent connections to multiple Streams servers WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections LogDebug bool // Log debug information InsecureSkipVerify bool // Skip server certificate chain and host name verification Logger func(format string, a ...any) // Logger function // InspectHttp intercepts http responses for rest requests. // The response object must not be modified. InspectHttpResponse func(*http.Response) // contains filtered or unexported fields }
Config specifies the client configuration and dependencies. If specified the Logger function will be used to log informational client activity.
type CtxKey ¶
type CtxKey string
CtxKey type for context values
const ( // CustomHeadersCtxKey is used as key in the context.Context object // to pass in a custom http headers in a http.Header to be used by the client. // Custom header values will overwrite client headers if they have the same key. CustomHeadersCtxKey CtxKey = "CustomHeaders" )
type ReportPage ¶
type ReportPage struct { Reports []*ReportResponse NextPageTS uint64 }
ReportPage implements the server pagination response. NextPageTS is the timestamp to be used when requesting the next page.
type ReportResponse ¶
type ReportResponse struct { FeedID feed.ID `json:"feedID"` FullReport []byte `json:"fullReport"` ValidFromTimestamp uint64 `json:"validFromTimestamp"` ObservationsTimestamp uint64 `json:"observationsTimestamp"` }
ReportResponse implements the report envelope that contains the full report payload, its FeedID and timestamps. For decoding the Report Payload use report.Decode().
func (*ReportResponse) MarshalJSON ¶
func (r *ReportResponse) MarshalJSON() ([]byte, error)
func (*ReportResponse) String ¶
func (r *ReportResponse) String() (s string)
func (*ReportResponse) UnmarshalJSON ¶
func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats ¶
type Stats struct { Accepted uint64 // Total number of accepted reports Deduplicated uint64 // Total number of deduplicated reports when in HA TotalReceived uint64 // Total number of received reports PartialReconnects uint64 // Total number of partial reconnects when in HA FullReconnects uint64 // Total number of full reconnects ConfiguredConnections uint64 // Number of configured connections if in HA ActiveConnections uint64 // Current number of active connections }
Stats for the Stream
type Stream ¶
type Stream interface { // Read the next available report on the Stream. // Read blocks until a report is received, the context is canceled or // all underlying connections are in a error state. Read(context.Context) (*ReportResponse, error) // Stats return basic stats about the Stream. Stats() Stats // Close the Stream. Is the caller responsibility to call close when // the stream is no longer needed. Close() error }
Stream represents a realtime report stream. Safe for concurrent usage.
The Stream will maintain at least 2 concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.
Example ¶
package main import ( "context" "os" "time" streams "github.com/smartcontractkit/data-streams-sdk/go" "github.com/smartcontractkit/data-streams-sdk/go/feed" streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report" v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3" ) func main() { cfg := streams.Config{ ApiKey: "mykey", ApiSecret: "mysecret", RestURL: "https://streams.url", WsURL: "https://streams.url", Logger: streams.LogPrintf, } client, err := streams.New(cfg) if err != nil { streams.LogPrintf("error creating client: %s", err) os.Exit(1) } ctx, cancel := context.WithTimeout(context.Background(), time.Second) availableFeeds, err := client.GetFeeds(ctx) cancel() if err != nil { streams.LogPrintf("error fetching feeds: %s", err) os.Exit(1) } ctx, cancel = context.WithTimeout(context.Background(), time.Second) stream, err := client.StreamWithStatusCallback( ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID}, func(isConnected bool, host string, origin string) { streams.LogPrintf("Host: %s, Origin: %s, isConnected: %s", host, origin, isConnected) }) cancel() if err != nil { streams.LogPrintf("error subscribing: %s", err) os.Exit(1) } defer stream.Close() for stream.Stats().Accepted < 100 { report, err := stream.Read(context.Background()) if err != nil { streams.LogPrintf("error reading from stream: %s", err) os.Exit(1) } streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d", report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp) decoded, err := streamsReport.Decode[v3.Data](report.FullReport) if err != nil { streams.LogPrintf("error decoding decoded: %s", err) os.Exit(1) } streams.LogPrintf( "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d", decoded.Data.FeedID.String(), decoded.Data.FeedID.Version(), decoded.Data.Bid.String(), decoded.Data.Ask.String(), decoded.Data.BenchmarkPrice.String(), decoded.Data.LinkFee.String(), decoded.Data.NativeFee.String(), decoded.Data.ValidFromTimestamp, decoded.Data.ExpiresAt, ) streams.LogPrintf("stream stats: %s", stream.Stats().String()) } }
Output:
Directories ¶
Path | Synopsis |
---|---|
Package report implements the Streams feed and feed schema version types.
|
Package report implements the Streams feed and feed schema version types. |
Package report implements the Streams report schema, type and has sub packages that implements the report data schema and types.
|
Package report implements the Streams report schema, type and has sub packages that implements the report data schema and types. |