Documentation ¶
Index ¶
- Constants
- Variables
- func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error)
- type BufferedWriteCloser
- type Config
- type Factory
- type IntoWriteRequest
- type MetaExecutor
- func (m *MetaExecutor) BackupShard(id uint64, since time.Time, w io.Writer) error
- func (m *MetaExecutor) CreateShard(db, policy string, shardID uint64, enabled bool) error
- func (m *MetaExecutor) DeleteDatabase(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteMeasurement(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteRetentionPolicy(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteSeries(stmt influxql.Statement) error
- func (m *MetaExecutor) DeleteShard(stmt influxql.Statement) error
- func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error
- func (m *MetaExecutor) IteratorCreator(opt influxql.IteratorOptions) influxql.IteratorCreator
- func (m *MetaExecutor) Measurements() []string
- func (m *MetaExecutor) RestoreShard(id uint64, r io.Reader) error
- func (m *MetaExecutor) TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
- func (m *MetaExecutor) WriteToShard(shardID, ownerID uint64, points []models.Point) error
- type NodeDialer
- type PointsWriter
- func (w *PointsWriter) Close() error
- func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
- func (w *PointsWriter) Open() error
- func (w *PointsWriter) WithLogger(log zap.Logger)
- func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, ...) error
- func (w *PointsWriter) WritePointsInto(p *coordinator.IntoWriteRequest) error
- type Service
- type ShardDeleter
- type ShardMapping
- type ShardWriter
- type StatementExecutor
- type Tracker
- type WritePointsRequest
- type WriteStatistics
Constants ¶
const ( // DefaultDialTimeout is the default timeout for a complete dial to succeed. DefaultDialTimeout = 1 * time.Second // DefaultShardWriterTimeout is the default timeout set on shard writers. DefaultShardWriterTimeout = 5 * time.Second // DefaultShardReaderTimeout is the default timeout set on shard writers. DefaultShardReaderTimeout = 5 * time.Second // DefaultMaxRemoteWriteConnections is the maximum number of open connections // that will be available for remote writes to another host. DefaultMaxRemoteWriteConnections = 3 // DefaultClusterTracing enables traceing cluster info if it is true DefaultClusterTracing = false // DefaultWriteTimeout is the default timeout for a complete write to succeed. DefaultWriteTimeout = 5 * time.Second // DefaultMaxConcurrentQueries is the maximum number of running queries. // A value of zero will make the maximum query limit unlimited. DefaultMaxConcurrentQueries = 0 // DefaultMaxSelectPointN is the maximum number of points a SELECT can process. // A value of zero will make the maximum point count unlimited. DefaultMaxSelectPointN = 0 // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. // A value of zero will make the maximum series count unlimited. DefaultMaxSelectSeriesN = 0 // DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run. // A value of zero will make the maximum series count unlimited. DefaultMaxSelectBucketsN = 0 )
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB
MaxMessageSize defines how large a message can be before we reject it
const MuxHeader = 2
MuxHeader is the header byte used in the TCP mux.
Variables ¶
var ( // ErrTimeout is returned when a write times out. ErrTimeout = errors.New("timeout") // ErrPartialWrite is returned when a write partially succeeds but does // not meet the requested consistency level. ErrPartialWrite = errors.New("partial write") // ErrWriteFailed is returned when no writes succeeded. ErrWriteFailed = errors.New("write failed") )
Functions ¶
func NewBoundedPool ¶
func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error)
NewBoundedPool returns a new pool based on buffered channels with an initial capacity, maximum capacity and timeout to wait for a connection from the pool. Factory is used when initial capacity is greater than zero to fill the pool. A zero initialCap doesn't fill the Pool until a new Get() is called. During a Get(), If there is no new connection available in the pool and total connections is less than the max, a new connection will be created via the Factory() method. Othewise, the call will block until a connection is available or the timeout is reached.
Types ¶
type BufferedWriteCloser ¶
type BufferedWriteCloser struct { }
BufferedWriteCloser will
func (*BufferedWriteCloser) Close ¶
func (bfc *BufferedWriteCloser) Close()
Close is actually closing this bufferedwriter
type Config ¶
type Config struct { DialTimeout toml.Duration `toml:"dial-timeout"` ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"` ShardReaderTimeout toml.Duration `toml:"shard-reader-timeout"` MaxRemoteWriteConnections int `toml:"max-remote-write-connections"` ClusterTracing bool `toml:"cluster-tracing` WriteTimeout toml.Duration `toml:"write-timeout"` MaxConcurrentQueries int `toml:"max-concurrent-queries"` QueryTimeout toml.Duration `toml:"query-timeout"` LogQueriesAfter toml.Duration `toml:"log-queries-after"` MaxSelectPointN int `toml:"max-select-point"` MaxSelectSeriesN int `toml:"max-select-series"` MaxSelectBucketsN int `toml:"max-select-buckets"` }
Config represents the configuration for the clustering service.
type IntoWriteRequest ¶
IntoWriteRequest is a partial copy of cluster.WriteRequest
type MetaExecutor ¶
type MetaExecutor struct { Logger zap.Logger Node *influxdb.Node MetaClient interface { DataNode(id uint64) (ni *meta.NodeInfo, err error) DataNodes() ([]meta.NodeInfo, error) } TSDBStore interface { CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error BackupShard(id uint64, since time.Time, w io.Writer) error RestoreShard(id uint64, r io.Reader) error Measurements(database string, cond influxql.Expr) ([]string, error) TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error) } ShardWriter interface { WriteShard(shardID, ownerID uint64, points []models.Point) error } // contains filtered or unexported fields }
MetaExecutor executes meta queries on all data nodes.
func NewMetaExecutor ¶
func NewMetaExecutor() *MetaExecutor
NewMetaExecutor returns a new initialized *MetaExecutor.
func (*MetaExecutor) BackupShard ¶
BackupShard backup a shard in cluster
func (*MetaExecutor) CreateShard ¶
func (m *MetaExecutor) CreateShard(db, policy string, shardID uint64, enabled bool) error
CreateShard will create Shard on serveral data nodes
func (*MetaExecutor) DeleteDatabase ¶
func (m *MetaExecutor) DeleteDatabase(stmt influxql.Statement) error
DeleteDatabase will remove a database from cluster
func (*MetaExecutor) DeleteMeasurement ¶
func (m *MetaExecutor) DeleteMeasurement(stmt influxql.Statement) error
DeleteMeasurement removes measurement from cluster
func (*MetaExecutor) DeleteRetentionPolicy ¶
func (m *MetaExecutor) DeleteRetentionPolicy(stmt influxql.Statement) error
DeleteRetentionPolicy removes RetentionPolicy from cluster
func (*MetaExecutor) DeleteSeries ¶
func (m *MetaExecutor) DeleteSeries(stmt influxql.Statement) error
DeleteSeries removes series data from cluster
func (*MetaExecutor) DeleteShard ¶
func (m *MetaExecutor) DeleteShard(stmt influxql.Statement) error
DeleteShard removes a Shard from cluster
func (*MetaExecutor) ExecuteStatement ¶
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error
ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (*MetaExecutor) IteratorCreator ¶
func (m *MetaExecutor) IteratorCreator(opt influxql.IteratorOptions) influxql.IteratorCreator
IteratorCreator return a IteratorCreator according IteratorOptions
func (*MetaExecutor) Measurements ¶
func (m *MetaExecutor) Measurements() []string
Measurements return a all measurements in cluster
func (*MetaExecutor) RestoreShard ¶
func (m *MetaExecutor) RestoreShard(id uint64, r io.Reader) error
RestoreShard restore a shard in cluster
func (*MetaExecutor) WriteToShard ¶
func (m *MetaExecutor) WriteToShard(shardID, ownerID uint64, points []models.Point) error
WriteToShard will write points into shard accoridng to shardID and ownerID
type NodeDialer ¶
type PointsWriter ¶
type PointsWriter struct { WriteTimeout time.Duration Logger zap.Logger Node *influxcloud.Node MetaClient interface { Database(name string) (di *meta.DatabaseInfo) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) } TSDBStore interface { CreateShard(database, retentionPolicy string, shardID uint64) error WriteToShard(shardID uint64, points []models.Point) error } ShardWriter interface { WriteShard(shardID, ownerID uint64, points []models.Point) error } HintedHandoff interface { WriteShard(shardID, ownerID uint64, points []models.Point) error } // contains filtered or unexported fields }
PointsWriter handles writes across multiple local and remote data nodes.
func NewPointsWriter ¶
func NewPointsWriter() *PointsWriter
NewPointsWriter returns a new instance of PointsWriter for a node.
func (*PointsWriter) Close ¶
func (w *PointsWriter) Close() error
Close closes the communication channel with the point writer
func (*PointsWriter) MapShards ¶
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.
func (*PointsWriter) Open ¶
func (w *PointsWriter) Open() error
Open opens the communication channel with the point writer
func (*PointsWriter) WithLogger ¶
func (w *PointsWriter) WithLogger(log zap.Logger)
WithLogger sets the Logger on w.
func (*PointsWriter) WritePoints ¶
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
WritePoints writes across multiple local and remote data nodes according the consistency level.
func (*PointsWriter) WritePointsInto ¶
func (w *PointsWriter) WritePointsInto(p *coordinator.IntoWriteRequest) error
WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency
type Service ¶
type Service struct { Listener net.Listener MetaClient interface { ShardOwner(shardID uint64) (string, string, meta.ShardInfo) } TSDBStore coordinator.TSDBStore ShardIteratorCreator coordinator.ShardIteratorCreator Logger zap.Logger ShardWriter ShardWriter // contains filtered or unexported fields }
Service reprsents a cluster service
func (*Service) WithLogger ¶
WithLogger sets the internal logger to the logger passed in
type ShardDeleter ¶
type ShardDeleter struct { TSDBStore interface { ShardIDs() []uint64 DeleteShard(shardID uint64) error } }
ShardDeleter is a wrapper of TSDBStore which can delete shard from disk
func NewShardDeleter ¶
func NewShardDeleter() *ShardDeleter
NewShardDeleter will return a ShardDeleter instance
func (ShardDeleter) DeleteShard ¶
func (d ShardDeleter) DeleteShard(shardID uint64) error
DeleteShard will delete a shard according to shardID if failed, then return error
func (ShardDeleter) ShardIDs ¶
func (d ShardDeleter) ShardIDs() []uint64
ShardIDs will return all shards' ID in this node
type ShardMapping ¶
type ShardMapping struct { Points map[uint64][]models.Point // The points associated with a shard ID Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID // contains filtered or unexported fields }
ShardMapping contains a mapping of a shards to a points.
func NewShardMapping ¶
func NewShardMapping(n int) *ShardMapping
NewShardMapping creates an empty ShardMapping
type ShardWriter ¶
type ShardWriter struct { MetaClient interface { ShardOwner(shardID uint64) (database, policy string, owners meta.ShardInfo) DataNode(id uint64) (ni *meta.NodeInfo, err error) } // contains filtered or unexported fields }
ShardWriter writes a set of points to a shard.
func NewShardWriter ¶
func NewShardWriter(timeout time.Duration, maxConnections int) *ShardWriter
NewShardWriter returns a new instance of ShardWriter.
func (*ShardWriter) WriteShard ¶
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error
WriteShard writes time series points to a shard
func (*ShardWriter) WriteShardBinary ¶
func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, buf []byte) error
WriteShardBinary writes binary time series points to a shard
type StatementExecutor ¶
type StatementExecutor struct { MetaClient interface { DataNodes() (ni meta.NodeInfos, err error) } // This reprsents local StatementExecutor StatementExecutor coordinator.StatementExecutor // contains filtered or unexported fields }
StatementExecutor executes a statement in the query.
func (*StatementExecutor) ExecuteStatement ¶
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error
ExecuteStatement executes the given statement with the given execution context.
type WritePointsRequest ¶
WritePointsRequest represents a request to write point data to the cluster.
type WriteStatistics ¶
type WriteStatistics struct { WriteReq int64 PointWriteReq int64 PointWriteReqLocal int64 PointWriteReqRemote int64 PointWriteReqHH int64 WriteOK int64 WriteDropped int64 WriteTimeout int64 WritePartial int64 WriteErr int64 SubWriteOK int64 SubWriteDrop int64 }
WriteStatistics keeps statistics related to the PointsWriter.