Documentation
¶
Index ¶
- Constants
- Variables
- func BuildFieldCall(info *meta2.StreamInfo, srcSchema *meta2.CleanSchema, ...) ([]*streamLib.FieldCall, error)
- func CreateColumnStorePlan(schema hybridqp.Catalog, eTraits []hybridqp.Trait, ...) (hybridqp.QueryNode, error)
- func GetStreamCtx() *streamCtx
- func GetTagLimit() int
- func IsKeepWritingErr(err error) bool
- func IsRetriedError(err error) (isSpecial bool)
- func IsRetryErrorForPtView(err error) bool
- func LoadCapacity(capacityFile string, retryTimes int) (int64, error)
- func MarshalWithMeasurements(buf []byte, mst string, rec *record.Record) ([]byte, error)
- func PutStreamCtx(s *streamCtx)
- func SearchLowerBoundOfRec(rec *record.Record, sg *meta2.ShardGroupInfo, start int) int
- func SetTagLimit(limit int)
- func StoreCapacity(capacityFile string, capacity int64) error
- func UnmarshalWithMeasurements(buf []byte, rec *record.Record) (string, error)
- type AllWriter
- type BaseWriter
- type Client
- type ClusterShardMapper
- type ClusterShardMapping
- func (csm *ClusterShardMapping) Close() error
- func (csm *ClusterShardMapping) CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema hybridqp.Catalog) (hybridqp.QueryNode, error)
- func (csm *ClusterShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, ...)
- func (csm *ClusterShardMapping) GetETraits(ctx context.Context, sources influxql.Sources, schema hybridqp.Catalog) ([]hybridqp.Trait, error)
- func (csm *ClusterShardMapping) GetSeriesKey() []byte
- func (csm *ClusterShardMapping) GetShardAndSourcesMap(sources influxql.Sources) (map[uint64]map[uint32][]executor.ShardInfo, map[uint32]influxql.Sources, error)
- func (csm *ClusterShardMapping) GetSources(sources influxql.Sources) influxql.Sources
- func (csm *ClusterShardMapping) LogicalPlanCost(m *influxql.Measurement, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)
- func (csm *ClusterShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType
- func (csm *ClusterShardMapping) MapTypeBatch(m *influxql.Measurement, fields map[string]*influxql.FieldNameSpace, ...) error
- func (csm *ClusterShardMapping) NodeNumbers() int
- func (csm *ClusterShardMapping) RemoteQueryETraitsAndSrc(ctx context.Context, opts *query.ProcessorOptions, schema hybridqp.Catalog, ...) ([]hybridqp.Trait, error)
- type ComMetaClient
- type HTTPClient
- type IMetaExecutor
- type LocalStore
- type MetaClient
- type MetaExecutor
- type PWMetaClient
- type PointsWriter
- func (w *PointsWriter) ApplyTimeRangeLimit(limit []toml.Duration)
- func (w *PointsWriter) Close()
- func (w *PointsWriter) MapRowToMeasurement(ctx *injestionCtx, id uint64, mst string, r *influx.Row) error
- func (w *PointsWriter) RetryWritePointRows(database, retentionPolicy string, rows []influx.Row) error
- func (w *PointsWriter) SetStore(store Storage)
- type RWMetaClient
- type RecMsg
- type RecordWriter
- func (w *RecordWriter) CacheCapacity(capacityPath string, totalLen int64)
- func (w *RecordWriter) Close() error
- func (w *RecordWriter) FlushCapacity()
- func (w *RecordWriter) Open() error
- func (w *RecordWriter) RetryWriteLogRecord(bulk *record.BulkRecords) error
- func (w *RecordWriter) RetryWriteRecord(database, retentionPolicy, measurement string, rec arrow.Record) error
- type RoundRobinWriter
- type ShardRow
- type ShardRows
- type ShowTagKeysExecutor
- type ShowTagValuesExecutor
- type Source
- type Storage
- type Stream
- type SubscriberManager
- func (s *SubscriberManager) InitWriters()
- func (s *SubscriberManager) NewSubscriberWriter(db, rp, name, mode string, destinations []string) (SubscriberWriter, error)
- func (s *SubscriberManager) Send(db, rp string, lineProtocol []byte)
- func (s *SubscriberManager) StopAllWriters()
- func (s *SubscriberManager) Update()
- func (s *SubscriberManager) UpdateWriters()
- func (s *SubscriberManager) WalkDatabases(fn func(db *meta.DatabaseInfo))
- type SubscriberWriter
- type TSDBStore
- type TagValuesSlice
- type WriteRequest
Constants ¶
const ( DDLRetryInternalSecond = 1 DDLTimeOutSecond = 30 DMLRetryInternalMillisecond = 200 DMLTimeOutSecond = 30 )
const (
MaxConcurrencyInOnePt int = 8
)
const MaxShardKey = 64 * 1024
const NotInShardDuration = -1
Variables ¶
var (
CurrentCapacityMap = &sync.Map{}
)
var ErrDatabaseNameRequired = errors.New("database name required")
ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.
Functions ¶
func BuildFieldCall ¶ added in v1.0.0
func BuildFieldCall(info *meta2.StreamInfo, srcSchema *meta2.CleanSchema, destSchema *meta2.CleanSchema) ([]*streamLib.FieldCall, error)
func CreateColumnStorePlan ¶ added in v1.1.0
func GetStreamCtx ¶ added in v1.0.0
func GetStreamCtx() *streamCtx
func GetTagLimit ¶ added in v1.1.0
func GetTagLimit() int
func IsKeepWritingErr ¶ added in v1.1.0
func IsRetriedError ¶ added in v1.0.0
func IsRetryErrorForPtView ¶ added in v1.0.0
IsRetryErrorForPtView returns true if dbpt is not on this node.
func LoadCapacity ¶ added in v1.3.0
LoadCapacity is used to load the shard-level capacity value.
func MarshalWithMeasurements ¶ added in v1.1.0
func PutStreamCtx ¶ added in v1.0.0
func PutStreamCtx(s *streamCtx)
func SearchLowerBoundOfRec ¶ added in v1.1.0
func SetTagLimit ¶ added in v1.1.0
func SetTagLimit(limit int)
func StoreCapacity ¶ added in v1.3.0
StoreCapacity is used to persist the shard-level capacity value.
Types ¶
type AllWriter ¶ added in v1.1.0
type AllWriter struct {
BaseWriter
}
type BaseWriter ¶ added in v1.1.0
type BaseWriter struct {
// contains filtered or unexported fields
}
func NewBaseWriter ¶ added in v1.1.0
func NewBaseWriter(db, rp, name string, clients []Client, logger *logger.Logger) BaseWriter
func (*BaseWriter) Clients ¶ added in v1.1.0
func (w *BaseWriter) Clients() []Client
func (*BaseWriter) Name ¶ added in v1.1.0
func (w *BaseWriter) Name() string
func (*BaseWriter) Run ¶ added in v1.1.0
func (w *BaseWriter) Run()
func (*BaseWriter) Send ¶ added in v1.1.0
func (w *BaseWriter) Send(wr *WriteRequest)
func (*BaseWriter) Start ¶ added in v1.1.0
func (w *BaseWriter) Start(concurrency, buffersize int)
func (*BaseWriter) Stop ¶ added in v1.1.0
func (w *BaseWriter) Stop()
type ClusterShardMapper ¶
type ClusterShardMapper struct { //Node *meta.Node Logger *logger.Logger // Remote execution timeout Timeout time.Duration meta.MetaClient NetStore netstorage.Storage }
ClusterShardMapper implements a ShardMapper for Remote shards.
func (*ClusterShardMapper) Close ¶
func (csm *ClusterShardMapper) Close() error
func (*ClusterShardMapper) MapShards ¶
func (csm *ClusterShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRange, opt query.SelectOptions, condition influxql.Expr) (query.ShardGroup, error)
type ClusterShardMapping ¶
type ClusterShardMapping struct { ShardMapper *ClusterShardMapper NetStore netstorage.Storage MetaClient meta.MetaClient // Remote execution timeout Timeout time.Duration ShardMap map[Source]map[uint32][]executor.ShardInfo // {source: {ptId: []ShardInfo}}, // MinTime is the minimum time that this shard mapper will allow. // Any attempt to use a time before this one will automatically result in using // this time instead. MinTime time.Time // MaxTime is the maximum time that this shard mapper will allow. // Any attempt to use a time after this one will automatically result in using // this time instead. MaxTime time.Time Logger *logger.Logger // contains filtered or unexported fields }
ClusterShardMapping maps data sources to a list of shard information.
func NewClusterShardMapping ¶ added in v1.1.0
func NewClusterShardMapping(csm *ClusterShardMapper, tmin, tmax time.Time) *ClusterShardMapping
func (*ClusterShardMapping) Close ¶
func (csm *ClusterShardMapping) Close() error
Close clears out the list of mapped shards.
func (*ClusterShardMapping) CreateLogicalPlan ¶
func (*ClusterShardMapping) FieldDimensions ¶
func (csm *ClusterShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, schema *influxql.Schema, err error)
func (*ClusterShardMapping) GetETraits ¶ added in v1.1.0
func (*ClusterShardMapping) GetSeriesKey ¶ added in v1.1.0
func (csm *ClusterShardMapping) GetSeriesKey() []byte
func (*ClusterShardMapping) GetShardAndSourcesMap ¶ added in v1.1.0
func (*ClusterShardMapping) GetSources ¶
func (csm *ClusterShardMapping) GetSources(sources influxql.Sources) influxql.Sources
there are multi source return when one source input because measurement regex
func (*ClusterShardMapping) LogicalPlanCost ¶
func (csm *ClusterShardMapping) LogicalPlanCost(m *influxql.Measurement, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)
func (*ClusterShardMapping) MapType ¶
func (csm *ClusterShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType
func (*ClusterShardMapping) MapTypeBatch ¶
func (csm *ClusterShardMapping) MapTypeBatch(m *influxql.Measurement, fields map[string]*influxql.FieldNameSpace, schema *influxql.Schema) error
func (*ClusterShardMapping) NodeNumbers ¶
func (csm *ClusterShardMapping) NodeNumbers() int
func (*ClusterShardMapping) RemoteQueryETraitsAndSrc ¶ added in v1.1.0
type ComMetaClient ¶ added in v1.1.0
type ComMetaClient interface { Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error) CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta2.ShardKeyInfo, numOfShards int32, indexR *influxql.IndexRelation, engineType config.EngineType, colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error) CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta2.ShardGroupInfo, error) }
type HTTPClient ¶ added in v1.1.0
type HTTPClient struct {
// contains filtered or unexported fields
}
func NewHTTPClient ¶ added in v1.1.0
func NewHTTPClient(url *url.URL, timeout time.Duration) *HTTPClient
func NewHTTPSClient ¶ added in v1.1.0
func (*HTTPClient) Destination ¶ added in v1.1.0
func (c *HTTPClient) Destination() string
type IMetaExecutor ¶
type LocalStore ¶ added in v1.2.0
type LocalStore struct {
// contains filtered or unexported fields
}
func NewLocalStore ¶ added in v1.2.0
func NewLocalStore(store Storage) *LocalStore
func (*LocalStore) WriteRows ¶ added in v1.2.0
func (s *LocalStore) WriteRows(ctx *netstorage.WriteContext, _ uint64, _ uint32, database, rp string, _ time.Duration) error
type MetaClient ¶ added in v1.1.0
type MetaClient interface { Databases() map[string]*meta.DatabaseInfo Database(string) (*meta.DatabaseInfo, error) GetMaxSubscriptionID() uint64 WaitForDataChanged() chan struct{} }
type MetaExecutor ¶
type MetaExecutor struct { Logger *logger.Logger MetaClient meta.MetaClient // contains filtered or unexported fields }
MetaExecutor executes meta queries on all data nodes.
func NewMetaExecutor ¶
func NewMetaExecutor() *MetaExecutor
NewMetaExecutor returns a new initialized *MetaExecutor.
func (*MetaExecutor) Close ¶
func (m *MetaExecutor) Close() error
func (*MetaExecutor) EachDBNodes ¶
func (*MetaExecutor) SetTimeOut ¶
func (m *MetaExecutor) SetTimeOut(timeout time.Duration)
type PWMetaClient ¶ added in v1.0.0
type PWMetaClient interface { Database(name string) (di *meta2.DatabaseInfo, err error) RetentionPolicy(database, policy string) (*meta2.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta2.ShardGroupInfo, error) DBPtView(database string) (meta2.DBPtInfos, error) Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error) UpdateSchema(database string, retentionPolicy string, mst string, fieldToCreate []*proto2.FieldSchema) error CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta2.ShardKeyInfo, numOfShards int32, indexR *influxql.IndexRelation, engineType config.EngineType, colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error) GetAliveShards(database string, sgi *meta2.ShardGroupInfo) []int GetStreamInfos() map[string]*meta2.StreamInfo GetDstStreamInfos(db, rp string, dstSis *[]*meta2.StreamInfo) bool DBRepGroups(database string) []meta2.ReplicaGroup GetReplicaN(database string) (int, error) GetSgEndTime(database string, rp string, timestamp time.Time, engineType config.EngineType) (int64, error) }
type PointsWriter ¶
type PointsWriter struct { MetaClient PWMetaClient TSDBStore TSDBStore // contains filtered or unexported fields }
PointsWriter handles writes across multiple local and remote data nodes.
func NewPointsWriter ¶
func NewPointsWriter(timeout time.Duration) *PointsWriter
NewPointsWriter returns a new instance of PointsWriter for a node.
func (*PointsWriter) ApplyTimeRangeLimit ¶ added in v1.0.0
func (w *PointsWriter) ApplyTimeRangeLimit(limit []toml.Duration)
func (*PointsWriter) Close ¶ added in v1.0.0
func (w *PointsWriter) Close()
func (*PointsWriter) MapRowToMeasurement ¶ added in v1.0.0
func (*PointsWriter) RetryWritePointRows ¶ added in v1.0.0
func (w *PointsWriter) RetryWritePointRows(database, retentionPolicy string, rows []influx.Row) error
RetryWritePointRows make sure sql client got the latest metadata.
func (*PointsWriter) SetStore ¶ added in v1.2.0
func (w *PointsWriter) SetStore(store Storage)
type RWMetaClient ¶ added in v1.1.0
type RWMetaClient interface { Database(name string) (di *meta.DatabaseInfo, err error) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta.ShardGroupInfo, error) DBPtView(database string) (meta.DBPtInfos, error) Measurement(database string, rpName string, mstName string) (*meta.MeasurementInfo, error) UpdateSchema(database string, retentionPolicy string, mst string, fieldToCreate []*proto.FieldSchema) error CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta.ShardKeyInfo, numOfShards int32, indexR *influxql.IndexRelation, engineType config.EngineType, colStoreInfo *meta.ColStoreInfo, schemaInfo []*proto.FieldSchema, options *meta.Options) (*meta.MeasurementInfo, error) GetShardInfoByTime(database, retentionPolicy string, t time.Time, ptIdx int, nodeId uint64, engineType config.EngineType) (*meta.ShardInfo, error) }
type RecMsg ¶ added in v1.1.0
type RecMsg struct { TotalLen int64 Database string RetentionPolicy string Measurement string Rec interface{} MsgType record.RecordType }
RecMsg data structure of the message of the record.
type RecordWriter ¶ added in v1.1.0
type RecordWriter struct { MetaClient RWMetaClient StorageEngine interface { WriteRec(db, rp, mst string, ptId uint32, shardID uint64, rec *record.Record, binaryRec []byte) error } // contains filtered or unexported fields }
RecordWriter handles writes the local data node.
func NewRecordWriter ¶ added in v1.1.0
func NewRecordWriter(timeout time.Duration, ptNum, recMsgChFactor int) *RecordWriter
func (*RecordWriter) CacheCapacity ¶ added in v1.3.0
func (w *RecordWriter) CacheCapacity(capacityPath string, totalLen int64)
func (*RecordWriter) Close ¶ added in v1.1.0
func (w *RecordWriter) Close() error
func (*RecordWriter) FlushCapacity ¶ added in v1.3.0
func (w *RecordWriter) FlushCapacity()
func (*RecordWriter) Open ¶ added in v1.1.0
func (w *RecordWriter) Open() error
func (*RecordWriter) RetryWriteLogRecord ¶ added in v1.2.0
func (w *RecordWriter) RetryWriteLogRecord(bulk *record.BulkRecords) error
func (*RecordWriter) RetryWriteRecord ¶ added in v1.1.0
func (w *RecordWriter) RetryWriteRecord(database, retentionPolicy, measurement string, rec arrow.Record) error
type RoundRobinWriter ¶ added in v1.1.0
type RoundRobinWriter struct { BaseWriter // contains filtered or unexported fields }
func (*RoundRobinWriter) Write ¶ added in v1.1.0
func (w *RoundRobinWriter) Write(lineProtocol []byte)
type ShowTagKeysExecutor ¶ added in v1.2.0
type ShowTagKeysExecutor struct {
// contains filtered or unexported fields
}
func NewShowTagKeysExecutor ¶ added in v1.2.0
func NewShowTagKeysExecutor(logger *logger.Logger, mc meta.MetaClient, me IMetaExecutor, store netstorage.Storage) *ShowTagKeysExecutor
func (*ShowTagKeysExecutor) Execute ¶ added in v1.2.0
func (e *ShowTagKeysExecutor) Execute(stmt *influxql.ShowTagKeysStatement) (netstorage.TableTagKeys, error)
type ShowTagValuesExecutor ¶
type ShowTagValuesExecutor struct {
// contains filtered or unexported fields
}
func NewShowTagValuesExecutor ¶
func NewShowTagValuesExecutor(logger *logger.Logger, mc meta.MetaClient, me IMetaExecutor, store netstorage.Storage) *ShowTagValuesExecutor
func (*ShowTagValuesExecutor) Cardinality ¶
func (e *ShowTagValuesExecutor) Cardinality(dimensions influxql.Dimensions)
func (*ShowTagValuesExecutor) Execute ¶
func (e *ShowTagValuesExecutor) Execute(stmt *influxql.ShowTagValuesStatement) (models.Rows, error)
type Stream ¶ added in v1.0.0
type Stream struct { TSDBStore TSDBStore MetaClient PWMetaClient // contains filtered or unexported fields }
type SubscriberManager ¶ added in v1.1.0
func NewSubscriberManager ¶ added in v1.1.0
func NewSubscriberManager(c config.Subscriber, m MetaClient, l *logger.Logger) *SubscriberManager
func (*SubscriberManager) InitWriters ¶ added in v1.1.0
func (s *SubscriberManager) InitWriters()
func (*SubscriberManager) NewSubscriberWriter ¶ added in v1.1.0
func (s *SubscriberManager) NewSubscriberWriter(db, rp, name, mode string, destinations []string) (SubscriberWriter, error)
func (*SubscriberManager) Send ¶ added in v1.1.0
func (s *SubscriberManager) Send(db, rp string, lineProtocol []byte)
func (*SubscriberManager) StopAllWriters ¶ added in v1.1.0
func (s *SubscriberManager) StopAllWriters()
func (*SubscriberManager) Update ¶ added in v1.1.0
func (s *SubscriberManager) Update()
func (*SubscriberManager) UpdateWriters ¶ added in v1.1.0
func (s *SubscriberManager) UpdateWriters()
func (*SubscriberManager) WalkDatabases ¶ added in v1.1.0
func (s *SubscriberManager) WalkDatabases(fn func(db *meta.DatabaseInfo))
type SubscriberWriter ¶ added in v1.1.0
type TSDBStore ¶ added in v1.0.0
type TSDBStore interface {
WriteRows(ctx *netstorage.WriteContext, nodeID uint64, pt uint32, database, rp string, timeout time.Duration) error
}
type TagValuesSlice ¶
type TagValuesSlice []netstorage.TableTagSets
func (TagValuesSlice) Len ¶
func (a TagValuesSlice) Len() int
func (TagValuesSlice) Less ¶
func (a TagValuesSlice) Less(i, j int) bool
func (TagValuesSlice) Swap ¶
func (a TagValuesSlice) Swap(i, j int)