coordinator

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2014 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// this is the key used for the persistent atomic ints for sequence numbers
	POINT_SEQUENCE_NUMBER_KEY = "p"

	// actual point sequence numbers will have the first part of the number
	// be a host id. This ensures that sequence numbers are unique across the cluster
	HOST_ID_OFFSET = uint64(10000)

	SHARDS_TO_QUERY_FOR_LIST_SERIES = 10
)
View Source
const (
	REQUEST_RETRY_ATTEMPTS = 2
	MAX_RESPONSE_SIZE      = MAX_REQUEST_SIZE
	MAX_REQUEST_TIME       = time.Second * 1200
	RECONNECT_RETRY_WAIT   = time.Millisecond * 100
)
View Source
const (
	DEFAULT_ROOT_PWD        = "root"
	DEFAULT_ROOT_PWD_ENVKEY = "INFLUXDB_INIT_PWD"
)
View Source
const KILOBYTE = 1024
View Source
const MAX_REQUEST_SIZE = MEGABYTE * 2
View Source
const (
	MAX_SIZE = 10 * MEGABYTE
)
View Source
const MEGABYTE = 1024 * KILOBYTE

Variables

View Source
var (
	BARRIER_TIME_MIN int64 = math.MinInt64
	BARRIER_TIME_MAX int64 = math.MaxInt64
)

Functions

func SendCommandToServer added in v0.5.12

func SendCommandToServer(url string, command raft.Command) (interface{}, error)

Types

type ChangeDbUserPassword added in v0.4.0

type ChangeDbUserPassword struct {
	Database string
	Username string
	Hash     string
}

func NewChangeDbUserPasswordCommand added in v0.4.0

func NewChangeDbUserPasswordCommand(db, username, hash string) *ChangeDbUserPassword

func (*ChangeDbUserPassword) Apply added in v0.4.0

func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error)

func (*ChangeDbUserPassword) CommandName added in v0.4.0

func (c *ChangeDbUserPassword) CommandName() string

type ChangeDbUserPermissions added in v0.5.11

type ChangeDbUserPermissions struct {
	Database         string
	Username         string
	ReadPermissions  string
	WritePermissions string
}

func NewChangeDbUserPermissionsCommand added in v0.5.11

func NewChangeDbUserPermissionsCommand(db, username, readPermissions, writePermissions string) *ChangeDbUserPermissions

func (*ChangeDbUserPermissions) Apply added in v0.5.11

func (c *ChangeDbUserPermissions) Apply(server raft.Server) (interface{}, error)

func (*ChangeDbUserPermissions) CommandName added in v0.5.11

func (c *ChangeDbUserPermissions) CommandName() string

type ClusterConsensus added in v0.0.6

type ClusterConsensus interface {
	CreateDatabase(name string) error
	DropDatabase(name string) error
	CreateContinuousQuery(db string, query string) error
	DeleteContinuousQuery(db string, id uint32) error
	SaveClusterAdminUser(u *cluster.ClusterAdmin) error
	SaveDbUser(user *cluster.DbUser) error
	ChangeDbUserPassword(db, username string, hash []byte) error
	ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error
	AssignCoordinator(coordinator *CoordinatorImpl) error
	// When a cluster is turned on for the first time.
	CreateRootUser() error
	ForceLogCompaction() error
}

type ContinuousQueryWriter added in v0.5.0

type ContinuousQueryWriter struct {
	// contains filtered or unexported fields
}

func NewContinuousQueryWriter added in v0.5.0

func NewContinuousQueryWriter(yield func(*protocol.Series) error) *ContinuousQueryWriter

func (*ContinuousQueryWriter) Close added in v0.5.0

func (self *ContinuousQueryWriter) Close()

func (*ContinuousQueryWriter) Write added in v0.5.0

func (self *ContinuousQueryWriter) Write(series *protocol.Series) error

type Coordinator

type Coordinator interface {
	// Assumption about the returned data:
	//   1. For any given time series, the points returned are in order
	//   2. If the query involves more than one time series, there is no
	//      guarantee on the order in whic they are returned
	//   3. Data is filtered, i.e. where clause should be assumed to hold true
	//      for all the data points that are returned
	//   4. The end of a time series is signaled by returning a series with no data points
	//   5. TODO: Aggregation on the nodes
	WriteSeriesData(user common.User, db string, series []*protocol.Series) error
	DropDatabase(user common.User, db string) error
	CreateDatabase(user common.User, db string) error
	ForceCompaction(user common.User) error
	ListDatabases(user common.User) ([]*cluster.Database, error)
	DeleteContinuousQuery(user common.User, db string, id uint32) error
	CreateContinuousQuery(user common.User, db string, query string) error
	ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)

	// v2 clustering, based on sharding instead of the circular hash ring
	RunQuery(user common.User, db, query string, seriesWriter SeriesWriter) error
}

type CoordinatorImpl

type CoordinatorImpl struct {
	// contains filtered or unexported fields
}

func NewCoordinatorImpl

func NewCoordinatorImpl(config *configuration.Configuration, raftServer ClusterConsensus, clusterConfiguration *cluster.ClusterConfiguration) *CoordinatorImpl

func (*CoordinatorImpl) AuthenticateClusterAdmin

func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error)

func (*CoordinatorImpl) AuthenticateDbUser

func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error)

func (*CoordinatorImpl) ChangeClusterAdminPassword

func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, username, password string) error

func (*CoordinatorImpl) ChangeDbUserPassword

func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, username, password string) error

func (*CoordinatorImpl) ChangeDbUserPermissions added in v0.5.11

func (self *CoordinatorImpl) ChangeDbUserPermissions(requester common.User, db, username, readPermissions, writePermissions string) error

func (*CoordinatorImpl) CommitSeriesData added in v0.4.0

func (self *CoordinatorImpl) CommitSeriesData(db string, serieses []*protocol.Series, sync bool) error

func (*CoordinatorImpl) ConnectToProtobufServers added in v0.4.0

func (self *CoordinatorImpl) ConnectToProtobufServers(localRaftName string) error

func (*CoordinatorImpl) CreateClusterAdminUser

func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, username, password string) error

func (*CoordinatorImpl) CreateContinuousQuery added in v0.4.0

func (self *CoordinatorImpl) CreateContinuousQuery(user common.User, db string, query string) error

func (*CoordinatorImpl) CreateDatabase

func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error

func (*CoordinatorImpl) CreateDbUser

func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username, password string, permissions ...string) error

func (*CoordinatorImpl) DeleteClusterAdminUser

func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error

func (*CoordinatorImpl) DeleteContinuousQuery added in v0.4.0

func (self *CoordinatorImpl) DeleteContinuousQuery(user common.User, db string, id uint32) error

func (*CoordinatorImpl) DeleteDbUser

func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error

func (*CoordinatorImpl) DropDatabase added in v0.0.2

func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error

func (*CoordinatorImpl) ForceCompaction added in v0.4.1

func (self *CoordinatorImpl) ForceCompaction(user common.User) error

func (*CoordinatorImpl) GetDbUser added in v0.5.0

func (self *CoordinatorImpl) GetDbUser(requester common.User, db string, username string) (common.User, error)

func (*CoordinatorImpl) InterpolateValuesAndCommit added in v0.5.0

func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, series *protocol.Series, targetName string, assignSequenceNumbers bool) error

func (*CoordinatorImpl) ListClusterAdmins added in v0.0.2

func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error)

func (*CoordinatorImpl) ListContinuousQueries added in v0.4.0

func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string) ([]*protocol.Series, error)

func (*CoordinatorImpl) ListDatabases added in v0.0.2

func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Database, error)

func (*CoordinatorImpl) ListDbUsers added in v0.0.2

func (self *CoordinatorImpl) ListDbUsers(requester common.User, db string) ([]common.User, error)

func (*CoordinatorImpl) ProcessContinuousQueries added in v0.4.0

func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series)

func (*CoordinatorImpl) RunQuery added in v0.5.0

func (self *CoordinatorImpl) RunQuery(user common.User, database string, queryString string, seriesWriter SeriesWriter) (err error)

func (*CoordinatorImpl) SetDbAdmin

func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username string, isAdmin bool) error

func (*CoordinatorImpl) WriteSeriesData

func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series []*protocol.Series) error

type CreateContinuousQueryCommand added in v0.4.0

type CreateContinuousQueryCommand struct {
	Database string `json:"database"`
	Query    string `json:"query"`
}

func NewCreateContinuousQueryCommand added in v0.4.0

func NewCreateContinuousQueryCommand(database string, query string) *CreateContinuousQueryCommand

func (*CreateContinuousQueryCommand) Apply added in v0.4.0

func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateContinuousQueryCommand) CommandName added in v0.4.0

func (c *CreateContinuousQueryCommand) CommandName() string

type CreateDatabaseCommand

type CreateDatabaseCommand struct {
	Name string `json:"name"`
}

func NewCreateDatabaseCommand

func NewCreateDatabaseCommand(name string) *CreateDatabaseCommand

func (*CreateDatabaseCommand) Apply

func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateDatabaseCommand) CommandName

func (c *CreateDatabaseCommand) CommandName() string

type CreateShardsCommand added in v0.5.0

type CreateShardsCommand struct {
	Shards []*cluster.NewShardData
}

func NewCreateShardsCommand added in v0.5.0

func NewCreateShardsCommand(shards []*cluster.NewShardData) *CreateShardsCommand

func (*CreateShardsCommand) Apply added in v0.5.0

func (c *CreateShardsCommand) Apply(server raft.Server) (interface{}, error)

func (*CreateShardsCommand) CommandName added in v0.5.0

func (c *CreateShardsCommand) CommandName() string

func (*CreateShardsCommand) Decode added in v0.5.9

func (c *CreateShardsCommand) Decode(r io.Reader) error

func (*CreateShardsCommand) Encode added in v0.5.9

func (c *CreateShardsCommand) Encode(w io.Writer) error

type DeleteContinuousQueryCommand added in v0.4.0

type DeleteContinuousQueryCommand struct {
	Database string `json:"database"`
	Id       uint32 `json:"id"`
}

func NewDeleteContinuousQueryCommand added in v0.4.0

func NewDeleteContinuousQueryCommand(database string, id uint32) *DeleteContinuousQueryCommand

func (*DeleteContinuousQueryCommand) Apply added in v0.4.0

func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error)

func (*DeleteContinuousQueryCommand) CommandName added in v0.4.0

func (c *DeleteContinuousQueryCommand) CommandName() string

type DropDatabaseCommand added in v0.0.2

type DropDatabaseCommand struct {
	Name string `json:"name"`
}

func NewDropDatabaseCommand added in v0.0.2

func NewDropDatabaseCommand(name string) *DropDatabaseCommand

func (*DropDatabaseCommand) Apply added in v0.0.2

func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error)

func (*DropDatabaseCommand) CommandName added in v0.0.2

func (c *DropDatabaseCommand) CommandName() string

type DropShardCommand added in v0.5.0

type DropShardCommand struct {
	ShardId   uint32
	ServerIds []uint32
}

func NewDropShardCommand added in v0.5.0

func NewDropShardCommand(id uint32, serverIds []uint32) *DropShardCommand

func (*DropShardCommand) Apply added in v0.5.0

func (c *DropShardCommand) Apply(server raft.Server) (interface{}, error)

func (*DropShardCommand) CommandName added in v0.5.0

func (c *DropShardCommand) CommandName() string

type InfluxChangeConnectionStringCommand added in v0.5.12

type InfluxChangeConnectionStringCommand struct {
	Name                     string `json:"name"`
	Force                    bool   `json:"force"`
	ConnectionString         string `json:"connectionString"`
	ProtobufConnectionString string `json:"protobufConnectionString"`
}

func (*InfluxChangeConnectionStringCommand) Apply added in v0.5.12

func (c *InfluxChangeConnectionStringCommand) Apply(server raft.Server) (interface{}, error)

func (*InfluxChangeConnectionStringCommand) CommandName added in v0.5.12

The name of the ChangeConnectionString command in the log

func (*InfluxChangeConnectionStringCommand) NodeName added in v0.5.12

type InfluxJoinCommand added in v0.4.0

type InfluxJoinCommand struct {
	Name                     string `json:"name"`
	ConnectionString         string `json:"connectionString"`
	ProtobufConnectionString string `json:"protobufConnectionString"`
}

func (*InfluxJoinCommand) Apply added in v0.4.0

func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error)

func (*InfluxJoinCommand) CommandName added in v0.4.0

func (c *InfluxJoinCommand) CommandName() string

The name of the Join command in the log

func (*InfluxJoinCommand) NodeName added in v0.4.0

func (c *InfluxJoinCommand) NodeName() string

type ProtobufClient added in v0.4.0

type ProtobufClient struct {
	// contains filtered or unexported fields
}

func NewProtobufClient added in v0.4.0

func NewProtobufClient(hostAndPort string, writeTimeout time.Duration) *ProtobufClient

func (*ProtobufClient) ClearRequests added in v0.6.3

func (self *ProtobufClient) ClearRequests()

func (*ProtobufClient) Close added in v0.4.0

func (self *ProtobufClient) Close()

func (*ProtobufClient) Connect added in v0.5.0

func (self *ProtobufClient) Connect()

func (*ProtobufClient) MakeRequest added in v0.4.0

func (self *ProtobufClient) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error

Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means that an attempt to make a request to a downed server will take 300ms to time out.

type ProtobufRequestHandler added in v0.4.0

type ProtobufRequestHandler struct {
	// contains filtered or unexported fields
}

func NewProtobufRequestHandler added in v0.4.0

func NewProtobufRequestHandler(coordinator Coordinator, clusterConfig *cluster.ClusterConfiguration) *ProtobufRequestHandler

func (*ProtobufRequestHandler) HandleRequest added in v0.4.0

func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error

func (*ProtobufRequestHandler) WriteResponse added in v0.4.0

func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error

type ProtobufServer added in v0.4.0

type ProtobufServer struct {
	// contains filtered or unexported fields
}

func NewProtobufServer added in v0.4.0

func NewProtobufServer(port string, requestHandler RequestHandler) *ProtobufServer

func (*ProtobufServer) Close added in v0.4.0

func (self *ProtobufServer) Close()

func (*ProtobufServer) ListenAndServe added in v0.4.0

func (self *ProtobufServer) ListenAndServe()

type RaftServer

type RaftServer struct {
	// contains filtered or unexported fields
}

The raftd server is a combination of the Raft server and an HTTP server which acts as the transport.

func NewRaftServer

func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.ClusterConfiguration) *RaftServer

Creates a new server.

func (*RaftServer) AssignCoordinator added in v0.5.0

func (s *RaftServer) AssignCoordinator(coordinator *CoordinatorImpl) error

func (*RaftServer) ChangeConnectionString added in v0.5.12

func (s *RaftServer) ChangeConnectionString(raftName, protobufConnectionString, raftConnectionString string, forced bool) error

func (*RaftServer) ChangeDbUserPassword added in v0.4.0

func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) error

func (*RaftServer) ChangeDbUserPermissions added in v0.5.11

func (s *RaftServer) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error

func (*RaftServer) Close

func (self *RaftServer) Close()

func (*RaftServer) CommittedAllChanges added in v0.5.7

func (s *RaftServer) CommittedAllChanges() bool

func (*RaftServer) CompactLog added in v0.4.0

func (s *RaftServer) CompactLog()

func (*RaftServer) CreateContinuousQuery added in v0.4.0

func (s *RaftServer) CreateContinuousQuery(db string, query string) error

func (*RaftServer) CreateDatabase

func (s *RaftServer) CreateDatabase(name string) error

func (*RaftServer) CreateRootUser

func (s *RaftServer) CreateRootUser() error

func (*RaftServer) CreateShards added in v0.5.0

func (self *RaftServer) CreateShards(shards []*cluster.NewShardData) ([]*cluster.ShardData, error)

func (*RaftServer) DeleteContinuousQuery added in v0.4.0

func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error

func (*RaftServer) DropDatabase added in v0.0.2

func (s *RaftServer) DropDatabase(name string) error

func (*RaftServer) DropShard added in v0.5.0

func (self *RaftServer) DropShard(id uint32, serverIds []uint32) error

func (*RaftServer) ForceLogCompaction added in v0.4.1

func (s *RaftServer) ForceLogCompaction() error

func (*RaftServer) GetRaftName added in v0.5.0

func (s *RaftServer) GetRaftName() string

func (*RaftServer) HandleFunc

func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))

This is a hack around Gorilla mux not providing the correct net/http HandleFunc() interface.

func (*RaftServer) Join

func (s *RaftServer) Join(leader string) error

Joins to the leader of an existing cluster.

func (*RaftServer) ListenAndServe

func (s *RaftServer) ListenAndServe() error

func (*RaftServer) SaveClusterAdminUser

func (s *RaftServer) SaveClusterAdminUser(u *cluster.ClusterAdmin) error

func (*RaftServer) SaveDbUser

func (s *RaftServer) SaveDbUser(u *cluster.DbUser) error

func (*RaftServer) Serve added in v0.0.5

func (s *RaftServer) Serve(l net.Listener) error

func (*RaftServer) SetContinuousQueryTimestamp added in v0.4.0

func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error

func (*RaftServer) StartProcessingContinuousQueries added in v0.5.5

func (s *RaftServer) StartProcessingContinuousQueries()

type RequestHandler added in v0.4.0

type RequestHandler interface {
	HandleRequest(request *protocol.Request, conn net.Conn) error
}

type SaveClusterAdminCommand

type SaveClusterAdminCommand struct {
	User *cluster.ClusterAdmin `json:"user"`
}

func NewSaveClusterAdminCommand

func NewSaveClusterAdminCommand(u *cluster.ClusterAdmin) *SaveClusterAdminCommand

func (*SaveClusterAdminCommand) Apply

func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error)

func (*SaveClusterAdminCommand) CommandName

func (c *SaveClusterAdminCommand) CommandName() string

type SaveDbUserCommand

type SaveDbUserCommand struct {
	User *cluster.DbUser `json:"user"`
}

func NewSaveDbUserCommand

func NewSaveDbUserCommand(u *cluster.DbUser) *SaveDbUserCommand

func (*SaveDbUserCommand) Apply

func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error)

func (*SaveDbUserCommand) CommandName

func (c *SaveDbUserCommand) CommandName() string

type SeriesWriter added in v0.5.0

type SeriesWriter interface {
	Write(*protocol.Series) error
	Close()
}

type SetContinuousQueryTimestampCommand added in v0.4.0

type SetContinuousQueryTimestampCommand struct {
	Timestamp time.Time `json:"timestamp"`
}

func NewSetContinuousQueryTimestampCommand added in v0.4.0

func NewSetContinuousQueryTimestampCommand(timestamp time.Time) *SetContinuousQueryTimestampCommand

func (*SetContinuousQueryTimestampCommand) Apply added in v0.4.0

func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error)

func (*SetContinuousQueryTimestampCommand) CommandName added in v0.4.0

func (c *SetContinuousQueryTimestampCommand) CommandName() string

type ShardAwareObject added in v0.5.0

type ShardAwareObject interface {
	GetShards(querySpec *parser.QuerySpec) []cluster.Shard
	// returns true if results from shards can just be ordered. false if the results are raw points that
	// need to be sent through the query engine
	CanCollateShards(querySpec *parser.QuerySpec) bool
	GetShardById(id uint32) cluster.Shard
	GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (cluster.Shard, error)
}

These are things that the Coordinator need (defined in Coordinator, will have to import cluster package)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL