Documentation
¶
Index ¶
- Constants
- Variables
- func QueryBuilder(stmnt string, params ...interface{}) (*string, error)
- type ActiveStandbyPerQuery
- type ActiveStandbyPerQueryMap
- type BodyReader
- type ClusterNode
- type ClusterNodeMap
- type ClusterStatus
- type ClusterStatusResponse
- type Column
- type CommandStatus
- type ExecOptions
- type Field
- type Header
- type HostStoreLags
- type KsqlResponse
- type KsqlResponseSlice
- type KsqlServerInfo
- type KsqlServerInfoResponse
- type Ksqldb
- type KsqldbClient
- func (cl *KsqldbClient) Close()
- func (api *KsqldbClient) ClosePushQuery(ctx context.Context, queryID string) (err error)
- func (cl *KsqldbClient) EnableParseSQL(activate bool)
- func (api *KsqldbClient) Execute(ctx context.Context, options ExecOptions) (response *KsqlResponseSlice, err error)
- func (api *KsqldbClient) GetClusterStatus(ctx context.Context) (*ClusterStatusResponse, error)
- func (api *KsqldbClient) GetQueryStatus(ctx context.Context, commandId string) (*QueryStatus, error)
- func (api *KsqldbClient) GetServerInfo(ctx context.Context) (info *KsqlServerInfo, err error)
- func (api *KsqldbClient) GetServerStatus(ctx context.Context) (result *ServerStatusResponse, err error)
- func (cl *KsqldbClient) ParseSQLEnabled() bool
- func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)
- func (api *KsqldbClient) Push(ctx context.Context, options QueryOptions, rowChannel chan<- Row, ...) (err error)
- func (api *KsqldbClient) TerminateCluster(ctx context.Context, topics ...string) (result *KsqlResponseSlice, err error)
- func (api *KsqldbClient) ValidateProperty(ctx context.Context, property string) (*bool, error)
- type LagByPartition
- type LagByPartitionMap
- type NewClientFactory
- type NewClientWithOptionsFactory
- type Partition
- type PartitionMap
- type Payload
- type PropertyMap
- type Query
- type QueryDescription
- type QueryInfo
- type QueryOptions
- func (q *QueryOptions) AutoOffsetReset(offset StreamOffset) *QueryOptions
- func (o *QueryOptions) EmptyQuery() bool
- func (q *QueryOptions) EnablePullQueryTableScan(enable bool) *QueryOptions
- func (q *QueryOptions) SanitizeQuery()
- func (q *QueryOptions) SetIdleConnectionTimeout(seconds int64) *QueryOptions
- type QuerySlice
- type QueryStatus
- type RequestParams
- type RespUnmarshaller
- type Response
- type ResponseError
- type Row
- type Schema
- type ServerStatusResponse
- type SessionVariablesMap
- type SourceDescription
- type StateStoreLag
- type StateStoreLagMap
- type Stream
- type StreamOffset
- type StreamSlice
- type Table
- type TableSlice
- type TerminateClusterTopics
- type TopicPartition
Constants ¶
const ( QUERY_STREAM_ENDPOINT = "/query-stream" QUERY_ENDPOINT = "/query" INSERTS_ENDPOINT = "/inserts-stream" CLOSE_QUERY_ENDPOINT = "/close-query" KSQL_ENDPOINT = "/ksql" INFO_ENDPOINT = "/info" STATUS_ENDPOINT = "/status" HEALTHCHECK_ENDPOINT = "/healthcheck" CLUSTER_STATUS_ENDPOINT = "/clusterStatus" PROP_VALIDITY_ENPOINT = "/is_valid_property" TERMINATE_CLUSTER_ENDPOINT = "/ksql/terminate" )
const ( KSQL_QUERY_PULL_TABLE_SCAN_ENABLED = "ksql.query.pull.table.scan.enabled" KSQL_STREAMS_AUTO_OFFSET_RESET = "ksql.streams.auto.offset.reset" KSQL_IDLE_CONNECTION_TIMEOUT_SECONDS = "ksql.idle.connection.timeout.seconds" DEFAULT_IDLE_CONNECTION_TIMEOUT = int64(600) // 10 minutes )
const ( QBErr = "qbErr" QBUnsupportedType = "unsupported param type" EMPTY_STATEMENT = "empty ksql statement" )
Variables ¶
var (
ErrNotFound = errors.New("no result found")
)
Functions ¶
func QueryBuilder ¶ added in v0.0.3
QueryBuilder replaces ? with the correct types in the sql statement
Types ¶
type ActiveStandbyPerQuery ¶ added in v0.0.4
type ActiveStandbyPerQuery struct { ActiveStores []string ActivePartitions []TopicPartition StandByStore []string StandByPartitions []string }
type ActiveStandbyPerQueryMap ¶ added in v0.0.4
type ActiveStandbyPerQueryMap map[string]ActiveStandbyPerQuery
type ClusterNode ¶ added in v0.0.4
type ClusterNode struct { HostAlive bool LastStatusUpdateMs int64 HostStoreLags HostStoreLags ActiveStandbyPerQuery ActiveStandbyPerQueryMap }
type ClusterNodeMap ¶ added in v0.0.4
type ClusterNodeMap map[string]ClusterNode
type ClusterStatus ¶ added in v0.0.4
type ClusterStatus struct {
Host ClusterNodeMap `mapstructure:",remain"`
}
type ClusterStatusResponse ¶ added in v0.0.4
type ClusterStatusResponse struct {
ClusterStatus ClusterStatus
}
type CommandStatus ¶ added in v0.0.4
type ExecOptions ¶ added in v0.0.4
type ExecOptions struct { KSql string `json:"ksql"` StreamsProperties PropertyMap `json:"streamsProperties,omitempty"` SessionVariables SessionVariablesMap `json:"sessionVariables,omitempty"` CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"` }
func (*ExecOptions) EmptyQuery ¶ added in v0.0.4
func (o *ExecOptions) EmptyQuery() bool
func (*ExecOptions) SanitizeQuery ¶ added in v0.0.4
func (o *ExecOptions) SanitizeQuery()
type HostStoreLags ¶ added in v0.0.4
type HostStoreLags struct { StateStoreLags StateStoreLagMap UpdateTimeMs uint64 }
type KsqlResponse ¶ added in v0.0.4
type KsqlResponse struct { StatementText string Warnings []string Type string `json:"@type"` CommandId string `json:"commandId,omitempty"` CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"` // -1 if the operation was unsuccessful CommandStatus CommandStatus `json:"commandStatus,omitempty"` Stream *StreamSlice `json:"streams,omitempty"` Tables *TableSlice `json:"tables,omitempty"` Queries *QuerySlice `json:"queries,omitempty"` QueryDescription *QueryDescription `json:"queryDescription,omitempty"` SourceDescription *SourceDescription `json:"sourceDescription,omitempty"` }
type KsqlResponseSlice ¶ added in v0.0.4
type KsqlResponseSlice []KsqlResponse
type KsqlServerInfo ¶ added in v0.0.4
type KsqlServerInfo struct { Version string `json:"version"` KafkaClusterID string `json:"kafkaClusterId"` KsqlServiceID string `json:"ksqlServiceId"` ServerStatus string `json:"serverStatus,omitempty"` }
KsqlServerInfo
type KsqlServerInfoResponse ¶ added in v0.0.4
type KsqlServerInfoResponse struct {
KsqlServerInfo KsqlServerInfo `json:"KsqlServerInfo"`
}
KsqlServerInfoResponse
type Ksqldb ¶ added in v0.0.4
type Ksqldb interface { // GetServerInfo returns informations about the ksqlDB Server // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/ GetServerInfo() (*KsqlServerInfo, error) // GetServerStatus returns server status // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/ GetServerStatus() (*ServerStatusResponse, error) // GetClusterStatus // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/ GetClusterStatus() (*ClusterStatusResponse, error) // TerminateCluster terminates a ksqldb cluster - READ THE DOCS before you call this endpoint // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/terminate-endpoint/ TerminateCluster(topics ...string) (*KsqlResponseSlice, error) // ValidateProperty validates a property // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/is_valid_property-endpoint/ ValidateProperty(property string) (*bool, error) // Pull data // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/ Pull(context.Context, string, bool) (Header, Payload, error) // Push data // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/ Push(context.Context, string, chan<- Row, chan<- Header) error // ClosePushQuery terminates push query explicitly // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#terminating-queries ClosePushQuery(context.Context, string) error // GetQueryStatus // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/status-endpoint/ GetQueryStatus(string) (*QueryStatus, error) // EnableParseSQL enables/disables query parsing for push/pull/execute requests EnableParseSQL(bool) // ParseSQLEnabled returns true if query parsing is enabled or not ParseSQLEnabled() bool // Close closes net.HTTPClient transport Close() }
type KsqldbClient ¶ added in v0.0.4
type KsqldbClient struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(http net.HTTPClient) (KsqldbClient, error)
NewClient returns a new KsqldbClient with the given net.HTTPclient
func NewClientWithOptions ¶ added in v0.0.4
func NewClientWithOptions(options net.Options) (KsqldbClient, error)
NewClientWithOptions returns a new @KsqldbClient with Options
func (*KsqldbClient) Close ¶ added in v0.0.4
func (cl *KsqldbClient) Close()
Close closes the underlying http transport
func (*KsqldbClient) ClosePushQuery ¶ added in v0.0.5
func (api *KsqldbClient) ClosePushQuery(ctx context.Context, queryID string) (err error)
Close Query terminates push query explicitly
func (*KsqldbClient) EnableParseSQL ¶ added in v0.0.4
func (cl *KsqldbClient) EnableParseSQL(activate bool)
EnableParseSQL enables / disables sql parsing
func (*KsqldbClient) Execute ¶ added in v0.0.4
func (api *KsqldbClient) Execute(ctx context.Context, options ExecOptions) (response *KsqlResponseSlice, err error)
Execute will execute a ksqlDB statement. All statements, except those starting with SELECT, can be run on this endpoint. To run SELECT statements use use Push or Pull functions.
To use this function pass in the @ExecOptions.
Ref: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/
func (*KsqldbClient) GetClusterStatus ¶ added in v0.0.4
func (api *KsqldbClient) GetClusterStatus(ctx context.Context) (*ClusterStatusResponse, error)
GetClusterStatus returns the status of the cluster @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/
func (*KsqldbClient) GetQueryStatus ¶ added in v0.0.4
func (api *KsqldbClient) GetQueryStatus(ctx context.Context, commandId string) (*QueryStatus, error)
GetQueryStatus returns the current command status for a CREATE, DROP, or TERMINATE statement.
CREATE, DROP, and TERMINATE statements returns an object that indicates the current state of statement execution. A statement can be in one of the following states:
QUEUED, PARSING, EXECUTING: The statement was accepted by the server and is being processed. SUCCESS: The statement was successfully processed. ERROR: There was an error processing the statement. The statement was not executed.
TERMINATED: The query started by the statement was terminated. Only returned for CREATE STREAM|TABLE AS SELECT.
If a CREATE, DROP, or TERMINATE statement returns a command status with state QUEUED, PARSING, or EXECUTING from the @Execute endpoint, you can use the @GetQueryStatus endpoint to poll the status of the command.
func (*KsqldbClient) GetServerInfo ¶ added in v0.0.4
func (api *KsqldbClient) GetServerInfo(ctx context.Context) (info *KsqlServerInfo, err error)
ServerInfo gets the info for your server api net.KsqlHTTPClient
func (*KsqldbClient) GetServerStatus ¶ added in v0.0.4
func (api *KsqldbClient) GetServerStatus(ctx context.Context) (result *ServerStatusResponse, err error)
GetServerStatus provides provides information about your server
func (*KsqldbClient) ParseSQLEnabled ¶ added in v0.0.4
func (cl *KsqldbClient) ParseSQLEnabled() bool
ParseSQLEnabled returns true if sql parsing is enabled; false otherwise
func (*KsqldbClient) Pull ¶ added in v0.0.4
func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)
Pull queries are like "traditional" RDBMS queries in which the query terminates once the state has been queried.
To use this function pass in the the SQL query statement, and a boolean for whether full table scans should be enabled.
The function returns a ksqldb.Header and ksqldb.Payload which will hold one or more rows of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var col1 string var col2 float64 for _, row := range r { col1 = row[0].(string) col2 = row[1].(float64) ... Do other stuff with the data here } }
func (*KsqldbClient) Push ¶ added in v0.0.4
func (api *KsqldbClient) Push(ctx context.Context, options QueryOptions, rowChannel chan<- Row, headerChannel chan<- Header) (err error)
Push queries are continuous queries in which new events or changes to a table's state are pushed to the client. You can think of them as subscribing to a stream of changes.
Since push queries never end, this function expects a channel to which it can write new rows of data as and when they are received.
To use this function pass in a context, the SQL query statement, and two channels:
* ksqldb.Row - rows of data * ksqldb.Header - header (including column definitions).
If you don't want to block before receiving row data then make this channel buffered.
The channel is populated with ksqldb.Row which represents one row of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var DATA_TS float64 var ID string for row := range rc { if row != nil { DATA_TS = row[0].(float64) ID = row[1].(string)
func (*KsqldbClient) TerminateCluster ¶ added in v0.0.4
func (api *KsqldbClient) TerminateCluster(ctx context.Context, topics ...string) (result *KsqlResponseSlice, err error)
TerminateCluster terminates your cluster
This is a `Terminate` requests and the response is a terminate response see: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/#common-fields
server logs: INFO Received: ClusterTerminateRequest{deleteTopicList=[DOGS_BY_SIZE, dogs]} (io.confluent.ksql.rest.server.resources.KsqlResource:216) INFO Terminating the KSQL server. (io.confluent.ksql.rest.server.computation.CommandRunner:374) INFO 172.18.0.1 - - "POST /ksql/terminate HTTP/2.0" 200 242 "-" "Go-http-client/2.0" 43 (io.confluent.ksql.api.server.LoggingHandler:113) INFO The KSQL server was terminated. (io.confluent.ksql.rest.server.computation.CommandRunner:380) INFO Closing command store (io.confluent.ksql.rest.server.computation.CommandRunner:479)
func (*KsqldbClient) ValidateProperty ¶ added in v0.0.4
ValidateProperty resource tells you whether a property is prohibited from setting. If prohibited the ksqlDB server api returns a 400 error
type LagByPartition ¶ added in v0.0.4
type LagByPartition struct {
Partition Partition
}
type LagByPartitionMap ¶ added in v0.0.4
type LagByPartitionMap map[string]LagByPartition
type NewClientFactory ¶ added in v0.0.4
type NewClientFactory interface { // NewClient factory NewClient(net.HTTPClient) (*KsqldbClient, error) }
type NewClientWithOptionsFactory ¶ added in v0.0.4
type PartitionMap ¶ added in v0.0.4
type PropertyMap ¶ added in v0.0.4
type QueryDescription ¶ added in v0.0.4
type QueryOptions ¶ added in v0.0.4
type QueryOptions struct { Sql string `json:"sql"` Properties PropertyMap `json:"properties"` }
func NewDefaultPullQueryOptions ¶ added in v0.0.5
func NewDefaultPullQueryOptions(sql string) (options QueryOptions)
NewDefaultPullQueryOptions returns default QueryOptions for pull queries
- EnablePullQueryTableScan: true
func NewDefaultPushQueryOptions ¶ added in v0.0.5
func NewDefaultPushQueryOptions(sql string) (options QueryOptions)
NewDefaultPushQueryOptions returns default QueryOptions for push queries
- IdleConnectionTimeout: 600 seconds - AutoOffsetReset: "latest"
func (*QueryOptions) AutoOffsetReset ¶ added in v0.0.5
func (q *QueryOptions) AutoOffsetReset(offset StreamOffset) *QueryOptions
AutoOffsetReset sets the offset to latest | earliest
Determines what to do when there is no initial offset in Apache Kafka® or if the current offset doesn't exist on the server. The default value in ksqlDB is `latest`, which means all Kafka topics are read from the latest available offset.
func (*QueryOptions) EmptyQuery ¶ added in v0.0.4
func (o *QueryOptions) EmptyQuery() bool
EmptyQuery returns true if the query is empty
func (*QueryOptions) EnablePullQueryTableScan ¶ added in v0.0.4
func (q *QueryOptions) EnablePullQueryTableScan(enable bool) *QueryOptions
EnablePullQueryTableScan to control whether table scans are permitted when executing pull queries.
Without this enabled, only key lookups are used.
Enabling table scans removes various restrictions on what types of queries are allowed.
In particular, these pull query types are now permitted:
- No WHERE clause
- Range queries on keys
- Equality and range queries on non-key columns
- Multi-column key queries without specifying all key columns
There may be significant performance implications to using these types of queries, depending on the size of the data and other workloads running, so use this config carefully.
func (*QueryOptions) SanitizeQuery ¶ added in v0.0.4
func (q *QueryOptions) SanitizeQuery()
SanitizeQuery removes `\t` and `\n` from the query
func (*QueryOptions) SetIdleConnectionTimeout ¶ added in v0.0.5
func (q *QueryOptions) SetIdleConnectionTimeout(seconds int64) *QueryOptions
SetIdleConnectionTimeout sets the timeout for idle connections
A connection is idle if there is no data in either direction on that connection for the duration of the timeout.
This configuration can be helpful if you are issuing push queries that only receive data infrequently from the server, as otherwise those connections will be severed when the timeout (default 10 minutes) is hit.
Decreasing this timeout enables closing connections more aggressively to save server resources.
Increasing this timeout makes the server more tolerant of low-data volume use cases.
type QuerySlice ¶ added in v0.0.4
type QuerySlice []Query
type QueryStatus ¶ added in v0.0.4
type RequestParams ¶ added in v0.0.3
type RequestParams map[string]interface{}
type RespUnmarshaller ¶ added in v0.0.4
type ResponseError ¶ added in v0.0.4
type ResponseError struct { ErrType string `json:"@type"` ErrCode int `json:"error_code"` Message string `json:"message"` }
func (ResponseError) Error ¶ added in v0.0.4
func (e ResponseError) Error() string
Error gets the error string without new lines from ResponseError
type ServerStatusResponse ¶ added in v0.0.4
type ServerStatusResponse struct { IsHealthy *bool `json:"isHealthy"` Details struct { Metastore struct { IsHealthy *bool `json:"isHealthy"` } `json:"metastore"` Kafka struct { IsHealthy *bool `json:"isHealthy"` } `json:"kafka"` } `json:"details"` KsqlServiceID string `json:"ksqlServiceId"` }
ServerStatusResponse
type SessionVariablesMap ¶ added in v0.0.4
type SessionVariablesMap map[string]interface{}
type SourceDescription ¶ added in v0.0.7
type StateStoreLag ¶ added in v0.0.4
type StateStoreLag struct { LagByPartition LagByPartitionMap Size uint64 }
type StateStoreLagMap ¶ added in v0.0.4
type StateStoreLagMap map[string]StateStoreLag
type StreamOffset ¶ added in v0.0.5
type StreamOffset string
const ( EARLIEST StreamOffset = "earliest" LATEST StreamOffset = "latest" )
type StreamSlice ¶ added in v0.0.4
type StreamSlice []Stream
type TableSlice ¶ added in v0.0.4
type TableSlice []Table
type TerminateClusterTopics ¶ added in v0.0.4
type TerminateClusterTopics struct {
DeleteTopicList []string `json:"deleteTopicList,omitempty"`
}
func (*TerminateClusterTopics) Add ¶ added in v0.0.4
func (tct *TerminateClusterTopics) Add(topics ...string)
func (*TerminateClusterTopics) Size ¶ added in v0.0.4
func (tct *TerminateClusterTopics) Size() int