Documentation ¶
Index ¶
- Constants
- Variables
- func HashDbAndSeriesToInt(database, series string) int
- func HashPassword(password string) ([]byte, error)
- func SortShardsByTimeAscending(shards []*ShardData)
- func SortShardsByTimeDescending(shards []*ShardData)
- type ByShardTimeAsc
- type ByShardTimeDesc
- type ClusterAdmin
- func (self *ClusterAdmin) GetReadPermission() string
- func (self *ClusterAdmin) GetWritePermission() string
- func (self *ClusterAdmin) HasReadAccess(_ string) bool
- func (self *ClusterAdmin) HasWriteAccess(_ string) bool
- func (self *ClusterAdmin) IsClusterAdmin() bool
- func (self *ClusterAdmin) IsDbAdmin(_ string) bool
- type ClusterConfiguration
- func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
- func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error
- func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)
- func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)
- func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)
- func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
- func (self *ClusterConfiguration) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error
- func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *ClusterServer)
- func (self *ClusterConfiguration) CreateCheckpoint() error
- func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
- func (self *ClusterConfiguration) CreateDatabase(name string) error
- func (self *ClusterConfiguration) DatabaseExists(name string) bool
- func (self *ClusterConfiguration) DatabasesExists(db string) bool
- func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
- func (self *ClusterConfiguration) DropDatabase(name string) error
- func (self *ClusterConfiguration) DropSeries(database, series string) error
- func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error
- func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin
- func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
- func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
- func (self *ClusterConfiguration) GetDatabases() []*Database
- func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser
- func (self *ClusterConfiguration) GetDbUsers(db string) []common.User
- func (self *ClusterConfiguration) GetExpiredShards() []*ShardData
- func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration
- func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData
- func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
- func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
- func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
- func (self *ClusterConfiguration) GetShard(id uint32) *ShardData
- func (self *ClusterConfiguration) GetShardSpaces() []*ShardSpace
- func (self *ClusterConfiguration) GetShardSpacesForDatabase(database string) []*ShardSpace
- func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)
- func (self *ClusterConfiguration) GetShards() []*ShardData
- func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) (Shards, error)
- func (self *ClusterConfiguration) HasContinuousQueries() bool
- func (self *ClusterConfiguration) HasUncommitedWrites() bool
- func (self *ClusterConfiguration) IsSingleServer() bool
- func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time
- func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)
- func (self *ClusterConfiguration) RecoverFromWAL() error
- func (self *ClusterConfiguration) Recovery(b []byte) error
- func (self *ClusterConfiguration) RemoveServer(server *ClusterServer) error
- func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error
- func (self *ClusterConfiguration) Save() ([]byte, error)
- func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)
- func (self *ClusterConfiguration) SaveDbUser(u *DbUser)
- func (self *ClusterConfiguration) SerializableConfiguration() *SavedConfiguration
- func (self *ClusterConfiguration) ServerId() uint32
- func (self *ClusterConfiguration) Servers() []*ClusterServer
- func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
- func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)
- func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)
- func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool
- func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error
- func (self *ClusterConfiguration) WaitForLocalServerLoaded()
- type ClusterServer
- func (self *ClusterServer) BufferWrite(request *protocol.Request)
- func (self *ClusterServer) Connect()
- func (self *ClusterServer) GetId() uint32
- func (self *ClusterServer) IsUp() bool
- func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response)
- func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)
- func (self *ClusterServer) StartHeartbeat()
- func (self *ClusterServer) Write(request *protocol.Request) error
- type CommonUser
- func (self *CommonUser) ChangePassword(hash string) error
- func (self *CommonUser) GetDb() string
- func (self *CommonUser) GetName() string
- func (self *CommonUser) HasReadAccess(name string) bool
- func (self *CommonUser) HasWriteAccess(name string) bool
- func (self *CommonUser) IsClusterAdmin() bool
- func (self *CommonUser) IsDbAdmin(db string) bool
- func (self *CommonUser) IsDeleted() bool
- type ContinuousQuery
- type Database
- type DbUser
- func (self *DbUser) ChangePermissions(readPermissions, writePermissions string)
- func (self *DbUser) GetDb() string
- func (self *DbUser) GetReadPermission() string
- func (self *DbUser) GetWritePermission() string
- func (self *DbUser) HasReadAccess(name string) bool
- func (self *DbUser) HasWriteAccess(name string) bool
- func (self *DbUser) IsDbAdmin(db string) bool
- type LocalShardDb
- type LocalShardStore
- type Matcher
- type NewShardData
- type NilProcessor
- type QuerySpec
- type ResponseChannel
- type ResponseChannelProcessor
- type ResponseChannelWrapper
- type SavedConfiguration
- type ServerConnection
- type Shard
- type ShardCollection
- type ShardCreator
- type ShardData
- func (self *ShardData) DropFields(fields []*metastore.Field) error
- func (self *ShardData) EndMicro() int64
- func (self *ShardData) EndTime() time.Time
- func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, ...)
- func (self *ShardData) Id() uint32
- func (self *ShardData) IsMicrosecondInRange(t int64) bool
- func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, ...)
- func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Response)
- func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int
- func (self *ShardData) ReplicationFactor() int
- func (self *ShardData) ServerIds() []uint32
- func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error
- func (self *ShardData) SetServers(servers []*ClusterServer)
- func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool
- func (self *ShardData) StartMicro() int64
- func (self *ShardData) StartTime() time.Time
- func (self *ShardData) String() string
- func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error
- func (self *ShardData) ToNewShardData() *NewShardData
- func (self *ShardData) Write(request *p.Request) error
- func (self *ShardData) WriteLocalOnly(request *p.Request) error
- type ShardIdInserterProcessor
- type ShardSpace
- func (s *ShardSpace) MatchesSeries(name string) bool
- func (s *ShardSpace) ParsedRetentionPeriod() time.Duration
- func (s *ShardSpace) ParsedShardDuration() time.Duration
- func (s *ShardSpace) SecondsOfDuration() float64
- func (s *ShardSpace) SetDefaults()
- func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error
- func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error
- type ShardType
- type Shards
- type WAL
- type WriteBuffer
- type Writer
Constants ¶
const ( PER_SERVER_BUFFER_SIZE = 10 LOCAL_WRITE_BUFFER_SIZE = 10 )
const ( SEVEN_DAYS = time.Hour * 24 * 7 DEFAULT_SPLIT = 1 DEFAULT_REPLICATION_FACTOR = 1 DEFAULT_SHARD_DURATION = SEVEN_DAYS DEFAULT_RETENTION_POLICY_DURATION = 0 )
const (
DEFAULT_SHARD_SPACE_NAME = "default"
)
const (
HEARTBEAT_TIMEOUT = 100 * time.Millisecond
)
const InfiniteRetention = time.Duration(0)
Variables ¶
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT
Functions ¶
func HashDbAndSeriesToInt ¶
func HashPassword ¶
func SortShardsByTimeAscending ¶
func SortShardsByTimeAscending(shards []*ShardData)
func SortShardsByTimeDescending ¶
func SortShardsByTimeDescending(shards []*ShardData)
Types ¶
type ByShardTimeAsc ¶
type ByShardTimeAsc struct{ ShardCollection }
func (ByShardTimeAsc) Less ¶
func (s ByShardTimeAsc) Less(i, j int) bool
type ByShardTimeDesc ¶
type ByShardTimeDesc struct{ ShardCollection }
func (ByShardTimeDesc) Less ¶
func (s ByShardTimeDesc) Less(i, j int) bool
type ClusterAdmin ¶
type ClusterAdmin struct {
CommonUser `json:"common"`
}
func (*ClusterAdmin) GetReadPermission ¶
func (self *ClusterAdmin) GetReadPermission() string
func (*ClusterAdmin) GetWritePermission ¶
func (self *ClusterAdmin) GetWritePermission() string
func (*ClusterAdmin) HasReadAccess ¶
func (self *ClusterAdmin) HasReadAccess(_ string) bool
func (*ClusterAdmin) HasWriteAccess ¶
func (self *ClusterAdmin) HasWriteAccess(_ string) bool
func (*ClusterAdmin) IsClusterAdmin ¶
func (self *ClusterAdmin) IsClusterAdmin() bool
func (*ClusterAdmin) IsDbAdmin ¶
func (self *ClusterAdmin) IsDbAdmin(_ string) bool
type ClusterConfiguration ¶
type ClusterConfiguration struct { DatabaseReplicationFactors map[string]struct{} ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery LocalServer *ClusterServer LocalRaftName string MetaStore *metastore.Store // contains filtered or unexported fields }
This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.
func NewClusterConfiguration ¶
func NewClusterConfiguration( config *configuration.Configuration, wal WAL, shardStore LocalShardStore, connectionCreator func(string) ServerConnection, metaStore *metastore.Store) *ClusterConfiguration
func (*ClusterConfiguration) AddPotentialServer ¶
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
func (*ClusterConfiguration) AddShardSpace ¶
func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error
func (*ClusterConfiguration) AddShards ¶
func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)
Add shards expects all shards to be of the same type (long term or short term) and have the same start and end times. This is called to add the shard set for a given duration. If existing shards have the same times, those are returned.
func (*ClusterConfiguration) AuthenticateClusterAdmin ¶
func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)
func (*ClusterConfiguration) AuthenticateDbUser ¶
func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)
func (*ClusterConfiguration) ChangeDbUserPassword ¶
func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
func (*ClusterConfiguration) ChangeDbUserPermissions ¶
func (self *ClusterConfiguration) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error
func (*ClusterConfiguration) ChangeProtobufConnectionString ¶
func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *ClusterServer)
func (*ClusterConfiguration) CreateCheckpoint ¶
func (self *ClusterConfiguration) CreateCheckpoint() error
func (*ClusterConfiguration) CreateContinuousQuery ¶
func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
func (*ClusterConfiguration) CreateDatabase ¶
func (self *ClusterConfiguration) CreateDatabase(name string) error
func (*ClusterConfiguration) DatabaseExists ¶
func (self *ClusterConfiguration) DatabaseExists(name string) bool
func (*ClusterConfiguration) DatabasesExists ¶
func (self *ClusterConfiguration) DatabasesExists(db string) bool
func (*ClusterConfiguration) DeleteContinuousQuery ¶
func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
func (*ClusterConfiguration) DropDatabase ¶
func (self *ClusterConfiguration) DropDatabase(name string) error
func (*ClusterConfiguration) DropSeries ¶
func (self *ClusterConfiguration) DropSeries(database, series string) error
func (*ClusterConfiguration) DropShard ¶
func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error
func (*ClusterConfiguration) GetClusterAdmin ¶
func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin
func (*ClusterConfiguration) GetClusterAdmins ¶
func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
func (*ClusterConfiguration) GetContinuousQueries ¶
func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
func (*ClusterConfiguration) GetDatabases ¶
func (self *ClusterConfiguration) GetDatabases() []*Database
func (*ClusterConfiguration) GetDbUser ¶
func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser
func (*ClusterConfiguration) GetDbUsers ¶
func (self *ClusterConfiguration) GetDbUsers(db string) []common.User
func (*ClusterConfiguration) GetExpiredShards ¶
func (self *ClusterConfiguration) GetExpiredShards() []*ShardData
func (*ClusterConfiguration) GetLocalConfiguration ¶
func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration
func (*ClusterConfiguration) GetLocalShardById ¶
func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData
This function is for the request handler to get the shard to write a request to locally.
func (*ClusterConfiguration) GetServerById ¶
func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
func (*ClusterConfiguration) GetServerByProtobufConnectionString ¶
func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
func (*ClusterConfiguration) GetServerByRaftName ¶
func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
func (*ClusterConfiguration) GetShard ¶
func (self *ClusterConfiguration) GetShard(id uint32) *ShardData
func (*ClusterConfiguration) GetShardSpaces ¶
func (self *ClusterConfiguration) GetShardSpaces() []*ShardSpace
func (*ClusterConfiguration) GetShardSpacesForDatabase ¶
func (self *ClusterConfiguration) GetShardSpacesForDatabase(database string) []*ShardSpace
func (*ClusterConfiguration) GetShardToWriteToBySeriesAndTime ¶
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)
Given a db and series name and pick a shard where a point with the given timestamp should be written to. If the point is outside the retention period of the shard space then return nil. The returned shard is always nil if err != nil
func (*ClusterConfiguration) GetShards ¶
func (self *ClusterConfiguration) GetShards() []*ShardData
func (*ClusterConfiguration) GetShardsForQuery ¶
func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) (Shards, error)
func (*ClusterConfiguration) HasContinuousQueries ¶
func (self *ClusterConfiguration) HasContinuousQueries() bool
func (*ClusterConfiguration) HasUncommitedWrites ¶
func (self *ClusterConfiguration) HasUncommitedWrites() bool
Return per shard request numbers for the local server and all remote servers
func (*ClusterConfiguration) IsSingleServer ¶
func (self *ClusterConfiguration) IsSingleServer() bool
func (*ClusterConfiguration) LastContinuousQueryRunTime ¶
func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time
func (*ClusterConfiguration) MarshalNewShardArrayToShards ¶
func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)
func (*ClusterConfiguration) RecoverFromWAL ¶
func (self *ClusterConfiguration) RecoverFromWAL() error
func (*ClusterConfiguration) Recovery ¶
func (self *ClusterConfiguration) Recovery(b []byte) error
func (*ClusterConfiguration) RemoveServer ¶
func (self *ClusterConfiguration) RemoveServer(server *ClusterServer) error
func (*ClusterConfiguration) RemoveShardSpace ¶
func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error
func (*ClusterConfiguration) Save ¶
func (self *ClusterConfiguration) Save() ([]byte, error)
func (*ClusterConfiguration) SaveClusterAdmin ¶
func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)
func (*ClusterConfiguration) SaveDbUser ¶
func (self *ClusterConfiguration) SaveDbUser(u *DbUser)
func (*ClusterConfiguration) SerializableConfiguration ¶
func (self *ClusterConfiguration) SerializableConfiguration() *SavedConfiguration
func (*ClusterConfiguration) ServerId ¶
func (self *ClusterConfiguration) ServerId() uint32
func (*ClusterConfiguration) Servers ¶
func (self *ClusterConfiguration) Servers() []*ClusterServer
func (*ClusterConfiguration) SetContinuousQueryTimestamp ¶
func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
func (*ClusterConfiguration) SetLastContinuousQueryRunTime ¶
func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)
func (*ClusterConfiguration) SetShardCreator ¶
func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)
func (*ClusterConfiguration) ShardSpaceExists ¶
func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool
func (*ClusterConfiguration) UpdateShardSpace ¶
func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error
func (*ClusterConfiguration) WaitForLocalServerLoaded ¶
func (self *ClusterConfiguration) WaitForLocalServerLoaded()
This function will wait until the configuration has received an addPotentialServer command for this local server.
type ClusterServer ¶
type ClusterServer struct { Id uint32 RaftName string RaftConnectionString string ProtobufConnectionString string HeartbeatInterval time.Duration Backoff time.Duration MinBackoff time.Duration MaxBackoff time.Duration // contains filtered or unexported fields }
func NewClusterServer ¶
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, config *c.Configuration) *ClusterServer
func (*ClusterServer) BufferWrite ¶
func (self *ClusterServer) BufferWrite(request *protocol.Request)
func (*ClusterServer) Connect ¶
func (self *ClusterServer) Connect()
func (*ClusterServer) GetId ¶
func (self *ClusterServer) GetId() uint32
func (*ClusterServer) IsUp ¶
func (self *ClusterServer) IsUp() bool
func (*ClusterServer) MakeRequest ¶
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response)
func (*ClusterServer) SetWriteBuffer ¶
func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)
func (*ClusterServer) StartHeartbeat ¶
func (self *ClusterServer) StartHeartbeat()
type CommonUser ¶
type CommonUser struct { Name string `json:"name"` Hash string `json:"hash"` IsUserDeleted bool `json:"is_deleted"` CacheKey string `json:"cache_key"` }
func (*CommonUser) ChangePassword ¶
func (self *CommonUser) ChangePassword(hash string) error
func (*CommonUser) GetDb ¶
func (self *CommonUser) GetDb() string
func (*CommonUser) GetName ¶
func (self *CommonUser) GetName() string
func (*CommonUser) HasReadAccess ¶
func (self *CommonUser) HasReadAccess(name string) bool
func (*CommonUser) HasWriteAccess ¶
func (self *CommonUser) HasWriteAccess(name string) bool
func (*CommonUser) IsClusterAdmin ¶
func (self *CommonUser) IsClusterAdmin() bool
func (*CommonUser) IsDbAdmin ¶
func (self *CommonUser) IsDbAdmin(db string) bool
func (*CommonUser) IsDeleted ¶
func (self *CommonUser) IsDeleted() bool
type ContinuousQuery ¶
type DbUser ¶
type DbUser struct { CommonUser `json:"common"` Db string `json:"db"` ReadFrom []*Matcher `json:"read_matchers"` WriteTo []*Matcher `json:"write_matchers"` IsAdmin bool `json:"is_admin"` }
func (*DbUser) ChangePermissions ¶
func (*DbUser) GetReadPermission ¶
func (*DbUser) GetWritePermission ¶
func (*DbUser) HasReadAccess ¶
func (*DbUser) HasWriteAccess ¶
type LocalShardDb ¶
type LocalShardStore ¶
type NewShardData ¶
type NilProcessor ¶
type NilProcessor struct{}
func (NilProcessor) Close ¶
func (np NilProcessor) Close() error
func (NilProcessor) Name ¶
func (np NilProcessor) Name() string
func (NilProcessor) Next ¶
func (np NilProcessor) Next() engine.Processor
type QuerySpec ¶
type QuerySpec interface { GetStartTime() time.Time GetEndTime() time.Time Database() string TableNames() []string TableNamesAndRegex() ([]string, *regexp.Regexp) GetGroupByInterval() *time.Duration AllShardsQuery() bool IsRegex() bool }
defined by cluster config (in cluster package)
type ResponseChannel ¶
ResponseChannel is a processor for Responses as opposed to Series like `engine.Processor'
func NewResponseChannelWrapper ¶
func NewResponseChannelWrapper(c chan<- *protocol.Response) ResponseChannel
type ResponseChannelProcessor ¶
type ResponseChannelProcessor struct {
// contains filtered or unexported fields
}
ResponseChannelProcessor converts Series to Responses. This is used to chain `engine.Processor` with a `ResponseChannel'
func NewResponseChannelProcessor ¶
func NewResponseChannelProcessor(r ResponseChannel) *ResponseChannelProcessor
func (*ResponseChannelProcessor) Close ¶
func (p *ResponseChannelProcessor) Close() error
func (*ResponseChannelProcessor) Name ¶
func (p *ResponseChannelProcessor) Name() string
func (*ResponseChannelProcessor) Next ¶
func (p *ResponseChannelProcessor) Next() engine.Processor
type ResponseChannelWrapper ¶
type ResponseChannelWrapper struct {
// contains filtered or unexported fields
}
A `ResponseProcessor' that wraps a go channel.
func (*ResponseChannelWrapper) Name ¶
func (w *ResponseChannelWrapper) Name() string
type SavedConfiguration ¶
type SavedConfiguration struct { Databases map[string]uint8 Admins map[string]*ClusterAdmin DbUsers map[string]map[string]*DbUser Servers []*ClusterServer ContinuousQueries map[string][]*ContinuousQuery MetaStore *metastore.Store LastShardIdUsed uint32 LastServerIdUsed uint32 DatabaseShardSpaces map[string][]*ShardSpace Shards []*NewShardData }
type ServerConnection ¶
type Shard ¶
type Shard interface { Id() uint32 StartTime() time.Time EndTime() time.Time Write(*p.Request) error SyncWrite(req *p.Request, assignSeqNum bool) error Query(querySpec *parser.QuerySpec, response chan<- *p.Response) ReplicationFactor() int IsMicrosecondInRange(t int64) bool }
A shard implements an interface for writing and querying data. It can be copied to multiple servers or the local datastore. Shards contains data from [startTime, endTime] Ids are unique across the cluster
type ShardCollection ¶
type ShardCollection []*ShardData
func (ShardCollection) Len ¶
func (s ShardCollection) Len() int
func (ShardCollection) Swap ¶
func (s ShardCollection) Swap(i, j int)
type ShardCreator ¶
type ShardCreator interface { // the shard creator expects all shards to be of the same type (long term or short term) and have the same // start and end times. This is called to create the shard set for a given duration. CreateShards(shards []*NewShardData) ([]*ShardData, error) CreateShardSpace(shardSpace *ShardSpace) error }
type ShardData ¶
type ShardData struct { IsLocal bool SpaceName string Database string // contains filtered or unexported fields }
func (*ShardData) HandleDestructiveQuery ¶
func (*ShardData) IsMicrosecondInRange ¶
func (*ShardData) LogAndHandleDestructiveQuery ¶
func (*ShardData) QueryResponseBufferSize ¶
func (*ShardData) ReplicationFactor ¶
func (*ShardData) SetLocalStore ¶
func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error
func (*ShardData) SetServers ¶
func (self *ShardData) SetServers(servers []*ClusterServer)
func (*ShardData) ShouldAggregateLocally ¶
Returns true if we can aggregate the data locally per shard, i.e. the group by interval lines up with the shard duration and there are no joins or merges
func (*ShardData) StartMicro ¶
func (*ShardData) ToNewShardData ¶
func (self *ShardData) ToNewShardData() *NewShardData
used to serialize shards when sending around in raft or when snapshotting in the log
type ShardIdInserterProcessor ¶
type ShardIdInserterProcessor struct {
// contains filtered or unexported fields
}
A processor to set the ShardId on the series to `id`
func NewShardIdInserterProcessor ¶
func NewShardIdInserterProcessor(id uint32, next engine.Processor) ShardIdInserterProcessor
func (ShardIdInserterProcessor) Close ¶
func (sip ShardIdInserterProcessor) Close() error
func (ShardIdInserterProcessor) Name ¶
func (sip ShardIdInserterProcessor) Name() string
func (ShardIdInserterProcessor) Next ¶
func (sip ShardIdInserterProcessor) Next() engine.Processor
type ShardSpace ¶
type ShardSpace struct { // required, must be unique within the database Name string `json:"name"` // required, a database has many shard spaces and a shard space belongs to a database Database string `json:"database"` // this is optional, if they don't set it, we'll set to /.*/ Regex string `json:"regex"` // this is optional, if they don't set it, it will default to the storage.dir in the config RetentionPolicy string `json:"retentionPolicy"` ShardDuration string `json:"shardDuration"` ReplicationFactor uint32 `json:"replicationFactor"` Split uint32 `json:"split"` // contains filtered or unexported fields }
func NewShardSpace ¶
func NewShardSpace(database, name string) *ShardSpace
func (*ShardSpace) MatchesSeries ¶
func (s *ShardSpace) MatchesSeries(name string) bool
func (*ShardSpace) ParsedRetentionPeriod ¶
func (s *ShardSpace) ParsedRetentionPeriod() time.Duration
func (*ShardSpace) ParsedShardDuration ¶
func (s *ShardSpace) ParsedShardDuration() time.Duration
func (*ShardSpace) SecondsOfDuration ¶
func (s *ShardSpace) SecondsOfDuration() float64
func (*ShardSpace) SetDefaults ¶
func (s *ShardSpace) SetDefaults()
func (*ShardSpace) UpdateFromSpace ¶
func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error
func (*ShardSpace) Validate ¶
func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error
type WAL ¶
type WAL interface { AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error) AssignSequenceNumbers(request *protocol.Request) error Commit(requestNumber uint32, serverId uint32) error CreateCheckpoint() error RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error }
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
Acts as a buffer for writes
func NewWriteBuffer ¶
func (*WriteBuffer) HasUncommitedWrites ¶
func (self *WriteBuffer) HasUncommitedWrites() bool
func (*WriteBuffer) ShardsRequestNumber ¶
func (self *WriteBuffer) ShardsRequestNumber() map[uint32]uint32
func (*WriteBuffer) Write ¶
func (self *WriteBuffer) Write(request *protocol.Request)
This method never blocks. It'll buffer writes until they fill the buffer then drop the on the floor and let the background goroutine replay from the WAL