Documentation ¶
Index ¶
- Constants
- Variables
- func HashDbAndSeriesToInt(database, series string) int
- func HashPassword(password string) ([]byte, error)
- func SortShardsByTimeAscending(shards []*ShardData)
- func SortShardsByTimeDescending(shards []*ShardData)
- type ByShardTimeAsc
- type ByShardTimeDesc
- type ClusterAdmin
- func (self *ClusterAdmin) GetReadPermission() string
- func (self *ClusterAdmin) GetWritePermission() string
- func (self *ClusterAdmin) HasReadAccess(_ string) bool
- func (self *ClusterAdmin) HasWriteAccess(_ string) bool
- func (self *ClusterAdmin) IsClusterAdmin() bool
- func (self *ClusterAdmin) IsDbAdmin(_ string) bool
- type ClusterConfiguration
- func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
- func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error
- func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)
- func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)
- func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)
- func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
- func (self *ClusterConfiguration) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error
- func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *ClusterServer)
- func (self *ClusterConfiguration) CreateCheckpoint() error
- func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
- func (self *ClusterConfiguration) CreateDatabase(name string) error
- func (self *ClusterConfiguration) DatabaseExists(name string) bool
- func (self *ClusterConfiguration) DatabasesExists(db string) bool
- func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
- func (self *ClusterConfiguration) DropDatabase(name string) error
- func (self *ClusterConfiguration) DropSeries(database, series string) error
- func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error
- func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin
- func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
- func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
- func (self *ClusterConfiguration) GetDatabases() []*Database
- func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser
- func (self *ClusterConfiguration) GetDbUsers(db string) []common.User
- func (self *ClusterConfiguration) GetExpiredShards() []*ShardData
- func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration
- func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData
- func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
- func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
- func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
- func (self *ClusterConfiguration) GetShard(id uint32) *ShardData
- func (self *ClusterConfiguration) GetShardSpaces() []*ShardSpace
- func (self *ClusterConfiguration) GetShardSpacesForDatabase(database string) []*ShardSpace
- func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)
- func (self *ClusterConfiguration) GetShards() []*ShardData
- func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) (Shards, error)
- func (self *ClusterConfiguration) HasContinuousQueries() bool
- func (self *ClusterConfiguration) HasUncommitedWrites() bool
- func (self *ClusterConfiguration) IsSingleServer() bool
- func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time
- func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)
- func (self *ClusterConfiguration) RecoverFromWAL() error
- func (self *ClusterConfiguration) Recovery(b []byte) error
- func (self *ClusterConfiguration) RemoveServer(server *ClusterServer) error
- func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error
- func (self *ClusterConfiguration) Save() ([]byte, error)
- func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)
- func (self *ClusterConfiguration) SaveDbUser(u *DbUser)
- func (self *ClusterConfiguration) SerializableConfiguration() *SavedConfiguration
- func (self *ClusterConfiguration) ServerId() uint32
- func (self *ClusterConfiguration) Servers() []*ClusterServer
- func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
- func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)
- func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)
- func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool
- func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error
- func (self *ClusterConfiguration) WaitForLocalServerLoaded()
- type ClusterServer
- func (self *ClusterServer) BufferWrite(request *protocol.Request)
- func (self *ClusterServer) Connect()
- func (self *ClusterServer) GetId() uint32
- func (self *ClusterServer) GetStateName() (stateName string)
- func (self *ClusterServer) IsUp() bool
- func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response)
- func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)
- func (self *ClusterServer) StartHeartbeat()
- func (self *ClusterServer) Write(request *protocol.Request) error
- type CommonUser
- func (self *CommonUser) ChangePassword(hash string) error
- func (self *CommonUser) GetDb() string
- func (self *CommonUser) GetName() string
- func (self *CommonUser) HasReadAccess(name string) bool
- func (self *CommonUser) HasWriteAccess(name string) bool
- func (self *CommonUser) IsClusterAdmin() bool
- func (self *CommonUser) IsDbAdmin(db string) bool
- func (self *CommonUser) IsDeleted() bool
- type ContinuousQuery
- type Database
- type DbUser
- func (self *DbUser) ChangePermissions(readPermissions, writePermissions string)
- func (self *DbUser) GetDb() string
- func (self *DbUser) GetReadPermission() string
- func (self *DbUser) GetWritePermission() string
- func (self *DbUser) HasReadAccess(name string) bool
- func (self *DbUser) HasWriteAccess(name string) bool
- func (self *DbUser) IsDbAdmin(db string) bool
- type LocalShardDb
- type LocalShardStore
- type Matcher
- type NewShardData
- type NilProcessor
- type QuerySpec
- type ResponseChannel
- type ResponseChannelProcessor
- type ResponseChannelWrapper
- type SavedConfiguration
- type ServerConnection
- type ServerState
- type Shard
- type ShardCollection
- type ShardCreator
- type ShardData
- func (self *ShardData) DropFields(fields []*metastore.Field) error
- func (self *ShardData) EndMicro() int64
- func (self *ShardData) EndTime() time.Time
- func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, ...)
- func (self *ShardData) Id() uint32
- func (self *ShardData) IsMicrosecondInRange(t int64) bool
- func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, ...)
- func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Response)
- func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int
- func (self *ShardData) ReplicationFactor() int
- func (self *ShardData) ServerIds() []uint32
- func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error
- func (self *ShardData) SetServers(servers []*ClusterServer)
- func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool
- func (self *ShardData) StartMicro() int64
- func (self *ShardData) StartTime() time.Time
- func (self *ShardData) String() string
- func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error
- func (self *ShardData) ToNewShardData() *NewShardData
- func (self *ShardData) Write(request *p.Request) error
- func (self *ShardData) WriteLocalOnly(request *p.Request) error
- type ShardIdInserterProcessor
- type ShardSpace
- func (s *ShardSpace) MatchesSeries(name string) bool
- func (s *ShardSpace) ParsedRetentionPeriod() time.Duration
- func (s *ShardSpace) ParsedShardDuration() time.Duration
- func (s *ShardSpace) SecondsOfDuration() float64
- func (s *ShardSpace) SetDefaults()
- func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error
- func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error
- type ShardType
- type Shards
- type WAL
- type WriteBuffer
- type Writer
Constants ¶
const ( PER_SERVER_BUFFER_SIZE = 10 LOCAL_WRITE_BUFFER_SIZE = 10 )
const ( SEVEN_DAYS = time.Hour * 24 * 7 DEFAULT_SPLIT = 1 DEFAULT_REPLICATION_FACTOR = 1 DEFAULT_SHARD_DURATION = SEVEN_DAYS DEFAULT_RETENTION_POLICY_DURATION = 0 )
const (
DEFAULT_SHARD_SPACE_NAME = "default"
)
const (
HEARTBEAT_TIMEOUT = 100 * time.Millisecond
)
Variables ¶
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT
Functions ¶
func HashDbAndSeriesToInt ¶
func HashPassword ¶
func SortShardsByTimeAscending ¶
func SortShardsByTimeAscending(shards []*ShardData)
func SortShardsByTimeDescending ¶
func SortShardsByTimeDescending(shards []*ShardData)
Types ¶
type ByShardTimeAsc ¶
type ByShardTimeAsc struct{ ShardCollection }
func (ByShardTimeAsc) Less ¶
func (s ByShardTimeAsc) Less(i, j int) bool
type ByShardTimeDesc ¶
type ByShardTimeDesc struct{ ShardCollection }
func (ByShardTimeDesc) Less ¶
func (s ByShardTimeDesc) Less(i, j int) bool
type ClusterAdmin ¶
type ClusterAdmin struct {
CommonUser `json:"common"`
}
func (*ClusterAdmin) GetReadPermission ¶
func (self *ClusterAdmin) GetReadPermission() string
func (*ClusterAdmin) GetWritePermission ¶
func (self *ClusterAdmin) GetWritePermission() string
func (*ClusterAdmin) HasReadAccess ¶
func (self *ClusterAdmin) HasReadAccess(_ string) bool
func (*ClusterAdmin) HasWriteAccess ¶
func (self *ClusterAdmin) HasWriteAccess(_ string) bool
func (*ClusterAdmin) IsClusterAdmin ¶
func (self *ClusterAdmin) IsClusterAdmin() bool
func (*ClusterAdmin) IsDbAdmin ¶
func (self *ClusterAdmin) IsDbAdmin(_ string) bool
type ClusterConfiguration ¶
type ClusterConfiguration struct { DatabaseReplicationFactors map[string]struct{} ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery LocalServer *ClusterServer LocalRaftName string MetaStore *metastore.Store // contains filtered or unexported fields }
This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.
func NewClusterConfiguration ¶
func NewClusterConfiguration( config *configuration.Configuration, wal WAL, shardStore LocalShardStore, connectionCreator func(string) ServerConnection, metaStore *metastore.Store) *ClusterConfiguration
func (*ClusterConfiguration) AddPotentialServer ¶
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
func (*ClusterConfiguration) AddShardSpace ¶
func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error
func (*ClusterConfiguration) AddShards ¶
func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)
Add shards expects all shards to be of the same type (long term or short term) and have the same start and end times. This is called to add the shard set for a given duration. If existing shards have the same times, those are returned.
func (*ClusterConfiguration) AuthenticateClusterAdmin ¶
func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)
func (*ClusterConfiguration) AuthenticateDbUser ¶
func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)
func (*ClusterConfiguration) ChangeDbUserPassword ¶
func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
func (*ClusterConfiguration) ChangeDbUserPermissions ¶
func (self *ClusterConfiguration) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error
func (*ClusterConfiguration) ChangeProtobufConnectionString ¶
func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *ClusterServer)
func (*ClusterConfiguration) CreateCheckpoint ¶
func (self *ClusterConfiguration) CreateCheckpoint() error
func (*ClusterConfiguration) CreateContinuousQuery ¶
func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
func (*ClusterConfiguration) CreateDatabase ¶
func (self *ClusterConfiguration) CreateDatabase(name string) error
func (*ClusterConfiguration) DatabaseExists ¶
func (self *ClusterConfiguration) DatabaseExists(name string) bool
func (*ClusterConfiguration) DatabasesExists ¶
func (self *ClusterConfiguration) DatabasesExists(db string) bool
func (*ClusterConfiguration) DeleteContinuousQuery ¶
func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
func (*ClusterConfiguration) DropDatabase ¶
func (self *ClusterConfiguration) DropDatabase(name string) error
func (*ClusterConfiguration) DropSeries ¶
func (self *ClusterConfiguration) DropSeries(database, series string) error
func (*ClusterConfiguration) DropShard ¶
func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error
func (*ClusterConfiguration) GetClusterAdmin ¶
func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin
func (*ClusterConfiguration) GetClusterAdmins ¶
func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
func (*ClusterConfiguration) GetContinuousQueries ¶
func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
func (*ClusterConfiguration) GetDatabases ¶
func (self *ClusterConfiguration) GetDatabases() []*Database
func (*ClusterConfiguration) GetDbUser ¶
func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser
func (*ClusterConfiguration) GetDbUsers ¶
func (self *ClusterConfiguration) GetDbUsers(db string) []common.User
func (*ClusterConfiguration) GetExpiredShards ¶
func (self *ClusterConfiguration) GetExpiredShards() []*ShardData
func (*ClusterConfiguration) GetLocalConfiguration ¶
func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration
func (*ClusterConfiguration) GetLocalShardById ¶
func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData
This function is for the request handler to get the shard to write a request to locally.
func (*ClusterConfiguration) GetServerById ¶
func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
func (*ClusterConfiguration) GetServerByProtobufConnectionString ¶
func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
func (*ClusterConfiguration) GetServerByRaftName ¶
func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
func (*ClusterConfiguration) GetShard ¶
func (self *ClusterConfiguration) GetShard(id uint32) *ShardData
func (*ClusterConfiguration) GetShardSpaces ¶
func (self *ClusterConfiguration) GetShardSpaces() []*ShardSpace
func (*ClusterConfiguration) GetShardSpacesForDatabase ¶ added in v0.8.4
func (self *ClusterConfiguration) GetShardSpacesForDatabase(database string) []*ShardSpace
func (*ClusterConfiguration) GetShardToWriteToBySeriesAndTime ¶
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)
func (*ClusterConfiguration) GetShards ¶
func (self *ClusterConfiguration) GetShards() []*ShardData
func (*ClusterConfiguration) GetShardsForQuery ¶
func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) (Shards, error)
func (*ClusterConfiguration) HasContinuousQueries ¶
func (self *ClusterConfiguration) HasContinuousQueries() bool
func (*ClusterConfiguration) HasUncommitedWrites ¶
func (self *ClusterConfiguration) HasUncommitedWrites() bool
Return per shard request numbers for the local server and all remote servers
func (*ClusterConfiguration) IsSingleServer ¶
func (self *ClusterConfiguration) IsSingleServer() bool
func (*ClusterConfiguration) LastContinuousQueryRunTime ¶
func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time
func (*ClusterConfiguration) MarshalNewShardArrayToShards ¶
func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)
func (*ClusterConfiguration) RecoverFromWAL ¶
func (self *ClusterConfiguration) RecoverFromWAL() error
func (*ClusterConfiguration) Recovery ¶
func (self *ClusterConfiguration) Recovery(b []byte) error
func (*ClusterConfiguration) RemoveServer ¶
func (self *ClusterConfiguration) RemoveServer(server *ClusterServer) error
func (*ClusterConfiguration) RemoveShardSpace ¶
func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error
func (*ClusterConfiguration) Save ¶
func (self *ClusterConfiguration) Save() ([]byte, error)
func (*ClusterConfiguration) SaveClusterAdmin ¶
func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)
func (*ClusterConfiguration) SaveDbUser ¶
func (self *ClusterConfiguration) SaveDbUser(u *DbUser)
func (*ClusterConfiguration) SerializableConfiguration ¶ added in v0.8.3
func (self *ClusterConfiguration) SerializableConfiguration() *SavedConfiguration
func (*ClusterConfiguration) ServerId ¶
func (self *ClusterConfiguration) ServerId() uint32
func (*ClusterConfiguration) Servers ¶
func (self *ClusterConfiguration) Servers() []*ClusterServer
func (*ClusterConfiguration) SetContinuousQueryTimestamp ¶
func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
func (*ClusterConfiguration) SetLastContinuousQueryRunTime ¶
func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)
func (*ClusterConfiguration) SetShardCreator ¶
func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)
func (*ClusterConfiguration) ShardSpaceExists ¶ added in v0.8.3
func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool
func (*ClusterConfiguration) UpdateShardSpace ¶ added in v0.8.3
func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error
func (*ClusterConfiguration) WaitForLocalServerLoaded ¶
func (self *ClusterConfiguration) WaitForLocalServerLoaded()
This function will wait until the configuration has received an addPotentialServer command for this local server.
type ClusterServer ¶
type ClusterServer struct { Id uint32 RaftName string State ServerState RaftConnectionString string ProtobufConnectionString string HeartbeatInterval time.Duration Backoff time.Duration MinBackoff time.Duration MaxBackoff time.Duration // contains filtered or unexported fields }
func NewClusterServer ¶
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, config *c.Configuration) *ClusterServer
func (*ClusterServer) BufferWrite ¶
func (self *ClusterServer) BufferWrite(request *protocol.Request)
func (*ClusterServer) Connect ¶
func (self *ClusterServer) Connect()
func (*ClusterServer) GetId ¶
func (self *ClusterServer) GetId() uint32
func (*ClusterServer) GetStateName ¶
func (self *ClusterServer) GetStateName() (stateName string)
func (*ClusterServer) IsUp ¶
func (self *ClusterServer) IsUp() bool
func (*ClusterServer) MakeRequest ¶
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response)
func (*ClusterServer) SetWriteBuffer ¶
func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)
func (*ClusterServer) StartHeartbeat ¶
func (self *ClusterServer) StartHeartbeat()
type CommonUser ¶
type CommonUser struct { Name string `json:"name"` Hash string `json:"hash"` IsUserDeleted bool `json:"is_deleted"` CacheKey string `json:"cache_key"` }
func (*CommonUser) ChangePassword ¶
func (self *CommonUser) ChangePassword(hash string) error
func (*CommonUser) GetDb ¶
func (self *CommonUser) GetDb() string
func (*CommonUser) GetName ¶
func (self *CommonUser) GetName() string
func (*CommonUser) HasReadAccess ¶
func (self *CommonUser) HasReadAccess(name string) bool
func (*CommonUser) HasWriteAccess ¶
func (self *CommonUser) HasWriteAccess(name string) bool
func (*CommonUser) IsClusterAdmin ¶
func (self *CommonUser) IsClusterAdmin() bool
func (*CommonUser) IsDbAdmin ¶
func (self *CommonUser) IsDbAdmin(db string) bool
func (*CommonUser) IsDeleted ¶
func (self *CommonUser) IsDeleted() bool
type ContinuousQuery ¶
type DbUser ¶
type DbUser struct { CommonUser `json:"common"` Db string `json:"db"` ReadFrom []*Matcher `json:"read_matchers"` WriteTo []*Matcher `json:"write_matchers"` IsAdmin bool `json:"is_admin"` }
func (*DbUser) ChangePermissions ¶
func (*DbUser) GetReadPermission ¶
func (*DbUser) GetWritePermission ¶
func (*DbUser) HasReadAccess ¶
func (*DbUser) HasWriteAccess ¶
type LocalShardDb ¶
type LocalShardStore ¶
type NewShardData ¶
type NilProcessor ¶ added in v0.8.4
type NilProcessor struct{}
func (NilProcessor) Close ¶ added in v0.8.4
func (np NilProcessor) Close() error
func (NilProcessor) Name ¶ added in v0.8.4
func (np NilProcessor) Name() string
type QuerySpec ¶
type QuerySpec interface { GetStartTime() time.Time GetEndTime() time.Time Database() string TableNames() []string TableNamesAndRegex() ([]string, *regexp.Regexp) GetGroupByInterval() *time.Duration AllShardsQuery() bool IsRegex() bool }
defined by cluster config (in cluster package)
type ResponseChannel ¶ added in v0.8.4
ResponseChannel is a processor for Responses as opposed to Series like `engine.Processor'
func NewResponseChannelWrapper ¶ added in v0.8.4
func NewResponseChannelWrapper(c chan<- *protocol.Response) ResponseChannel
type ResponseChannelProcessor ¶ added in v0.8.4
type ResponseChannelProcessor struct {
// contains filtered or unexported fields
}
ResponseChannelProcessor converts Series to Responses. This is used to chain `engine.Processor` with a `ResponseChannel'
func NewResponseChannelProcessor ¶ added in v0.8.4
func NewResponseChannelProcessor(r ResponseChannel) *ResponseChannelProcessor
func (*ResponseChannelProcessor) Close ¶ added in v0.8.4
func (p *ResponseChannelProcessor) Close() error
func (*ResponseChannelProcessor) Name ¶ added in v0.8.4
func (p *ResponseChannelProcessor) Name() string
type ResponseChannelWrapper ¶ added in v0.8.4
type ResponseChannelWrapper struct {
// contains filtered or unexported fields
}
A `ResponseProcessor' that wraps a go channel.
func (*ResponseChannelWrapper) Name ¶ added in v0.8.4
func (w *ResponseChannelWrapper) Name() string
type SavedConfiguration ¶
type SavedConfiguration struct { Databases map[string]uint8 Admins map[string]*ClusterAdmin DbUsers map[string]map[string]*DbUser Servers []*ClusterServer ContinuousQueries map[string][]*ContinuousQuery MetaStore *metastore.Store LastShardIdUsed uint32 LastServerIdUsed uint32 DatabaseShardSpaces map[string][]*ShardSpace Shards []*NewShardData }
type ServerConnection ¶
type ServerState ¶
type ServerState int
const ( LoadingRingData ServerState = iota SendingRingData DeletingOldData Running Potential )
type Shard ¶
type Shard interface { Id() uint32 StartTime() time.Time EndTime() time.Time Write(*p.Request) error SyncWrite(req *p.Request, assignSeqNum bool) error Query(querySpec *parser.QuerySpec, response chan<- *p.Response) ReplicationFactor() int IsMicrosecondInRange(t int64) bool }
A shard implements an interface for writing and querying data. It can be copied to multiple servers or the local datastore. Shards contains data from [startTime, endTime] Ids are unique across the cluster
type ShardCollection ¶
type ShardCollection []*ShardData
func (ShardCollection) Len ¶
func (s ShardCollection) Len() int
func (ShardCollection) Swap ¶
func (s ShardCollection) Swap(i, j int)
type ShardCreator ¶
type ShardCreator interface { // the shard creator expects all shards to be of the same type (long term or short term) and have the same // start and end times. This is called to create the shard set for a given duration. CreateShards(shards []*NewShardData) ([]*ShardData, error) CreateShardSpace(shardSpace *ShardSpace) error }
type ShardData ¶
type ShardData struct { IsLocal bool SpaceName string Database string // contains filtered or unexported fields }
func (*ShardData) HandleDestructiveQuery ¶
func (*ShardData) IsMicrosecondInRange ¶
func (*ShardData) LogAndHandleDestructiveQuery ¶
func (*ShardData) QueryResponseBufferSize ¶
func (*ShardData) ReplicationFactor ¶
func (*ShardData) SetLocalStore ¶
func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error
func (*ShardData) SetServers ¶
func (self *ShardData) SetServers(servers []*ClusterServer)
func (*ShardData) ShouldAggregateLocally ¶
Returns true if we can aggregate the data locally per shard, i.e. the group by interval lines up with the shard duration and there are no joins or merges
func (*ShardData) StartMicro ¶
func (*ShardData) ToNewShardData ¶
func (self *ShardData) ToNewShardData() *NewShardData
used to serialize shards when sending around in raft or when snapshotting in the log
type ShardIdInserterProcessor ¶ added in v0.8.4
type ShardIdInserterProcessor struct {
// contains filtered or unexported fields
}
A processor to set the ShardId on the series to `id`
func NewShardIdInserterProcessor ¶ added in v0.8.4
func NewShardIdInserterProcessor(id uint32, next engine.Processor) ShardIdInserterProcessor
func (ShardIdInserterProcessor) Close ¶ added in v0.8.4
func (sip ShardIdInserterProcessor) Close() error
func (ShardIdInserterProcessor) Name ¶ added in v0.8.4
func (sip ShardIdInserterProcessor) Name() string
type ShardSpace ¶
type ShardSpace struct { // required, must be unique within the database Name string `json:"name"` // required, a database has many shard spaces and a shard space belongs to a database Database string `json:"database"` // this is optional, if they don't set it, we'll set to /.*/ Regex string `json:"regex"` // this is optional, if they don't set it, it will default to the storage.dir in the config RetentionPolicy string `json:"retentionPolicy"` ShardDuration string `json:"shardDuration"` ReplicationFactor uint32 `json:"replicationFactor"` Split uint32 `json:"split"` // contains filtered or unexported fields }
func NewShardSpace ¶
func NewShardSpace(database, name string) *ShardSpace
func (*ShardSpace) MatchesSeries ¶
func (s *ShardSpace) MatchesSeries(name string) bool
func (*ShardSpace) ParsedRetentionPeriod ¶
func (s *ShardSpace) ParsedRetentionPeriod() time.Duration
func (*ShardSpace) ParsedShardDuration ¶
func (s *ShardSpace) ParsedShardDuration() time.Duration
func (*ShardSpace) SecondsOfDuration ¶
func (s *ShardSpace) SecondsOfDuration() float64
func (*ShardSpace) SetDefaults ¶
func (s *ShardSpace) SetDefaults()
func (*ShardSpace) UpdateFromSpace ¶ added in v0.8.3
func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error
func (*ShardSpace) Validate ¶
func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error
type WAL ¶
type WAL interface { AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error) AssignSequenceNumbers(request *protocol.Request) error Commit(requestNumber uint32, serverId uint32) error CreateCheckpoint() error RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error }
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
Acts as a buffer for writes
func NewWriteBuffer ¶
func (*WriteBuffer) HasUncommitedWrites ¶
func (self *WriteBuffer) HasUncommitedWrites() bool
func (*WriteBuffer) ShardsRequestNumber ¶
func (self *WriteBuffer) ShardsRequestNumber() map[uint32]uint32
func (*WriteBuffer) Write ¶
func (self *WriteBuffer) Write(request *protocol.Request)
This method never blocks. It'll buffer writes until they fill the buffer then drop the on the floor and let the background goroutine replay from the WAL