Documentation ¶
Index ¶
- Constants
- Variables
- func QueryBuilder(stmnt string, params ...interface{}) (*string, error)
- type ActiveStandbyPerQuery
- type ActiveStandbyPerQueryMap
- type BodyReader
- type ClusterNode
- type ClusterNodeMap
- type ClusterStatus
- type ClusterStatusResponse
- type Column
- type CommandStatus
- type ExecOptions
- type Field
- type Header
- type HostStoreLags
- type KsqlResponse
- type KsqlResponseSlice
- type KsqlServerInfo
- type KsqlServerInfoResponse
- type Ksqldb
- type KsqldbClient
- func (cl *KsqldbClient) Close()
- func (api *KsqldbClient) ClosePushQuery(ctx context.Context, queryID string) error
- func (cl *KsqldbClient) EnableParseSQL(activate bool)
- func (api *KsqldbClient) Execute(options ExecOptions) (*KsqlResponseSlice, error)
- func (api *KsqldbClient) GetClusterStatus() (*ClusterStatusResponse, error)
- func (api *KsqldbClient) GetQueryStatus(commandId string) (*QueryStatus, error)
- func (api *KsqldbClient) GetServerInfo() (*KsqlServerInfo, error)
- func (api *KsqldbClient) GetServerStatus() (*ServerStatusResponse, error)
- func (cl *KsqldbClient) ParseSQLEnabled() bool
- func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)
- func (api *KsqldbClient) Push(ctx context.Context, options QueryOptions, rowChannel chan<- Row, ...) (err error)
- func (api *KsqldbClient) TerminateCluster(topics ...string) (*KsqlResponseSlice, error)
- func (api *KsqldbClient) ValidateProperty(property string) (*bool, error)
- type LagByPartition
- type LagByPartitionMap
- type NewClientFactory
- type NewClientWithOptionsFactory
- type Partition
- type PartitionMap
- type Payload
- type PropertyMap
- type Query
- type QueryDescription
- type QueryOptions
- func (q *QueryOptions) AutoOffsetReset(offset StreamOffset) *QueryOptions
- func (o *QueryOptions) EmptyQuery() bool
- func (q *QueryOptions) EnablePullQueryTableScan(enable bool) *QueryOptions
- func (q *QueryOptions) SanitizeQuery()
- func (q *QueryOptions) SetIdleConnectionTimeout(seconds int64) *QueryOptions
- type QuerySlice
- type QueryStatus
- type RequestParams
- type RespUnmarshaller
- type Response
- type ResponseError
- type Row
- type Schema
- type ServerStatusResponse
- type SessionVariablesMap
- type StateStoreLag
- type StateStoreLagMap
- type Stream
- type StreamOffset
- type StreamSlice
- type Table
- type TableSlice
- type TerminateClusterTopics
- type TopicPartition
Constants ¶
const ( QUERY_STREAM_ENDPOINT = "/query-stream" QUERY_ENDPOINT = "/query" INSERTS_ENDPOINT = "/inserts-stream" CLOSE_QUERY_ENDPOINT = "/close-query" KSQL_ENDPOINT = "/ksql" INFO_ENDPOINT = "/info" STATUS_ENDPOINT = "/status" HEALTHCHECK_ENDPOINT = "/healthcheck" CLUSTER_STATUS_ENDPOINT = "/clusterStatus" PROP_VALIDITY_ENPOINT = "/is_valid_property" TERMINATE_CLUSTER_ENDPOINT = "/ksql/terminate" )
const ( KSQL_QUERY_PULL_TABLE_SCAN_ENABLED = "ksql.query.pull.table.scan.enabled" KSQL_STREAMS_AUTO_OFFSET_RESET = "ksql.streams.auto.offset.reset" KSQL_IDLE_CONNECTION_TIMEOUT_SECONDS = "ksql.idle.connection.timeout.seconds" DEFAULT_IDLE_CONNECTION_TIMEOUT = int64(600) // 10 minutes )
const ( QBErr = "qbErr" QBUnsupportedType = "unsupported param type" EMPTY_STATEMENT = "empty ksql statement" )
Variables ¶
var (
ErrNotFound = errors.New("no result found")
)
Functions ¶
func QueryBuilder ¶
QueryBuilder replaces ? with the correct types in the sql statement
Types ¶
type ActiveStandbyPerQuery ¶
type ActiveStandbyPerQuery struct { ActiveStores []string ActivePartitions []TopicPartition StandByStore []string StandByPartitions []string }
type ActiveStandbyPerQueryMap ¶
type ActiveStandbyPerQueryMap map[string]ActiveStandbyPerQuery
type ClusterNode ¶
type ClusterNode struct { HostAlive bool LastStatusUpdateMs int64 HostStoreLags HostStoreLags ActiveStandbyPerQuery ActiveStandbyPerQueryMap }
type ClusterNodeMap ¶
type ClusterNodeMap map[string]ClusterNode
type ClusterStatus ¶
type ClusterStatus struct {
Host ClusterNodeMap `mapstructure:",remain"`
}
type ClusterStatusResponse ¶
type ClusterStatusResponse struct {
ClusterStatus ClusterStatus
}
type CommandStatus ¶
type ExecOptions ¶
type ExecOptions struct { KSql string `json:"ksql"` StreamsProperties PropertyMap `json:"streamsProperties,omitempty"` SessionVariables SessionVariablesMap `json:"sessionVariables,omitempty"` CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"` }
func (*ExecOptions) EmptyQuery ¶
func (o *ExecOptions) EmptyQuery() bool
func (*ExecOptions) SanitizeQuery ¶
func (o *ExecOptions) SanitizeQuery()
type HostStoreLags ¶
type HostStoreLags struct { StateStoreLags StateStoreLagMap UpdateTimeMs uint64 }
type KsqlResponse ¶
type KsqlResponse struct { StatementText string Warnings []string Type string `json:"@type"` CommandId string `json:"commandId,omitempty"` CommandSequenceNumber int64 `json:"commandSequenceNumber,omitempty"` // -1 if the operation was unsuccessful CommandStatus CommandStatus `json:"commandStatus,omitempty"` Stream *StreamSlice `json:"streams,omitempty"` Tables *TableSlice `json:"tables,omitempty"` Queries *QuerySlice `json:"queries,omitempty"` QueryDescription *QueryDescription `json:"queryDescription,omitempty"` }
type KsqlResponseSlice ¶
type KsqlResponseSlice []KsqlResponse
type KsqlServerInfo ¶
type KsqlServerInfo struct { Version string `json:"version"` KafkaClusterID string `json:"kafkaClusterId"` KsqlServiceID string `json:"ksqlServiceId"` ServerStatus string `json:"serverStatus,omitempty"` }
KsqlServerInfo
type KsqlServerInfoResponse ¶
type KsqlServerInfoResponse struct {
KsqlServerInfo KsqlServerInfo `json:"KsqlServerInfo"`
}
KsqlServerInfoResponse
type Ksqldb ¶
type Ksqldb interface { // GetServerInfo returns informations about the ksqlDB Server // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/ GetServerInfo() (*KsqlServerInfo, error) // GetServerStatus returns server status // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/info-endpoint/ GetServerStatus() (*ServerStatusResponse, error) // GetClusterStatus // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/ GetClusterStatus() (*ClusterStatusResponse, error) // TerminateCluster terminates a ksqldb cluster - READ THE DOCS before you call this endpoint // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/terminate-endpoint/ TerminateCluster(topics ...string) (*KsqlResponseSlice, error) // ValidateProperty validates a property // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/is_valid_property-endpoint/ ValidateProperty(property string) (*bool, error) // Pull data // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/ Pull(context.Context, string, bool) (Header, Payload, error) // Push data // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/ Push(context.Context, string, chan<- Row, chan<- Header) error // ClosePushQuery terminates push query explicitly // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#terminating-queries ClosePushQuery(context.Context, string) error // GetQueryStatus // @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/status-endpoint/ GetQueryStatus(string) (*QueryStatus, error) // EnableParseSQL enables/disables query parsing for push/pull/execute requests EnableParseSQL(bool) // ParseSQLEnabled returns true if query parsing is enabled or not ParseSQLEnabled() bool // Close closes net.HTTPClient transport Close() }
type KsqldbClient ¶
type KsqldbClient struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(http net.HTTPClient) (KsqldbClient, error)
NewClient returns a new KsqldbClient with the given net.HTTPclient
func NewClientWithOptions ¶
func NewClientWithOptions(options net.Options) (KsqldbClient, error)
NewClientWithOptions returns a new @KsqldbClient with Options
func (*KsqldbClient) Close ¶
func (cl *KsqldbClient) Close()
Close closes the underlying http transport
func (*KsqldbClient) ClosePushQuery ¶
func (api *KsqldbClient) ClosePushQuery(ctx context.Context, queryID string) error
Close Query terminates push query explicitly
func (*KsqldbClient) EnableParseSQL ¶
func (cl *KsqldbClient) EnableParseSQL(activate bool)
EnableParseSQL enables / disables sql parsing
func (*KsqldbClient) Execute ¶
func (api *KsqldbClient) Execute(options ExecOptions) (*KsqlResponseSlice, error)
Execute will execute a ksqlDB statement. All statements, except those starting with SELECT, can be run on this endpoint. To run SELECT statements use use Push or Pull functions.
To use this function pass in the @ExecOptions.
Ref: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint/
func (*KsqldbClient) GetClusterStatus ¶
func (api *KsqldbClient) GetClusterStatus() (*ClusterStatusResponse, error)
GetClusterStatus @see https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/cluster-status-endpoint/
func (*KsqldbClient) GetQueryStatus ¶
func (api *KsqldbClient) GetQueryStatus(commandId string) (*QueryStatus, error)
GetQueryStatus returns the current command status for a CREATE, DROP, or TERMINATE statement.
CREATE, DROP, and TERMINATE statements returns an object that indicates the current state of statement execution. A statement can be in one of the following states:
QUEUED, PARSING, EXECUTING: The statement was accepted by the server and is being processed. SUCCESS: The statement was successfully processed. ERROR: There was an error processing the statement. The statement was not executed.
TERMINATED: The query started by the statement was terminated. Only returned for CREATE STREAM|TABLE AS SELECT.
If a CREATE, DROP, or TERMINATE statement returns a command status with state QUEUED, PARSING, or EXECUTING from the @Execute endpoint, you can use the @GetQueryStatus endpoint to poll the status of the command.
func (*KsqldbClient) GetServerInfo ¶
func (api *KsqldbClient) GetServerInfo() (*KsqlServerInfo, error)
ServerInfo gets the info for your server api net.KsqlHTTPClient
func (*KsqldbClient) GetServerStatus ¶
func (api *KsqldbClient) GetServerStatus() (*ServerStatusResponse, error)
ServerInfo provides information about your server
func (*KsqldbClient) ParseSQLEnabled ¶
func (cl *KsqldbClient) ParseSQLEnabled() bool
ParseSQLEnabled returns true if sql parsing is enabled; false otherwise
func (*KsqldbClient) Pull ¶
func (api *KsqldbClient) Pull(ctx context.Context, options QueryOptions) (header Header, payload Payload, err error)
Pull queries are like "traditional" RDBMS queries in which the query terminates once the state has been queried.
To use this function pass in the the SQL query statement, and a boolean for whether full table scans should be enabled.
The function returns a ksqldb.Header and ksqldb.Payload which will hold one or more rows of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var col1 string var col2 float64 for _, row := range r { col1 = row[0].(string) col2 = row[1].(float64) // Do other stuff with the data here } }
func (*KsqldbClient) Push ¶
func (api *KsqldbClient) Push(ctx context.Context, options QueryOptions, rowChannel chan<- Row, headerChannel chan<- Header) (err error)
Push queries are continuous queries in which new events or changes to a table's state are pushed to the client. You can think of them as subscribing to a stream of changes.
Since push queries never end, this function expects a channel to which it can write new rows of data as and when they are received.
To use this function pass in a context, the SQL query statement, and two channels:
* ksqldb.Row - rows of data * ksqldb.Header - header (including column definitions).
If you don't want to block before receiving row data then make this channel buffered.
The channel is populated with ksqldb.Row which represents one row of data. You will need to define variables to hold each column's value. You can adopt this pattern to do this:
var DATA_TS float64 var ID string for row := range rc { if row != nil { DATA_TS = row[0].(float64) ID = row[1].(string)
func (*KsqldbClient) TerminateCluster ¶
func (api *KsqldbClient) TerminateCluster(topics ...string) (*KsqlResponseSlice, error)
func (*KsqldbClient) ValidateProperty ¶
func (api *KsqldbClient) ValidateProperty(property string) (*bool, error)
ValidateProperty resource tells you whether a property is prohibited from setting. If prohibited the ksqlDB server api returns a 400 error
type LagByPartition ¶
type LagByPartition struct {
Partition Partition
}
type LagByPartitionMap ¶
type LagByPartitionMap map[string]LagByPartition
type NewClientFactory ¶
type NewClientFactory interface { // NewClient factory NewClient(net.HTTPClient) (*KsqldbClient, error) }
type PartitionMap ¶
type PropertyMap ¶
type QueryDescription ¶
type QueryOptions ¶
type QueryOptions struct { Sql string `json:"sql"` Properties PropertyMap `json:"properties"` }
func NewDefaultPullQueryOptions ¶
func NewDefaultPullQueryOptions(sql string) (options QueryOptions)
func NewDefaultPushQueryOptions ¶
func NewDefaultPushQueryOptions(sql string) (options QueryOptions)
func (*QueryOptions) AutoOffsetReset ¶
func (q *QueryOptions) AutoOffsetReset(offset StreamOffset) *QueryOptions
AutoOffsetReset sets the offset to latest | earliest
Determines what to do when there is no initial offset in Apache Kafka® or if the current offset doesn't exist on the server. The default value in ksqlDB is `latest`, which means all Kafka topics are read from the latest available offset.
func (*QueryOptions) EmptyQuery ¶
func (o *QueryOptions) EmptyQuery() bool
func (*QueryOptions) EnablePullQueryTableScan ¶
func (q *QueryOptions) EnablePullQueryTableScan(enable bool) *QueryOptions
EnablePullQueryTableScan to control whether table scans are permitted when executing pull queries.
Without this enabled, only key lookups are used.
Enabling table scans removes various restrictions on what types of queries are allowed.
In particular, these pull query types are now permitted:
- No WHERE clause
- Range queries on keys
- Equality and range queries on non-key columns
- Multi-column key queries without specifying all key columns
There may be significant performance implications to using these types of queries, depending on the size of the data and other workloads running, so use this config carefully.
func (*QueryOptions) SanitizeQuery ¶
func (q *QueryOptions) SanitizeQuery()
func (*QueryOptions) SetIdleConnectionTimeout ¶
func (q *QueryOptions) SetIdleConnectionTimeout(seconds int64) *QueryOptions
SetIdleConnectionTimeout sets the timeout for idle connections
A connection is idle if there is no data in either direction on that connection for the duration of the timeout.
This configuration can be helpful if you are issuing push queries that only receive data infrequently from the server, as otherwise those connections will be severed when the timeout (default 10 minutes) is hit.
Decreasing this timeout enables closing connections more aggressively to save server resources.
Increasing this timeout makes the server more tolerant of low-data volume use cases.
type QuerySlice ¶
type QuerySlice []Query
type QueryStatus ¶
type RequestParams ¶
type RequestParams map[string]interface{}
type RespUnmarshaller ¶
type ResponseError ¶
type ResponseError struct { ErrType string `json:"@type"` ErrCode int `json:"error_code"` Message string `json:"message"` }
func (ResponseError) Error ¶
func (e ResponseError) Error() string
type ServerStatusResponse ¶
type ServerStatusResponse struct { IsHealthy *bool `json:"isHealthy"` Details struct { Metastore struct { IsHealthy *bool `json:"isHealthy"` } `json:"metastore"` Kafka struct { IsHealthy *bool `json:"isHealthy"` } `json:"kafka"` } `json:"details"` KsqlServiceID string `json:"ksqlServiceId"` }
ServerStatusResponse
type SessionVariablesMap ¶
type SessionVariablesMap map[string]interface{}
type StateStoreLag ¶
type StateStoreLag struct { LagByPartition LagByPartitionMap Size uint64 }
type StateStoreLagMap ¶
type StateStoreLagMap map[string]StateStoreLag
type StreamOffset ¶
type StreamOffset string
const ( EARLIEST StreamOffset = "earliest" LATEST StreamOffset = "latest" )
type StreamSlice ¶
type StreamSlice []Stream
type TableSlice ¶
type TableSlice []Table
type TerminateClusterTopics ¶
type TerminateClusterTopics struct {
DeleteTopicList []string `json:"deleteTopicList,omitempty"`
}
func (*TerminateClusterTopics) Add ¶
func (tct *TerminateClusterTopics) Add(topics ...string)
func (*TerminateClusterTopics) Size ¶
func (tct *TerminateClusterTopics) Size() int