Documentation ¶
Index ¶
- Constants
- Variables
- func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error)
- func ReadTLV(r io.Reader) (byte, []byte, error)
- func WriteTLV(w io.Writer, typ byte, buf []byte) error
- type Balancer
- type Config
- type ConsistencyLevel
- type Factory
- type IteratorCreator
- type MapShardRequest
- func (m *MapShardRequest) ChunkSize() int32
- func (m *MapShardRequest) MarshalBinary() ([]byte, error)
- func (m *MapShardRequest) Query() string
- func (m *MapShardRequest) SetChunkSize(chunkSize int32)
- func (m *MapShardRequest) SetQuery(query string)
- func (m *MapShardRequest) SetShardID(id uint64)
- func (m *MapShardRequest) ShardID() uint64
- func (m *MapShardRequest) UnmarshalBinary(buf []byte) error
- type MapShardResponse
- func (r *MapShardResponse) Code() int
- func (r *MapShardResponse) Data() []byte
- func (r *MapShardResponse) Fields() []string
- func (r *MapShardResponse) MarshalBinary() ([]byte, error)
- func (r *MapShardResponse) Message() string
- func (r *MapShardResponse) SetCode(code int)
- func (r *MapShardResponse) SetData(data []byte)
- func (r *MapShardResponse) SetFields(fields []string)
- func (r *MapShardResponse) SetMessage(message string)
- func (r *MapShardResponse) SetTagSets(tagsets []string)
- func (r *MapShardResponse) TagSets() []string
- func (r *MapShardResponse) UnmarshalBinary(buf []byte) error
- type PointsWriter
- type Service
- type ShardMapping
- type ShardWriter
- type WritePointsRequest
- type WriteShardRequest
- func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)
- func (w *WriteShardRequest) AddPoints(points []models.Point)
- func (w *WriteShardRequest) MarshalBinary() ([]byte, error)
- func (w *WriteShardRequest) Points() []models.Point
- func (w *WriteShardRequest) SetShardID(id uint64)
- func (w *WriteShardRequest) ShardID() uint64
- func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error
- type WriteShardResponse
- func (w *WriteShardResponse) Code() int
- func (w *WriteShardResponse) MarshalBinary() ([]byte, error)
- func (w *WriteShardResponse) Message() string
- func (w *WriteShardResponse) SetCode(code int)
- func (w *WriteShardResponse) SetMessage(message string)
- func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error
Constants ¶
const ( // DefaultWriteTimeout is the default timeout for a complete write to succeed. DefaultWriteTimeout = 5 * time.Second // DefaultShardWriterTimeout is the default timeout set on shard writers. DefaultShardWriterTimeout = 5 * time.Second // DefaultShardMapperTimeout is the default timeout set on shard mappers. DefaultShardMapperTimeout = 5 * time.Second // DefaultMaxRemoteWriteConnections is the maximum number of open connections // that will be available for remote writes to another host. DefaultMaxRemoteWriteConnections = 3 )
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB
MaxMessageSize defines how large a message can be before we reject it
const MuxHeader = 2
MuxHeader is the header byte used in the TCP mux.
Variables ¶
var ( // ErrTimeout is returned when a write times out. ErrTimeout = errors.New("timeout") // ErrPartialWrite is returned when a write partially succeeds but does // not meet the requested consistency level. ErrPartialWrite = errors.New("partial write") // ErrWriteFailed is returned when no writes succeeded. ErrWriteFailed = errors.New("write failed") // ErrInvalidConsistencyLevel is returned when parsing the string version // of a consistency level. ErrInvalidConsistencyLevel = errors.New("invalid consistency level") )
Functions ¶
func NewBoundedPool ¶
func NewBoundedPool(initialCap, maxCap int, timeout time.Duration, factory Factory) (pool.Pool, error)
NewBoundedPool returns a new pool based on buffered channels with an initial capacity, maximum capacity and timeout to wait for a connection from the pool. Factory is used when initial capacity is greater than zero to fill the pool. A zero initialCap doesn't fill the Pool until a new Get() is called. During a Get(), If there is no new connection available in the pool and total connections is less than the max, a new connection will be created via the Factory() method. Othewise, the call will block until a connection is available or the timeout is reached.
Types ¶
type Balancer ¶
type Balancer interface { // Next returns the next Node according to the balancing method // or nil if there are no nodes available Next() *meta.NodeInfo }
Balancer represents a load-balancing algorithm for a set of nodes
func NewNodeBalancer ¶
NewNodeBalancer create a shuffled, round-robin balancer so that multiple instances will return nodes in randomized order and each each returned node will be repeated in a cycle
type Config ¶
type Config struct { ForceRemoteShardMapping bool `toml:"force-remote-mapping"` WriteTimeout toml.Duration `toml:"write-timeout"` ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"` MaxRemoteWriteConnections int `toml:"max-remote-write-connections"` ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"` }
Config represents the configuration for the clustering service.
type ConsistencyLevel ¶
type ConsistencyLevel int
ConsistencyLevel represent a required replication criteria before a write can be returned as successful
const ( // ConsistencyLevelAny allows for hinted hand off, potentially no write happened yet ConsistencyLevelAny ConsistencyLevel = iota // ConsistencyLevelOne requires at least one data node acknowledged a write ConsistencyLevelOne // ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write ConsistencyLevelQuorum // ConsistencyLevelAll requires all data nodes to acknowledge a write ConsistencyLevelAll )
func ParseConsistencyLevel ¶
func ParseConsistencyLevel(level string) (ConsistencyLevel, error)
ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const
type IteratorCreator ¶
type IteratorCreator struct { MetaStore interface { NodeID() uint64 Node(id uint64) (ni *meta.NodeInfo, err error) } TSDBStore influxql.IteratorCreator // Duration before idle remote iterators are disconnected. Timeout time.Duration // Treats all shards as remote. Useful for testing. ForceRemoteMapping bool // contains filtered or unexported fields }
IteratorCreator is responsible for creating iterators for queries. Iterators can be created for the local node or can be retrieved remotely.
func NewIteratorCreator ¶
func NewIteratorCreator() *IteratorCreator
NewIteratorCreator returns a new instance of IteratorCreator.
func (*IteratorCreator) CreateIterator ¶
func (ic *IteratorCreator) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
CreateIterator creates an iterator from local and remote shards.
func (*IteratorCreator) FieldDimensions ¶
func (ic *IteratorCreator) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
FieldDimensions returns the unique fields and dimensions across a list of sources.
type MapShardRequest ¶
type MapShardRequest struct {
// contains filtered or unexported fields
}
MapShardRequest represents the request to map a remote shard for a query.
func (*MapShardRequest) ChunkSize ¶
func (m *MapShardRequest) ChunkSize() int32
ChunkSize returns Shard map request's chunk size
func (*MapShardRequest) MarshalBinary ¶
func (m *MapShardRequest) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*MapShardRequest) Query ¶
func (m *MapShardRequest) Query() string
Query returns the Shard map request's query
func (*MapShardRequest) SetChunkSize ¶
func (m *MapShardRequest) SetChunkSize(chunkSize int32)
SetChunkSize sets the Shard map request's chunk size
func (*MapShardRequest) SetQuery ¶
func (m *MapShardRequest) SetQuery(query string)
SetQuery sets the Shard map request's Query
func (*MapShardRequest) SetShardID ¶
func (m *MapShardRequest) SetShardID(id uint64)
SetShardID sets the map request's shard id
func (*MapShardRequest) ShardID ¶
func (m *MapShardRequest) ShardID() uint64
ShardID of the map request
func (*MapShardRequest) UnmarshalBinary ¶
func (m *MapShardRequest) UnmarshalBinary(buf []byte) error
UnmarshalBinary populates MapShardRequest from a binary format.
type MapShardResponse ¶
type MapShardResponse struct {
// contains filtered or unexported fields
}
MapShardResponse represents the response returned from a remote MapShardRequest call
func NewMapShardResponse ¶
func NewMapShardResponse(code int, message string) *MapShardResponse
NewMapShardResponse returns the response returned from a remote MapShardRequest call
func (*MapShardResponse) Code ¶
func (r *MapShardResponse) Code() int
Code returns the Shard map response's code
func (*MapShardResponse) Data ¶
func (r *MapShardResponse) Data() []byte
Data returns the Shard map response's Data
func (*MapShardResponse) Fields ¶
func (r *MapShardResponse) Fields() []string
Fields returns the Shard map response's Fields
func (*MapShardResponse) MarshalBinary ¶
func (r *MapShardResponse) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*MapShardResponse) Message ¶
func (r *MapShardResponse) Message() string
Message returns the the Shard map response's Message
func (*MapShardResponse) SetCode ¶
func (r *MapShardResponse) SetCode(code int)
SetCode sets the Shard map response's code
func (*MapShardResponse) SetData ¶
func (r *MapShardResponse) SetData(data []byte)
SetData sets the Shard map response's Data
func (*MapShardResponse) SetFields ¶
func (r *MapShardResponse) SetFields(fields []string)
SetFields sets the Shard map response's Fields
func (*MapShardResponse) SetMessage ¶
func (r *MapShardResponse) SetMessage(message string)
SetMessage sets Shard map response's message
func (*MapShardResponse) SetTagSets ¶
func (r *MapShardResponse) SetTagSets(tagsets []string)
SetTagSets sets Shard map response's tagsets
func (*MapShardResponse) TagSets ¶
func (r *MapShardResponse) TagSets() []string
TagSets returns Shard map response's tag sets
func (*MapShardResponse) UnmarshalBinary ¶
func (r *MapShardResponse) UnmarshalBinary(buf []byte) error
UnmarshalBinary populates WritePointRequest from a binary format.
type PointsWriter ¶
type PointsWriter struct { WriteTimeout time.Duration Logger *log.Logger Node *influxdb.Node MetaClient interface { Database(name string) (di *meta.DatabaseInfo, err error) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) } TSDBStore interface { CreateShard(database, retentionPolicy string, shardID uint64) error WriteToShard(shardID uint64, points []models.Point) error } ShardWriter interface { WriteShard(shardID, ownerID uint64, points []models.Point) error } HintedHandoff interface { WriteShard(shardID, ownerID uint64, points []models.Point) error } Subscriber interface { Points() chan<- *WritePointsRequest } // contains filtered or unexported fields }
PointsWriter handles writes across multiple local and remote data nodes.
func NewPointsWriter ¶
func NewPointsWriter() *PointsWriter
NewPointsWriter returns a new instance of PointsWriter for a node.
func (*PointsWriter) Close ¶
func (w *PointsWriter) Close() error
Close closes the communication channel with the point writer
func (*PointsWriter) MapShards ¶
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.
func (*PointsWriter) Open ¶
func (w *PointsWriter) Open() error
Open opens the communication channel with the point writer
func (*PointsWriter) WritePoints ¶
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error
WritePoints writes across multiple local and remote data nodes according the consistency level.
func (*PointsWriter) WritePointsInto ¶
func (w *PointsWriter) WritePointsInto(p *tsdb.IntoWriteRequest) error
WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency
type Service ¶
type Service struct { Listener net.Listener MetaClient interface { ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) } TSDBStore interface { CreateShard(database, policy string, shardID uint64) error WriteToShard(shardID uint64, points []models.Point) error } Logger *log.Logger // contains filtered or unexported fields }
Service processes data received over raw TCP connections.
type ShardMapping ¶
type ShardMapping struct { Points map[uint64][]models.Point // The points associated with a shard ID Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID }
ShardMapping contains a mapping of a shards to a points.
func NewShardMapping ¶
func NewShardMapping() *ShardMapping
NewShardMapping creates an empty ShardMapping
type ShardWriter ¶
type ShardWriter struct { MetaClient interface { DataNode(id uint64) (ni *meta.NodeInfo, err error) } // contains filtered or unexported fields }
ShardWriter writes a set of points to a shard.
func NewShardWriter ¶
func NewShardWriter(timeout time.Duration, maxConnections int) *ShardWriter
NewShardWriter returns a new instance of ShardWriter.
func (*ShardWriter) WriteShard ¶
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error
WriteShard writes time series points to a shard
type WritePointsRequest ¶
type WritePointsRequest struct { Database string RetentionPolicy string ConsistencyLevel ConsistencyLevel Points []models.Point }
WritePointsRequest represents a request to write point data to the cluster
type WriteShardRequest ¶
type WriteShardRequest struct {
// contains filtered or unexported fields
}
WriteShardRequest represents the a request to write a slice of points to a shard
func (*WriteShardRequest) AddPoint ¶
func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)
AddPoint adds a new time series point
func (*WriteShardRequest) AddPoints ¶
func (w *WriteShardRequest) AddPoints(points []models.Point)
AddPoints adds a new time series point
func (*WriteShardRequest) MarshalBinary ¶
func (w *WriteShardRequest) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*WriteShardRequest) Points ¶
func (w *WriteShardRequest) Points() []models.Point
Points returns the time series Points
func (*WriteShardRequest) SetShardID ¶
func (w *WriteShardRequest) SetShardID(id uint64)
SetShardID sets the ShardID
func (*WriteShardRequest) ShardID ¶
func (w *WriteShardRequest) ShardID() uint64
ShardID gets the ShardID
func (*WriteShardRequest) UnmarshalBinary ¶
func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error
UnmarshalBinary populates WritePointRequest from a binary format.
type WriteShardResponse ¶
type WriteShardResponse struct {
// contains filtered or unexported fields
}
WriteShardResponse represents the response returned from a remote WriteShardRequest call
func (*WriteShardResponse) MarshalBinary ¶
func (w *WriteShardResponse) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*WriteShardResponse) Message ¶
func (w *WriteShardResponse) Message() string
Message returns the Message
func (*WriteShardResponse) SetCode ¶
func (w *WriteShardResponse) SetCode(code int)
SetCode sets the Code
func (*WriteShardResponse) SetMessage ¶
func (w *WriteShardResponse) SetMessage(message string)
SetMessage sets the Message
func (*WriteShardResponse) UnmarshalBinary ¶
func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error
UnmarshalBinary populates WritePointRequest from a binary format.