coordinator

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2014 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	REQUEST_RETRY_ATTEMPTS = 3
	MAX_RESPONSE_SIZE      = MAX_REQUEST_SIZE
	IS_RECONNECTING        = uint32(1)
	IS_CONNECTED           = uint32(0)
	MAX_REQUEST_TIME       = time.Second * 1200
	RECONNECT_RETRY_WAIT   = time.Millisecond * 100
)
View Source
const (
	DEFAULT_ROOT_PWD = "root"
)
View Source
const HOST_ID_OFFSET = uint64(10000)

actual point sequence numbers will have the first part of the number be a host id. This ensures that sequence numbers are unique across the cluster

View Source
const KILOBYTE = 1024
View Source
const MAX_REQUEST_SIZE = MEGABYTE * 2
View Source
const (
	MAX_SIZE = 10 * MEGABYTE
)
View Source
const MEGABYTE = 1024 * KILOBYTE
View Source
const POINT_SEQUENCE_NUMBER_KEY = "p"

this is the key used for the persistent atomic ints for sequence numbers

Variables

View Source
var (
	BARRIER_TIME_MIN int64 = math.MinInt64
	BARRIER_TIME_MAX int64 = math.MaxInt64
)
View Source
var VALID_NAMES *regexp.Regexp

usernames and db names should match this regex

Functions

This section is empty.

Types

type AddPotentialServerCommand added in v0.0.5

type AddPotentialServerCommand struct {
	Server *ClusterServer
}

func NewAddPotentialServerCommand added in v0.0.5

func NewAddPotentialServerCommand(s *ClusterServer) *AddPotentialServerCommand

func (*AddPotentialServerCommand) Apply added in v0.0.5

func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error)

func (*AddPotentialServerCommand) CommandName added in v0.0.5

func (c *AddPotentialServerCommand) CommandName() string

type ChangeDbUserPassword added in v0.4.0

type ChangeDbUserPassword struct {
	Database string
	Username string
	Hash     string
}

func NewChangeDbUserPasswordCommand added in v0.4.0

func NewChangeDbUserPasswordCommand(db, username, hash string) *ChangeDbUserPassword

func (*ChangeDbUserPassword) Apply added in v0.4.0

func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error)

func (*ChangeDbUserPassword) CommandName added in v0.4.0

func (c *ChangeDbUserPassword) CommandName() string

type ClusterConfiguration

type ClusterConfiguration struct {
	ClusterVersion uint32
	// contains filtered or unexported fields
}

This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.

ClusterVersion is a monotonically increasing int that keeps track of different server configurations. For example, when you spin up a cluster and start writing data, the version will be 1. If you expand the cluster the version will be bumped. Using this the cluster is able to run two versions simultaneously while the new servers are being brought online.

func NewClusterConfiguration

func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfiguration

func (*ClusterConfiguration) AddPotentialServer added in v0.0.2

func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)

func (*ClusterConfiguration) ChangeDbUserPassword added in v0.4.0

func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error

func (*ClusterConfiguration) CreateContinuousQuery added in v0.4.0

func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error

func (*ClusterConfiguration) CreateDatabase

func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error

func (*ClusterConfiguration) DeleteContinuousQuery added in v0.4.0

func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error

func (*ClusterConfiguration) DropDatabase added in v0.0.2

func (self *ClusterConfiguration) DropDatabase(name string) error

func (*ClusterConfiguration) GetClusterAdmin added in v0.0.2

func (self *ClusterConfiguration) GetClusterAdmin(username string) *clusterAdmin

func (*ClusterConfiguration) GetClusterAdmins added in v0.0.2

func (self *ClusterConfiguration) GetClusterAdmins() (names []string)

func (*ClusterConfiguration) GetContinuousQueries added in v0.4.0

func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery

func (*ClusterConfiguration) GetDatabaseReplicationFactor added in v0.4.0

func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint8

func (*ClusterConfiguration) GetDatabases

func (self *ClusterConfiguration) GetDatabases() []*Database

func (*ClusterConfiguration) GetDbUser added in v0.0.2

func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser

func (*ClusterConfiguration) GetDbUsers added in v0.0.2

func (self *ClusterConfiguration) GetDbUsers(db string) (names []string)

func (*ClusterConfiguration) GetOwnerIdByLocation added in v0.4.0

func (self *ClusterConfiguration) GetOwnerIdByLocation(location *int) *uint32

func (*ClusterConfiguration) GetReplicas added in v0.4.0

func (self *ClusterConfiguration) GetReplicas(server *ClusterServer, database *string) (*ClusterServer, []*ClusterServer)

This function returns the replicas of the given server

func (*ClusterConfiguration) GetReplicationFactor added in v0.4.0

func (self *ClusterConfiguration) GetReplicationFactor(database *string) uint8

func (*ClusterConfiguration) GetRingFilterFunction added in v0.4.0

func (self *ClusterConfiguration) GetRingFilterFunction(database string, countOfServersToInclude uint32) func(database, series *string, time *int64) bool

This method returns a function that can be passed into the datastore's ExecuteQuery method. It will tell the datastore if each point is something that should be returned in the query based on its ring location and if the query calls for data from different replicas.

Params: - database: the name of the database - countOfServersToInclude: the number of replicas that this server will return data for

Returns a function that returns true if the point should be filtered out. Otherwise the point should be included in the yielded time series

func (*ClusterConfiguration) GetServerById added in v0.4.0

func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer

func (*ClusterConfiguration) GetServerByProtobufConnectionString added in v0.4.0

func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer

func (*ClusterConfiguration) GetServerByRaftName added in v0.0.5

func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer

func (*ClusterConfiguration) GetServerIndexByLocation added in v0.4.0

func (self *ClusterConfiguration) GetServerIndexByLocation(location *int) int

func (*ClusterConfiguration) GetServersByIndexAndReplicationFactor added in v0.4.0

func (self *ClusterConfiguration) GetServersByIndexAndReplicationFactor(database *string, index *int) (*ClusterServer, []*ClusterServer)

This function returns the server that owns the ring location and a set of servers that are replicas (which include the onwer)

func (*ClusterConfiguration) GetServersByRingLocation added in v0.4.0

func (self *ClusterConfiguration) GetServersByRingLocation(database *string, location *int) []*ClusterServer

func (*ClusterConfiguration) GetServersToMakeQueryTo added in v0.4.0

func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*serverToQuery, replicationFactor uint32)

This function will return an array of servers to query and the number of ring locations to return per server. Queries are issued to every nth server in the cluster where n is the replication factor. We need the local host id because we want to issue the query locally and the nth servers from there. Optimally, the number of servers in a cluster will be evenly divisible by the replication factors used. For example, if you have a cluster with databases with RFs of 1, 2, and 3: optimal cluster sizes would be 6, 12, 18, 24, 30, etc. If that's not the case, one or more servers will have to filter out data from other servers on the fly, which could be a little more expensive.

func (*ClusterConfiguration) IsActive added in v0.0.5

func (self *ClusterConfiguration) IsActive() bool

func (*ClusterConfiguration) IsSingleServer added in v0.4.0

func (self *ClusterConfiguration) IsSingleServer() bool

func (*ClusterConfiguration) Recovery added in v0.4.0

func (self *ClusterConfiguration) Recovery(b []byte) error

func (*ClusterConfiguration) Save added in v0.4.0

func (self *ClusterConfiguration) Save() ([]byte, error)

func (*ClusterConfiguration) SaveClusterAdmin

func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin)

func (*ClusterConfiguration) SaveDbUser

func (self *ClusterConfiguration) SaveDbUser(u *dbUser)

func (*ClusterConfiguration) Servers added in v0.4.0

func (self *ClusterConfiguration) Servers() []*ClusterServer

func (*ClusterConfiguration) SetActive added in v0.4.0

func (self *ClusterConfiguration) SetActive()

func (*ClusterConfiguration) SetContinuousQueryTimestamp added in v0.4.0

func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error

func (*ClusterConfiguration) UpdateServerState added in v0.0.5

func (self *ClusterConfiguration) UpdateServerState(serverId uint32, state ServerState) error

func (*ClusterConfiguration) WaitForLocalServerLoaded added in v0.4.0

func (self *ClusterConfiguration) WaitForLocalServerLoaded()

This function will wait until the configuration has received an addPotentialServer command for this local server.

type ClusterConsensus added in v0.0.6

type ClusterConsensus interface {
	CreateDatabase(name string, replicationFactor uint8) error
	DropDatabase(name string) error
	CreateContinuousQuery(db string, query string) error
	DeleteContinuousQuery(db string, id uint32) error
	SaveClusterAdminUser(u *clusterAdmin) error
	SaveDbUser(user *dbUser) error
	ChangeDbUserPassword(db, username string, hash []byte) error

	// an insert index of -1 will append to the end of the ring
	AddServer(server *ClusterServer, insertIndex int) error
	// only servers that are in a Potential state can be moved around in the ring
	MovePotentialServer(server *ClusterServer, insertIndex int) error
	/*
		Activate tells the cluster to start sending writes to this node.
		The node will also make requests to the other servers to backfill any
		  data they should have
		Once the new node updates it state to "Running" the other servers will
		  delete all of the data that they no longer have to keep from the ring
	*/
	ActivateServer(server *ClusterServer) error

	// Efficient method to have a potential server take the place of a running (or downed)
	// server. The replacement must have a state of "Potential" for this to work.
	ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error

	AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error

	// When a cluster is turned on for the first time.
	CreateRootUser() error

	ForceLogCompaction() error
}

type ClusterServer added in v0.0.2

type ClusterServer struct {
	Id                       uint32
	RaftName                 string
	State                    ServerState
	RaftConnectionString     string
	ProtobufConnectionString string
	// contains filtered or unexported fields
}

func (*ClusterServer) Connect added in v0.4.0

func (self *ClusterServer) Connect()

func (*ClusterServer) MakeRequest added in v0.4.0

func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error

type CommonUser

type CommonUser struct {
	Name          string `json:"name"`
	Hash          string `json:"hash"`
	IsUserDeleted bool   `json:"is_deleted"`
	CacheKey      string `json:"cache_key"`
}

func (*CommonUser) GetDb

func (self *CommonUser) GetDb() string

func (*CommonUser) GetName

func (self *CommonUser) GetName() string

func (*CommonUser) HasReadAccess

func (self *CommonUser) HasReadAccess(name string) bool

func (*CommonUser) HasWriteAccess

func (self *CommonUser) HasWriteAccess(name string) bool

func (*CommonUser) IsClusterAdmin

func (self *CommonUser) IsClusterAdmin() bool

func (*CommonUser) IsDbAdmin

func (self *CommonUser) IsDbAdmin(db string) bool

func (*CommonUser) IsDeleted

func (self *CommonUser) IsDeleted() bool

type ContinuousQuery added in v0.4.0

type ContinuousQuery struct {
	Id    uint32
	Query string
}

type Coordinator

type Coordinator interface {
	// Assumption about the returned data:
	//   1. For any given time series, the points returned are in order
	//   2. If the query involves more than one time series, there is no
	//      guarantee on the order in whic they are returned
	//   3. Data is filtered, i.e. where clause should be assumed to hold true
	//      for all the data points that are returned
	//   4. The end of a time series is signaled by returning a series with no data points
	//   5. TODO: Aggregation on the nodes
	DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error
	WriteSeriesData(user common.User, db string, series *protocol.Series) error
	DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error
	DropDatabase(user common.User, db string) error
	DropSeries(user common.User, db, series string) error
	CreateDatabase(user common.User, db string, replicationFactor uint8) error
	ForceCompaction(user common.User) error
	ListDatabases(user common.User) ([]*Database, error)
	ListSeries(user common.User, database string) ([]*protocol.Series, error)
	ReplicateWrite(request *protocol.Request) error
	ReplicateDelete(request *protocol.Request) error
	ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64)
	GetLastSequenceNumber(replicationFactor uint8, ownerServerId, originatingServerId uint32) (uint64, error)
	DeleteContinuousQuery(user common.User, db string, id uint32) error
	CreateContinuousQuery(user common.User, db string, query string) error
	ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)
}

type CoordinatorImpl

type CoordinatorImpl struct {
	// contains filtered or unexported fields
}

func NewCoordinatorImpl

func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl

func (*CoordinatorImpl) AuthenticateClusterAdmin

func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error)

func (*CoordinatorImpl) AuthenticateDbUser

func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error)

func (*CoordinatorImpl) ChangeClusterAdminPassword

func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error

func (*CoordinatorImpl) ChangeDbUserPassword

func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error

func (*CoordinatorImpl) CommitSeriesData added in v0.4.0

func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error

func (*CoordinatorImpl) ConnectToProtobufServers added in v0.4.0

func (self *CoordinatorImpl) ConnectToProtobufServers(localConnectionString string) error

func (*CoordinatorImpl) CreateClusterAdminUser

func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username string) error

func (*CoordinatorImpl) CreateContinuousQuery added in v0.4.0

func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error

func (*CoordinatorImpl) CreateDatabase

func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error

func (*CoordinatorImpl) CreateDbUser

func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username string) error

func (*CoordinatorImpl) DeleteClusterAdminUser

func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error

func (*CoordinatorImpl) DeleteContinuousQuery added in v0.4.0

func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error

func (*CoordinatorImpl) DeleteDbUser

func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error

func (*CoordinatorImpl) DeleteSeriesData added in v0.4.0

func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error

func (*CoordinatorImpl) DistributeQuery

func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error

Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order. TODO: make this work even if there is a downed server in the cluster

func (*CoordinatorImpl) DropDatabase added in v0.0.2

func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error

func (*CoordinatorImpl) DropSeries added in v0.4.0

func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error

func (*CoordinatorImpl) ForceCompaction added in v0.4.1

func (self *CoordinatorImpl) ForceCompaction(user common.User) error

func (*CoordinatorImpl) GetLastSequenceNumber added in v0.4.0

func (self *CoordinatorImpl) GetLastSequenceNumber(replicationFactor uint8, originatingServer, owningServer uint32) (uint64, error)

func (*CoordinatorImpl) ListClusterAdmins added in v0.0.2

func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error)

func (*CoordinatorImpl) ListContinuousQueries added in v0.4.0

func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)

func (*CoordinatorImpl) ListDatabases added in v0.0.2

func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error)

func (*CoordinatorImpl) ListDbUsers added in v0.0.2

func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]string, error)

func (*CoordinatorImpl) ListSeries added in v0.4.0

func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*protocol.Series, error)

func (*CoordinatorImpl) ProcessContinuousQueries added in v0.4.0

func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series)

func (*CoordinatorImpl) ReplayReplication added in v0.4.0

func (self *CoordinatorImpl) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64)

func (*CoordinatorImpl) ReplicateDelete added in v0.4.0

func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error

func (*CoordinatorImpl) ReplicateWrite added in v0.4.0

func (self *CoordinatorImpl) ReplicateWrite(request *protocol.Request) error

func (*CoordinatorImpl) SetDbAdmin

func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error

func (*CoordinatorImpl) SyncLogIteration added in v0.4.0

func (self *CoordinatorImpl) SyncLogIteration()

func (*CoordinatorImpl) SyncLogs added in v0.4.0

func (self *CoordinatorImpl) SyncLogs()

func (*CoordinatorImpl) WriteSeriesData

func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error

type CreateContinuousQueryCommand added in v0.4.0

type CreateContinuousQueryCommand struct {
	Database string `json:"database"`
	Query    string `json:"query"`
}

func NewCreateContinuousQueryCommand added in v0.4.0

func NewCreateContinuousQueryCommand(database string, query string) *CreateContinuousQueryCommand

func (*CreateContinuousQueryCommand) Apply added in v0.4.0

func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateContinuousQueryCommand) CommandName added in v0.4.0

func (c *CreateContinuousQueryCommand) CommandName() string

type CreateDatabaseCommand

type CreateDatabaseCommand struct {
	Name              string `json:"name"`
	ReplicationFactor uint8  `json:"replicationFactor"`
}

func NewCreateDatabaseCommand

func NewCreateDatabaseCommand(name string, replicationFactor uint8) *CreateDatabaseCommand

func (*CreateDatabaseCommand) Apply

func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateDatabaseCommand) CommandName

func (c *CreateDatabaseCommand) CommandName() string

type Database added in v0.4.0

type Database struct {
	Name              string `json:"name"`
	ReplicationFactor uint8  `json:"replicationFactor"`
}

type DeleteContinuousQueryCommand added in v0.4.0

type DeleteContinuousQueryCommand struct {
	Database string `json:"database"`
	Id       uint32 `json:"id"`
}

func NewDeleteContinuousQueryCommand added in v0.4.0

func NewDeleteContinuousQueryCommand(database string, id uint32) *DeleteContinuousQueryCommand

func (*DeleteContinuousQueryCommand) Apply added in v0.4.0

func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)

func (*DeleteContinuousQueryCommand) CommandName added in v0.4.0

func (c *DeleteContinuousQueryCommand) CommandName() string

type DropDatabaseCommand added in v0.0.2

type DropDatabaseCommand struct {
	Name string `json:"name"`
}

func NewDropDatabaseCommand added in v0.0.2

func NewDropDatabaseCommand(name string) *DropDatabaseCommand

func (*DropDatabaseCommand) Apply added in v0.0.2

func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error)

func (*DropDatabaseCommand) CommandName added in v0.0.2

func (c *DropDatabaseCommand) CommandName() string

type InfluxJoinCommand added in v0.4.0

type InfluxJoinCommand struct {
	Name                     string `json:"name"`
	ConnectionString         string `json:"connectionString"`
	ProtobufConnectionString string `json:"protobufConnectionString"`
}

func (*InfluxJoinCommand) Apply added in v0.4.0

func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error)

func (*InfluxJoinCommand) CommandName added in v0.4.0

func (c *InfluxJoinCommand) CommandName() string

The name of the Join command in the log

func (*InfluxJoinCommand) NodeName added in v0.4.0

func (c *InfluxJoinCommand) NodeName() string

type Matcher

type Matcher struct {
	IsRegex bool
	Name    string
}

func (*Matcher) Matches

func (self *Matcher) Matches(name string) bool

type NextPoint added in v0.4.0

type NextPoint struct {
	// contains filtered or unexported fields
}

type ProtobufClient added in v0.4.0

type ProtobufClient struct {
	// contains filtered or unexported fields
}

func NewProtobufClient added in v0.4.0

func NewProtobufClient(hostAndPort string) *ProtobufClient

func (*ProtobufClient) Close added in v0.4.0

func (self *ProtobufClient) Close()

func (*ProtobufClient) MakeRequest added in v0.4.0

func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error

Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means that an attempt to make a request to a downed server will take 300ms to time out.

type ProtobufRequestHandler added in v0.4.0

type ProtobufRequestHandler struct {
	// contains filtered or unexported fields
}

func NewProtobufRequestHandler added in v0.4.0

func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *ClusterConfiguration) *ProtobufRequestHandler

func (*ProtobufRequestHandler) HandleRequest added in v0.4.0

func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error

func (*ProtobufRequestHandler) WriteResponse added in v0.4.0

func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error

type ProtobufServer added in v0.4.0

type ProtobufServer struct {
	// contains filtered or unexported fields
}

func NewProtobufServer added in v0.4.0

func NewProtobufServer(port string, requestHandler RequestHandler) *ProtobufServer

func (*ProtobufServer) Close added in v0.4.0

func (self *ProtobufServer) Close()

func (*ProtobufServer) ListenAndServe added in v0.4.0

func (self *ProtobufServer) ListenAndServe()

type RaftServer

type RaftServer struct {
	// contains filtered or unexported fields
}

The raftd server is a combination of the Raft server and an HTTP server which acts as the transport.

func NewRaftServer

func NewRaftServer(config *configuration.Configuration, clusterConfig *ClusterConfiguration) *RaftServer

Creates a new server.

func (*RaftServer) ActivateServer added in v0.0.6

func (s *RaftServer) ActivateServer(server *ClusterServer) error

func (*RaftServer) AddServer added in v0.0.6

func (s *RaftServer) AddServer(server *ClusterServer, insertIndex int) error

func (*RaftServer) AssignEngineAndCoordinator added in v0.4.0

func (s *RaftServer) AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error

func (*RaftServer) ChangeDbUserPassword added in v0.4.0

func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) error

func (*RaftServer) Close

func (self *RaftServer) Close()

func (*RaftServer) CompactLog added in v0.4.0

func (s *RaftServer) CompactLog()

func (*RaftServer) CreateContinuousQuery added in v0.4.0

func (s *RaftServer) CreateContinuousQuery(db string, query string) error

func (*RaftServer) CreateDatabase

func (s *RaftServer) CreateDatabase(name string, replicationFactor uint8) error

func (*RaftServer) CreateRootUser

func (s *RaftServer) CreateRootUser() error

func (*RaftServer) DeleteContinuousQuery added in v0.4.0

func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error

func (*RaftServer) DropDatabase added in v0.0.2

func (s *RaftServer) DropDatabase(name string) error

func (*RaftServer) ForceLogCompaction added in v0.4.1

func (s *RaftServer) ForceLogCompaction() error

func (*RaftServer) HandleFunc

func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))

This is a hack around Gorilla mux not providing the correct net/http HandleFunc() interface.

func (*RaftServer) Join

func (s *RaftServer) Join(leader string) error

Joins to the leader of an existing cluster.

func (*RaftServer) ListenAndServe

func (s *RaftServer) ListenAndServe() error

func (*RaftServer) MovePotentialServer added in v0.0.6

func (s *RaftServer) MovePotentialServer(server *ClusterServer, insertIndex int) error

func (*RaftServer) ReplaceServer added in v0.0.6

func (s *RaftServer) ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error

func (*RaftServer) SaveClusterAdminUser

func (s *RaftServer) SaveClusterAdminUser(u *clusterAdmin) error

func (*RaftServer) SaveDbUser

func (s *RaftServer) SaveDbUser(u *dbUser) error

func (*RaftServer) Serve added in v0.0.5

func (s *RaftServer) Serve(l net.Listener) error

func (*RaftServer) SetContinuousQueryTimestamp added in v0.4.0

func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error

type RequestHandler added in v0.4.0

type RequestHandler interface {
	HandleRequest(request *protocol.Request, conn net.Conn) error
}

type SaveClusterAdminCommand

type SaveClusterAdminCommand struct {
	User *clusterAdmin `json:"user"`
}

func NewSaveClusterAdminCommand

func NewSaveClusterAdminCommand(u *clusterAdmin) *SaveClusterAdminCommand

func (*SaveClusterAdminCommand) Apply

func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error)

func (*SaveClusterAdminCommand) CommandName

func (c *SaveClusterAdminCommand) CommandName() string

type SaveDbUserCommand

type SaveDbUserCommand struct {
	User *dbUser `json:"user"`
}

func NewSaveDbUserCommand

func NewSaveDbUserCommand(u *dbUser) *SaveDbUserCommand

func (*SaveDbUserCommand) Apply

func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error)

func (*SaveDbUserCommand) CommandName

func (c *SaveDbUserCommand) CommandName() string

type ServerState added in v0.0.2

type ServerState int
const (
	LoadingRingData ServerState = iota
	SendingRingData
	DeletingOldData
	Running
	Potential
)

type SetContinuousQueryTimestampCommand added in v0.4.0

type SetContinuousQueryTimestampCommand struct {
	Timestamp time.Time `json:"timestamp"`
}

func NewSetContinuousQueryTimestampCommand added in v0.4.0

func NewSetContinuousQueryTimestampCommand(timestamp time.Time) *SetContinuousQueryTimestampCommand

func (*SetContinuousQueryTimestampCommand) Apply added in v0.4.0

func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error)

func (*SetContinuousQueryTimestampCommand) CommandName added in v0.4.0

func (c *SetContinuousQueryTimestampCommand) CommandName() string

type UpdateServerStateCommand added in v0.0.5

type UpdateServerStateCommand struct {
	ServerId uint32
	State    ServerState
}

func NewUpdateServerStateCommand added in v0.0.5

func NewUpdateServerStateCommand(serverId uint32, state ServerState) *UpdateServerStateCommand

func (*UpdateServerStateCommand) Apply added in v0.0.5

func (c *UpdateServerStateCommand) Apply(server raft.Server) (interface{}, error)

func (*UpdateServerStateCommand) CommandName added in v0.0.5

func (c *UpdateServerStateCommand) CommandName() string

type UserManager

type UserManager interface {
	// Returns the user for the given db and that has the given
	// credentials, falling back to cluster admins
	AuthenticateDbUser(db, username, password string) (common.User, error)
	// Returns the cluster admin with the given credentials
	AuthenticateClusterAdmin(username, password string) (common.User, error)
	// Create a cluster admin user, it's an error if requester isn't a cluster admin
	CreateClusterAdminUser(request common.User, username string) error
	// Delete a cluster admin. Same restrictions as CreateClusterAdminUser
	DeleteClusterAdminUser(requester common.User, username string) error
	// Change cluster admin's password. It's an error if requester isn't a cluster admin
	ChangeClusterAdminPassword(requester common.User, username, password string) error
	// list cluster admins. only a cluster admin can list the other cluster admins
	ListClusterAdmins(requester common.User) ([]string, error)
	// Create a db user, it's an error if requester isn't a db admin or cluster admin
	CreateDbUser(request common.User, db, username string) error
	// Delete a db user. Same restrictions apply as in CreateDbUser
	DeleteDbUser(requester common.User, db, username string) error
	// Change db user's password. It's an error if requester isn't a cluster admin or db admin
	ChangeDbUserPassword(requester common.User, db, username, password string) error
	// list cluster admins. only a cluster admin or the db admin can list the db users
	ListDbUsers(requester common.User, db string) ([]string, error)
	// make user a db admin for 'db'. It's an error if the requester
	// isn't a db admin or cluster admin or if user isn't a db user
	// for the given db
	SetDbAdmin(requester common.User, db, username string, isAdmin bool) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL