Documentation ¶
Index ¶
- Constants
- Variables
- func HashPassword(password string) ([]byte, error)
- func SortShardsByTimeAscending(shards []*ShardData)
- func SortShardsByTimeDescending(shards []*ShardData)
- type ByShardTimeAsc
- type ByShardTimeDesc
- type ClusterAdmin
- type ClusterConfiguration
- func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
- func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)
- func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)
- func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)
- func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
- func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
- func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error
- func (self *ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes()
- func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
- func (self *ClusterConfiguration) DropDatabase(name string) error
- func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error
- func (self *ClusterConfiguration) GetAllShards() []*ShardData
- func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin
- func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
- func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
- func (self *ClusterConfiguration) GetDatabases() []*Database
- func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser
- func (self *ClusterConfiguration) GetDbUsers(db string) []common.User
- func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData
- func (self *ClusterConfiguration) GetLongTermShards() []*ShardData
- func (self *ClusterConfiguration) GetMapForJsonSerialization() map[string]interface{}
- func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
- func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
- func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
- func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)
- func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*ShardData
- func (self *ClusterConfiguration) GetShortTermShards() []*ShardData
- func (self *ClusterConfiguration) HasContinuousQueries() bool
- func (self *ClusterConfiguration) HashDbAndSeriesToInt(database, series string) int
- func (self *ClusterConfiguration) IsSingleServer() bool
- func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time
- func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)
- func (self *ClusterConfiguration) RecoverFromWAL() error
- func (self *ClusterConfiguration) Recovery(b []byte) error
- func (self *ClusterConfiguration) Save() ([]byte, error)
- func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)
- func (self *ClusterConfiguration) SaveDbUser(u *DbUser)
- func (self *ClusterConfiguration) ServerId() uint32
- func (self *ClusterConfiguration) Servers() []*ClusterServer
- func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
- func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)
- func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)
- func (self *ClusterConfiguration) WaitForLocalServerLoaded()
- type ClusterServer
- func (self *ClusterServer) BufferWrite(request *protocol.Request)
- func (self *ClusterServer) Connect()
- func (self *ClusterServer) GetId() uint32
- func (self *ClusterServer) IsUp() bool
- func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response)
- func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)
- func (self *ClusterServer) StartHeartbeat()
- func (self *ClusterServer) Write(request *protocol.Request) error
- type CommonUser
- func (self *CommonUser) ChangePassword(hash string) error
- func (self *CommonUser) GetDb() string
- func (self *CommonUser) GetName() string
- func (self *CommonUser) HasReadAccess(name string) bool
- func (self *CommonUser) HasWriteAccess(name string) bool
- func (self *CommonUser) IsClusterAdmin() bool
- func (self *CommonUser) IsDbAdmin(db string) bool
- func (self *CommonUser) IsDeleted() bool
- type ContinuousQuery
- type Database
- type DbUser
- type LocalShardDb
- type LocalShardStore
- type Matcher
- type NewShardData
- type QueryProcessor
- type QuerySpec
- type SavedConfiguration
- type ServerConnection
- type ServerState
- type Shard
- type ShardCollection
- type ShardCreator
- type ShardData
- func (self *ShardData) DropDatabase(database string, sendToServers bool)
- func (self *ShardData) EndMicro() int64
- func (self *ShardData) EndTime() time.Time
- func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, ...)
- func (self *ShardData) Id() uint32
- func (self *ShardData) IsMicrosecondInRange(t int64) bool
- func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, ...)
- func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Response)
- func (self *ShardData) ServerIds() []uint32
- func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error
- func (self *ShardData) SetServers(servers []*ClusterServer)
- func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool
- func (self *ShardData) StartMicro() int64
- func (self *ShardData) StartTime() time.Time
- func (self *ShardData) String() string
- func (self *ShardData) ToNewShardData() *NewShardData
- func (self *ShardData) Write(request *p.Request) error
- func (self *ShardData) WriteLocalOnly(request *p.Request) error
- type ShardType
- type WAL
- type WriteBuffer
- type Writer
Constants ¶
const ( DEFAULT_BACKOFF = time.Second MAX_BACKOFF = 10 * time.Second HEARTBEAT_TIMEOUT = 100 * time.Millisecond )
const ( PER_SERVER_BUFFER_SIZE = 10 LOCAL_WRITE_BUFFER_SIZE = 10 )
const (
FIRST_LOWER_CASE_CHARACTER = uint8('a')
)
Variables ¶
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT
Functions ¶
func HashPassword ¶
func SortShardsByTimeAscending ¶
func SortShardsByTimeAscending(shards []*ShardData)
func SortShardsByTimeDescending ¶
func SortShardsByTimeDescending(shards []*ShardData)
Types ¶
type ByShardTimeAsc ¶
type ByShardTimeAsc struct{ ShardCollection }
func (ByShardTimeAsc) Less ¶
func (s ByShardTimeAsc) Less(i, j int) bool
type ByShardTimeDesc ¶
type ByShardTimeDesc struct{ ShardCollection }
func (ByShardTimeDesc) Less ¶
func (s ByShardTimeDesc) Less(i, j int) bool
type ClusterAdmin ¶
type ClusterAdmin struct {
CommonUser `json:"common"`
}
func (*ClusterAdmin) HasReadAccess ¶
func (self *ClusterAdmin) HasReadAccess(_ string) bool
func (*ClusterAdmin) HasWriteAccess ¶
func (self *ClusterAdmin) HasWriteAccess(_ string) bool
func (*ClusterAdmin) IsClusterAdmin ¶
func (self *ClusterAdmin) IsClusterAdmin() bool
type ClusterConfiguration ¶
type ClusterConfiguration struct { DatabaseReplicationFactors map[string]uint8 ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery LocalServerId uint32 LocalRaftName string // contains filtered or unexported fields }
This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.
func NewClusterConfiguration ¶
func NewClusterConfiguration( config *configuration.Configuration, wal WAL, shardStore LocalShardStore, connectionCreator func(string) ServerConnection) *ClusterConfiguration
func (*ClusterConfiguration) AddPotentialServer ¶
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
func (*ClusterConfiguration) AddShards ¶
func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)
Add shards expects all shards to be of the same type (long term or short term) and have the same start and end times. This is called to add the shard set for a given duration. If existing shards have the same times, those are returned.
func (*ClusterConfiguration) AuthenticateClusterAdmin ¶
func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)
func (*ClusterConfiguration) AuthenticateDbUser ¶
func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)
func (*ClusterConfiguration) ChangeDbUserPassword ¶
func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
func (*ClusterConfiguration) CreateContinuousQuery ¶
func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
func (*ClusterConfiguration) CreateDatabase ¶
func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error
func (*ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes ¶
func (self *ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes()
called by the server, this will wake up every 10 mintues to see if it should create a shard for the next window of time. This way shards get created before a bunch of writes stream in and try to create it all at the same time.
func (*ClusterConfiguration) DeleteContinuousQuery ¶
func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
func (*ClusterConfiguration) DropDatabase ¶
func (self *ClusterConfiguration) DropDatabase(name string) error
func (*ClusterConfiguration) DropShard ¶
func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error
func (*ClusterConfiguration) GetAllShards ¶
func (self *ClusterConfiguration) GetAllShards() []*ShardData
func (*ClusterConfiguration) GetClusterAdmin ¶
func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin
func (*ClusterConfiguration) GetClusterAdmins ¶
func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
func (*ClusterConfiguration) GetContinuousQueries ¶
func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
func (*ClusterConfiguration) GetDatabases ¶
func (self *ClusterConfiguration) GetDatabases() []*Database
func (*ClusterConfiguration) GetDbUser ¶
func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser
func (*ClusterConfiguration) GetDbUsers ¶
func (self *ClusterConfiguration) GetDbUsers(db string) []common.User
func (*ClusterConfiguration) GetLocalShardById ¶
func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData
This function is for the request handler to get the shard to write a request to locally.
func (*ClusterConfiguration) GetLongTermShards ¶
func (self *ClusterConfiguration) GetLongTermShards() []*ShardData
func (*ClusterConfiguration) GetMapForJsonSerialization ¶
func (self *ClusterConfiguration) GetMapForJsonSerialization() map[string]interface{}
func (*ClusterConfiguration) GetServerById ¶
func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
func (*ClusterConfiguration) GetServerByProtobufConnectionString ¶
func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
func (*ClusterConfiguration) GetServerByRaftName ¶
func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
func (*ClusterConfiguration) GetShardToWriteToBySeriesAndTime ¶
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)
func (*ClusterConfiguration) GetShards ¶
func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*ShardData
func (*ClusterConfiguration) GetShortTermShards ¶
func (self *ClusterConfiguration) GetShortTermShards() []*ShardData
func (*ClusterConfiguration) HasContinuousQueries ¶
func (self *ClusterConfiguration) HasContinuousQueries() bool
func (*ClusterConfiguration) HashDbAndSeriesToInt ¶
func (self *ClusterConfiguration) HashDbAndSeriesToInt(database, series string) int
func (*ClusterConfiguration) IsSingleServer ¶
func (self *ClusterConfiguration) IsSingleServer() bool
func (*ClusterConfiguration) LastContinuousQueryRunTime ¶
func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time
func (*ClusterConfiguration) MarshalNewShardArrayToShards ¶
func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)
func (*ClusterConfiguration) RecoverFromWAL ¶
func (self *ClusterConfiguration) RecoverFromWAL() error
func (*ClusterConfiguration) Recovery ¶
func (self *ClusterConfiguration) Recovery(b []byte) error
func (*ClusterConfiguration) Save ¶
func (self *ClusterConfiguration) Save() ([]byte, error)
func (*ClusterConfiguration) SaveClusterAdmin ¶
func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)
func (*ClusterConfiguration) SaveDbUser ¶
func (self *ClusterConfiguration) SaveDbUser(u *DbUser)
func (*ClusterConfiguration) ServerId ¶
func (self *ClusterConfiguration) ServerId() uint32
func (*ClusterConfiguration) Servers ¶
func (self *ClusterConfiguration) Servers() []*ClusterServer
func (*ClusterConfiguration) SetContinuousQueryTimestamp ¶
func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
func (*ClusterConfiguration) SetLastContinuousQueryRunTime ¶
func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)
func (*ClusterConfiguration) SetShardCreator ¶
func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)
func (*ClusterConfiguration) WaitForLocalServerLoaded ¶
func (self *ClusterConfiguration) WaitForLocalServerLoaded()
This function will wait until the configuration has received an addPotentialServer command for this local server.
type ClusterServer ¶
type ClusterServer struct { Id uint32 RaftName string State ServerState RaftConnectionString string ProtobufConnectionString string HeartbeatInterval time.Duration Backoff time.Duration // contains filtered or unexported fields }
func NewClusterServer ¶
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, heartbeatInterval time.Duration) *ClusterServer
func (*ClusterServer) BufferWrite ¶
func (self *ClusterServer) BufferWrite(request *protocol.Request)
func (*ClusterServer) Connect ¶
func (self *ClusterServer) Connect()
func (*ClusterServer) GetId ¶
func (self *ClusterServer) GetId() uint32
func (*ClusterServer) IsUp ¶
func (self *ClusterServer) IsUp() bool
func (*ClusterServer) MakeRequest ¶
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response)
func (*ClusterServer) SetWriteBuffer ¶
func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)
func (*ClusterServer) StartHeartbeat ¶
func (self *ClusterServer) StartHeartbeat()
type CommonUser ¶
type CommonUser struct { Name string `json:"name"` Hash string `json:"hash"` IsUserDeleted bool `json:"is_deleted"` CacheKey string `json:"cache_key"` }
func (*CommonUser) ChangePassword ¶
func (self *CommonUser) ChangePassword(hash string) error
func (*CommonUser) GetDb ¶
func (self *CommonUser) GetDb() string
func (*CommonUser) GetName ¶
func (self *CommonUser) GetName() string
func (*CommonUser) HasReadAccess ¶
func (self *CommonUser) HasReadAccess(name string) bool
func (*CommonUser) HasWriteAccess ¶
func (self *CommonUser) HasWriteAccess(name string) bool
func (*CommonUser) IsClusterAdmin ¶
func (self *CommonUser) IsClusterAdmin() bool
func (*CommonUser) IsDbAdmin ¶
func (self *CommonUser) IsDbAdmin(db string) bool
func (*CommonUser) IsDeleted ¶
func (self *CommonUser) IsDeleted() bool
type ContinuousQuery ¶
type DbUser ¶
type DbUser struct { CommonUser `json:"common"` Db string `json:"db"` WriteTo []*Matcher `json:"write_matchers"` ReadFrom []*Matcher `json:"read_matchers"` IsAdmin bool `json:"is_admin"` }
func (*DbUser) HasReadAccess ¶
func (*DbUser) HasWriteAccess ¶
type LocalShardDb ¶
type LocalShardStore ¶
type NewShardData ¶
type QueryProcessor ¶
type QueryProcessor interface { // This method returns true if the query should continue. If the query should be stopped, // like maybe the limit was hit, it should return false YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool YieldSeries(seriesIncoming *p.Series) bool Close() // Set by the shard, so EXPLAIN query can know query against which shard is being measured SetShardInfo(shardId int, shardLocal bool) // Let QueryProcessor identify itself. What if it is a spy and we can't check that? GetName() string }
Passed to a shard (local datastore or whatever) that gets yielded points from series.
type QuerySpec ¶
type QuerySpec interface { GetStartTime() time.Time GetEndTime() time.Time Database() string TableNames() []string GetGroupByInterval() *time.Duration IsRegex() bool ShouldQueryShortTermAndLongTerm() (shouldQueryShortTerm bool, shouldQueryLongTerm bool) }
defined by cluster config (in cluster package)
type SavedConfiguration ¶
type SavedConfiguration struct { Databases map[string]uint8 Admins map[string]*ClusterAdmin DbUsers map[string]map[string]*DbUser Servers []*ClusterServer ShortTermShards []*NewShardData LongTermShards []*NewShardData }
type ServerConnection ¶
type ServerState ¶
type ServerState int
const ( LoadingRingData ServerState = iota SendingRingData DeletingOldData Running Potential )
type Shard ¶
type Shard interface { Id() uint32 StartTime() time.Time EndTime() time.Time Write(*p.Request) error Query(querySpec *parser.QuerySpec, response chan *p.Response) IsMicrosecondInRange(t int64) bool }
A shard imements an interface for writing and querying data. It can be copied to multiple servers or the local datastore. Shard contains data from [startTime, endTime) Ids are unique across the cluster
type ShardCollection ¶
type ShardCollection []*ShardData
func (ShardCollection) Len ¶
func (s ShardCollection) Len() int
func (ShardCollection) Swap ¶
func (s ShardCollection) Swap(i, j int)
type ShardCreator ¶
type ShardCreator interface { // the shard creator expects all shards to be of the same type (long term or short term) and have the same // start and end times. This is called to create the shard set for a given duration. CreateShards(shards []*NewShardData) ([]*ShardData, error) }
type ShardData ¶
type ShardData struct { IsLocal bool // contains filtered or unexported fields }
func (*ShardData) DropDatabase ¶
func (*ShardData) HandleDestructiveQuery ¶
func (*ShardData) IsMicrosecondInRange ¶
func (*ShardData) LogAndHandleDestructiveQuery ¶
func (*ShardData) SetLocalStore ¶
func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error
func (*ShardData) SetServers ¶
func (self *ShardData) SetServers(servers []*ClusterServer)
func (*ShardData) ShouldAggregateLocally ¶
func (*ShardData) StartMicro ¶
func (*ShardData) ToNewShardData ¶
func (self *ShardData) ToNewShardData() *NewShardData
used to serialize shards when sending around in raft or when snapshotting in the log
type WAL ¶
type WAL interface { AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error) Commit(requestNumber uint32, serverId uint32) error RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error }
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
Acts as a buffer for writes
func NewWriteBuffer ¶
func NewWriteBuffer(writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer
func (*WriteBuffer) Write ¶
func (self *WriteBuffer) Write(request *protocol.Request)
This method never blocks. It'll buffer writes until they fill the buffer then drop the on the floor and let the background goroutine replay from the WAL