influxdb

package module
v0.9.0-rc9.0...-73a5747 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2015 License: MIT Imports: 27 Imported by: 0

README

InfluxDB Circle CI

An Open-Source, Distributed, Time Series Database

InfluxDB v0.9.0 is now in the RC phase. If you're building a new project, please build against master or the most current RC instead of using v0.8.8.

InfluxDB is an open source distributed time series database with no external dependencies. It's useful for recording metrics, events, and performing analytics.

Features

  • Built-in [HTTP API] (http://influxdb.com/docs/v0.9/concepts/reading_and_writing_data.html) so you don't have to write any server side code to get up and running.
  • Clustering is supported out of the box, so that you can scale horizontally to handle your data.
  • Simple to install and manage, and fast to get data in and out.
  • It aims to answer queries in real-time. That means every data point is indexed as it comes in and is immediately available in queries that should return in < 100ms.

Getting Started

The following directions apply only to the 0.9.0 release or building from the source on master.

Building

You don't need to build the project to use it - you can use any of our pre-built packages to install InfluxDB. That's the recommended way to get it running. However, if you want to contribute to the core of InfluxDB, you'll need to build. For those adventurous enough, you can follow along on our docs.

Starting InfluxDB
  • service influxdb start if you have installed InfluxDB using an official Debian or RPM package.
  • $GOPATH/bin/influxd if you have built InfluxDB from source.
Creating your first database
curl -G 'http://localhost:8086/query' --data-urlencode "q=CREATE DATABASE mydb"
Insert some data
curl -H "Content-Type: application/json" http://localhost:8086/write -d '
{
    "database": "mydb",
    "retentionPolicy": "default",
    "points": [
        {
            "timestamp": "2014-11-10T23:00:00Z",
            "name": "cpu",
             "tags": {
                 "region":"uswest",
                 "host": "server01"
            },
             "fields":{
                 "value": 100
            }
         }
      ]
}'
Query for the data
curl -G http://localhost:8086/query?pretty=true \
--data-urlencode "db=mydb" --data-urlencode "q=SELECT * FROM cpu"

Documentation

Index

Constants

View Source
const (
	// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
	// in the cluster to run any continuous queries that should be run.
	DefaultContinuousQueryCheckTime = 1 * time.Second

	// DefaultDataNodeTimeout is how long the broker will wait before timing out on a data node
	// that it has requested process continuous queries.
	DefaultDataNodeTimeout = 1 * time.Second

	// DefaultFailureSleep is how long the broker will sleep before trying the next data node in
	// the cluster if the current data node failed to respond
	DefaultFailureSleep = 100 * time.Millisecond
)
View Source
const (
	// DefaultRootPassword is the password initially set for the root user.
	// It is also used when reseting the root user's password.
	DefaultRootPassword = "root"

	// DefaultRetentionPolicyName is the name of a databases's default shard space.
	DefaultRetentionPolicyName = "default"

	// DefaultSplitN represents the number of partitions a shard is split into.
	DefaultSplitN = 1

	// DefaultReplicaN represents the number of replicas data is written to.
	DefaultReplicaN = 1

	// DefaultShardDuration is the time period held by a shard.
	DefaultShardDuration = 7 * (24 * time.Hour)

	// DefaultShardRetention is the length of time before a shard is dropped.
	DefaultShardRetention = 7 * (24 * time.Hour)

	// BroadcastTopicID is the topic used for all metadata.
	BroadcastTopicID = uint64(0)

	// When planning a select statement, passing zero tells it not to chunk results. Only applies to raw queries
	NoChunkingSize = 0
)
View Source
const (
	MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024
)

Variables

View Source
var (
	// ErrServerOpen is returned when opening an already open server.
	ErrServerOpen = errors.New("server already open")

	// ErrServerClosed is returned when closing an already closed server.
	ErrServerClosed = errors.New("server already closed")

	// ErrPathRequired is returned when opening a server without a path.
	ErrPathRequired = errors.New("path required")

	// ErrUnableToJoin is returned when a server cannot join a cluster.
	ErrUnableToJoin = errors.New("unable to join")

	// ErrDataNodeURLRequired is returned when creating a data node without a URL.
	ErrDataNodeURLRequired = errors.New("data node url required")

	// ErrNoDataNodeAvailable is returned when there are no data nodes available
	ErrNoDataNodeAvailable = errors.New("data node not available")

	// ErrDataNodeExists is returned when creating a duplicate data node.
	ErrDataNodeExists = errors.New("data node exists")

	// ErrDataNodeNotFound is returned when dropping a non-existent data node or
	// attempting to join another data node when no data nodes exist yet
	ErrDataNodeNotFound = errors.New("data node not found")

	// ErrDataNodeRequired is returned when using a blank data node id.
	ErrDataNodeRequired = errors.New("data node required")

	// ErrDatabaseNameRequired is returned when creating a database without a name.
	ErrDatabaseNameRequired = errors.New("database name required")

	// ErrDatabaseExists is returned when creating a duplicate database.
	ErrDatabaseExists = errors.New("database exists")

	// ErrDatabaseRequired is returned when using a blank database name.
	ErrDatabaseRequired = errors.New("database required")

	// ErrClusterAdminExists is returned when creating a duplicate admin.
	ErrClusterAdminExists = errors.New("cluster admin exists")

	// ErrClusterAdminNotFound is returned when deleting a non-existent admin.
	ErrClusterAdminNotFound = errors.New("cluster admin not found")

	// ErrUserExists is returned when creating a duplicate user.
	ErrUserExists = errors.New("user exists")

	// ErrUserNotFound is returned when deleting a non-existent user.
	ErrUserNotFound = errors.New("user not found")

	// ErrUsernameRequired is returned when using a blank username.
	ErrUsernameRequired = errors.New("username required")

	// ErrInvalidUsername is returned when using a username with invalid characters.
	ErrInvalidUsername = errors.New("invalid username")

	// ErrRetentionPolicyExists is returned when creating a duplicate shard space.
	ErrRetentionPolicyExists = errors.New("retention policy exists")

	// ErrRetentionPolicyNotFound is returned when deleting a non-existent shard space.
	ErrRetentionPolicyNotFound = errors.New("retention policy not found")

	// ErrRetentionPolicyNameRequired is returned using a blank shard space name.
	ErrRetentionPolicyNameRequired = errors.New("retention policy name required")

	// ErrRetentionPolicyMinDuration is returned when creating replication policy with a duration smaller than RetenionPolicyMinDuration.
	ErrRetentionPolicyMinDuration = fmt.Errorf("retention policy duration needs to be at least %s", retentionPolicyMinDuration)

	// ErrDefaultRetentionPolicyNotFound is returned when using the default
	// policy on a database but the default has not been set.
	ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found")

	// ErrShardNotFound is returned when attempting to access a non-existent shard
	ErrShardNotFound = errors.New("shard not found")

	// ErrShardNotLocal is returned when a server attempts to access a shard that is not local
	ErrShardNotLocal = errors.New("shard not local")

	// ErrShardShortRead returned when the number of bytes read from a shard is less than expected.
	ErrShardShortRead = errors.New("shard read returned insufficient data")

	// ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid
	ErrInvalidPointBuffer = errors.New("invalid point buffer")

	// ErrReadAccessDenied is returned when a user attempts to read
	// data that he or she does not have permission to read.
	ErrReadAccessDenied = errors.New("read access denied")

	// ErrReadWritePermissionsRequired is returned when required read/write permissions aren't provided.
	ErrReadWritePermissionsRequired = errors.New("read/write permissions required")

	// ErrInvalidQuery is returned when executing an unknown query type.
	ErrInvalidQuery = errors.New("invalid query")

	// ErrMeasurementNameRequired is returned when a point does not contain a name.
	ErrMeasurementNameRequired = errors.New("measurement name required")

	// ErrFieldsRequired is returned when a point does not any fields.
	ErrFieldsRequired = errors.New("fields required")

	// ErrFieldOverflow is returned when too many fields are created on a measurement.
	ErrFieldOverflow = errors.New("field overflow")

	// ErrFieldTypeConflict is returned when a new field already exists with a different type.
	ErrFieldTypeConflict = errors.New("field type conflict")

	// ErrFieldNotFound is returned when a field cannot be found.
	ErrFieldNotFound = errors.New("field not found")

	// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
	// there is no mapping for.
	ErrFieldUnmappedID = errors.New("field ID not mapped")

	// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
	ErrSeriesNotFound = errors.New("series not found")

	// ErrSeriesExists is returned when attempting to set the id of a series by database, name and tags that already exists
	ErrSeriesExists = errors.New("series already exists")

	// ErrNotExecuted is returned when a statement is not executed in a query.
	// This can occur when a previous statement in the same query has errored.
	ErrNotExecuted = errors.New("not executed")

	// ErrInvalidGrantRevoke is returned when a statement requests an invalid
	// privilege for a user on the cluster or a database.
	ErrInvalidGrantRevoke = errors.New("invalid privilege requested")

	// ErrContinuousQueryExists is returned when creating a duplicate continuous query.
	ErrContinuousQueryExists = errors.New("continuous query already exists")

	// ErrContinuousQueryNotFound is returned when dropping a nonexistent continuous query.
	ErrContinuousQueryNotFound = errors.New("continuous query not found")
)
View Source
var BcryptCost = 10

BcryptCost is the cost associated with generating password with Bcrypt. This setting is lowered during testing to improve test suite performance.

Functions

func ErrDatabaseNotFound

func ErrDatabaseNotFound(name string) error

func ErrMeasurementNotFound

func ErrMeasurementNotFound(name string) error

func Errorf

func Errorf(format string, a ...interface{}) (err error)

func HashPassword

func HashPassword(password string) ([]byte, error)

HashPassword generates a cryptographically secure hash for password. Returns an error if the password is invalid or a hash cannot be generated.

func NopWriteToCloser

func NopWriteToCloser(w io.WriterTo) interface {
	io.WriterTo
	io.Closer
}

NopWriteToCloser returns an io.WriterTo that implements io.Closer.

Types

type Balancer

type Balancer interface {
	// Next returns the next DataNode according to the balancing method
	// or nil if there are no nodes available
	Next() *DataNode
}

Balancer represents a load-balancing algorithm for a set of DataNodes

func NewDataNodeBalancer

func NewDataNodeBalancer(dataNodes []*DataNode) Balancer

NewDataNodeBalancer create a shuffled, round-robin balancer so that multiple instances will return nodes in randomized order and each each returned DataNode will be repeated in a cycle

type Broker

type Broker struct {
	*messaging.Broker

	// variables to control when to trigger processing and when to timeout
	TriggerInterval     time.Duration
	TriggerTimeout      time.Duration
	TriggerFailurePause time.Duration
	// contains filtered or unexported fields
}

Broker represents an InfluxDB specific messaging broker.

func NewBroker

func NewBroker() *Broker

NewBroker returns a new instance of a Broker with default values.

func (*Broker) Close

func (b *Broker) Close() error

Close closes the broker.

func (*Broker) RunContinuousQueryLoop

func (b *Broker) RunContinuousQueryLoop()

RunContinuousQueryLoop starts running continuous queries on a background goroutine.

type BuildDiagnostics

type BuildDiagnostics struct {
	Version    string
	CommitHash string
}

BuildDiagnostics capture basic build version information.

func (*BuildDiagnostics) AsRow

func (b *BuildDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row

AsRow returns the BuildDiagnostics object as an InfluxQL row.

type ContinuousQuery

type ContinuousQuery struct {
	Query string `json:"query"`
	// contains filtered or unexported fields
}

ContinuousQuery represents a query that exists on the server and processes each incoming event.

func NewContinuousQuery

func NewContinuousQuery(q string) (*ContinuousQuery, error)

NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement

type DataNode

type DataNode struct {
	ID  uint64
	URL *url.URL

	// OfflineUntil is the time when the DataNode will no longer be consider down
	OfflineUntil time.Time
	// contains filtered or unexported fields
}

DataNode represents a data node in the cluster.

func (*DataNode) Down

func (d *DataNode) Down()

Down marks the DataNode as offline for a period of time. Each successive call to Down will exponentially extend the offline time with a maximum offline time of 5 minutes.

func (*DataNode) Up

func (d *DataNode) Up()

Up marks this DataNode as online if it was currently down

type ErrAuthorize

type ErrAuthorize struct {
	// contains filtered or unexported fields
}

ErrAuthorize represents an authorization error.

func (ErrAuthorize) Error

func (e ErrAuthorize) Error() string

Error returns the text of the error.

type Field

type Field struct {
	ID   uint8             `json:"id,omitempty"`
	Name string            `json:"name,omitempty"`
	Type influxql.DataType `json:"type,omitempty"`
}

Field represents a series field.

type FieldCodec

type FieldCodec struct {
	// contains filtered or unexported fields
}

FieldCodec providecs encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.

It is not affected by changes to the Measurement object after codec creation.

func NewFieldCodec

func NewFieldCodec(m *Measurement) *FieldCodec

NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with a RLock that protects the Measurement.

func (*FieldCodec) DecodeByID

func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)

DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value.

func (*FieldCodec) DecodeFields

func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)

DecodeFields decodes a byte slice into a set of field ids and values.

func (*FieldCodec) DecodeFieldsWithNames

func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)

DecodeFieldsWithNames decodes a byte slice into a set of field names and values

func (*FieldCodec) EncodeFields

func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)

EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.

If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.

func (*FieldCodec) FieldByName

func (f *FieldCodec) FieldByName(name string) *Field

FieldByName returns the field by its name. It will return a nil if not found

type Fields

type Fields []*Field

Fields represents a list of fields.

type GoDiagnostics

type GoDiagnostics struct {
	GoMaxProcs   int
	NumGoroutine int
	Version      string
}

GoDiagnostics captures basic information about the runtime.

func NewGoDiagnostics

func NewGoDiagnostics() *GoDiagnostics

NewGoDiagnostics returns a GoDiagnostics object.

func (*GoDiagnostics) AsRow

func (g *GoDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row

AsRow returns the GoDiagnostic object as an InfluxQL row.

type Int

type Int struct {
	// contains filtered or unexported fields
}

Int representes a 64-bit signed integer which can be updated atomically.

func NewInt

func NewInt(v int64) *Int

NewInt returns a new Int

func (*Int) Add

func (i *Int) Add(delta int64)

Add atomically adds the given delta to the Int.

type LocalMapper

type LocalMapper struct {
	// contains filtered or unexported fields
}

LocalMapper implements the influxql.Mapper interface for running map tasks over a shard that is local to this server

func (*LocalMapper) Begin

func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error

Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time

func (*LocalMapper) Close

func (l *LocalMapper) Close()

Close closes the LocalMapper.

func (*LocalMapper) IsEmpty

func (l *LocalMapper) IsEmpty(tmax int64) bool

IsEmpty returns true if either all cursors are nil or all cursors are past the passed in max time

func (*LocalMapper) Next

func (l *LocalMapper) Next() (seriesID uint64, timestamp int64, value interface{})

Next returns the next matching timestamped value for the LocalMapper.

func (*LocalMapper) NextInterval

func (l *LocalMapper) NextInterval() (interface{}, error)

NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. If this is a raw query, interval should be the max time to hit in the query

func (*LocalMapper) Open

func (l *LocalMapper) Open() error

Open opens the LocalMapper.

type MapResponse

type MapResponse struct {
	Err       string `json:",omitempty"`
	Data      []byte
	Completed bool `json:",omitempty"`
}

Responses get streamed back to the remote mapper from the remote machine that runs a local mapper

type Matcher

type Matcher struct {
	IsRegex bool
	Name    string
}

Matcher can match either a Regex or plain string.

func (*Matcher) Matches

func (m *Matcher) Matches(name string) bool

Matches returns true of the name passed in matches this Matcher.

type Measurement

type Measurement struct {
	Name   string   `json:"name,omitempty"`
	Fields []*Field `json:"fields,omitempty"`
	// contains filtered or unexported fields
}

Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. These structures are accessed through private methods on the Measurement object. Generally these methods are only accessed from Index, which is responsible for ensuring go routine safe access.

func NewMeasurement

func NewMeasurement(name string) *Measurement

NewMeasurement allocates and initializes a new Measurement.

func (*Measurement) Field

func (m *Measurement) Field(id uint8) *Field

Field returns a field by id.

func (*Measurement) FieldByName

func (m *Measurement) FieldByName(name string) *Field

FieldByName returns a field by name.

func (*Measurement) HasTagKey

func (m *Measurement) HasTagKey(k string) bool

HasTagKey returns true if at least one eries in this measurement has written a value for the passed in tag key

type Measurements

type Measurements []*Measurement

Measurements represents a list of *Measurement.

func (Measurements) Len

func (a Measurements) Len() int

func (Measurements) Less

func (a Measurements) Less(i, j int) bool

func (Measurements) Swap

func (a Measurements) Swap(i, j int)

type MemoryDiagnostics

type MemoryDiagnostics struct {
	Alloc        int64
	TotalAlloc   int64
	Sys          int64
	Lookups      int64
	Mallocs      int64
	Frees        int64
	HeapAlloc    int64
	HeapSys      int64
	HeapIdle     int64
	HeapInUse    int64
	HeapReleased int64
	HeapObjects  int64
	PauseTotalNs int64
	NumGC        int64
}

MemoryDiagnostics captures Go memory stats.

func NewMemoryDiagnostics

func NewMemoryDiagnostics() *MemoryDiagnostics

NewMemoryDiagnostics returns a MemoryDiagnostics object.

func (*MemoryDiagnostics) AsRow

func (m *MemoryDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row

AsRow returns the MemoryDiagnostics object as an InfluxQL row.

type MessagingClient

type MessagingClient interface {
	Open(path string) error
	Close() error

	// Retrieves or sets the current list of broker URLs.
	URLs() []url.URL
	SetURLs([]url.URL)

	// Publishes a message to the broker.
	Publish(m *messaging.Message) (index uint64, err error)

	// Conn returns an open, streaming connection to a topic.
	Conn(topicID uint64) MessagingConn
	CloseConn(topicID uint64) error
}

MessagingClient represents the client used to connect to brokers.

func NewMessagingClient

func NewMessagingClient(dataURL url.URL) MessagingClient

NewMessagingClient returns an instance of MessagingClient.

type MessagingConn

type MessagingConn interface {
	Open(index uint64, streaming bool) error
	C() <-chan *messaging.Message
}

MessagingConn represents a streaming connection to a single broker topic.

type Point

type Point struct {
	Name      string
	Tags      map[string]string
	Timestamp time.Time
	Fields    map[string]interface{}
}

Point defines the values that will be written to the database

func NormalizeBatchPoints

func NormalizeBatchPoints(bp client.BatchPoints) ([]Point, error)

NormalizeBatchPoints returns a slice of Points, created by populating individual points within the batch, which do not have timestamps or tags, with the top-level values.

type RemoteMapper

type RemoteMapper struct {
	Call            string   `json:",omitempty"`
	Database        string   `json:",omitempty"`
	MeasurementName string   `json:",omitempty"`
	TMin            int64    `json:",omitempty"`
	TMax            int64    `json:",omitempty"`
	SeriesIDs       []uint64 `json:",omitempty"`
	ShardID         uint64   `json:",omitempty"`
	Filters         []string `json:",omitempty"`
	WhereFields     []*Field `json:",omitempty"`
	SelectFields    []*Field `json:",omitempty"`
	SelectTags      []string `json:",omitempty"`
	Limit           int      `json:",omitempty"`
	Offset          int      `json:",omitempty"`
	Interval        int64    `json:",omitempty"`
	ChunkSize       int      `json:",omitempty"`
	// contains filtered or unexported fields
}

RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper to pull map results from shards that only exist on other servers in the cluster.

func (*RemoteMapper) Begin

func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error

Begin sends a request to the remote server to start streaming map results

func (*RemoteMapper) CallExpr

func (m *RemoteMapper) CallExpr() (*influxql.Call, error)

CallExpr will parse the Call string into an expression or return nil

func (*RemoteMapper) Close

func (m *RemoteMapper) Close()

Close the response body

func (*RemoteMapper) FilterExprs

func (m *RemoteMapper) FilterExprs() []influxql.Expr

FilterExprs will parse the filter strings and return any expressions. This array will be the same size as the SeriesIDs array with each element having a filter (which could be nil)

func (*RemoteMapper) NextInterval

func (m *RemoteMapper) NextInterval() (interface{}, error)

NextInterval is part of the mapper interface. In this case we read the next chunk from the remote mapper

func (*RemoteMapper) Open

func (m *RemoteMapper) Open() error

Open is a no op, real work is done starting with Being

func (*RemoteMapper) SetFilters

func (m *RemoteMapper) SetFilters(filters []influxql.Expr)

SetFilters will convert the given arrray of filters into filters that can be marshaled and sent to the remote system

type Response

type Response struct {
	Results []*Result
	Err     error
}

Response represents a list of statement results.

func (*Response) Error

func (r *Response) Error() error

Error returns the first error from any statement. Returns nil if no errors occurred on any statements.

func (Response) MarshalJSON

func (r Response) MarshalJSON() ([]byte, error)

MarshalJSON encodes a Response struct into JSON.

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Response struct

type Result

type Result struct {
	// StatementID is just the statement's position in the query. It's used
	// to combine statement results if they're being buffered in memory.
	StatementID int `json:"-"`
	Series      influxql.Rows
	Err         error
}

Result represents a resultset returned from a single statement.

func (*Result) MarshalJSON

func (r *Result) MarshalJSON() ([]byte, error)

MarshalJSON encodes the result into JSON.

func (*Result) UnmarshalJSON

func (r *Result) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Result struct

type RetentionPolicies

type RetentionPolicies []*RetentionPolicy

RetentionPolicies represents a list of retention policies.

func (RetentionPolicies) Len

func (a RetentionPolicies) Len() int

func (RetentionPolicies) Less

func (a RetentionPolicies) Less(i, j int) bool

func (RetentionPolicies) Swap

func (a RetentionPolicies) Swap(i, j int)

type RetentionPolicy

type RetentionPolicy struct {
	// Unique name within database. Required.
	Name string `json:"name"`

	// Length of time to keep data around. A zero duration means keep the data forever.
	Duration time.Duration `json:"duration"`

	// Length of time to create shard groups in.
	ShardGroupDuration time.Duration `json:"shardGroupDuration"`

	// The number of copies to make of each shard.
	ReplicaN uint32 `json:"replicaN"`
	// contains filtered or unexported fields
}

RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.

func NewRetentionPolicy

func NewRetentionPolicy(name string) *RetentionPolicy

NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.

func (*RetentionPolicy) MarshalJSON

func (rp *RetentionPolicy) MarshalJSON() ([]byte, error)

MarshalJSON encodes a retention policy to a JSON-encoded byte slice.

func (*RetentionPolicy) UnmarshalJSON

func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error

UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.

type RetentionPolicyUpdate

type RetentionPolicyUpdate struct {
	Name     *string        `json:"name,omitempty"`
	Duration *time.Duration `json:"duration,omitempty"`
	ReplicaN *uint32        `json:"replicaN,omitempty"`
}

RetentionPolicyUpdate represents retention policy fields that need to be updated.

type Series

type Series struct {
	ID   uint64
	Tags map[string]string
	// contains filtered or unexported fields
}

Series belong to a Measurement and represent unique time series in a database

type Server

type Server struct {
	Logger     *log.Logger
	WriteTrace bool // Detailed logging of write path

	// Retention policy settings
	RetentionAutoCreate bool

	// continuous query settings
	RecomputePreviousN     int
	RecomputeNoOlderThan   time.Duration
	ComputeRunsPerInterval int
	ComputeNoMoreThan      time.Duration

	// Build information.
	Version    string
	CommitHash string
	// contains filtered or unexported fields
}

Server represents a collection of metadata and raw metric data.

func NewServer

func NewServer() *Server

NewServer returns a new instance of Server.

func (*Server) AdminUserExists

func (s *Server) AdminUserExists() bool

AdminUserExists returns whether at least 1 admin-level user exists.

func (*Server) Authenticate

func (s *Server) Authenticate(username, password string) (*User, error)

Authenticate returns an authenticated user by username. If any error occurs, or the authentication credentials are invalid, an error is returned.

func (*Server) Authorize

func (s *Server) Authorize(u *User, q *influxql.Query, database string) error

Authorize user u to execute query q on database. database can be "" for queries that do not require a database. If u is nil, this means authorization is disabled.

func (*Server) Begin

func (s *Server) Begin() (influxql.Tx, error)

Begin returns an unopened transaction associated with the server.

func (*Server) BrokerURLs

func (s *Server) BrokerURLs() []url.URL

func (*Server) Client

func (s *Server) Client() MessagingClient

Client retrieves the current messaging client.

func (*Server) Close

func (s *Server) Close() error

Close shuts down the server.

func (*Server) ContinuousQueries

func (s *Server) ContinuousQueries(database string) []*ContinuousQuery

ContinuousQueries returns a list of all continuous queries.

func (*Server) CopyMetastore

func (s *Server) CopyMetastore(w io.Writer) error

CopyMetastore writes the underlying metastore data file to a writer.

func (*Server) CopyShard

func (s *Server) CopyShard(w io.Writer, shardID uint64) error

CopyShard writes the requested shard to a writer.

func (*Server) CreateContinuousQuery

func (s *Server) CreateContinuousQuery(q *influxql.CreateContinuousQueryStatement) error

CreateContinuousQuery creates a continuous query.

func (*Server) CreateDataNode

func (s *Server) CreateDataNode(u *url.URL) error

CreateDataNode creates a new data node with a given URL.

func (*Server) CreateDatabase

func (s *Server) CreateDatabase(name string) error

CreateDatabase creates a new database.

func (*Server) CreateDatabaseIfNotExists

func (s *Server) CreateDatabaseIfNotExists(name string) error

CreateDatabaseIfNotExists creates a new database if, and only if, it does not exist already.

func (*Server) CreateRetentionPolicy

func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error

CreateRetentionPolicy creates a retention policy for a database.

func (*Server) CreateRetentionPolicyIfNotExists

func (s *Server) CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicy) error

CreateRetentionPolicyIfNotExists creates a retention policy for a database.

func (*Server) CreateShardGroupIfNotExists

func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) error

CreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into.

func (*Server) CreateSnapshotWriter

func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error)

CreateSnapshotWriter returns a writer for the current snapshot.

func (*Server) CreateUser

func (s *Server) CreateUser(username, password string, admin bool) error

CreateUser creates a user on the server.

func (*Server) DataNode

func (s *Server) DataNode(id uint64) *DataNode

DataNode returns a data node by id.

func (*Server) DataNodeByURL

func (s *Server) DataNodeByURL(u *url.URL) *DataNode

DataNodeByURL returns a data node by url.

func (*Server) DataNodes

func (s *Server) DataNodes() (a []*DataNode)

DataNodes returns a list of data nodes.

func (*Server) DataNodesByID

func (s *Server) DataNodesByID(ids []uint64) []*DataNode

DataNodesByID returns the data nodes matching the passed ids

func (*Server) DatabaseExists

func (s *Server) DatabaseExists(name string) bool

DatabaseExists returns true if a database exists.

func (*Server) Databases

func (s *Server) Databases() (a []string)

Databases returns a sorted list of all database names.

func (*Server) DefaultRetentionPolicy

func (s *Server) DefaultRetentionPolicy(database string) (*RetentionPolicy, error)

DefaultRetentionPolicy returns the default retention policy for a database. Returns an error if the database doesn't exist.

func (*Server) DeleteDataNode

func (s *Server) DeleteDataNode(id uint64) error

DeleteDataNode deletes an existing data node.

func (*Server) DeleteRetentionPolicy

func (s *Server) DeleteRetentionPolicy(database, name string) error

DeleteRetentionPolicy removes a retention policy from a database.

func (*Server) DeleteShardGroup

func (s *Server) DeleteShardGroup(database, policy string, shardID uint64) error

DeleteShardGroup deletes the shard group identified by shardID.

func (*Server) DeleteUser

func (s *Server) DeleteUser(username string) error

DeleteUser removes a user from the server.

func (*Server) DiagnosticsAsRows

func (s *Server) DiagnosticsAsRows() []*influxql.Row

DiagnosticsAsRows returns diagnostic information about the server, as a slice of InfluxQL rows.

func (*Server) DropContinuousQuery

func (s *Server) DropContinuousQuery(q *influxql.DropContinuousQueryStatement) error

DropContinuousQuery dropsoa continuous query.

func (*Server) DropDatabase

func (s *Server) DropDatabase(name string) error

DropDatabase deletes an existing database.

func (*Server) DropMeasurement

func (s *Server) DropMeasurement(database, name string) error

DropMeasurement drops a given measurement from a database.

func (*Server) DropSeries

func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]uint64) error

DropSeries deletes from an existing series.

func (*Server) EnforceRetentionPolicies

func (s *Server) EnforceRetentionPolicies()

EnforceRetentionPolicies ensures that data that is aging-out due to retention policies is removed from the server.

func (*Server) ExecuteQuery

func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User, chunkSize int) (chan *Result, error)

ExecuteQuery executes an InfluxQL query against the server. If the user isn't authorized to access the database an error will be returned. It sends results down the passed in chan and closes it when done. It will close the chan on the first statement that throws an error.

func (*Server) ID

func (s *Server) ID() uint64

ID returns the data node id for the server. Returns zero if the server is closed or the server has not joined a cluster.

func (*Server) Index

func (s *Server) Index() uint64

Index returns the index for the server.

func (*Server) Initialize

func (s *Server) Initialize(u url.URL) error

Initialize creates a new data node and initializes the server's id to the latest.

func (*Server) Join

func (s *Server) Join(u *url.URL, joinURL *url.URL) error

Join creates a new data node in an existing cluster, copies the metastore, and initializes the ID.

func (*Server) MeasurementNames

func (s *Server) MeasurementNames(database string) []string

MeasurementNames returns a list of all measurements for the specified database.

func (*Server) NormalizeMeasurement

func (s *Server) NormalizeMeasurement(m *influxql.Measurement, defaultDatabase string) error

NormalizeMeasurement inserts the default database or policy into all measurement names.

func (*Server) NormalizeStatement

func (s *Server) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error)

NormalizeStatement adds a default database and policy to the measurements in statement.

func (*Server) Open

func (s *Server) Open(path string, client MessagingClient) error

Open initializes the server from a given path.

func (*Server) Path

func (s *Server) Path() string

Path returns the path used when opening the server. Returns an empty string when the server is closed.

func (*Server) ReadSeries

func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time) (map[string]interface{}, error)

ReadSeries reads a single point from a series in the database. It is used for debug and test only.

func (*Server) RetentionPolicies

func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error)

RetentionPolicies returns a list of retention polocies for a database. Returns an error if the database doesn't exist.

func (*Server) RetentionPolicy

func (s *Server) RetentionPolicy(database, name string) (*RetentionPolicy, error)

RetentionPolicy returns a retention policy by name. Returns an error if the database doesn't exist.

func (*Server) RetentionPolicyExists

func (s *Server) RetentionPolicyExists(database, retention string) bool

RetentionPolicyExists returns true if a retention policy exists for a given database.

func (*Server) RunContinuousQueries

func (s *Server) RunContinuousQueries() error

RunContinuousQueries will run any continuous queries that are due to run and write the results back into the database

func (*Server) SetAuthenticationEnabled

func (s *Server) SetAuthenticationEnabled(enabled bool)

SetAuthenticationEnabled turns on or off server authentication

func (*Server) SetDefaultRetentionPolicy

func (s *Server) SetDefaultRetentionPolicy(database, name string) error

SetDefaultRetentionPolicy sets the default policy to write data into and query from on a database.

func (*Server) SetPrivilege

func (s *Server) SetPrivilege(p influxql.Privilege, username string, dbname string) error

SetPrivilege grants / revokes a privilege to a user.

func (*Server) Shard

func (s *Server) Shard(id uint64) *Shard

Shard returns a shard by ID.

func (*Server) ShardGroupPreCreate

func (s *Server) ShardGroupPreCreate(checkInterval time.Duration)

ShardGroupPreCreate ensures that future shard groups and shards are created and ready for writing is removed from the server.

func (*Server) ShardGroups

func (s *Server) ShardGroups(database string) ([]*ShardGroup, error)

ShardGroups returns a list of all shard groups for a database. Returns an error if the database doesn't exist.

func (*Server) StartLocalMapper

func (s *Server) StartLocalMapper(rm *RemoteMapper) (*LocalMapper, error)

StartLocalMapper will create a local mapper for the passed in remote mapper

func (*Server) StartReportingLoop

func (s *Server) StartReportingLoop(clusterID uint64) chan struct{}

StartReportingLoop starts the anonymous usage reporting loop for a given cluster ID.

func (*Server) StartRetentionPolicyEnforcement

func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error

StartRetentionPolicyEnforcement launches retention policy enforcement.

func (*Server) StartSelfMonitoring

func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error

StartSelfMonitoring starts a goroutine which monitors the InfluxDB server itself and stores the results in the specified database at a given interval.

func (*Server) StartShardGroupsPreCreate

func (s *Server) StartShardGroupsPreCreate(checkInterval time.Duration) error

StartShardGroupsPreCreate launches shard group pre-create to avoid write bottlenecks.

func (*Server) Sync

func (s *Server) Sync(topicID, index uint64) error

Sync blocks until a given index (or a higher index) has been applied. Returns any error associated with the command.

func (*Server) URL

func (s *Server) URL() url.URL

func (*Server) UpdateRetentionPolicy

func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error

UpdateRetentionPolicy updates an existing retention policy on a database.

func (*Server) UpdateUser

func (s *Server) UpdateUser(username, password string) error

UpdateUser updates an existing user on the server.

func (*Server) User

func (s *Server) User(name string) *User

User returns a user by username Returns nil if the user does not exist.

func (*Server) UserCount

func (s *Server) UserCount() int

UserCount returns the number of users.

func (*Server) Users

func (s *Server) Users() (a []*User)

Users returns a list of all users, sorted by name.

func (*Server) WriteSeries

func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error)

WriteSeries writes series data to the database. Returns the messaging index the data was written to.

type Shard

type Shard struct {
	ID          uint64   `json:"id,omitempty"`
	DataNodeIDs []uint64 `json:"nodeIDs,omitempty"` // owners
	// contains filtered or unexported fields
}

Shard represents the logical storage for a given time range. The instance on a local server may contain the raw data in "store" if the shard is assigned to the server's data node id.

func (*Shard) HasDataNodeID

func (s *Shard) HasDataNodeID(id uint64) bool

HasDataNodeID return true if the data node owns the shard.

func (*Shard) Index

func (s *Shard) Index() uint64

Index returns the highest Raft index processed by this shard. Shard RLock held during execution.

type ShardGroup

type ShardGroup struct {
	ID        uint64    `json:"id,omitempty"`
	StartTime time.Time `json:"startTime,omitempty"`
	EndTime   time.Time `json:"endTime,omitempty"`
	Shards    []*Shard  `json:"shards,omitempty"`
}

ShardGroup represents a group of shards created for a single time range.

func (*ShardGroup) Contains

func (sg *ShardGroup) Contains(min, max time.Time) bool

Contains return whether the shard group contains data for the time between min and max

func (*ShardGroup) Duration

func (sg *ShardGroup) Duration() time.Duration

Duration returns the duration between the shard group's start and end time.

func (*ShardGroup) ShardBySeriesID

func (sg *ShardGroup) ShardBySeriesID(seriesID uint64) *Shard

ShardBySeriesID returns the shard that a series is assigned to in the group.

type Shards

type Shards []*Shard

Shards represents a list of shards.

type Snapshot

type Snapshot struct {
	Files []SnapshotFile `json:"files"`
}

Snapshot represents the state of the Server at a given time.

func ReadFileSnapshot

func ReadFileSnapshot(path string) (*Snapshot, error)

ReadFileSnapshot returns a Snapshot for a given base snapshot path. This snapshot merges all incremental backup snapshots as well.

func (*Snapshot) Diff

func (s *Snapshot) Diff(other *Snapshot) *Snapshot

Diff returns a Snapshot of files that are newer in s than other.

func (*Snapshot) Index

func (s *Snapshot) Index() uint64

Index returns the highest index across all files.

func (*Snapshot) Merge

func (s *Snapshot) Merge(other *Snapshot) *Snapshot

Merge returns a Snapshot that combines s with other. Only the newest file between the two snapshots is returned.

type SnapshotFile

type SnapshotFile struct {
	Name  string `json:"name"`  // filename
	Size  int64  `json:"size"`  // file size
	Index uint64 `json:"index"` // highest index applied
}

SnapshotFile represents a single file in a Snapshot.

type SnapshotFileWriter

type SnapshotFileWriter interface {
	io.WriterTo
	io.Closer
}

SnapshotFileWriter is the interface used for writing a file to a snapshot.

type SnapshotFiles

type SnapshotFiles []SnapshotFile

SnapshotFiles represents a sortable list of snapshot files.

func (SnapshotFiles) Len

func (p SnapshotFiles) Len() int

func (SnapshotFiles) Less

func (p SnapshotFiles) Less(i, j int) bool

func (SnapshotFiles) Swap

func (p SnapshotFiles) Swap(i, j int)

type SnapshotReader

type SnapshotReader struct {
	// contains filtered or unexported fields
}

SnapshotReader reads a snapshot from a Reader. This type is not safe for concurrent use.

func NewSnapshotReader

func NewSnapshotReader(r io.Reader) *SnapshotReader

NewSnapshotReader returns a new SnapshotReader reading from r.

func (*SnapshotReader) Next

func (sr *SnapshotReader) Next() (SnapshotFile, error)

Next returns the next file in the snapshot.

func (*SnapshotReader) Read

func (sr *SnapshotReader) Read(b []byte) (n int, err error)

Read reads the current entry in the snapshot.

func (*SnapshotReader) Snapshot

func (sr *SnapshotReader) Snapshot() (*Snapshot, error)

Snapshot returns the snapshot meta data.

type SnapshotWriter

type SnapshotWriter struct {
	// The snapshot to write from.
	// Removing files from the snapshot after creation will cause those files to be ignored.
	Snapshot *Snapshot

	// Writers for each file by filename.
	// Writers will be closed as they're processed and will close by the end of WriteTo().
	FileWriters map[string]SnapshotFileWriter
}

SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive.

func NewSnapshotWriter

func NewSnapshotWriter() *SnapshotWriter

NewSnapshotWriter returns a new instance of SnapshotWriter.

func (*SnapshotWriter) Close

func (sw *SnapshotWriter) Close() error

Close closes all file writers on the snapshot.

func (*SnapshotWriter) WriteTo

func (sw *SnapshotWriter) WriteTo(w io.Writer) (n int64, err error)

WriteTo writes the snapshot to the writer. File writers are closed as they are written. This function will always return n == 0.

type SnapshotsReader

type SnapshotsReader struct {
	// contains filtered or unexported fields
}

SnapshotsReader reads from a collection of snapshots. Only files with the highest index are read from the reader. This type is not safe for concurrent use.

func NewSnapshotsReader

func NewSnapshotsReader(readers ...io.Reader) *SnapshotsReader

NewSnapshotsReader returns a new SnapshotsReader reading from a list of readers.

func OpenFileSnapshotsReader

func OpenFileSnapshotsReader(path string) (*SnapshotsReader, []io.Closer, error)

OpenFileSnapshotsReader returns a SnapshotsReader based on the path of the base snapshot. Returns the underlying files which need to be closed separately.

func (*SnapshotsReader) Next

func (ssr *SnapshotsReader) Next() (SnapshotFile, error)

Next returns the next file in the reader.

func (*SnapshotsReader) Read

func (ssr *SnapshotsReader) Read(b []byte) (n int, err error)

Read reads the current entry in the reader.

func (*SnapshotsReader) Snapshot

func (ssr *SnapshotsReader) Snapshot() (*Snapshot, error)

Snapshot returns the combined snapshot from all readers.

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats represents a collection of metrics, as key-value pairs.

func NewStats

func NewStats(name string) *Stats

NewStats returns a Stats object with the given name.

func (*Stats) Add

func (s *Stats) Add(key string, delta int64)

Add adds delta to the stat indiciated by key.

func (*Stats) Diff

func (s *Stats) Diff(other *Stats) *Stats

Diff returns the difference between two sets of stats. The result is undefined if the two Stats objects do not contain the same keys.

func (*Stats) Get

func (s *Stats) Get(key string) int64

Get returns a value for a given key.

func (*Stats) Inc

func (s *Stats) Inc(key string)

Inc simply increments the given key by 1.

func (*Stats) Name

func (s *Stats) Name() string

Name returns the name of the Stats object.

func (*Stats) Set

func (s *Stats) Set(key string, v int64)

Set sets a value for the given key.

func (*Stats) Snapshot

func (s *Stats) Snapshot() *Stats

Snapshot returns a copy of the stats object. Addition and removal of stats keys is blocked during the created of the snapshot, but existing entries may be concurrently updated.

func (*Stats) String

func (s *Stats) String() string

func (*Stats) Walk

func (s *Stats) Walk(f func(string, int64))

Walk calls f for each entry in the stats. The stats are locked during the walk but existing entries may be concurrently updated.

type SystemDiagnostics

type SystemDiagnostics struct {
	Hostname string
	PID      int
	OS       string
	Arch     string
	NumCPU   int
}

SystemDiagnostics captures basic machine data.

func NewSystemDiagnostics

func NewSystemDiagnostics() *SystemDiagnostics

NewSystemDiagnostics returns a SystemDiagnostics object.

func (*SystemDiagnostics) AsRow

func (s *SystemDiagnostics) AsRow(measurement string, tags map[string]string) *influxql.Row

AsRow returns the GoDiagnostic object as an InfluxQL row.

type TagFilter

type TagFilter struct {
	Op    influxql.Token
	Key   string
	Value string
	Regex *regexp.Regexp
}

TagFilter represents a tag filter when looking up other tags or measurements.

type User

type User struct {
	Name       string                        `json:"name"`
	Hash       string                        `json:"hash"`
	Privileges map[string]influxql.Privilege `json:"privileges"` // db name to privilege
	Admin      bool                          `json:"admin,omitempty"`
}

User represents a user account on the system. It can be given read/write permissions to individual databases.

func (*User) Authenticate

func (u *User) Authenticate(password string) error

Authenticate returns nil if the password matches the user's password. Returns an error if the password was incorrect.

func (*User) Authorize

func (u *User) Authorize(privilege influxql.Privilege, database string) bool

Authorize returns true if the user is authorized and false if not.

Directories

Path Synopsis
cmd
Package influxql implements a parser for the InfluxDB query language.
Package influxql implements a parser for the InfluxDB query language.
Package messaging implements a distributed, raft-backed messaging system.
Package messaging implements a distributed, raft-backed messaging system.
Package raft implements a streaming version of the Raft protocol.
Package raft implements a streaming version of the Raft protocol.
tests
The uuid package can be used to generate and parse universally unique identifiers, a standardized format in the form of a 128 bit number.
The uuid package can be used to generate and parse universally unique identifiers, a standardized format in the form of a 128 bit number.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL