ch

package module
v0.58.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: Apache-2.0 Imports: 26 Imported by: 28

README

ch

Low level TCP ClickHouse client and protocol implementation in Go. Designed for very fast data block streaming with low network, cpu and memory overhead.

NB: No pooling, reconnects and not goroutine-safe by default, only single connection. Use clickhouse-go for high-level database/sql-compatible client, pooling for ch-go is available as chpool package.

ClickHouse is an open-source, high performance columnar OLAP database management system for real-time analytics using SQL.

go get github.com/ClickHouse/ch-go@latest

Example

package main

import (
  "context"
  "fmt"

  "github.com/ClickHouse/ch-go"
  "github.com/ClickHouse/ch-go/proto"
)

func main() {
  ctx := context.Background()
  c, err := ch.Dial(ctx, ch.Options{Address: "localhost:9000"})
  if err != nil {
    panic(err)
  }
  var (
    numbers int
    data    proto.ColUInt64
  )
  if err := c.Do(ctx, ch.Query{
    Body: "SELECT number FROM system.numbers LIMIT 500000000",
    Result: proto.Results{
      {Name: "number", Data: &data},
    },
    // OnResult will be called on next received data block.
    OnResult: func(ctx context.Context, b proto.Block) error {
      numbers += len(data)
      return nil
    },
  }); err != nil {
    panic(err)
  }
  fmt.Println("numbers:", numbers)
}
393ms 0.5B rows  4GB  10GB/s 1 job
874ms 2.0B rows 16GB  18GB/s 4 jobs
Results

To stream query results, set Result and OnResult fields of Query. The OnResult will be called after Result is filled with received data block.

The OnResult is optional, but query will fail if more than single block is received, so it is ok to solely set the Result if only one row is expected.

Automatic result inference
var result proto.Results
q := ch.Query{
  Body:   "SELECT * FROM table",
  Result: result.Auto(),
}
Single result with column name inference
var res proto.ColBool
q := ch.Query{
  Body:   "SELECT v FROM test_table",
  Result: proto.ResultColumn{Data: &res},
}
Writing data

See examples/insert.

For table

CREATE TABLE test_table_insert
(
    ts                DateTime64(9),
    severity_text     Enum8('INFO'=1, 'DEBUG'=2),
    severity_number   UInt8,
    body              String,
    name              String,
    arr               Array(String)
) ENGINE = Memory

We prepare data block for insertion as follows:

var (
	body      proto.ColStr
	name      proto.ColStr
	sevText   proto.ColEnum
	sevNumber proto.ColUInt8

	ts  = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano) // DateTime64(9)
	arr = new(proto.ColStr).Array()                                   // Array(String)
	now = time.Date(2010, 1, 1, 10, 22, 33, 345678, time.UTC)
)

// Append 10 rows to initial data block.
for i := 0; i < 10; i++ {
	body.AppendBytes([]byte("Hello"))
	ts.Append(now)
	name.Append("name")
	sevText.Append("INFO")
	sevNumber.Append(10)
	arr.Append([]string{"foo", "bar", "baz"})
}

input := proto.Input{
	{Name: "ts", Data: ts},
	{Name: "severity_text", Data: &sevText},
	{Name: "severity_number", Data: sevNumber},
	{Name: "body", Data: body},
	{Name: "name", Data: name},
	{Name: "arr", Data: arr},
}
Single data block
if err := conn.Do(ctx, ch.Query{
	// Or "INSERT INTO test_table_insert (ts, severity_text, severity_number, body, name, arr) VALUES"
	// Or input.Into("test_table_insert")
	Body: "INSERT INTO test_table_insert VALUES",
	Input: input,
}); err != nil {
	panic(err)
}
Stream data
// Stream data to ClickHouse server in multiple data blocks.
var blocks int
if err := conn.Do(ctx, ch.Query{
	Body:  input.Into("test_table_insert"), // helper that generates INSERT INTO query with all columns
	Input: input,

	// OnInput is called to prepare Input data before encoding and sending
	// to ClickHouse server.
	OnInput: func(ctx context.Context) error {
		// On OnInput call, you should fill the input data.
		//
		// NB: You should reset the input columns, they are
		// not reset automatically.
		//
		// That is, we are re-using the same input columns and
		// if we will return nil without doing anything, data will be
		// just duplicated.

		input.Reset() // calls "Reset" on each column

		if blocks >= 10 {
			// Stop streaming.
			//
			// This will also write tailing input data if any,
			// but we just reset the input, so it is currently blank.
			return io.EOF
		}

		// Append new values:
		for i := 0; i < 10; i++ {
			body.AppendBytes([]byte("Hello"))
			ts.Append(now)
			name.Append("name")
			sevText.Append("DEBUG")
			sevNumber.Append(10)
			arr.Append([]string{"foo", "bar", "baz"})
		}

		// Data will be encoded and sent to ClickHouse server after returning nil.
		// The Do method will return error if any.
		blocks++
		return nil
	},
}); err != nil {
	panic(err)
}
Writing dumps in Native format

You can use ch-go to write ClickHouse dumps in Native format:

The most efficient format. Data is written and read by blocks in binary format. For each block, the number of rows, number of columns, column names and types, and parts of columns in this block are recorded one after another. In other words, this format is “columnar” – it does not convert columns to rows. This is the format used in the native interface for interaction between servers, for using the command-line client, and for C++ clients.

See ./internal/cmd/ch-native-dump for more sophisticated example.

Example:

var (
    colK proto.ColInt64
    colV proto.ColInt64
)
// Generate some data.
for i := 0; i < 100; i++ {
    colK.Append(int64(i))
    colV.Append(int64(i) + 1000)
}
// Write data to buffer.
var buf proto.Buffer
input := proto.Input{
    {"k", colK},
    {"v", colV},
}
b := proto.Block{
    Rows:    colK.Rows(),
    Columns: len(input),
}
// Note that we are using version 54451, proto.Version will fail.
if err := b.EncodeRawBlock(&buf, 54451, input); err != nil {
    panic(err)
}

// You can write buf.Buf to io.Writer, e.g. os.Stdout or file.
var out bytes.Buffer
_, _ = out.Write(buf.Buf)

// You can encode multiple buffers in sequence.
//
// To do this, reset buf and all columns, append new values
// to columns and call EncodeRawBlock again.
buf.Reset()
colV.Reset()
colV.Reset()

Features

  • OpenTelemetry support
  • No reflection or interface{}
  • Generics (go1.18) for Array[T], LowCardinaliy[T], Map[K, V], Nullable[T]
  • Reading or writing ClickHouse dumps in Native format
  • Column-oriented design that operates directly with blocks of data
    • Dramatically more efficient
    • Up to 100x faster than row-first design around sql
    • Up to 700x faster than HTTP API
    • Low memory overhead (data blocks are slices, i.e. continuous memory)
    • Highly efficient input and output block streaming
    • As close to ClickHouse as possible
  • Structured query execution telemetry streaming
  • LZ4, ZSTD or None (just checksums for integrity check) compression
  • External data support
  • Rigorously tested
    • Windows, Mac, Linux (also x86)
    • Unit tests for encoding and decoding
      • ClickHouse Server in Go for faster tests
      • Golden files for all packets, columns
      • Both server and client structures
      • Ensuring that partial read leads to failure
    • End-to-end tests on multiple LTS and stable versions
    • Fuzzing

Supported types

  • UInt8, UInt16, UInt32, UInt64, UInt128, UInt256
  • Int8, Int16, Int32, Int64, Int128, Int256
  • Date, Date32, DateTime, DateTime64
  • Decimal32, Decimal64, Decimal128, Decimal256 (only low-level raw values)
  • IPv4, IPv6
  • String, FixedString(N)
  • UUID
  • Array(T)
  • Enum8, Enum16
  • LowCardinality(T)
  • Map(K, V)
  • Bool
  • Tuple(T1, T2, ..., Tn)
  • Nullable(T)
  • Point
  • Nothing, Interval

Enums

You can use automatic enum inference in proto.ColEnum, this will come with some performance penalty.

To use proto.ColEnum8 and proto.ColEnum16, you need to explicitly provide DDL for them via proto.Wrap:

var v proto.ColEnum8

const ddl = `'Foo'=1, 'Bar'=2, 'Baz'=3`
input := []proto.InputColumn{
  {Name: "v", Data: proto.Wrap(&v, ddl)},
}

Generics

Most columns implement proto.ColumnOf[T] generic constraint:

type ColumnOf[T any] interface {
	Column
	Append(v T)
	AppendArr(vs []T)
	Row(i int) T
}

For example, ColStr (and ColStr.LowCardinality) implements ColumnOf[string]. Same for arrays: new(proto.ColStr).Array() implements ColumnOf[[]string], column of []string values.

Array

Generic for Array(T)

// Array(String)
arr := proto.NewArray[string](new(proto.ColStr))
// Or
arr := new(proto.ColStr).Array()
q := ch.Query{
  Body:   "SELECT ['foo', 'bar', 'baz']::Array(String) as v",
  Result: arr.Results("v"),
}
// Do ...
arr.Row(0) // ["foo", "bar", "baz"]

Dumps

Reading

Use proto.Block.DecodeRawBlock on proto.NewReader:

func TestDump(t *testing.T) {
	// Testing decoding of Native format dump.
	//
	// CREATE TABLE test_dump (id Int8, v String)
	//   ENGINE = MergeTree()
	// ORDER BY id;
	//
	// SELECT * FROM test_dump
	//   ORDER BY id
	// INTO OUTFILE 'test_dump_native.raw' FORMAT Native;
	data, err := os.ReadFile(filepath.Join("_testdata", "test_dump_native.raw"))
	require.NoError(t, err)
	var (
		dec    proto.Block
		ids    proto.ColInt8
		values proto.ColStr
	)
	require.NoError(t, dec.DecodeRawBlock(
		proto.NewReader(bytes.NewReader(data)),
		proto.Results{
			{Name: "id", Data: &ids},
			{Name: "v", Data: &values},
		}),
	)
}
Writing

Use proto.Block.EncodeRawBlock with version 54451 on proto.Buffer with Rows and Columns set:

func TestLocalNativeDump(t *testing.T) {
	ctx := context.Background()
	// Testing clickhouse-local.
	var v proto.ColStr
	for _, s := range data {
		v.Append(s)
	}
	buf := new(proto.Buffer)
	b := proto.Block{Rows: 2, Columns: 2}
	require.NoError(t, b.EncodeRawBlock(buf, 54451, []proto.InputColumn{
		{Name: "title", Data: v},
		{Name: "data", Data: proto.ColInt64{1, 2}},
	}), "encode")

	dir := t.TempDir()
	inFile := filepath.Join(dir, "data.native")
	require.NoError(t, os.WriteFile(inFile, buf.Buf, 0600), "write file")

	cmd := exec.Command("clickhouse-local", "local",
		"--logger.console",
		"--log-level", "trace",
		"--file", inFile,
		"--input-format", "Native",
		"--output-format", "JSON",
		"--query", "SELECT * FROM table",
	)
	out := new(bytes.Buffer)
	errOut := new(bytes.Buffer)
	cmd.Stdout = out
	cmd.Stderr = errOut

	t.Log(cmd.Args)
	require.NoError(t, cmd.Run(), "run: %s", errOut)
	t.Log(errOut)

	v := struct {
		Rows int `json:"rows"`
		Data []struct {
			Title string `json:"title"`
			Data  int    `json:"data,string"`
		}
	}{}
	require.NoError(t, json.Unmarshal(out.Bytes(), &v), "json")
	assert.Equal(t, 2, v.Rows)
	if assert.Len(t, v.Data, 2) {
		for i, r := range []struct {
			Title string `json:"title"`
			Data  int    `json:"data,string"`
		}{
			{"Foo", 1},
			{"Bar", 2},
		} {
			assert.Equal(t, r, v.Data[i])
		}
	}
}

TODO

  • Types
    • Decimal(P, S) API
    • JSON
    • SimpleAggregateFunction
    • AggregateFunction
    • Nothing
    • Interval
    • Nested
    • Geo types
      • Point
      • Ring
      • Polygon
      • MultiPolygon
  • Improved i/o timeout handling for reading packets from server
    • Close connection on context cancellation in all cases
    • Ensure that reads can't block forever

Reference

License

Apache License 2.0

Documentation

Overview

Package ch implements ClickHouse client.

Index

Examples

Constants

View Source
const (
	DefaultDatabase         = "default"
	DefaultUser             = "default"
	DefaultHost             = "127.0.0.1"
	DefaultPort             = 9000
	DefaultDialTimeout      = 1 * time.Second
	DefaultHandshakeTimeout = 300 * time.Second
	DefaultReadTimeout      = 3 * time.Second
)

Defaults for connection.

View Source
const NoTimeout = time.Duration(-1)

NoTimeout is a value for Options.ReadTimeout that disables timeout.

Variables

View Source
var ErrClosed = errors.New("client is closed")

ErrClosed means that client was already closed.

Functions

func CompressionStrings

func CompressionStrings() []string

CompressionStrings returns a slice of all String values of the enum

func IsErr

func IsErr(err error, codes ...proto.Error) bool

IsErr reports whether err is error with provided exception codes.

func IsException

func IsException(err error) bool

IsException reports whether err is Exception.

func Parameters added in v0.48.0

func Parameters(m map[string]any) []proto.Parameter

Parameters is helper for building Query.Parameters.

EXPERIMENTAL.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements ClickHouse binary protocol client on top of single TCP connection.

func Connect

func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error)

Connect performs handshake with ClickHouse server and initializes application level connection.

func Dial

func Dial(ctx context.Context, opt Options) (c *Client, err error)

Dial dials requested address and establishes TCP connection to ClickHouse server, performing handshake.

func (*Client) Close

func (c *Client) Close() error

Close closes underlying connection and frees all resources, rendering Client to unusable state.

func (*Client) Do

func (c *Client) Do(ctx context.Context, q Query) (err error)

Do performs Query on ClickHouse server.

func (*Client) IsClosed

func (c *Client) IsClosed() bool

IsClosed indicates that connection is closed.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) (err error)

Ping server.

Do not call concurrently with Do.

func (*Client) ServerInfo

func (c *Client) ServerInfo() proto.ServerHello

ServerInfo returns server information.

type Compression

type Compression byte

Compression setting.

Trade bandwidth for CPU.

const (
	// CompressionDisabled disables compression. Lowest CPU overhead.
	CompressionDisabled Compression = iota
	// CompressionLZ4 enables LZ4 compression for data. Medium CPU overhead.
	CompressionLZ4
	// CompressionZSTD enables ZStandard compression. High CPU overhead.
	CompressionZSTD
	// CompressionNone uses no compression but data has checksums.
	CompressionNone
)

func CompressionString

func CompressionString(s string) (Compression, error)

CompressionString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func CompressionValues

func CompressionValues() []Compression

CompressionValues returns all values of the enum

func (Compression) IsACompression

func (i Compression) IsACompression() bool

IsACompression returns "true" if the value is listed in the enum definition. "false" otherwise

func (Compression) String

func (i Compression) String() string

type CorruptedDataErr

type CorruptedDataErr struct {
	Actual    city.U128
	Reference city.U128
	RawSize   int
	DataSize  int
}

CorruptedDataErr means that provided hash mismatch with calculated.

func (*CorruptedDataErr) Error

func (c *CorruptedDataErr) Error() string

type Dialer

type Dialer interface {
	DialContext(ctx context.Context, network, address string) (net.Conn, error)
}

A Dialer dials using a context.

type Exception

type Exception struct {
	Code    proto.Error
	Name    string
	Message string
	Stack   string
	Next    []Exception // non-nil only for top exception
}

Exception is server-side error.

func AsException

func AsException(err error) (*Exception, bool)

AsException finds first *Exception in err chain.

func (*Exception) Error

func (e *Exception) Error() string

func (*Exception) IsCode

func (e *Exception) IsCode(codes ...proto.Error) bool

type Log

type Log = proto.Log

type Options

type Options struct {
	Logger      *zap.Logger // defaults to Nop.
	Address     string      // 127.0.0.1:9000
	Database    string      // "default"
	User        string      // "default"
	Password    string      // blank string by default
	QuotaKey    string      // blank string by default
	Compression Compression // disabled by default
	ClientName  string      // blank string by default
	Settings    []Setting   // none by default

	// ReadTimeout is a timeout for reading a single packet from the server.
	//
	// Defaults to 3s. No timeout if negative (you can use NoTimeout const).
	ReadTimeout time.Duration

	Dialer      Dialer        // defaults to net.Dialer
	DialTimeout time.Duration // defaults to 1s
	TLS         *tls.Config   // no TLS is used by default

	ProtocolVersion  int           // force protocol version, optional
	HandshakeTimeout time.Duration // longer lasting handshake is a case for ClickHouse cloud idle instances, defaults to 5m

	// Additional OpenTelemetry instrumentation that will capture query body
	// and other parameters.
	//
	// Note: OpenTelemetry context propagation works without this option too.
	OpenTelemetryInstrumentation bool
	TracerProvider               trace.TracerProvider
	MeterProvider                metric.MeterProvider
	// contains filtered or unexported fields
}

Options for Client. Zero value is valid.

type ProfileEvent

type ProfileEvent = proto.ProfileEvent

type ProfileEventType

type ProfileEventType = proto.ProfileEventType

type Query

type Query struct {
	// Body of query, like "SELECT 1".
	Body string
	// QueryID is ID of query, defaults to new UUIDv4.
	QueryID string
	// QuotaKey of query, optional.
	QuotaKey string

	// Input columns for INSERT operations.
	Input proto.Input
	// OnInput is called to allow ingesting more data to Input.
	//
	// The io.EOF reports that no more input should be ingested.
	//
	// Optional, single block is ingested from Input if not provided,
	// but query will fail if Input is set but has zero rows.
	OnInput func(ctx context.Context) error

	// Result columns for SELECT operations.
	Result proto.Result
	// OnResult is called when Result is filled with result block.
	//
	// Optional, but query will fail of more than one block is received
	// and no OnResult is provided.
	OnResult func(ctx context.Context, block proto.Block) error

	// OnProgress is optional progress handler. The progress value contain
	// difference, so progress should be accumulated if needed.
	OnProgress func(ctx context.Context, p proto.Progress) error
	// OnProfile is optional handler for profiling data.
	OnProfile func(ctx context.Context, p proto.Profile) error
	// OnProfileEvent is optional handler for profiling event stream data.
	OnProfileEvent func(ctx context.Context, e ProfileEvent) error
	// OnLog is optional handler for server log entry.
	OnLog func(ctx context.Context, l Log) error

	// Settings are optional query-scoped settings. Can override client settings.
	Settings []Setting

	// EXPERIMENTAL: parameters for query.
	Parameters []proto.Parameter

	// Secret is optional inter-server per-cluster secret for Distributed queries.
	//
	// See https://clickhouse.com/docs/en/engines/table-engines/special/distributed/#distributed-clusters
	Secret string

	// InitialUser is optional initial user for Distributed queries.
	InitialUser string

	// ExternalData is optional data for server to load.
	//
	// https://clickhouse.com/docs/en/engines/table-engines/special/external-data/
	ExternalData []proto.InputColumn
	// ExternalTable name. Defaults to _data.
	ExternalTable string
}

Query to ClickHouse.

Example (MultipleInputColumns)
var (
	body      proto.ColStr
	name      proto.ColStr
	sevText   proto.ColEnum
	sevNumber proto.ColUInt8

	ts  = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano)
	arr = new(proto.ColStr).Array() // Array(String)
	now = time.Date(2010, 1, 1, 10, 22, 33, 345678, time.UTC)
)
// Append 10 rows.
for i := 0; i < 10; i++ {
	body.AppendBytes([]byte("Hello"))
	ts.Append(now)
	name.Append("name")
	sevText.Values = append(sevText.Values, "INFO")
	sevNumber = append(sevNumber, 10)
	arr.Append([]string{"foo", "bar", "baz"})
}
input := proto.Input{
	{Name: "ts", Data: ts},
	{Name: "severity_text", Data: &sevText},
	{Name: "severity_number", Data: sevNumber},
	{Name: "body", Data: body},
	{Name: "name", Data: name},
	{Name: "arr", Data: arr},
}
fmt.Println(input.Into("logs"))
Output:

INSERT INTO "logs" ("ts","severity_text","severity_number","body","name","arr") VALUES

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is basic ClickHouse server.

func NewServer

func NewServer(opt ServerOptions) *Server

NewServer returns new ClickHouse Server.

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

Serve connections on net.Listener.

type ServerConn

type ServerConn struct {
	// contains filtered or unexported fields
}

ServerConn wraps Server connection.

func (*ServerConn) Handle

func (c *ServerConn) Handle() error

Handle connection.

type ServerOptions

type ServerOptions struct {
	Logger   *zap.Logger
	Timezone *time.Location
	OnError  func(err error)
}

ServerOptions wraps possible Server configuration.

type Setting

type Setting struct {
	Key, Value string
	Important  bool
}

Setting to send to server.

func SettingInt

func SettingInt(k string, v int) Setting

SettingInt returns Setting with integer value v.

Directories

Path Synopsis
Package chpool is a connection pool for ch.
Package chpool is a connection pool for ch.
Package cht implements ClickHouse testing utilities, primarily end to end.
Package cht implements ClickHouse testing utilities, primarily end to end.
Package compress implements compression support.
Package compress implements compression support.
examples
internal
cmd/app
Package app is helper for simple cli apps.
Package app is helper for simple cli apps.
e2e
Package e2e implements end to end testing utilities.
Package e2e implements end to end testing utilities.
gold
Package gold implements golden files.
Package gold implements golden files.
version
Package version resolves current module version.
Package version resolves current module version.
ztest
Package ztest provides a variety of helpers for testing log output.
Package ztest provides a variety of helpers for testing log output.
Package otelch provide OpenTelemetry instrumentation for go-faster/ch.
Package otelch provide OpenTelemetry instrumentation for go-faster/ch.
Package proto implements ClickHouse wire protocol.
Package proto implements ClickHouse wire protocol.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL