proton

package
v0.0.0-...-1a65ef4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// versioned endpoints
	// example: http://host:port/proton/v1/ddl/streams
	StreamsPath = "ddl/streams"
	IngestPath  = "ingest/streams"
	SearchPath  = "search"

	// unversioned endpoints
	// example: http://host:port/proton/ping
	PingPath = "ping"
	InfoPath = "info"
)
View Source
const DefaultLogStoreRetentionBytes = 604800000
View Source
const DefaultLogStoreRetentionMS = 3600000 // 1 hour for default
View Source
const DefaultTTL = "to_datetime(_tp_time) + INTERVAL 30 DAY"
View Source
const ProtonOBType = "proton"
View Source
const ProtonSinkType = "proton"

Variables

View Source
var ErrProtonDown = errors.New("failed to connect to Proton")

Functions

func Init

func Init()

func IsContextCancel

func IsContextCancel(err error) bool

func IsUnknownStreamError

func IsUnknownStreamError(err error) bool

func NewConfig

func NewConfig(host, user, password string) config

func NewProtonObserver

func NewProtonObserver(properties map[string]interface{}) (observer.Observer, error)

func NewProtonSink

func NewProtonSink(properties map[string]interface{}) (sink.Sink, error)

func Parse

func Parse(err error) (int, string)

func UnmarshalErrorResponse

func UnmarshalErrorResponse(resp *http.Response) error

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(host string, port int, restUser, restPassword string) *Client

NewClient creates proton client. It includes both rest and proton-go client.

func (*Client) CreateStream

func (s *Client) CreateStream(stream StreamDef, streamStorageConfig StreamStorageConfig) error

func (*Client) DeleteStream

func (s *Client) DeleteStream(name string) error

func (*Client) InsertData

func (s *Client) InsertData(data IngestData, stream string) (int, error)

type Column

type Column struct {
	Name string `json:"name" binding:"required"`
	Type string `json:"type" binding:"required"`

} // @name Column

type ColumnDef

type ColumnDef struct {
	Name                    string `json:"name" binding:"required" example:"name"`
	Type                    string `json:"type" binding:"required" example:"string"`
	Default                 string `json:"default,omitempty"`
	Codec                   string `json:"codec,omitempty"`
	TTLExpression           string `json:"ttl_expression,omitempty"`
	SkippingIndexExpression string `json:"skipping_index_expression,omitempty"`

	// This is used by proton only.
	CompressionCodec string `json:"compression_codec,omitempty" swaggerignore:"true"`

} // @name ColumnDef

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(c config) *Engine

func (*Engine) ExecWithParams

func (e *Engine) ExecWithParams(sql string, params ...any) error

func (*Engine) Ping

func (e *Engine) Ping() error

func (*Engine) QueryStream

func (e *Engine) QueryStream(ctx context.Context, sqlStr, id string) ([]Column, rxgo.Observable, rxgo.Observable, error)

func (*Engine) StopQuery

func (e *Engine) StopQuery(id string)

func (*Engine) SyncQuery

func (e *Engine) SyncQuery(sql string, timeout time.Duration, params ...any) ([]Column, [][]any, error)

Be sure ONLY use this function when the result set is small AND the query is fast. To make it simple, we allow the caller to pass a timeout. Ideally it should accept a context and let the caller to cancel it

type ErrorResponse

type ErrorResponse struct {
	HTTPStatusCode int
	// proton error codes: https://github.com/timeplus-io/proton/blob/develop/src/Common/ErrorCodes.cpp
	Code      int    `json:"code"`
	RequestId string `json:"request_id"`
	Message   string `json:"error_msg"`
}

HTTPStatusCode = -1 indicates the error is not from a HTTP request (e.g. from TCP)

func (*ErrorResponse) Error

func (e *ErrorResponse) Error() string

type IngestData

type IngestData struct {
	Columns []string `json:"columns"`
	Data    [][]any  `json:"data"`
}

type ProtonObserver

type ProtonObserver struct {
	// contains filtered or unexported fields
}

func (*ProtonObserver) Observe

func (o *ProtonObserver) Observe() error

func (*ProtonObserver) Stop

func (o *ProtonObserver) Stop()

func (*ProtonObserver) Wait

func (o *ProtonObserver) Wait()

type ProtonSink

type ProtonSink struct {
	// contains filtered or unexported fields
}

func (*ProtonSink) GetStats

func (s *ProtonSink) GetStats() *sink.Stats

func (*ProtonSink) Init

func (s *ProtonSink) Init(name string, fields []common.Field) error

func (*ProtonSink) Write

func (s *ProtonSink) Write(headers []string, rows [][]interface{}, index int) error

type ResponseDataRow

type ResponseDataRow []any

For Proton query response

type StreamDef

type StreamDef struct {

	// Stream name should only contain a maximum of 64 letters, numbers, or _, and start with a letter
	Name                   string      `json:"name" binding:"required" example:"test_stream"`
	Columns                []ColumnDef `json:"columns"`
	EventTimeColumn        string      `json:"event_time_column,omitempty"`
	Shards                 int         `json:"shards,omitempty"`
	ReplicationFactor      int         `json:"replication_factor,omitempty"`
	OrderByExpression      string      `json:"order_by_expression,omitempty"`
	OrderByGranularity     string      `json:"order_by_granularity,omitempty"`
	PartitionByGranularity string      `json:"partition_by_granularity,omitempty"`
	TTLExpression          string      `json:"ttl_expression,omitempty" example:"to_datetime(_tp_time) + INTERVAL 7 DAY"`

	// Storage mode of stream. Defaulted to `append`.
	Mode string `json:"mode,omitempty" example:"append" enums:"append,changelog,changelog_kv,versioned_kv"`

	// Expression of primary key, required in `changelog_kv` and `versioned_kv` mode
	PrimaryKey string `json:"primary_key,omitempty"`

} // @name StreamDef

type StreamStorageConfig

type StreamStorageConfig struct {
	// The max size a stream can grow. Any non-positive value means unlimited size. Defaulted to 10 GiB.
	RetentionBytes int `json:"logstore_retention_bytes,omitempty" example:"10737418240"`

	// The max time the data can be retained in the stream. Any non-positive value means unlimited time. Defaulted to 7 days.
	RetentionMS int `json:"logstore_retention_ms,omitempty" example:"604800000"`
}

func (*StreamStorageConfig) AppendToParams

func (c *StreamStorageConfig) AppendToParams(params *url.Values)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL