Documentation ¶
Index ¶
- Constants
- Variables
- type AddPotentialServerCommand
- type ChangeDbUserPassword
- type ClusterConfiguration
- func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
- func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
- func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
- func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error
- func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
- func (self *ClusterConfiguration) DropDatabase(name string) error
- func (self *ClusterConfiguration) GetClusterAdmin(username string) *clusterAdmin
- func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
- func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
- func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint8
- func (self *ClusterConfiguration) GetDatabases() []*Database
- func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser
- func (self *ClusterConfiguration) GetDbUsers(db string) (names []string)
- func (self *ClusterConfiguration) GetOwnerIdByLocation(location *int) *uint32
- func (self *ClusterConfiguration) GetReplicas(server *ClusterServer, database *string) (*ClusterServer, []*ClusterServer)
- func (self *ClusterConfiguration) GetReplicationFactor(database *string) uint8
- func (self *ClusterConfiguration) GetRingFilterFunction(database string, countOfServersToInclude uint32) func(database, series *string, time *int64) bool
- func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
- func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
- func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
- func (self *ClusterConfiguration) GetServerIndexByLocation(location *int) int
- func (self *ClusterConfiguration) GetServersByIndexAndReplicationFactor(database *string, index *int) (*ClusterServer, []*ClusterServer)
- func (self *ClusterConfiguration) GetServersByRingLocation(database *string, location *int) []*ClusterServer
- func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*serverToQuery, replicationFactor uint32)
- func (self *ClusterConfiguration) IsActive() bool
- func (self *ClusterConfiguration) IsSingleServer() bool
- func (self *ClusterConfiguration) Recovery(b []byte) error
- func (self *ClusterConfiguration) Save() ([]byte, error)
- func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin)
- func (self *ClusterConfiguration) SaveDbUser(u *dbUser)
- func (self *ClusterConfiguration) Servers() []*ClusterServer
- func (self *ClusterConfiguration) SetActive()
- func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
- func (self *ClusterConfiguration) UpdateServerState(serverId uint32, state ServerState) error
- func (self *ClusterConfiguration) WaitForLocalServerLoaded()
- type ClusterConsensus
- type ClusterServer
- type CommonUser
- func (self *CommonUser) GetDb() string
- func (self *CommonUser) GetName() string
- func (self *CommonUser) HasReadAccess(name string) bool
- func (self *CommonUser) HasWriteAccess(name string) bool
- func (self *CommonUser) IsClusterAdmin() bool
- func (self *CommonUser) IsDbAdmin(db string) bool
- func (self *CommonUser) IsDeleted() bool
- type ContinuousQuery
- type Coordinator
- type CoordinatorImpl
- func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error)
- func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error)
- func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error
- func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error
- func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error
- func (self *CoordinatorImpl) ConnectToProtobufServers(localConnectionString string) error
- func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username string) error
- func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error
- func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error
- func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username string) error
- func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error
- func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error
- func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error
- func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error
- func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, ...) error
- func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error
- func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error
- func (self *CoordinatorImpl) GetLastSequenceNumber(replicationFactor uint8, originatingServer, owningServer uint32) (uint64, error)
- func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error)
- func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)
- func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error)
- func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]string, error)
- func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*protocol.Series, error)
- func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series)
- func (self *CoordinatorImpl) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, ...)
- func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error
- func (self *CoordinatorImpl) ReplicateWrite(request *protocol.Request) error
- func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error
- func (self *CoordinatorImpl) SyncLogIteration()
- func (self *CoordinatorImpl) SyncLogs()
- func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error
- type CreateContinuousQueryCommand
- type CreateDatabaseCommand
- type Database
- type DeleteContinuousQueryCommand
- type DropDatabaseCommand
- type InfluxJoinCommand
- type Matcher
- type NextPoint
- type ProtobufClient
- type ProtobufRequestHandler
- type ProtobufServer
- type RaftServer
- func (s *RaftServer) ActivateServer(server *ClusterServer) error
- func (s *RaftServer) AddServer(server *ClusterServer, insertIndex int) error
- func (s *RaftServer) AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error
- func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) error
- func (self *RaftServer) Close()
- func (s *RaftServer) CompactLog()
- func (s *RaftServer) CreateContinuousQuery(db string, query string) error
- func (s *RaftServer) CreateDatabase(name string, replicationFactor uint8) error
- func (s *RaftServer) CreateRootUser() error
- func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error
- func (s *RaftServer) DropDatabase(name string) error
- func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
- func (s *RaftServer) Join(leader string) error
- func (s *RaftServer) ListenAndServe() error
- func (s *RaftServer) MovePotentialServer(server *ClusterServer, insertIndex int) error
- func (s *RaftServer) ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error
- func (s *RaftServer) SaveClusterAdminUser(u *clusterAdmin) error
- func (s *RaftServer) SaveDbUser(u *dbUser) error
- func (s *RaftServer) Serve(l net.Listener) error
- func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error
- type RequestHandler
- type SaveClusterAdminCommand
- type SaveDbUserCommand
- type ServerState
- type SetContinuousQueryTimestampCommand
- type UpdateServerStateCommand
- type UserManager
Constants ¶
const ( REQUEST_RETRY_ATTEMPTS = 3 MAX_RESPONSE_SIZE = MAX_REQUEST_SIZE IS_RECONNECTING = uint32(1) IS_CONNECTED = uint32(0) MAX_REQUEST_TIME = time.Second * 1200 RECONNECT_RETRY_WAIT = time.Millisecond * 100 )
const (
DEFAULT_ROOT_PWD = "root"
)
const HOST_ID_OFFSET = uint64(10000)
actual point sequence numbers will have the first part of the number be a host id. This ensures that sequence numbers are unique across the cluster
const KILOBYTE = 1024
const MAX_REQUEST_SIZE = MEGABYTE * 2
const (
MAX_SIZE = 10 * MEGABYTE
)
const MEGABYTE = 1024 * KILOBYTE
const POINT_SEQUENCE_NUMBER_KEY = "p"
this is the key used for the persistent atomic ints for sequence numbers
Variables ¶
var VALID_NAMES *regexp.Regexp
usernames and db names should match this regex
Functions ¶
This section is empty.
Types ¶
type AddPotentialServerCommand ¶ added in v0.0.5
type AddPotentialServerCommand struct {
Server *ClusterServer
}
func NewAddPotentialServerCommand ¶ added in v0.0.5
func NewAddPotentialServerCommand(s *ClusterServer) *AddPotentialServerCommand
func (*AddPotentialServerCommand) Apply ¶ added in v0.0.5
func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error)
func (*AddPotentialServerCommand) CommandName ¶ added in v0.0.5
func (c *AddPotentialServerCommand) CommandName() string
type ChangeDbUserPassword ¶ added in v0.4.0
func NewChangeDbUserPasswordCommand ¶ added in v0.4.0
func NewChangeDbUserPasswordCommand(db, username, hash string) *ChangeDbUserPassword
func (*ChangeDbUserPassword) Apply ¶ added in v0.4.0
func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error)
func (*ChangeDbUserPassword) CommandName ¶ added in v0.4.0
func (c *ChangeDbUserPassword) CommandName() string
type ClusterConfiguration ¶
type ClusterConfiguration struct { ClusterVersion uint32 // contains filtered or unexported fields }
This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.
ClusterVersion is a monotonically increasing int that keeps track of different server configurations. For example, when you spin up a cluster and start writing data, the version will be 1. If you expand the cluster the version will be bumped. Using this the cluster is able to run two versions simultaneously while the new servers are being brought online.
func NewClusterConfiguration ¶
func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfiguration
func (*ClusterConfiguration) AddPotentialServer ¶ added in v0.0.2
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)
func (*ClusterConfiguration) ChangeDbUserPassword ¶ added in v0.4.0
func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error
func (*ClusterConfiguration) CreateContinuousQuery ¶ added in v0.4.0
func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error
func (*ClusterConfiguration) CreateDatabase ¶
func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error
func (*ClusterConfiguration) DeleteContinuousQuery ¶ added in v0.4.0
func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error
func (*ClusterConfiguration) DropDatabase ¶ added in v0.0.2
func (self *ClusterConfiguration) DropDatabase(name string) error
func (*ClusterConfiguration) GetClusterAdmin ¶ added in v0.0.2
func (self *ClusterConfiguration) GetClusterAdmin(username string) *clusterAdmin
func (*ClusterConfiguration) GetClusterAdmins ¶ added in v0.0.2
func (self *ClusterConfiguration) GetClusterAdmins() (names []string)
func (*ClusterConfiguration) GetContinuousQueries ¶ added in v0.4.0
func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery
func (*ClusterConfiguration) GetDatabaseReplicationFactor ¶ added in v0.4.0
func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint8
func (*ClusterConfiguration) GetDatabases ¶
func (self *ClusterConfiguration) GetDatabases() []*Database
func (*ClusterConfiguration) GetDbUser ¶ added in v0.0.2
func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser
func (*ClusterConfiguration) GetDbUsers ¶ added in v0.0.2
func (self *ClusterConfiguration) GetDbUsers(db string) (names []string)
func (*ClusterConfiguration) GetOwnerIdByLocation ¶ added in v0.4.0
func (self *ClusterConfiguration) GetOwnerIdByLocation(location *int) *uint32
func (*ClusterConfiguration) GetReplicas ¶ added in v0.4.0
func (self *ClusterConfiguration) GetReplicas(server *ClusterServer, database *string) (*ClusterServer, []*ClusterServer)
This function returns the replicas of the given server
func (*ClusterConfiguration) GetReplicationFactor ¶ added in v0.4.0
func (self *ClusterConfiguration) GetReplicationFactor(database *string) uint8
func (*ClusterConfiguration) GetRingFilterFunction ¶ added in v0.4.0
func (self *ClusterConfiguration) GetRingFilterFunction(database string, countOfServersToInclude uint32) func(database, series *string, time *int64) bool
This method returns a function that can be passed into the datastore's ExecuteQuery method. It will tell the datastore if each point is something that should be returned in the query based on its ring location and if the query calls for data from different replicas.
Params: - database: the name of the database - countOfServersToInclude: the number of replicas that this server will return data for
Returns a function that returns true if the point should be filtered out. Otherwise the point should be included in the yielded time series
func (*ClusterConfiguration) GetServerById ¶ added in v0.4.0
func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer
func (*ClusterConfiguration) GetServerByProtobufConnectionString ¶ added in v0.4.0
func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer
func (*ClusterConfiguration) GetServerByRaftName ¶ added in v0.0.5
func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer
func (*ClusterConfiguration) GetServerIndexByLocation ¶ added in v0.4.0
func (self *ClusterConfiguration) GetServerIndexByLocation(location *int) int
func (*ClusterConfiguration) GetServersByIndexAndReplicationFactor ¶ added in v0.4.0
func (self *ClusterConfiguration) GetServersByIndexAndReplicationFactor(database *string, index *int) (*ClusterServer, []*ClusterServer)
This function returns the server that owns the ring location and a set of servers that are replicas (which include the onwer)
func (*ClusterConfiguration) GetServersByRingLocation ¶ added in v0.4.0
func (self *ClusterConfiguration) GetServersByRingLocation(database *string, location *int) []*ClusterServer
func (*ClusterConfiguration) GetServersToMakeQueryTo ¶ added in v0.4.0
func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*serverToQuery, replicationFactor uint32)
This function will return an array of servers to query and the number of ring locations to return per server. Queries are issued to every nth server in the cluster where n is the replication factor. We need the local host id because we want to issue the query locally and the nth servers from there. Optimally, the number of servers in a cluster will be evenly divisible by the replication factors used. For example, if you have a cluster with databases with RFs of 1, 2, and 3: optimal cluster sizes would be 6, 12, 18, 24, 30, etc. If that's not the case, one or more servers will have to filter out data from other servers on the fly, which could be a little more expensive.
func (*ClusterConfiguration) IsActive ¶ added in v0.0.5
func (self *ClusterConfiguration) IsActive() bool
func (*ClusterConfiguration) IsSingleServer ¶ added in v0.4.0
func (self *ClusterConfiguration) IsSingleServer() bool
func (*ClusterConfiguration) Recovery ¶ added in v0.4.0
func (self *ClusterConfiguration) Recovery(b []byte) error
func (*ClusterConfiguration) Save ¶ added in v0.4.0
func (self *ClusterConfiguration) Save() ([]byte, error)
func (*ClusterConfiguration) SaveClusterAdmin ¶
func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin)
func (*ClusterConfiguration) SaveDbUser ¶
func (self *ClusterConfiguration) SaveDbUser(u *dbUser)
func (*ClusterConfiguration) Servers ¶ added in v0.4.0
func (self *ClusterConfiguration) Servers() []*ClusterServer
func (*ClusterConfiguration) SetActive ¶ added in v0.4.0
func (self *ClusterConfiguration) SetActive()
func (*ClusterConfiguration) SetContinuousQueryTimestamp ¶ added in v0.4.0
func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error
func (*ClusterConfiguration) UpdateServerState ¶ added in v0.0.5
func (self *ClusterConfiguration) UpdateServerState(serverId uint32, state ServerState) error
func (*ClusterConfiguration) WaitForLocalServerLoaded ¶ added in v0.4.0
func (self *ClusterConfiguration) WaitForLocalServerLoaded()
This function will wait until the configuration has received an addPotentialServer command for this local server.
type ClusterConsensus ¶ added in v0.0.8
type ClusterConsensus interface { CreateDatabase(name string, replicationFactor uint8) error DropDatabase(name string) error CreateContinuousQuery(db string, query string) error DeleteContinuousQuery(db string, id uint32) error SaveClusterAdminUser(u *clusterAdmin) error SaveDbUser(user *dbUser) error ChangeDbUserPassword(db, username string, hash []byte) error // an insert index of -1 will append to the end of the ring AddServer(server *ClusterServer, insertIndex int) error // only servers that are in a Potential state can be moved around in the ring MovePotentialServer(server *ClusterServer, insertIndex int) error /* Activate tells the cluster to start sending writes to this node. The node will also make requests to the other servers to backfill any data they should have Once the new node updates it state to "Running" the other servers will delete all of the data that they no longer have to keep from the ring */ ActivateServer(server *ClusterServer) error // Efficient method to have a potential server take the place of a running (or downed) // server. The replacement must have a state of "Potential" for this to work. ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error // When a cluster is turned on for the first time. CreateRootUser() error }
type ClusterServer ¶ added in v0.0.2
type ClusterServer struct { Id uint32 RaftName string State ServerState RaftConnectionString string ProtobufConnectionString string // contains filtered or unexported fields }
func (*ClusterServer) Connect ¶ added in v0.4.0
func (self *ClusterServer) Connect()
func (*ClusterServer) MakeRequest ¶ added in v0.4.0
type CommonUser ¶
type CommonUser struct { Name string `json:"name"` Hash string `json:"hash"` IsUserDeleted bool `json:"is_deleted"` CacheKey string `json:"cache_key"` }
func (*CommonUser) GetDb ¶
func (self *CommonUser) GetDb() string
func (*CommonUser) GetName ¶
func (self *CommonUser) GetName() string
func (*CommonUser) HasReadAccess ¶
func (self *CommonUser) HasReadAccess(name string) bool
func (*CommonUser) HasWriteAccess ¶
func (self *CommonUser) HasWriteAccess(name string) bool
func (*CommonUser) IsClusterAdmin ¶
func (self *CommonUser) IsClusterAdmin() bool
func (*CommonUser) IsDbAdmin ¶
func (self *CommonUser) IsDbAdmin(db string) bool
func (*CommonUser) IsDeleted ¶
func (self *CommonUser) IsDeleted() bool
type ContinuousQuery ¶ added in v0.4.0
type Coordinator ¶
type Coordinator interface { // Assumption about the returned data: // 1. For any given time series, the points returned are in order // 2. If the query involves more than one time series, there is no // guarantee on the order in whic they are returned // 3. Data is filtered, i.e. where clause should be assumed to hold true // for all the data points that are returned // 4. The end of a time series is signaled by returning a series with no data points // 5. TODO: Aggregation on the nodes DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error WriteSeriesData(user common.User, db string, series *protocol.Series) error DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error DropDatabase(user common.User, db string) error DropSeries(user common.User, db, series string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error ListDatabases(user common.User) ([]*Database, error) ListSeries(user common.User, database string) ([]*protocol.Series, error) ReplicateWrite(request *protocol.Request) error ReplicateDelete(request *protocol.Request) error ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) GetLastSequenceNumber(replicationFactor uint8, ownerServerId, originatingServerId uint32) (uint64, error) DeleteContinuousQuery(user common.User, db string, id uint32) error CreateContinuousQuery(user common.User, db string, query string) error ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error) }
type CoordinatorImpl ¶
type CoordinatorImpl struct {
// contains filtered or unexported fields
}
func NewCoordinatorImpl ¶
func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl
func (*CoordinatorImpl) AuthenticateClusterAdmin ¶
func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error)
func (*CoordinatorImpl) AuthenticateDbUser ¶
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error)
func (*CoordinatorImpl) ChangeClusterAdminPassword ¶
func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error
func (*CoordinatorImpl) ChangeDbUserPassword ¶
func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error
func (*CoordinatorImpl) CommitSeriesData ¶ added in v0.4.0
func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error
func (*CoordinatorImpl) ConnectToProtobufServers ¶ added in v0.4.0
func (self *CoordinatorImpl) ConnectToProtobufServers(localConnectionString string) error
func (*CoordinatorImpl) CreateClusterAdminUser ¶
func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username string) error
func (*CoordinatorImpl) CreateContinuousQuery ¶ added in v0.4.0
func (*CoordinatorImpl) CreateDatabase ¶
func (*CoordinatorImpl) CreateDbUser ¶
func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username string) error
func (*CoordinatorImpl) DeleteClusterAdminUser ¶
func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error
func (*CoordinatorImpl) DeleteContinuousQuery ¶ added in v0.4.0
func (*CoordinatorImpl) DeleteDbUser ¶
func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error
func (*CoordinatorImpl) DeleteSeriesData ¶ added in v0.4.0
func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error
func (*CoordinatorImpl) DistributeQuery ¶
func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error
Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order. TODO: make this work even if there is a downed server in the cluster
func (*CoordinatorImpl) DropDatabase ¶ added in v0.0.2
func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error
func (*CoordinatorImpl) DropSeries ¶ added in v0.4.0
func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error
func (*CoordinatorImpl) GetLastSequenceNumber ¶ added in v0.4.0
func (self *CoordinatorImpl) GetLastSequenceNumber(replicationFactor uint8, originatingServer, owningServer uint32) (uint64, error)
func (*CoordinatorImpl) ListClusterAdmins ¶ added in v0.0.2
func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error)
func (*CoordinatorImpl) ListContinuousQueries ¶ added in v0.4.0
func (*CoordinatorImpl) ListDatabases ¶ added in v0.0.2
func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error)
func (*CoordinatorImpl) ListDbUsers ¶ added in v0.0.2
func (*CoordinatorImpl) ListSeries ¶ added in v0.4.0
func (*CoordinatorImpl) ProcessContinuousQueries ¶ added in v0.4.0
func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series)
func (*CoordinatorImpl) ReplayReplication ¶ added in v0.4.0
func (*CoordinatorImpl) ReplicateDelete ¶ added in v0.4.0
func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error
func (*CoordinatorImpl) ReplicateWrite ¶ added in v0.4.0
func (self *CoordinatorImpl) ReplicateWrite(request *protocol.Request) error
func (*CoordinatorImpl) SetDbAdmin ¶
func (*CoordinatorImpl) SyncLogIteration ¶ added in v0.4.0
func (self *CoordinatorImpl) SyncLogIteration()
func (*CoordinatorImpl) SyncLogs ¶ added in v0.4.0
func (self *CoordinatorImpl) SyncLogs()
func (*CoordinatorImpl) WriteSeriesData ¶
type CreateContinuousQueryCommand ¶ added in v0.4.0
type CreateContinuousQueryCommand struct { Database string `json:"database"` Query string `json:"query"` }
func NewCreateContinuousQueryCommand ¶ added in v0.4.0
func NewCreateContinuousQueryCommand(database string, query string) *CreateContinuousQueryCommand
func (*CreateContinuousQueryCommand) Apply ¶ added in v0.4.0
func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)
func (*CreateContinuousQueryCommand) CommandName ¶ added in v0.4.0
func (c *CreateContinuousQueryCommand) CommandName() string
type CreateDatabaseCommand ¶
type CreateDatabaseCommand struct { Name string `json:"name"` ReplicationFactor uint8 `json:"replicationFactor"` }
func NewCreateDatabaseCommand ¶
func NewCreateDatabaseCommand(name string, replicationFactor uint8) *CreateDatabaseCommand
func (*CreateDatabaseCommand) Apply ¶
func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error)
func (*CreateDatabaseCommand) CommandName ¶
func (c *CreateDatabaseCommand) CommandName() string
type DeleteContinuousQueryCommand ¶ added in v0.4.0
type DeleteContinuousQueryCommand struct { Database string `json:"database"` Id uint32 `json:"id"` }
func NewDeleteContinuousQueryCommand ¶ added in v0.4.0
func NewDeleteContinuousQueryCommand(database string, id uint32) *DeleteContinuousQueryCommand
func (*DeleteContinuousQueryCommand) Apply ¶ added in v0.4.0
func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)
func (*DeleteContinuousQueryCommand) CommandName ¶ added in v0.4.0
func (c *DeleteContinuousQueryCommand) CommandName() string
type DropDatabaseCommand ¶ added in v0.0.2
type DropDatabaseCommand struct {
Name string `json:"name"`
}
func NewDropDatabaseCommand ¶ added in v0.0.2
func NewDropDatabaseCommand(name string) *DropDatabaseCommand
func (*DropDatabaseCommand) Apply ¶ added in v0.0.2
func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error)
func (*DropDatabaseCommand) CommandName ¶ added in v0.0.2
func (c *DropDatabaseCommand) CommandName() string
type InfluxJoinCommand ¶ added in v0.4.0
type InfluxJoinCommand struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` ProtobufConnectionString string `json:"protobufConnectionString"` }
func (*InfluxJoinCommand) Apply ¶ added in v0.4.0
func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error)
func (*InfluxJoinCommand) CommandName ¶ added in v0.4.0
func (c *InfluxJoinCommand) CommandName() string
The name of the Join command in the log
func (*InfluxJoinCommand) NodeName ¶ added in v0.4.0
func (c *InfluxJoinCommand) NodeName() string
type NextPoint ¶ added in v0.4.0
type NextPoint struct {
// contains filtered or unexported fields
}
type ProtobufClient ¶ added in v0.4.0
type ProtobufClient struct {
// contains filtered or unexported fields
}
func NewProtobufClient ¶ added in v0.4.0
func NewProtobufClient(hostAndPort string) *ProtobufClient
func (*ProtobufClient) Close ¶ added in v0.4.0
func (self *ProtobufClient) Close()
func (*ProtobufClient) MakeRequest ¶ added in v0.4.0
func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means that an attempt to make a request to a downed server will take 300ms to time out.
type ProtobufRequestHandler ¶ added in v0.4.0
type ProtobufRequestHandler struct {
// contains filtered or unexported fields
}
func NewProtobufRequestHandler ¶ added in v0.4.0
func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *ClusterConfiguration) *ProtobufRequestHandler
func (*ProtobufRequestHandler) HandleRequest ¶ added in v0.4.0
func (*ProtobufRequestHandler) WriteResponse ¶ added in v0.4.0
type ProtobufServer ¶ added in v0.4.0
type ProtobufServer struct {
// contains filtered or unexported fields
}
func NewProtobufServer ¶ added in v0.4.0
func NewProtobufServer(port string, requestHandler RequestHandler) *ProtobufServer
func (*ProtobufServer) Close ¶ added in v0.4.0
func (self *ProtobufServer) Close()
func (*ProtobufServer) ListenAndServe ¶ added in v0.4.0
func (self *ProtobufServer) ListenAndServe()
type RaftServer ¶
type RaftServer struct {
// contains filtered or unexported fields
}
The raftd server is a combination of the Raft server and an HTTP server which acts as the transport.
func NewRaftServer ¶
func NewRaftServer(config *configuration.Configuration, clusterConfig *ClusterConfiguration) *RaftServer
Creates a new server.
func (*RaftServer) ActivateServer ¶ added in v0.0.8
func (s *RaftServer) ActivateServer(server *ClusterServer) error
func (*RaftServer) AddServer ¶ added in v0.0.8
func (s *RaftServer) AddServer(server *ClusterServer, insertIndex int) error
func (*RaftServer) AssignEngineAndCoordinator ¶ added in v0.4.0
func (s *RaftServer) AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error
func (*RaftServer) ChangeDbUserPassword ¶ added in v0.4.0
func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) error
func (*RaftServer) Close ¶
func (self *RaftServer) Close()
func (*RaftServer) CompactLog ¶ added in v0.4.0
func (s *RaftServer) CompactLog()
func (*RaftServer) CreateContinuousQuery ¶ added in v0.4.0
func (s *RaftServer) CreateContinuousQuery(db string, query string) error
func (*RaftServer) CreateDatabase ¶
func (s *RaftServer) CreateDatabase(name string, replicationFactor uint8) error
func (*RaftServer) CreateRootUser ¶
func (s *RaftServer) CreateRootUser() error
func (*RaftServer) DeleteContinuousQuery ¶ added in v0.4.0
func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error
func (*RaftServer) DropDatabase ¶ added in v0.0.2
func (s *RaftServer) DropDatabase(name string) error
func (*RaftServer) HandleFunc ¶
func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
This is a hack around Gorilla mux not providing the correct net/http HandleFunc() interface.
func (*RaftServer) Join ¶
func (s *RaftServer) Join(leader string) error
Joins to the leader of an existing cluster.
func (*RaftServer) ListenAndServe ¶
func (s *RaftServer) ListenAndServe() error
func (*RaftServer) MovePotentialServer ¶ added in v0.0.8
func (s *RaftServer) MovePotentialServer(server *ClusterServer, insertIndex int) error
func (*RaftServer) ReplaceServer ¶ added in v0.0.8
func (s *RaftServer) ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error
func (*RaftServer) SaveClusterAdminUser ¶
func (s *RaftServer) SaveClusterAdminUser(u *clusterAdmin) error
func (*RaftServer) SaveDbUser ¶
func (s *RaftServer) SaveDbUser(u *dbUser) error
func (*RaftServer) SetContinuousQueryTimestamp ¶ added in v0.4.0
func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error
type RequestHandler ¶ added in v0.4.0
type SaveClusterAdminCommand ¶
type SaveClusterAdminCommand struct {
User *clusterAdmin `json:"user"`
}
func NewSaveClusterAdminCommand ¶
func NewSaveClusterAdminCommand(u *clusterAdmin) *SaveClusterAdminCommand
func (*SaveClusterAdminCommand) Apply ¶
func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error)
func (*SaveClusterAdminCommand) CommandName ¶
func (c *SaveClusterAdminCommand) CommandName() string
type SaveDbUserCommand ¶
type SaveDbUserCommand struct {
User *dbUser `json:"user"`
}
func NewSaveDbUserCommand ¶
func NewSaveDbUserCommand(u *dbUser) *SaveDbUserCommand
func (*SaveDbUserCommand) Apply ¶
func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error)
func (*SaveDbUserCommand) CommandName ¶
func (c *SaveDbUserCommand) CommandName() string
type ServerState ¶ added in v0.0.2
type ServerState int
const ( LoadingRingData ServerState = iota SendingRingData DeletingOldData Running Potential )
type SetContinuousQueryTimestampCommand ¶ added in v0.4.0
func NewSetContinuousQueryTimestampCommand ¶ added in v0.4.0
func NewSetContinuousQueryTimestampCommand(timestamp time.Time) *SetContinuousQueryTimestampCommand
func (*SetContinuousQueryTimestampCommand) Apply ¶ added in v0.4.0
func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error)
func (*SetContinuousQueryTimestampCommand) CommandName ¶ added in v0.4.0
func (c *SetContinuousQueryTimestampCommand) CommandName() string
type UpdateServerStateCommand ¶ added in v0.0.5
type UpdateServerStateCommand struct { ServerId uint32 State ServerState }
func NewUpdateServerStateCommand ¶ added in v0.0.5
func NewUpdateServerStateCommand(serverId uint32, state ServerState) *UpdateServerStateCommand
func (*UpdateServerStateCommand) Apply ¶ added in v0.0.5
func (c *UpdateServerStateCommand) Apply(server raft.Server) (interface{}, error)
func (*UpdateServerStateCommand) CommandName ¶ added in v0.0.5
func (c *UpdateServerStateCommand) CommandName() string
type UserManager ¶
type UserManager interface { // Returns the user for the given db and that has the given // credentials, falling back to cluster admins AuthenticateDbUser(db, username, password string) (common.User, error) // Returns the cluster admin with the given credentials AuthenticateClusterAdmin(username, password string) (common.User, error) // Create a cluster admin user, it's an error if requester isn't a cluster admin CreateClusterAdminUser(request common.User, username string) error // Delete a cluster admin. Same restrictions as CreateClusterAdminUser DeleteClusterAdminUser(requester common.User, username string) error // Change cluster admin's password. It's an error if requester isn't a cluster admin ChangeClusterAdminPassword(requester common.User, username, password string) error // list cluster admins. only a cluster admin can list the other cluster admins ListClusterAdmins(requester common.User) ([]string, error) // Create a db user, it's an error if requester isn't a db admin or cluster admin CreateDbUser(request common.User, db, username string) error // Delete a db user. Same restrictions apply as in CreateDbUser DeleteDbUser(requester common.User, db, username string) error // Change db user's password. It's an error if requester isn't a cluster admin or db admin ChangeDbUserPassword(requester common.User, db, username, password string) error // list cluster admins. only a cluster admin or the db admin can list the db users ListDbUsers(requester common.User, db string) ([]string, error) // make user a db admin for 'db'. It's an error if the requester // isn't a db admin or cluster admin or if user isn't a db user // for the given db SetDbAdmin(requester common.User, db, username string, isAdmin bool) error }