Documentation ¶
Overview ¶
Package signalflow contains a SignalFx SignalFlow client, which can be used to execute analytics jobs against the SignalFx backend.
Not all SignalFlow messages are handled at this time, and some will be silently dropped. All of the most important and useful ones are supported though.
The client will automatically attempt to reconnect to the backend if the connection is broken after a short delay.
SignalFlow is documented at https://dev.splunk.com/observability/docs/signalflow/messages.
Example ¶
package main import ( "context" "flag" "log" "os" "time" "github.com/signalfx/signalflow-client-go/signalflow" ) func main() { var ( realm string accessToken string program string duration time.Duration ) flag.StringVar(&realm, "realm", "", "SignalFx Realm") flag.StringVar(&accessToken, "access-token", "", "SignalFx Org Access Token") flag.StringVar(&program, "program", "data('cpu.utilization').count().publish()", "The SignalFlow program to execute") flag.DurationVar(&duration, "duration", 30*time.Second, "How long to run the job before sending Stop message") flag.Parse() if realm == "" || accessToken == "" { flag.Usage() os.Exit(1) } timer := time.After(duration) c, err := signalflow.NewClient( signalflow.StreamURLForRealm(realm), signalflow.AccessToken(accessToken), signalflow.OnError(func(err error) { log.Printf("Error in SignalFlow client: %v\n", err) })) if err != nil { log.Printf("Error creating client: %v\n", err) return } defer c.Close() log.Printf("Executing program for %v: %s\n", duration, program) comp, err := c.Execute(context.Background(), &signalflow.ExecuteRequest{ Program: program, }) if err != nil { log.Printf("Could not send execute request: %v\n", err) return } go func() { <-timer if err := comp.Stop(context.Background()); err != nil { log.Printf("Failed to stop computation: %v\n", err) } }() // If you want to limit how long to wait for the metadata to come in // you can use a timeout context. ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() resolution, err := comp.Resolution(ctx) log.Printf("Resolution: %v (err: %v)\n", resolution, err) maxDelay, err := comp.MaxDelay(ctx) log.Printf("Max Delay: %v (err: %v)\n", maxDelay, err) lag, err := comp.Lag(ctx) log.Printf("Detected Lag: %v (err: %v)\n", lag, err) go func() { for msg := range comp.Expirations() { log.Printf("Got expiration notice for TSID %s\n", msg.TSID) } }() go func() { for msg := range comp.Info() { log.Printf("Got info message %s\n", msg.MessageBlock.ContentsRaw) } }() for msg := range comp.Data() { // This will run as long as there is data, or until the websocket gets // disconnected. if len(msg.Payloads) == 0 { log.Println("No data available") continue } for _, pl := range msg.Payloads { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) meta, err := comp.TSIDMetadata(ctx, pl.TSID) cancel() if err != nil { log.Printf("Failed to get metadata for tsid %s: %v\n", pl.TSID, err) continue } log.Printf("%s (%s) %v %v: %v\n", meta.OriginatingMetric, meta.Metric, meta.CustomProperties, meta.InternalProperties, pl.Value()) } } err = comp.Err() if err != nil { log.Printf("Job error: %v", comp.Err()) return } log.Println("Job completed") }
Output:
Index ¶
- Variables
- type AuthRequest
- type AuthType
- type Client
- type ClientParam
- func AccessToken(token string) ClientParam
- func OnError(f OnErrorFunc) ClientParam
- func ReadTimeout(timeout time.Duration) ClientParam
- func StreamURL(streamEndpoint string) ClientParam
- func StreamURLForRealm(realm string) ClientParam
- func UserAgent(userAgent string) ClientParam
- func WriteTimeout(timeout time.Duration) ClientParam
- type Computation
- func (c *Computation) Data() <-chan *messages.DataMessage
- func (c *Computation) Detach(ctx context.Context) error
- func (c *Computation) DetachWithReason(ctx context.Context, reason string) error
- func (c *Computation) Err() error
- func (c *Computation) Events() <-chan *messages.EventMessage
- func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage
- func (c *Computation) GroupByMissingProperties(ctx context.Context) ([]string, error)
- func (c *Computation) Handle(ctx context.Context) (string, error)
- func (c *Computation) Info() <-chan *messages.InfoMessage
- func (c *Computation) Lag(ctx context.Context) (time.Duration, error)
- func (c *Computation) LimitSize(ctx context.Context) (int, error)
- func (c *Computation) MatchedNoTimeseriesQuery(ctx context.Context) (string, error)
- func (c *Computation) MatchedSize(ctx context.Context) (int, error)
- func (c *Computation) MaxDelay(ctx context.Context) (time.Duration, error)
- func (c *Computation) Resolution(ctx context.Context) (time.Duration, error)
- func (c *Computation) Stop(ctx context.Context) error
- func (c *Computation) StopWithReason(ctx context.Context, reason string) error
- func (c *Computation) TSIDMetadata(ctx context.Context, tsid idtool.ID) (*messages.MetadataProperties, error)
- type ComputationError
- type DetachRequest
- type DetachType
- type ExecuteRequest
- type ExecuteType
- type FakeBackend
- func (f *FakeBackend) AddProgramError(program string, errorMsg string)
- func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)
- func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)
- func (f *FakeBackend) Client() (*Client, error)
- func (f *FakeBackend) KillExistingConnections()
- func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)
- func (f *FakeBackend) Restart()
- func (f *FakeBackend) RunningJobsForProgram(program string) int
- func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (f *FakeBackend) SetLogger(logger *log.Logger)
- func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)
- func (f *FakeBackend) Start()
- func (f *FakeBackend) Stop()
- func (f *FakeBackend) URL() string
- type OnErrorFunc
- type StopRequest
- type StopType
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrMetadataTimeout = errors.New("metadata value did not come in time")
Functions ¶
This section is empty.
Types ¶
type AuthRequest ¶
type Client ¶
Client for SignalFlow via websockets (SSE is not currently supported).
func NewClient ¶
func NewClient(options ...ClientParam) (*Client, error)
NewClient makes a new SignalFlow client that will immediately try and connect to the SignalFlow backend.
func (*Client) Close ¶
func (c *Client) Close()
Close the client and shutdown any ongoing connections and goroutines. The client cannot be reused after Close. Calling any of the client methods after Close() is undefined and will likely result in a panic.
func (*Client) Detach ¶
func (c *Client) Detach(ctx context.Context, req *DetachRequest) error
Detach from a computation but keep it running. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Detach-from-a-computation.
func (*Client) Execute ¶
func (c *Client) Execute(ctx context.Context, req *ExecuteRequest) (*Computation, error)
Execute a SignalFlow job and return a channel upon which informational messages and data will flow. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-a-computation
func (*Client) Stop ¶
func (c *Client) Stop(ctx context.Context, req *StopRequest) error
Stop sends a job stop request message to the backend. It does not wait for jobs to actually be stopped. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Stop-a-computation
type ClientParam ¶
ClientParam is the common type of configuration functions for the SignalFlow client
func AccessToken ¶
func AccessToken(token string) ClientParam
AccessToken can be used to provide a SignalFx organization access token or user access token to the SignalFlow client.
func OnError ¶
func OnError(f OnErrorFunc) ClientParam
func ReadTimeout ¶
func ReadTimeout(timeout time.Duration) ClientParam
ReadTimeout sets the duration to wait between messages that come on the websocket. If the resolution of the job is very low, this should be increased.
func StreamURL ¶
func StreamURL(streamEndpoint string) ClientParam
StreamURL lets you set the full URL to the stream endpoint, including the path.
func StreamURLForRealm ¶
func StreamURLForRealm(realm string) ClientParam
StreamURLForRealm can be used to configure the websocket url for a specific SignalFx realm.
func UserAgent ¶
func UserAgent(userAgent string) ClientParam
UserAgent allows setting the `userAgent` field when authenticating to SignalFlow. This can be useful for accounting how many jobs are started from each client.
func WriteTimeout ¶
func WriteTimeout(timeout time.Duration) ClientParam
WriteTimeout sets the maximum duration to wait to send a single message when writing messages to the SignalFlow server over the WebSocket connection.
type Computation ¶
Computation is a single running SignalFlow job
func (*Computation) Data ¶
func (c *Computation) Data() <-chan *messages.DataMessage
Data returns the channel on which data messages come. This channel will be closed when the computation is finished. To prevent goroutine leaks, you should read all messages from this channel until it is closed.
func (*Computation) Detach ¶
func (c *Computation) Detach(ctx context.Context) error
Detach the computation on the backend
func (*Computation) DetachWithReason ¶
func (c *Computation) DetachWithReason(ctx context.Context, reason string) error
DetachWithReason detaches the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel
func (*Computation) Err ¶
func (c *Computation) Err() error
Err returns the last fatal error that caused the computation to stop, if any. Will be nil if the computation stopped in an expected manner.
func (*Computation) Events ¶
func (c *Computation) Events() <-chan *messages.EventMessage
Events returns a channel that receives event/alert messages from the signalflow computation.
func (*Computation) Expirations ¶
func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage
Expirations returns a channel that will be sent messages about expired TSIDs, i.e. time series that are no longer valid for this computation. This channel will be closed when the computation is finished. To prevent goroutine leaks, you should read all messages from this channel until it is closed.
func (*Computation) GroupByMissingProperties ¶
func (c *Computation) GroupByMissingProperties(ctx context.Context) ([]string, error)
GroupByMissingProperties are timeseries that don't contain the required dimensions. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) Handle ¶
func (c *Computation) Handle(ctx context.Context) (string, error)
Handle of the computation. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) Info ¶
func (c *Computation) Info() <-chan *messages.InfoMessage
Info returns a channel that receives info messages from the signalflow computation.
func (*Computation) Lag ¶
Lag detected for the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) LimitSize ¶
func (c *Computation) LimitSize(ctx context.Context) (int, error)
LimitSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) MatchedNoTimeseriesQuery ¶
func (c *Computation) MatchedNoTimeseriesQuery(ctx context.Context) (string, error)
MatchedNoTimeseriesQuery if it matched no active timeseries. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) MatchedSize ¶
func (c *Computation) MatchedSize(ctx context.Context) (int, error)
MatchedSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) MaxDelay ¶
MaxDelay detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) Resolution ¶
Resolution of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
func (*Computation) Stop ¶
func (c *Computation) Stop(ctx context.Context) error
Stop the computation on the backend.
func (*Computation) StopWithReason ¶
func (c *Computation) StopWithReason(ctx context.Context, reason string) error
StopWithReason stops the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel.
func (*Computation) TSIDMetadata ¶
func (c *Computation) TSIDMetadata(ctx context.Context, tsid idtool.ID) (*messages.MetadataProperties, error)
TSIDMetadata for a particular tsid. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.
type ComputationError ¶
ComputationError exposes the underlying metadata of a computation error
func (*ComputationError) Error ¶
func (e *ComputationError) Error() string
type DetachRequest ¶
type DetachRequest struct { // This should not be set manually Type DetachType `json:"type"` Channel string `json:"channel"` Reason string `json:"reason"` }
type DetachType ¶
type DetachType string
func (DetachType) MarshalJSON ¶
func (DetachType) MarshalJSON() ([]byte, error)
type ExecuteRequest ¶
type ExecuteRequest struct { // This should not be set manually Type ExecuteType `json:"type"` Program string `json:"program"` Channel string `json:"channel"` Start time.Time `json:"-"` Stop time.Time `json:"-"` Resolution time.Duration `json:"-"` MaxDelay time.Duration `json:"-"` StartMs int64 `json:"start"` StopMs int64 `json:"stop"` ResolutionMs int64 `json:"resolution"` MaxDelayMs int64 `json:"maxDelay"` Immediate bool `json:"immediate"` Timezone string `json:"timezone"` }
See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-message-properties for details on the fields.
func (ExecuteRequest) MarshalJSON ¶
func (er ExecuteRequest) MarshalJSON() ([]byte, error)
MarshalJSON does some assignments to allow using more native Go types for time/duration.
type ExecuteType ¶
type ExecuteType string
func (ExecuteType) MarshalJSON ¶
func (ExecuteType) MarshalJSON() ([]byte, error)
type FakeBackend ¶
FakeBackend is useful for testing, both internal to this package and externally. It supports basic messages and allows for the specification of metadata and data messages that map to a particular program.
func NewRunningFakeBackend ¶
func NewRunningFakeBackend() *FakeBackend
func (*FakeBackend) AddProgramError ¶
func (f *FakeBackend) AddProgramError(program string, errorMsg string)
func (*FakeBackend) AddProgramTSIDs ¶
func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)
func (*FakeBackend) AddTSIDMetadata ¶
func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)
func (*FakeBackend) Client ¶
func (f *FakeBackend) Client() (*Client, error)
func (*FakeBackend) KillExistingConnections ¶
func (f *FakeBackend) KillExistingConnections()
func (*FakeBackend) RemoveTSIDData ¶
func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)
func (*FakeBackend) Restart ¶
func (f *FakeBackend) Restart()
func (*FakeBackend) RunningJobsForProgram ¶
func (f *FakeBackend) RunningJobsForProgram(program string) int
RunningJobsForProgram returns how many currently executing jobs there are for a particular program text.
func (*FakeBackend) ServeHTTP ¶
func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*FakeBackend) SetLogger ¶
func (f *FakeBackend) SetLogger(logger *log.Logger)
SetLogger sets the internal logger.
func (*FakeBackend) SetTSIDFloatData ¶
func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)
func (*FakeBackend) Start ¶
func (f *FakeBackend) Start()
func (*FakeBackend) Stop ¶
func (f *FakeBackend) Stop()
func (*FakeBackend) URL ¶
func (f *FakeBackend) URL() string
type OnErrorFunc ¶
type OnErrorFunc func(err error)