coordinator

package
v0.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2014 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// this is the key used for the persistent atomic ints for sequence numbers
	POINT_SEQUENCE_NUMBER_KEY = "p"

	// actual point sequence numbers will have the first part of the number
	// be a host id. This ensures that sequence numbers are unique across the cluster
	HOST_ID_OFFSET = uint64(10000)

	SHARDS_TO_QUERY_FOR_LIST_SERIES = 10
)
View Source
const (
	REQUEST_RETRY_ATTEMPTS = 2
	MAX_RESPONSE_SIZE      = MAX_REQUEST_SIZE
	MAX_REQUEST_TIME       = time.Second * 1200
	RECONNECT_RETRY_WAIT   = time.Millisecond * 100
)
View Source
const (
	DEFAULT_ROOT_PWD = "root"
)
View Source
const KILOBYTE = 1024
View Source
const MAX_REQUEST_SIZE = MEGABYTE * 2
View Source
const (
	MAX_SIZE = 10 * MEGABYTE
)
View Source
const MEGABYTE = 1024 * KILOBYTE

Variables

View Source
var (
	BARRIER_TIME_MIN int64 = math.MinInt64
	BARRIER_TIME_MAX int64 = math.MaxInt64
)
View Source
var VALID_NAMES *regexp.Regexp

usernames and db names should match this regex

Functions

This section is empty.

Types

type AddPotentialServerCommand added in v0.0.5

type AddPotentialServerCommand struct {
	Server *cluster.ClusterServer
}

func NewAddPotentialServerCommand added in v0.0.5

func NewAddPotentialServerCommand(s *cluster.ClusterServer) *AddPotentialServerCommand

func (*AddPotentialServerCommand) Apply added in v0.0.5

func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error)

func (*AddPotentialServerCommand) CommandName added in v0.0.5

func (c *AddPotentialServerCommand) CommandName() string

type ChangeDbUserPassword added in v0.4.0

type ChangeDbUserPassword struct {
	Database string
	Username string
	Hash     string
}

func NewChangeDbUserPasswordCommand added in v0.4.0

func NewChangeDbUserPasswordCommand(db, username, hash string) *ChangeDbUserPassword

func (*ChangeDbUserPassword) Apply added in v0.4.0

func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error)

func (*ChangeDbUserPassword) CommandName added in v0.4.0

func (c *ChangeDbUserPassword) CommandName() string

type ClusterConsensus added in v0.0.6

type ClusterConsensus interface {
	CreateDatabase(name string, replicationFactor uint8) error
	DropDatabase(name string) error
	CreateContinuousQuery(db string, query string) error
	DeleteContinuousQuery(db string, id uint32) error
	SaveClusterAdminUser(u *cluster.ClusterAdmin) error
	SaveDbUser(user *cluster.DbUser) error
	ChangeDbUserPassword(db, username string, hash []byte) error

	// an insert index of -1 will append to the end of the ring
	AddServer(server *cluster.ClusterServer, insertIndex int) error
	// only servers that are in a Potential state can be moved around in the ring
	MovePotentialServer(server *cluster.ClusterServer, insertIndex int) error
	/*
		Activate tells the cluster to start sending writes to this node.
		The node will also make requests to the other servers to backfill any
		  data they should have
		Once the new node updates it state to "Running" the other servers will
		  delete all of the data that they no longer have to keep from the ring
	*/
	ActivateServer(server *cluster.ClusterServer) error

	// Efficient method to have a potential server take the place of a running (or downed)
	// server. The replacement must have a state of "Potential" for this to work.
	ReplaceServer(oldServer *cluster.ClusterServer, replacement *cluster.ClusterServer) error

	AssignCoordinator(coordinator *CoordinatorImpl) error

	// When a cluster is turned on for the first time.
	CreateRootUser() error

	ForceLogCompaction() error
}

type ContinuousQueryWriter added in v0.5.0

type ContinuousQueryWriter struct {
	// contains filtered or unexported fields
}

func NewContinuousQueryWriter added in v0.5.0

func NewContinuousQueryWriter(yield func(*protocol.Series) error) *ContinuousQueryWriter

func (*ContinuousQueryWriter) Close added in v0.5.0

func (self *ContinuousQueryWriter) Close()

func (*ContinuousQueryWriter) Write added in v0.5.0

func (self *ContinuousQueryWriter) Write(series *protocol.Series) error

type Coordinator

type Coordinator interface {
	// Assumption about the returned data:
	//   1. For any given time series, the points returned are in order
	//   2. If the query involves more than one time series, there is no
	//      guarantee on the order in whic they are returned
	//   3. Data is filtered, i.e. where clause should be assumed to hold true
	//      for all the data points that are returned
	//   4. The end of a time series is signaled by returning a series with no data points
	//   5. TODO: Aggregation on the nodes
	WriteSeriesData(user common.User, db string, series *protocol.Series) error
	DropDatabase(user common.User, db string) error
	CreateDatabase(user common.User, db string, replicationFactor uint8) error
	ForceCompaction(user common.User) error
	ListDatabases(user common.User) ([]*cluster.Database, error)
	DeleteContinuousQuery(user common.User, db string, id uint32) error
	CreateContinuousQuery(user common.User, db string, query string) error
	ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)

	// v2 clustering, based on sharding instead of the circular hash ring
	RunQuery(user common.User, db, query string, seriesWriter SeriesWriter) error
}

type CoordinatorImpl

type CoordinatorImpl struct {
	// contains filtered or unexported fields
}

func NewCoordinatorImpl

func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterConsensus, clusterConfiguration *cluster.ClusterConfiguration) *CoordinatorImpl

func (*CoordinatorImpl) AuthenticateClusterAdmin

func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error)

func (*CoordinatorImpl) AuthenticateDbUser

func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error)

func (*CoordinatorImpl) ChangeClusterAdminPassword

func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error

func (*CoordinatorImpl) ChangeDbUserPassword

func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error

func (*CoordinatorImpl) CommitSeriesData added in v0.4.0

func (self *CoordinatorImpl) CommitSeriesData(db string, series *protocol.Series) error

func (*CoordinatorImpl) ConnectToProtobufServers added in v0.4.0

func (self *CoordinatorImpl) ConnectToProtobufServers(localConnectionString string) error

func (*CoordinatorImpl) CreateClusterAdminUser

func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username, password string) error

func (*CoordinatorImpl) CreateContinuousQuery added in v0.4.0

func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error

func (*CoordinatorImpl) CreateDatabase

func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error

func (*CoordinatorImpl) CreateDbUser

func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, password string) error

func (*CoordinatorImpl) DeleteClusterAdminUser

func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error

func (*CoordinatorImpl) DeleteContinuousQuery added in v0.4.0

func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error

func (*CoordinatorImpl) DeleteDbUser

func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error

func (*CoordinatorImpl) DropDatabase added in v0.0.2

func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error

func (*CoordinatorImpl) ForceCompaction added in v0.4.1

func (self *CoordinatorImpl) ForceCompaction(user common.User) error

func (*CoordinatorImpl) GetDbUser added in v0.5.0

func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, username string) (common.User, error)

func (*CoordinatorImpl) InterpolateValuesAndCommit added in v0.5.0

func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, series *protocol.Series, targetName string, assignSequenceNumbers bool) error

func (*CoordinatorImpl) ListClusterAdmins added in v0.0.2

func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error)

func (*CoordinatorImpl) ListContinuousQueries added in v0.4.0

func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)

func (*CoordinatorImpl) ListDatabases added in v0.0.2

func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Database, error)

func (*CoordinatorImpl) ListDbUsers added in v0.0.2

func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]common.User, error)

func (*CoordinatorImpl) ProcessContinuousQueries added in v0.4.0

func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series)

func (*CoordinatorImpl) RunQuery added in v0.5.0

func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error)

func (*CoordinatorImpl) SetDbAdmin

func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error

func (*CoordinatorImpl) WriteSeriesData

func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error

type CreateContinuousQueryCommand added in v0.4.0

type CreateContinuousQueryCommand struct {
	Database string `json:"database"`
	Query    string `json:"query"`
}

func NewCreateContinuousQueryCommand added in v0.4.0

func NewCreateContinuousQueryCommand(database string, query string) *CreateContinuousQueryCommand

func (*CreateContinuousQueryCommand) Apply added in v0.4.0

func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateContinuousQueryCommand) CommandName added in v0.4.0

func (c *CreateContinuousQueryCommand) CommandName() string

type CreateDatabaseCommand

type CreateDatabaseCommand struct {
	Name              string `json:"name"`
	ReplicationFactor uint8  `json:"replicationFactor"`
}

func NewCreateDatabaseCommand

func NewCreateDatabaseCommand(name string, replicationFactor uint8) *CreateDatabaseCommand

func (*CreateDatabaseCommand) Apply

func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateDatabaseCommand) CommandName

func (c *CreateDatabaseCommand) CommandName() string

type CreateShardsCommand added in v0.5.0

type CreateShardsCommand struct {
	Shards []*cluster.NewShardData
}

func NewCreateShardsCommand added in v0.5.0

func NewCreateShardsCommand(shards []*cluster.NewShardData) *CreateShardsCommand

func (*CreateShardsCommand) Apply added in v0.5.0

func (c *CreateShardsCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateShardsCommand) CommandName added in v0.5.0

func (c *CreateShardsCommand) CommandName() string

type DeleteContinuousQueryCommand added in v0.4.0

type DeleteContinuousQueryCommand struct {
	Database string `json:"database"`
	Id       uint32 `json:"id"`
}

func NewDeleteContinuousQueryCommand added in v0.4.0

func NewDeleteContinuousQueryCommand(database string, id uint32) *DeleteContinuousQueryCommand

func (*DeleteContinuousQueryCommand) Apply added in v0.4.0

func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)

func (*DeleteContinuousQueryCommand) CommandName added in v0.4.0

func (c *DeleteContinuousQueryCommand) CommandName() string

type DropDatabaseCommand added in v0.0.2

type DropDatabaseCommand struct {
	Name string `json:"name"`
}

func NewDropDatabaseCommand added in v0.0.2

func NewDropDatabaseCommand(name string) *DropDatabaseCommand

func (*DropDatabaseCommand) Apply added in v0.0.2

func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error)

func (*DropDatabaseCommand) CommandName added in v0.0.2

func (c *DropDatabaseCommand) CommandName() string

type DropShardCommand added in v0.5.0

type DropShardCommand struct {
	ShardId   uint32
	ServerIds []uint32
}

func NewDropShardCommand added in v0.5.0

func NewDropShardCommand(id uint32, serverIds []uint32) *DropShardCommand

func (*DropShardCommand) Apply added in v0.5.0

func (c *DropShardCommand) Apply(server raft.Server) (interface{}, error)

func (*DropShardCommand) CommandName added in v0.5.0

func (c *DropShardCommand) CommandName() string

type InfluxJoinCommand added in v0.4.0

type InfluxJoinCommand struct {
	Name                     string `json:"name"`
	ConnectionString         string `json:"connectionString"`
	ProtobufConnectionString string `json:"protobufConnectionString"`
}

func (*InfluxJoinCommand) Apply added in v0.4.0

func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error)

func (*InfluxJoinCommand) CommandName added in v0.4.0

func (c *InfluxJoinCommand) CommandName() string

The name of the Join command in the log

func (*InfluxJoinCommand) NodeName added in v0.4.0

func (c *InfluxJoinCommand) NodeName() string

type ProtobufClient added in v0.4.0

type ProtobufClient struct {
	// contains filtered or unexported fields
}

func NewProtobufClient added in v0.4.0

func NewProtobufClient(hostAndPort string, writeTimeout time.Duration) *ProtobufClient

func (*ProtobufClient) Close added in v0.4.0

func (self *ProtobufClient) Close()

func (*ProtobufClient) Connect added in v0.5.0

func (self *ProtobufClient) Connect()

func (*ProtobufClient) MakeRequest added in v0.4.0

func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error

Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means that an attempt to make a request to a downed server will take 300ms to time out.

type ProtobufRequestHandler added in v0.4.0

type ProtobufRequestHandler struct {
	// contains filtered or unexported fields
}

func NewProtobufRequestHandler added in v0.4.0

func NewProtobufRequestHandler(coordinator Coordinator, clusterConfig *cluster.ClusterConfiguration) *ProtobufRequestHandler

func (*ProtobufRequestHandler) HandleRequest added in v0.4.0

func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error

func (*ProtobufRequestHandler) WriteResponse added in v0.4.0

func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error

type ProtobufServer added in v0.4.0

type ProtobufServer struct {
	// contains filtered or unexported fields
}

func NewProtobufServer added in v0.4.0

func NewProtobufServer(port string, requestHandler RequestHandler) *ProtobufServer

func (*ProtobufServer) Close added in v0.4.0

func (self *ProtobufServer) Close()

func (*ProtobufServer) ListenAndServe added in v0.4.0

func (self *ProtobufServer) ListenAndServe()

type RaftServer

type RaftServer struct {
	// contains filtered or unexported fields
}

The raftd server is a combination of the Raft server and an HTTP server which acts as the transport.

func NewRaftServer

func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.ClusterConfiguration) *RaftServer

Creates a new server.

func (*RaftServer) ActivateServer added in v0.0.6

func (s *RaftServer) ActivateServer(server *cluster.ClusterServer) error

func (*RaftServer) AddServer added in v0.0.6

func (s *RaftServer) AddServer(server *cluster.ClusterServer, insertIndex int) error

func (*RaftServer) AssignCoordinator added in v0.5.0

func (s *RaftServer) AssignCoordinator(coordinator *CoordinatorImpl) error

func (*RaftServer) ChangeDbUserPassword added in v0.4.0

func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) error

func (*RaftServer) Close

func (self *RaftServer) Close()

func (*RaftServer) CompactLog added in v0.4.0

func (s *RaftServer) CompactLog()

func (*RaftServer) CreateContinuousQuery added in v0.4.0

func (s *RaftServer) CreateContinuousQuery(db string, query string) error

func (*RaftServer) CreateDatabase

func (s *RaftServer) CreateDatabase(name string, replicationFactor uint8) error

func (*RaftServer) CreateRootUser

func (s *RaftServer) CreateRootUser() error

func (*RaftServer) CreateShards added in v0.5.0

func (self *RaftServer) CreateShards(shards []*cluster.NewShardData) ([]*cluster.ShardData, error)

func (*RaftServer) DeleteContinuousQuery added in v0.4.0

func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error

func (*RaftServer) DropDatabase added in v0.0.2

func (s *RaftServer) DropDatabase(name string) error

func (*RaftServer) DropShard added in v0.5.0

func (self *RaftServer) DropShard(id uint32, serverIds []uint32) error

func (*RaftServer) ForceLogCompaction added in v0.4.1

func (s *RaftServer) ForceLogCompaction() error

func (*RaftServer) GetRaftName added in v0.5.0

func (s *RaftServer) GetRaftName() string

func (*RaftServer) HandleFunc

func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))

This is a hack around Gorilla mux not providing the correct net/http HandleFunc() interface.

func (*RaftServer) Join

func (s *RaftServer) Join(leader string) error

Joins to the leader of an existing cluster.

func (*RaftServer) ListenAndServe

func (s *RaftServer) ListenAndServe() error

func (*RaftServer) MovePotentialServer added in v0.0.6

func (s *RaftServer) MovePotentialServer(server *cluster.ClusterServer, insertIndex int) error

func (*RaftServer) ReplaceServer added in v0.0.6

func (s *RaftServer) ReplaceServer(oldServer *cluster.ClusterServer, replacement *cluster.ClusterServer) error

func (*RaftServer) SaveClusterAdminUser

func (s *RaftServer) SaveClusterAdminUser(u *cluster.ClusterAdmin) error

func (*RaftServer) SaveDbUser

func (s *RaftServer) SaveDbUser(u *cluster.DbUser) error

func (*RaftServer) Serve added in v0.0.5

func (s *RaftServer) Serve(l net.Listener) error

func (*RaftServer) SetContinuousQueryTimestamp added in v0.4.0

func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error

func (*RaftServer) StartProcessingContinuousQueries added in v0.5.5

func (s *RaftServer) StartProcessingContinuousQueries()

type RequestHandler added in v0.4.0

type RequestHandler interface {
	HandleRequest(request *protocol.Request, conn net.Conn) error
}

type SaveClusterAdminCommand

type SaveClusterAdminCommand struct {
	User *cluster.ClusterAdmin `json:"user"`
}

func NewSaveClusterAdminCommand

func NewSaveClusterAdminCommand(u *cluster.ClusterAdmin) *SaveClusterAdminCommand

func (*SaveClusterAdminCommand) Apply

func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error)

func (*SaveClusterAdminCommand) CommandName

func (c *SaveClusterAdminCommand) CommandName() string

type SaveDbUserCommand

type SaveDbUserCommand struct {
	User *cluster.DbUser `json:"user"`
}

func NewSaveDbUserCommand

func NewSaveDbUserCommand(u *cluster.DbUser) *SaveDbUserCommand

func (*SaveDbUserCommand) Apply

func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error)

func (*SaveDbUserCommand) CommandName

func (c *SaveDbUserCommand) CommandName() string

type SeriesWriter added in v0.5.0

type SeriesWriter interface {
	Write(*protocol.Series) error
	Close()
}

type SetContinuousQueryTimestampCommand added in v0.4.0

type SetContinuousQueryTimestampCommand struct {
	Timestamp time.Time `json:"timestamp"`
}

func NewSetContinuousQueryTimestampCommand added in v0.4.0

func NewSetContinuousQueryTimestampCommand(timestamp time.Time) *SetContinuousQueryTimestampCommand

func (*SetContinuousQueryTimestampCommand) Apply added in v0.4.0

func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error)

func (*SetContinuousQueryTimestampCommand) CommandName added in v0.4.0

func (c *SetContinuousQueryTimestampCommand) CommandName() string

type ShardAwareObject added in v0.5.0

type ShardAwareObject interface {
	GetShards(querySpec *parser.QuerySpec) []cluster.Shard
	// returns true if results from shards can just be ordered. false if the results are raw points that
	// need to be sent through the query engine
	CanCollateShards(querySpec *parser.QuerySpec) bool
	GetShardById(id uint32) cluster.Shard
	GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (cluster.Shard, error)
}

These are things that the Coordinator need (defined in Coordinator, will have to import cluster package)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL