Documentation ¶
Index ¶
- Constants
- Variables
- func FieldsEqual(f1, f2 Field) bool
- func GetHTTPClient(t *tls.Config) *http.Client
- func GetTLSConfig(tlsConfig *TLSConfig, log logger.Logger) (TLSConfig *tls.Config, err error)
- func GetTLSConfigFromConfluent(config *ConfluentCommand, log logger.Logger) (TLSConfig *tls.Config, err error)
- func HasMutex(fld Field) bool
- func NewKeypairReloader(certPath, keyPath string, log logger.Logger) (*keypairReloader, error)
- func NewStaticKeypair(certPemData, keyPemData []byte, log logger.Logger) (*keypairReloader, error)
- func ParseHeader(raw []byte) ([]Field, PathTable, error)
- func QuantumOf(fld Field) string
- func TTLOf(fld Field) (time.Duration, error)
- func TimestampToVal(unit Unit, ts time.Time) int64
- func ValToTimestamp(unit string, val int64) (time.Time, error)
- type BoolField
- type CacheConfig
- type ConfluentCommand
- type DateIntField
- type DecimalField
- type DeleteSentinel
- type ErrIDOffsetDesync
- type Field
- type FieldType
- type Fields
- type IDAllocator
- type IDArrayField
- type IDField
- type IDRange
- type IgnoreField
- type IntField
- type LocalRangeAllocator
- type LookupBatcher
- type LookupTextField
- type Main
- func (m *Main) Log() logger.Logger
- func (m *Main) NewLookupClient() (*PostgresClient, error)
- func (m *Main) PilosaClient() *pilosaclient.Client
- func (m *Main) Rename()
- func (m *Main) Run() (err error)
- func (m *Main) SetBasic()
- func (m *Main) SetLog(log logger.Logger)
- func (m *Main) Setup() (onFinishRun func(), err error)
- type Metadata
- type NopLookupBatcher
- type OffsetStreamRecord
- type PathTable
- type PostgresClient
- type PostgresUpsertBatcher
- type ProgressTracker
- type RangeAllocator
- type RawField
- type Record
- type RecordTimeField
- type Recordizer
- type SchemaManager
- type SignedIntBoolKeyField
- type Source
- type StringArrayField
- type StringField
- type TLSConfig
- type TimestampField
- type Unit
Constants ¶
const ( Exists = "-exists" ErrCommittingIDs = "committing IDs for batch" )
const ( Custom = Unit("c") Day = Unit("d") Hour = Unit("h") Minute = Unit("m") Second = Unit("s") Millisecond = Unit("ms") Microsecond = Unit("us") Nanosecond = Unit("ns") DefaultUnit = Second )
const ( MetricDeleterRowsAdded = "deleter_rows_added_total" MetricIngesterRowsAdded = "ingester_rows_added_total" MetricIngesterSchemaChanges = "ingester_schema_changes_total" MetricCommittedRecords = "committed_records" )
const DecimalPrecision = 18
const IDColumn = "id"
Variables ¶
var ( ErrNoFieldSpec = errors.New("no field spec in this header") ErrInvalidFieldName = errors.New("field name must match [a-z][a-z0-9Θ_-]{0,229}") ErrParsingEpoch = "parsing epoch for " ErrDecodingConfig = "decoding config for field " )
var ( // ErrSchemaChange is returned from Source.Record when the returned // record has a different schema from the previous record. ErrSchemaChange = errors.New("this record has a different schema from the previous record (or is the first one delivered). Please call Source.Schema() to fetch the schema in order to properly decode this record") // ErrFlush is returned from Source.Record when the Source wants to // signal that there may not be data for a while, so it's a good time // to make sure all data which has been received is ingested. The // record must be nil when ErrFlush is returned. ErrFlush = errors.New("the Source is requesting the batch be flushed") ErrFmtUnknownUnit = "unknown unit %q, please choose from d/h/m/s/ms/us/ns" ErrIntOutOfRange = errors.New("value provided for int field is out of range") ErrDecimalOutOfRange = errors.New("value provided for decimal field is out of range") ErrTimestampOutOfRange = errors.New("value provided for timestamp field is out of range") )
var ( MinTimestampNano = time.Unix(-1<<32, 0).UTC() // 1833-11-24T17:31:44Z MaxTimestampNano = time.Unix(1<<32, 0).UTC() // 2106-02-07T06:28:16Z MinTimestamp = time.Unix(-62135596799, 0).UTC() // 0001-01-01T00:00:01Z MaxTimestamp = time.Unix(253402300799, 0).UTC() // 9999-12-31T23:59:59Z )
var BuildTime = "not recorded"
var CounterCommittedRecords = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ingester", Name: MetricCommittedRecords, Help: "TODO", }, )
var CounterDeleterRowsAdded = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "ingester", Name: MetricDeleterRowsAdded, Help: "TODO", }, []string{ "type", }, )
var CounterIngesterRowsAdded = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ingester", Name: MetricIngesterRowsAdded, Help: "TODO", }, )
var CounterIngesterSchemaChanges = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ingester", Name: MetricIngesterSchemaChanges, Help: "TODO", }, )
var Version = "v0.0.0"
Functions ¶
func FieldsEqual ¶
FieldsEqual is used in testing to compare Fields. The pointers make it a bit tricky for IntField.
func GetTLSConfig ¶
func NewKeypairReloader ¶
func NewStaticKeypair ¶
func TimestampToVal ¶
TimestampToVal takes a time unit and a time.Time and converts it to an integer value
Types ¶
type BoolField ¶
func (BoolField) PilosafyVal ¶
type CacheConfig ¶
type CacheConfig struct { CacheType pilosaclient.CacheType CacheSize int }
CacheConfig - type (ranked, lru, none) and size.
func CacheConfigOf ¶
func CacheConfigOf(f Field) CacheConfig
CacheConfigOf returns CacheConfig of the Field.
type ConfluentCommand ¶
type ConfluentCommand struct { KafkaSecurityProtocol string `help:"Protocol used to communicate with brokers (security.protocol)"` KafkaBootstrapServers []string `help:"Comma separated list of host:port pairs for Kafka."` KafkaSslCaLocation string `help:"File or directory path to CA certificate(s) for verifying the broker's key(ssl.ca.location)"` KafkaSslCertificateLocation string `help:"Path to client's public key (PEM) used for authentication(ssl.certificate.location)"` KafkaSslKeyLocation string `help:"Path to client's private key (PEM) used for authentication(ssl.key.location)."` KafkaSslKeyPassword string `help:"Private key passphrase (for use with ssl.key.location and set_ssl_cert()(ssl.key.password))"` KafkaSslEndpointIdentificationAlgorithm string `` /* 130-byte string literal not displayed */ KafkaEnableSslCertificateVerification bool `help:"(enable.ssl.certificate.verification)"` KafkaSocketTimeoutMs int `help:"(socket.timeout.ms)"` KafkaSocketKeepaliveEnable string `help:"The (socket.keepalive.enable) kafka consumer configuration"` KafkaClientId string `help:"(client.id)"` KafkaDebug string `` /* 184-byte string literal not displayed */ KafkaMaxPollInterval int `` /* 159-byte string literal not displayed */ KafkaSessionTimeout int `` /* 136-byte string literal not displayed */ KafkaGroupInstanceId string `help:"The (group.instance.id) kafka consumer configuration."` KafkaSaslUsername string `help:"SASL authentication username (sasl.username)"` KafkaSaslPassword string `help:"SASL authentication password (sasl.password)"` KafkaSaslMechanism string `help:"SASL mechanism to use for authentication.(sasl.mechanism)"` SchemaRegistryUsername string `help:"authenticaion key provided by confluent for schema registry."` SchemaRegistryPassword string `help:"authenticaion secret provided by confluent for schema registry."` SchemaRegistryURL string `short:"g" help:"Location of Confluent Schema Registry. Must start with 'https://' if you want to use TLS."` KafkaConfiguration string `help:"json configuration for confluents ConfigMap will be applied first EXPERIMENTAL"` }
type DateIntField ¶
type DateIntField struct { NameVal string DestNameVal string Layout string Epoch time.Time Unit Unit CustomUnit string }
func (DateIntField) DestName ¶
func (d DateIntField) DestName() string
func (DateIntField) Name ¶
func (d DateIntField) Name() string
func (DateIntField) PilosafyVal ¶
func (d DateIntField) PilosafyVal(val interface{}) (interface{}, error)
PilosafyVal for a DateIntField takes a time.Time and int64 which represents the number units from the epoch.
type DecimalField ¶
func (DecimalField) DestName ¶
func (d DecimalField) DestName() string
func (DecimalField) Name ¶
func (d DecimalField) Name() string
func (DecimalField) PilosafyVal ¶
func (d DecimalField) PilosafyVal(val interface{}) (interface{}, error)
PilosafyVal for DecimalField always returns an int64. If the incoming value is anything but a float or string we attempt to convert to int64 and then scale it. Strings are attempted to be parsed into floats, and all values are scaled by the 10^scale before being returned. Byte slices are assumed to represent the already scaled value and are interpreted as int64.
type ErrIDOffsetDesync ¶
type ErrIDOffsetDesync struct { Requested uint64 `json:"requested"` // Base is the next lowest uncommitted offset for which // IDs may be reserved Base uint64 `json:"base"` }
ErrIDOffsetDesync is an error used when IDs are reserved at offsets which have already been committed. TODO, once pull/1559 is merged into pilosa main branch no need to maintain a duplicate definition of ErrIDOffsetDesync here
func (ErrIDOffsetDesync) Error ¶
func (e ErrIDOffsetDesync) Error() string
type Field ¶
type Field interface { Name() string DestName() string PilosafyVal(val interface{}) (interface{}, error) // TODO rename this }
Field knows how to interpret values of different types and tells how they get indexed in Pilosa. Every field implementation should be a struct named like <something>Field, and have as members `NameVal string` and `DestNameVal string`, where NameVal contains the name of the field at the source, and DestNameVal contains the name of the field at the destination (pilosa)
Many Field implementations have a Quantum field which can be any valid Pilosa time quantum, e.g. "Y", "YMDH", "DH", etc. If Quantum is set to a valid quantum, the Pilosa field created for this field will be of type "time". Other fields which control field type will be ignored until/if Pilosa supports time+(othertype) fields.
func HeaderToField ¶
HeaderToField takes a header specification which looks like sourcename___destname__FieldType_Arg_Arg2 (note that sourcename and destname are separated by "___", triple underscore) and converts it to an idk Field like:
FieldTypeField { NameVal: sourcename, DestNameVal: destname, Thing1: Arg, Thing2: Arg2, }
It does this using a variety of reflective magic. The unwritten rules are that all idk Fields must be structs and have their first member be `NameVal string`. The arguments are applied in order to exported fields.
type FieldType ¶
type FieldType string
const ( IDType FieldType = "id" BoolType FieldType = "bool" StringType FieldType = "string" LookupTextType FieldType = "lookuptext" IntType FieldType = "int" ForeignKeyType FieldType = "foreignkey" DecimalType FieldType = "decimal" StringArrayType FieldType = "stringarray" IDArrayType FieldType = "idarray" DateIntType FieldType = "dateint" RecordTimeType FieldType = "recordtime" SignedIntBoolKeyType FieldType = "signedintboolkey" IgnoreType FieldType = "ignore" TimestampType FieldType = "timestamp" )
type Fields ¶
type Fields []Field
Fields is a list of Field, representing a schema.
func (Fields) ContainsBool ¶
ContainsBool returns true if at least one field in the list is a BoolField.
type IDAllocator ¶
type IDAllocator interface { Next(context.Context, Record) (uint64, error) Reserve(context.Context, uint64) error Commit(context.Context) error }
func NewRangeNexter ¶
func NewRangeNexter(a RangeAllocator) (IDAllocator, error)
type IDArrayField ¶
type IDArrayField struct { NameVal string DestNameVal string // Quantum — see note about Quantum on "Field" interface. Quantum string TTL string *CacheConfig }
func (IDArrayField) DestName ¶
func (i IDArrayField) DestName() string
func (IDArrayField) Name ¶
func (i IDArrayField) Name() string
func (IDArrayField) PilosafyVal ¶
func (IDArrayField) PilosafyVal(val interface{}) (interface{}, error)
type IDField ¶
type IDField struct { NameVal string DestNameVal string // Mutex denotes whether we need to enforce that each record only // has a single value for this field. Put another way, says // whether a new value for this field be treated as adding an // additional value, or replacing the existing value (if there is // one). Mutex bool // Quantum — see note about Quantum on "Field" interface. Quantum string TTL string *CacheConfig }
func (IDField) PilosafyVal ¶
type IgnoreField ¶
type IgnoreField struct{}
IgnoreField can be used when you wish not to process one of the input fields, but it is inconvenient to remove it ahead of time.
func (IgnoreField) DestName ¶
func (IgnoreField) DestName() string
func (IgnoreField) Name ¶
func (IgnoreField) Name() string
func (IgnoreField) PilosafyVal ¶
func (IgnoreField) PilosafyVal(interface{}) (interface{}, error)
type IntField ¶
type IntField struct { NameVal string DestNameVal string Min *int64 Max *int64 ForeignIndex string }
IntField - if you add any new fields to this struct, please update the FieldsEqual function to accomodate.
func (IntField) PilosafyVal ¶
type LocalRangeAllocator ¶
type LocalRangeAllocator struct {
// contains filtered or unexported fields
}
func (*LocalRangeAllocator) Get ¶
func (a *LocalRangeAllocator) Get() (*IDRange, error)
func (*LocalRangeAllocator) Return ¶
func (a *LocalRangeAllocator) Return(r *IDRange) error
type LookupBatcher ¶
type LookupTextField ¶
type LookupTextField struct { // NOTE this implements the Field interface for simplicity of implementation/API, but that interface is intended for data going into pilosa, while this is not. NameVal string DestNameVal string }
func (LookupTextField) DestName ¶
func (s LookupTextField) DestName() string
func (LookupTextField) Name ¶
func (s LookupTextField) Name() string
func (LookupTextField) PilosafyVal ¶
func (s LookupTextField) PilosafyVal(val interface{}) (interface{}, error)
type Main ¶
type Main struct { PilosaHosts []string `short:"p" help:"Alias for --featurebase-hosts. Will be deprecated in the next major release."` FeaturebaseHosts []string `short:"" help:"Comma separated list of host:port pairs for FeatureBase."` PilosaGRPCHosts []string `short:"" help:"Alias for --featurebase-grpc-hosts. Will be deprecated in the next major release."` FeaturebaseGRPCHosts []string `short:"" help:"Comma separated list of host:port pairs for FeatureBase's GRPC endpoint. Used by Kafka delete consumer."` BatchSize int `` /* 178-byte string literal not displayed */ KeyTranslateBatchSize int `help:"Maximum number of keys to translate at a time."` BatchMaxStaleness time.Duration `` /* 184-byte string literal not displayed */ Index string `short:"i" help:"Name of FeatureBase index."` LogPath string `short:"l" help:"Log file to write to. Empty means stderr."` PrimaryKeyFields []string `` /* 188-byte string literal not displayed */ IDField string `` /* 168-byte string literal not displayed */ AutoGenerate bool `short:"a" help:"Automatically generate IDs."` ExternalGenerate bool `short:"" help:"Use FeatureBase's ID generation (must be set alongside auto-generate)."` IDAllocKeyPrefix string `` /* 135-byte string literal not displayed */ Concurrency int `` /* 252-byte string literal not displayed */ CacheLength uint64 `short:"" help:"Number of batches of ID mappings to cache."` PackBools string `` /* 130-byte string literal not displayed */ Verbose bool `short:"v" help:"Enable verbose logging."` Delete bool `help:"If true, delete records rather than write them." flag:"-"` Pprof string `short:"o" help:"host:port on which to listen for pprof"` Stats string `short:"s" help:"host:port on which to host metrics"` ExpSplitBatchMode bool `` /* 213-byte string literal not displayed */ AssumeEmptyPilosa bool `short:"u" help:"Alias for --assume-empty-featurebase. Will be deprecated in the next major release."` AssumeEmptyFeaturebase bool `` /* 366-byte string literal not displayed */ WriteCSV string `short:"" help:"Write data we're ingesting to a CSV file with the given name."` Namespace string `flag:"-"` DeleteIndex bool `` /* 181-byte string literal not displayed */ DryRun bool `short:"" help:"Dry run - just flag parsing."` TrackProgress bool `help:"Periodically print status updates on how many records have been sourced." short:""` OffsetMode bool `` /* 165-byte string literal not displayed */ LookupDBDSN string `flag:"lookup-db-dsn" help:"Connection string for connecting to Lookup database."` LookupBatchSize int `help:"Number of records to batch before writing them to Lookup database."` AuthToken string `flag:"auth-token" help:"Authentication Token for FeatureBase"` CommitTimeout time.Duration `help:"Maximum time before canceling commit."` AllowIntOutOfRange bool `help:"Allow ingest to continue when it encounters out of range integers in IntFields. (default false)"` AllowDecimalOutOfRange bool `help:"Allow ingest to continue when it encounters out of range decimals in DecimalFields. (default false)"` AllowTimestampOutOfRange bool `help:"Allow ingest to continue when it encounters out of range timestamps in TimestampFields. (default false)"` SkipBadRows int `help:"If you fail to process the first n rows without processing one successfully, fail."` UseShardTransactionalEndpoint bool `` /* 216-byte string literal not displayed */ ControllerAddress string `short:"" help:"Controller address."` OrganizationID dax.OrganizationID `short:"" help:"auto-assigned organization ID"` DatabaseID dax.DatabaseID `short:"" help:"auto-assigned database ID"` TableName dax.TableName `short:"" help:"human friendly table name"` // TODO implement the auto-generated IDs... hopefully using Pilosa to manage it. TLS TLSConfig // NewSource must be set by the user of Main before calling // Main.Run. Main.Run will call this function "Concurrency" times. It // is the job of this function to ensure that the concurrent // sources which are started partition work appropriately. This is // typically set up (by convention) in the Source's package in // cmd.go NewSource func() (Source, error) `flag:"-"` NewImporterFn func() pilosacore.Importer `flag:"-"` Batcher pilosabatch.Batcher `flag:"-"` SchemaManager SchemaManager `flag:"-"` Qtbl *dax.QualifiedTable `flag:"-"` // Future flags are used to represent features or functionality which is not // yet the default behavior, but will be in a future release. Future struct { // Rename, if true, will interact with a service called FeatureBase // instead of Pilosa. Rename bool `help:"Interact with FeatureBase instead of Pilosa."` } MaxMsgs uint64 `` /* 134-byte string literal not displayed */ // contains filtered or unexported fields }
Main holds all config for general ingest
func (*Main) NewLookupClient ¶
func (m *Main) NewLookupClient() (*PostgresClient, error)
NewLookupClient returns an instance of a lookupClient. This represents a somewhat generic interface to a separate data store; currently the only implementation uses Postgres. This is also used for testing.
func (*Main) PilosaClient ¶
func (m *Main) PilosaClient() *pilosaclient.Client
type NopLookupBatcher ¶
type NopLookupBatcher struct{}
NopLookupBatcher is a no-op implementation of a LookupBatcher.
func (*NopLookupBatcher) AddFullRow ¶
func (n *NopLookupBatcher) AddFullRow(row []interface{}) error
func (*NopLookupBatcher) Import ¶
func (n *NopLookupBatcher) Import() error
func (*NopLookupBatcher) Len ¶
func (n *NopLookupBatcher) Len() int
type OffsetStreamRecord ¶
type OffsetStreamRecord interface { Record // StreamOffset returns the stream from which the record originated, and the offset of the record within that stream. StreamOffset() (key string, offset uint64) }
OffsetStreamRecord is an extension of the record type which also tracks offsets within streams.
type PostgresClient ¶
type PostgresClient struct { DSN string TableName string ColumnNames []string // contains filtered or unexported fields }
func NewPostgresClient ¶
func NewPostgresClient(DSN, TableName string, log logger.Logger) (*PostgresClient, error)
NewPostgresClient creates a Lookup client for a Postgres backend. It creates the corresponding Postgres table if necessary.
func (*PostgresClient) Close ¶
func (c *PostgresClient) Close() error
func (*PostgresClient) DropTable ¶
func (c *PostgresClient) DropTable() error
DropTable drops the PostgresClient's table from the Postgres backend, for test cleanup.
func (*PostgresClient) ReadString ¶
func (c *PostgresClient) ReadString(id uint64, column string) (string, error)
ReadString reads the string value for the specified column, from the row with the specified id.
func (*PostgresClient) RowExists ¶
func (c *PostgresClient) RowExists(id uint64) (bool, error)
RowExists returns true if a row exists containing the specified id.
func (*PostgresClient) Setup ¶
func (c *PostgresClient) Setup(columnNames []string) error
Setup primarily serves to add text columns to the Postgres table. Column names are not known until ingester.Run starts, so columns are created separately from table creation.
type PostgresUpsertBatcher ¶
type PostgresUpsertBatcher struct {
// contains filtered or unexported fields
}
PostgresUpsertBatcher does bulk imports using an INSERT ... ON CONFLICT statement, to handle duplicate-key updates.
func NewPostgresUpsertBatcher ¶
func NewPostgresUpsertBatcher(client *PostgresClient, batchSize int) *PostgresUpsertBatcher
NewPostgresUpsertBatcher creates a LookupBatcher, with a Postgres backend, which uses the Upsert method (see the Import function for details) to batch imports.
func (*PostgresUpsertBatcher) AddFullRow ¶
func (pg *PostgresUpsertBatcher) AddFullRow(row []interface{}) error
AddFullRow adds a row to the batch, which includes data for each column in the table. If the addition causes the batch to reach the predefined batch size, an import occurs. An alternative might be AddPartialRow, which accepts data for only some of the columns in the table. An AddPartialRow function would be able to handle any number of missing Lookup fields in received messages; AddFullRow can not.
func (*PostgresUpsertBatcher) Import ¶
func (pg *PostgresUpsertBatcher) Import() error
Import sends a batch of row data to Postgres using an INSERT ... ON CONFLICT statement. The ON CONFLICT portion has no effect on the rows in the batch with new ids. For rows with ids that already exist in the Postgres table, the ON CONFLICT causes the new values for the data columns to overwrite the existing values.
func (*PostgresUpsertBatcher) Len ¶
func (pg *PostgresUpsertBatcher) Len() int
Len returns the number of rows in the current batch, that have yet to be imported to the Postgres backend.
type ProgressTracker ¶
type ProgressTracker struct {
// contains filtered or unexported fields
}
ProgressTracker tracks the progress of record sourcing.
func (*ProgressTracker) Check ¶
func (t *ProgressTracker) Check() uint64
Check the number of records that have been sourced so far.
func (*ProgressTracker) Track ¶
func (t *ProgressTracker) Track(src Source) Source
Track record generation progress on a source. Wraps the source with a progress-tracking mechanism.
type RangeAllocator ¶
func NewLocalRangeAllocator ¶
func NewLocalRangeAllocator(shardWidth uint64) RangeAllocator
type RawField ¶ added in v3.34.0
type RawField struct { Name string `json:"name"` Path []string `json:"path"` Type string `json:"type"` Config json.RawMessage }
RawField is used in cases where header fields are configured as json, typically read from a file. But this type is also used by the kafka runner in fbsql.
type Record ¶
type Record interface { // Commit notifies the Source which produced this record that it // and any record which came before it have been completely // processed. The Source can then take any necessary action to // record which records have been processed, and restart from the // earliest unprocessed record in the event of a failure. Commit(ctx context.Context) error Data() []interface{} Schema() interface{} }
type RecordTimeField ¶
type RecordTimeField struct { NameVal string DestNameVal string Layout string // Layout tells how the time should be parsed. Defaults to RFC3339. // need a way to create other time fields in the record (add time/quantum to String, StringArray, ID, IDArray?) // do we need a way to have timefields in a record with independent times/values Epoch time.Time Unit Unit }
RecordTimeField applies to whole record, but doesn't have a name (or quantum) of its own since it applies to any other time fields in the record.
func (RecordTimeField) DestName ¶
func (r RecordTimeField) DestName() string
func (RecordTimeField) Name ¶
func (r RecordTimeField) Name() string
func (RecordTimeField) PilosafyVal ¶
func (r RecordTimeField) PilosafyVal(val interface{}) (interface{}, error)
PilosafyVal for RecordTimeField always returns a time.Time or nil.
type Recordizer ¶
type Recordizer func(rawRec []interface{}, rec *pilosabatch.Row) error
type SchemaManager ¶
type SchemaManager interface { StartTransaction(id string, timeout time.Duration, exclusive bool, requestTimeout time.Duration) (*pilosacore.Transaction, error) FinishTransaction(id string) (*pilosacore.Transaction, error) Schema() (*pilosaclient.Schema, error) SyncIndex(index *pilosaclient.Index) error DeleteIndex(index *pilosaclient.Index) error Status() (pilosaclient.Status, error) SetAuthToken(string) }
SchemaManager is meant to be an interface for managing schema information; i.e. for interacting with a single source of truth for schema information, like the Serverless Schemar. But... it currently contains methods which are not related to schema because the first goal was just to introduce an interface in ingest.go for any methods being called on *m.client. We don't want a FeatureBase client directly called from ingest, rather, we want to call these interface methods and allow for different implementations (such as a Serverless implementation which uses the Schemar in Serverless as opposed to a FeatureBase node or cluster).
var NopSchemaManager SchemaManager = &nopSchemaManager{}
NopSchemaManager is an implementation of the SchemaManager interface that doesn't do anything.
type SignedIntBoolKeyField ¶
SignedIntBoolKeyField translates a signed integer value to a (rowID, bool) pair corresponding to the magnitude and sign of the original value. This may be used to specify whether a bool value is to be set (positive/true) or cleared (negative/false).
func (SignedIntBoolKeyField) DestName ¶
func (b SignedIntBoolKeyField) DestName() string
func (SignedIntBoolKeyField) Name ¶
func (b SignedIntBoolKeyField) Name() string
func (SignedIntBoolKeyField) PilosafyVal ¶
func (SignedIntBoolKeyField) PilosafyVal(val interface{}) (interface{}, error)
type Source ¶
type Source interface { // Record returns a data record, and an optional error. If the // error is ErrSchemaChange, then the record is valid, but one // should call Source.Schema to understand how each of its fields // should be interpreted. Record() (Record, error) // Schema returns a slice of Fields which applies to the most // recent Record returned from Source.Record. Every Field has a // name and a type, and depending on the concrete type of the // Field, may have other information which is relevant to how it // should be indexed. Schema() []Field Close() error }
Source is an interface implemented by sources of data which can be ingested into Pilosa. Each Record returned from Record is described by the slice of Fields returned from Source.Schema directly after the call to Source.Record. If the error returned from Source.Record is nil, then the call to Schema which applied to the previous Record also applies to this Record. Source implementations are fundamentally not threadsafe (due to the interplay between Record and Schema).
type StringArrayField ¶
type StringArrayField struct { NameVal string DestNameVal string // Quantum — see note about Quantum on "Field" interface. Quantum string TTL string *CacheConfig }
func (StringArrayField) DestName ¶
func (s StringArrayField) DestName() string
func (StringArrayField) Name ¶
func (s StringArrayField) Name() string
func (StringArrayField) PilosafyVal ¶
func (StringArrayField) PilosafyVal(val interface{}) (interface{}, error)
type StringField ¶
type StringField struct { NameVal string DestNameVal string // Mutex denotes whether we need to enforce that each record only // has a single value for this field. Put another way, says // whether a new value for this field be treated as adding an // additional value, or replacing the existing value (if there is // one). Mutex bool // Quantum — see note about Quantum on "Field" interface. Quantum string TTL string *CacheConfig }
func (StringField) DestName ¶
func (s StringField) DestName() string
func (StringField) Name ¶
func (s StringField) Name() string
func (StringField) PilosafyVal ¶
func (s StringField) PilosafyVal(val interface{}) (interface{}, error)
type TLSConfig ¶
type TLSConfig struct { // CertificatePath contains the path to the certificate (.crt or .pem file) CertificatePath string `json:"certificate" help:"Path to certificate file, or literal PEM data."` // CertificateKeyPath contains the path to the certificate key (.key file) CertificateKeyPath string `json:"key" help:"Path to certificate key file, or literal PEM data."` // CACertPath is the path to a CA certificate (.crt or .pem file) CACertPath string `json:"ca-certificate" help:"Path to CA certificate file, or literal PEM data."` // SkipVerify disables verification of server certificates. SkipVerify bool `json:"skip-verify" help:"Disables verification of server certificates."` // EnableClientVerification enables verification of client TLS certificates (Mutual TLS) EnableClientVerification bool `json:"enable-client-verification" help:"Enable verification of client certificates."` }
TLSConfig contains TLS configuration. *Path elements can be set with local file paths, OR with literal PEM data, which is detected automatically (naively). Passing cert data directly via environment variables is used for m-cloud deployment, secured via SSM. This makes the parameter names slightly inaccurate, but it avoids introducing yet another configuration parameter.
type TimestampField ¶
type TimestampField struct { NameVal string DestNameVal string Layout string Epoch time.Time Unit Unit Granularity string }
func (TimestampField) DestName ¶
func (d TimestampField) DestName() string
func (TimestampField) Name ¶
func (d TimestampField) Name() string
func (TimestampField) PilosafyVal ¶
func (t TimestampField) PilosafyVal(val interface{}) (interface{}, error)
PilosafyVal for TimestampField always returns an int or nil.