Documentation
¶
Index ¶
- Constants
- Variables
- func Init()
- func IsContextCancel(err error) bool
- func IsUnknownStreamError(err error) bool
- func NewConfig(host, user, password string) config
- func NewProtonObserver(properties map[string]interface{}) (observer.Observer, error)
- func NewProtonSink(properties map[string]interface{}) (sink.Sink, error)
- func Parse(err error) (int, string)
- func UnmarshalErrorResponse(resp *http.Response) error
- type Client
- type Column
- type ColumnDef
- type Engine
- func (e *Engine) ExecWithParams(sql string, params ...any) error
- func (e *Engine) Ping() error
- func (e *Engine) QueryStream(ctx context.Context, sqlStr, id string) ([]Column, rxgo.Observable, rxgo.Observable, error)
- func (e *Engine) StopQuery(id string)
- func (e *Engine) SyncQuery(sql string, timeout time.Duration, params ...any) ([]Column, [][]any, error)
- type ErrorResponse
- type IngestData
- type ProtonObserver
- type ProtonSink
- type ResponseDataRow
- type StreamDef
- type StreamStorageConfig
Constants ¶
View Source
const ( // versioned endpoints // example: http://host:port/proton/v1/ddl/streams StreamsPath = "ddl/streams" IngestPath = "ingest/streams" SearchPath = "search" // unversioned endpoints // example: http://host:port/proton/ping PingPath = "ping" InfoPath = "info" )
View Source
const DefaultLogStoreRetentionBytes = 604800000
View Source
const DefaultLogStoreRetentionMS = 3600000 // 1 hour for default
View Source
const DefaultTTL = "to_datetime(_tp_time) + INTERVAL 30 DAY"
View Source
const ProtonOBType = "proton"
View Source
const ProtonSinkType = "proton"
Variables ¶
View Source
var ErrProtonDown = errors.New("failed to connect to Proton")
Functions ¶
func IsContextCancel ¶
func IsUnknownStreamError ¶
func NewProtonObserver ¶
func UnmarshalErrorResponse ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CreateStream ¶
func (s *Client) CreateStream(stream StreamDef, streamStorageConfig StreamStorageConfig) error
func (*Client) DeleteStream ¶
func (*Client) InsertData ¶
func (s *Client) InsertData(data IngestData, stream string) (int, error)
type ColumnDef ¶
type ColumnDef struct { Name string `json:"name" binding:"required" example:"name"` Type string `json:"type" binding:"required" example:"string"` Default string `json:"default,omitempty"` Codec string `json:"codec,omitempty"` TTLExpression string `json:"ttl_expression,omitempty"` SkippingIndexExpression string `json:"skipping_index_expression,omitempty"` // This is used by proton only. CompressionCodec string `json:"compression_codec,omitempty" swaggerignore:"true"` } // @name ColumnDef
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) QueryStream ¶
func (*Engine) SyncQuery ¶
func (e *Engine) SyncQuery(sql string, timeout time.Duration, params ...any) ([]Column, [][]any, error)
Be sure ONLY use this function when the result set is small AND the query is fast. To make it simple, we allow the caller to pass a timeout. Ideally it should accept a context and let the caller to cancel it
type ErrorResponse ¶
type ErrorResponse struct { HTTPStatusCode int // proton error codes: https://github.com/timeplus-io/proton/blob/develop/src/Common/ErrorCodes.cpp Code int `json:"code"` RequestId string `json:"request_id"` Message string `json:"error_msg"` }
HTTPStatusCode = -1 indicates the error is not from a HTTP request (e.g. from TCP)
func (*ErrorResponse) Error ¶
func (e *ErrorResponse) Error() string
type IngestData ¶
type ProtonObserver ¶
type ProtonObserver struct {
// contains filtered or unexported fields
}
func (*ProtonObserver) Observe ¶
func (o *ProtonObserver) Observe() error
func (*ProtonObserver) Stop ¶
func (o *ProtonObserver) Stop()
func (*ProtonObserver) Wait ¶
func (o *ProtonObserver) Wait()
type ProtonSink ¶
type ProtonSink struct {
// contains filtered or unexported fields
}
func (*ProtonSink) GetStats ¶
func (s *ProtonSink) GetStats() *sink.Stats
type StreamDef ¶
type StreamDef struct { // Stream name should only contain a maximum of 64 letters, numbers, or _, and start with a letter Name string `json:"name" binding:"required" example:"test_stream"` Columns []ColumnDef `json:"columns"` EventTimeColumn string `json:"event_time_column,omitempty"` Shards int `json:"shards,omitempty"` ReplicationFactor int `json:"replication_factor,omitempty"` OrderByExpression string `json:"order_by_expression,omitempty"` OrderByGranularity string `json:"order_by_granularity,omitempty"` PartitionByGranularity string `json:"partition_by_granularity,omitempty"` TTLExpression string `json:"ttl_expression,omitempty" example:"to_datetime(_tp_time) + INTERVAL 7 DAY"` // Storage mode of stream. Defaulted to `append`. Mode string `json:"mode,omitempty" example:"append" enums:"append,changelog,changelog_kv,versioned_kv"` // Expression of primary key, required in `changelog_kv` and `versioned_kv` mode PrimaryKey string `json:"primary_key,omitempty"` } // @name StreamDef
type StreamStorageConfig ¶
type StreamStorageConfig struct { // The max size a stream can grow. Any non-positive value means unlimited size. Defaulted to 10 GiB. RetentionBytes int `json:"logstore_retention_bytes,omitempty" example:"10737418240"` // The max time the data can be retained in the stream. Any non-positive value means unlimited time. Defaulted to 7 days. RetentionMS int `json:"logstore_retention_ms,omitempty" example:"604800000"` }
func (*StreamStorageConfig) AppendToParams ¶
func (c *StreamStorageConfig) AppendToParams(params *url.Values)
Click to show internal directories.
Click to hide internal directories.