idk

package
v3.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2023 License: Apache-2.0, Apache-2.0 Imports: 50 Imported by: 0

README

idk: Ingest Development Kit

Integration tests

To run the tests, you will need to install the following dependencies:

  1. Docker
  2. Docker Compose
  3. Certstrap

In addition to these dependancies, you will need to be added to the moleculacorp Dockerhub account.

First start the test environment. This is a docker-compose environment that includes pilosa and a confluent kafka stack. Run the following to start those services:

make startup

To build and run the integration tests, run:

make test-run

An alternative command to use, if you're running tests locally and want human-friendly output, is:

make test-run-local

With that command you can also specify individual tests to run like this:

make test-run-local TCMD='-run=TestJustThisOne .'

Then to shut down the test environment, run:

make shutdown

You can run all of the previous commands by calling test-all:

make test-all

The previous command is equivalent to running the following:

make startup
sleep 30 # wait for services to come up
make test-run
make shutdown

To run an individual test, you can run the command directly using docker-compose. Note that you must run docker-compose build idk-test for docker to run the latest code. Modify the following as needed:

make startup
docker-compose build idk-test
docker-compose run idk-test /usr/local/go/bin/go test -count=1 -mod=vendor -run=TestCmdMainOne ./kafka

To shutdown and reset the environment:

make clean

Running dependencies locally (rather than in docker) "make test-local"

This is for running the tests locally and not in Docker... so you have to be running a bunch of stuff natively on your machine.

Run Pilosa with default config:

pilosa server

Run another pilosa like

pilosa server --config=pilosa-sec-test.conf

which will run Pilosa with TLS using certs in testenv. (make testenv first if you haven't).

You also need to be running the Confluent stack which, after you've installed it from Confluent's site might look something like:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home
confluent local destroy && confluent local start schema-registry

Or it might not! You may not need the first line, but if you have the wrong Java version by default, that's how you set it. The second line may change depending on what version of the confluent stack you get. According to

confluent version

I'm running:

Version:     v0.212.0
Git Ref:     2b04985

Use the test-local make target:

make test-local

This sets a number of environment variables (which it prints when you run it), and should set them correctly if you follow the instructions above, but if you're running things on non-default ports you may need to tweak them.

CSV Ingester

  1. make sure you're running Pilosa (localhost:10101 for these instructions)

molecula-consumer-csv --primary-key-fields=asset_tag -i sample-index --files sample.csv

asset_tag__String,fan_time__RecordTime_2006-01-02,fan_val__String_F_YMD
ABCD,2019-01-02,70%
ABCD,2019-01-03,20%
ABCD,2019-01-04,30%
BEDF,2019-01-02,70%
BEDF,2019-01-05,90%
BEDF,2019-01-08,10%
BEDF,2019-01-08,20%
ABCD,2019-01-30,40%

Datagen

Datagen is an internal command-line tool to generate various application-specific datasets, and ingest them directly into Pilosa. After running make install, run datagen with no arguments to see a list of available "sources".

When running Datagen with local Docker stacks, make sure to add individual docker stacks to the /etc/hosts file:

127.0.0.1 kafka
127.0.0.1 pilosa
127.0.0.1 <docker stack>

ODBC Support

By default, the SQL ingester is not built with ODBC support. This is because it uses CGO with extra dependencies, and the resulting binaries are not portable. In order to build with ODBC support, it is necessary to install unixODBC to the system. Then run:

make bin/molecula-consumer-sql-odbc

Different Linux distros will store certain libraries in different locations. ODBC uses dynamic library loading to handle drivers, so full static linking of dependencies is not possible. It is therefore necessary to build on a system with the same distro and the same software versions as the target machine.

Documentation

Index

Constants

View Source
const (
	Exists           = "-exists"
	ErrCommittingIDs = "committing IDs for batch"
)
View Source
const (
	Custom      = Unit("c")
	Day         = Unit("d")
	Hour        = Unit("h")
	Minute      = Unit("m")
	Second      = Unit("s")
	Millisecond = Unit("ms")
	Microsecond = Unit("us")
	Nanosecond  = Unit("ns")

	DefaultUnit = Second
)
View Source
const (
	MetricDeleterRowsAdded      = "deleter_rows_added_total"
	MetricIngesterRowsAdded     = "ingester_rows_added_total"
	MetricIngesterSchemaChanges = "ingester_schema_changes_total"
	MetricCommittedRecords      = "committed_records"
)
View Source
const DecimalPrecision = 18
View Source
const IDColumn = "id"

Variables

View Source
var (
	ErrNoFieldSpec      = errors.New("no field spec in this header")
	ErrInvalidFieldName = errors.New("field name must match [a-z][a-z0-9_-]{0,229}")
	ErrParsingEpoch     = "parsing epoch for "
	ErrDecodingConfig   = "decoding config for field "
)
View Source
var (
	// ErrSchemaChange is returned from Source.Record when the returned
	// record has a different schema from the previous record.
	ErrSchemaChange = errors.New("this record has a different schema from the previous record (or is the first one delivered). Please call Source.Schema() to fetch the schema in order to properly decode this record")

	// ErrFlush is returned from Source.Record when the Source wants to
	// signal that there may not be data for a while, so it's a good time
	// to make sure all data which has been received is ingested. The
	// record must be nil when ErrFlush is returned.
	ErrFlush = errors.New("the Source is requesting the batch be flushed")

	ErrFmtUnknownUnit      = "unknown unit %q, please choose from d/h/m/s/ms/us/ns"
	ErrIntOutOfRange       = errors.New("value provided for int field is out of range")
	ErrDecimalOutOfRange   = errors.New("value provided for decimal field is out of range")
	ErrTimestampOutOfRange = errors.New("value provided for timestamp field is out of range")
)
View Source
var (
	MinTimestampNano = time.Unix(-1<<32, 0).UTC()       // 1833-11-24T17:31:44Z
	MaxTimestampNano = time.Unix(1<<32, 0).UTC()        // 2106-02-07T06:28:16Z
	MinTimestamp     = time.Unix(-62135596799, 0).UTC() // 0001-01-01T00:00:01Z
	MaxTimestamp     = time.Unix(253402300799, 0).UTC() // 9999-12-31T23:59:59Z
)
View Source
var BuildTime = "not recorded"
View Source
var CounterCommittedRecords = prometheus.NewCounter(
	prometheus.CounterOpts{
		Namespace: "ingester",
		Name:      MetricCommittedRecords,
		Help:      "TODO",
	},
)
View Source
var CounterDeleterRowsAdded = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Namespace: "ingester",
		Name:      MetricDeleterRowsAdded,
		Help:      "TODO",
	},
	[]string{
		"type",
	},
)
View Source
var CounterIngesterRowsAdded = prometheus.NewCounter(
	prometheus.CounterOpts{
		Namespace: "ingester",
		Name:      MetricIngesterRowsAdded,
		Help:      "TODO",
	},
)
View Source
var CounterIngesterSchemaChanges = prometheus.NewCounter(
	prometheus.CounterOpts{
		Namespace: "ingester",
		Name:      MetricIngesterSchemaChanges,
		Help:      "TODO",
	},
)
View Source
var Version = "v0.0.0"

Functions

func FieldsEqual

func FieldsEqual(f1, f2 Field) bool

FieldsEqual is used in testing to compare Fields. The pointers make it a bit tricky for IntField.

func GetHTTPClient

func GetHTTPClient(t *tls.Config) *http.Client

func GetTLSConfig

func GetTLSConfig(tlsConfig *TLSConfig, log logger.Logger) (TLSConfig *tls.Config, err error)

func GetTLSConfigFromConfluent

func GetTLSConfigFromConfluent(config *ConfluentCommand, log logger.Logger) (TLSConfig *tls.Config, err error)

func HasMutex

func HasMutex(fld Field) bool

HasMutex returns Mutex value of StringField or IDField, otherwise false

func NewKeypairReloader

func NewKeypairReloader(certPath, keyPath string, log logger.Logger) (*keypairReloader, error)

func NewStaticKeypair

func NewStaticKeypair(certPemData, keyPemData []byte, log logger.Logger) (*keypairReloader, error)

func ParseHeader

func ParseHeader(raw []byte) ([]Field, PathTable, error)

func QuantumOf

func QuantumOf(fld Field) string

QuantumOf returns Quantum of the Field.

func TTLOf

func TTLOf(fld Field) (time.Duration, error)

func TimestampToVal

func TimestampToVal(unit Unit, ts time.Time) int64

TimestampToVal takes a time unit and a time.Time and converts it to an integer value

func ValToTimestamp

func ValToTimestamp(unit string, val int64) (time.Time, error)

ValToTimestamp takes a timeunit and an integer value and converts it to time.Time

Types

type BoolField

type BoolField struct {
	NameVal     string
	DestNameVal string
}

func (BoolField) DestName

func (b BoolField) DestName() string

func (BoolField) Name

func (b BoolField) Name() string

func (BoolField) PilosafyVal

func (b BoolField) PilosafyVal(val interface{}) (interface{}, error)

type CacheConfig

type CacheConfig struct {
	CacheType pilosaclient.CacheType
	CacheSize int
}

CacheConfig - type (ranked, lru, none) and size.

func CacheConfigOf

func CacheConfigOf(f Field) CacheConfig

CacheConfigOf returns CacheConfig of the Field.

type ConfluentCommand

type ConfluentCommand struct {
	KafkaSecurityProtocol string   `help:"Protocol used to communicate with brokers (security.protocol)"`
	KafkaBootstrapServers []string `help:"Comma separated list of host:port pairs for Kafka."`

	KafkaSslCaLocation                      string `help:"File or directory path to CA certificate(s) for verifying the broker's key(ssl.ca.location)"`
	KafkaSslCertificateLocation             string `help:"Path to client's public key (PEM) used for authentication(ssl.certificate.location)"`
	KafkaSslKeyLocation                     string `help:"Path to client's private key (PEM) used for authentication(ssl.key.location)."`
	KafkaSslKeyPassword                     string `help:"Private key passphrase (for use with ssl.key.location and set_ssl_cert()(ssl.key.password))"`
	KafkaSslEndpointIdentificationAlgorithm string `` /* 130-byte string literal not displayed */
	KafkaEnableSslCertificateVerification   bool   `help:"(enable.ssl.certificate.verification)"`
	KafkaSocketTimeoutMs                    int    `help:"(socket.timeout.ms)"`
	KafkaSocketKeepaliveEnable              string `help:"The (socket.keepalive.enable) kafka consumer configuration"`

	KafkaClientId        string `help:"(client.id)"`
	KafkaDebug           string `` /* 184-byte string literal not displayed */
	KafkaMaxPollInterval string `` /* 159-byte string literal not displayed */
	KafkaSessionTimeout  string `` /* 136-byte string literal not displayed */
	KafkaGroupInstanceId string `help:"The (group.instance.id) kafka consumer configuration."`

	KafkaSaslUsername  string `help:"SASL authentication username (sasl.username)"`
	KafkaSaslPassword  string `help:"SASL authentication password (sasl.password)"`
	KafkaSaslMechanism string `help:"SASL mechanism to use for authentication.(sasl.mechanism)"`

	SchemaRegistryUsername string `help:"authenticaion key provided by confluent for schema registry."`
	SchemaRegistryPassword string `help:"authenticaion secret provided by confluent for schema registry."`
	SchemaRegistryURL      string `short:"g" help:"Location of Confluent Schema Registry. Must start with 'https://' if you want to use TLS."`

	KafkaConfiguration string `help:"json configuration for confluents ConfigMap will be applied first EXPERIMENTAL"`
}

type DateIntField

type DateIntField struct {
	NameVal     string
	DestNameVal string
	Layout      string
	Epoch       time.Time
	Unit        Unit
	CustomUnit  string
}

func (DateIntField) DestName

func (d DateIntField) DestName() string

func (DateIntField) Name

func (d DateIntField) Name() string

func (DateIntField) PilosafyVal

func (d DateIntField) PilosafyVal(val interface{}) (interface{}, error)

PilosafyVal for a DateIntField takes a time.Time and int64 which represents the number units from the epoch.

type DecimalField

type DecimalField struct {
	NameVal     string
	DestNameVal string
	Scale       int64
}

func (DecimalField) DestName

func (d DecimalField) DestName() string

func (DecimalField) Name

func (d DecimalField) Name() string

func (DecimalField) PilosafyVal

func (d DecimalField) PilosafyVal(val interface{}) (interface{}, error)

PilosafyVal for DecimalField always returns an int64. If the incoming value is anything but a float or string we attempt to convert to int64 and then scale it. Strings are attempted to be parsed into floats, and all values are scaled by the 10^scale before being returned. Byte slices are assumed to represent the already scaled value and are interpreted as int64.

type DeleteSentinel

type DeleteSentinel int
const (
	DELETE_SENTINEL DeleteSentinel = 1
)

type ErrIDOffsetDesync

type ErrIDOffsetDesync struct {
	Requested uint64 `json:"requested"`
	// Base is the next lowest uncommitted offset for which
	// IDs may be reserved
	Base uint64 `json:"base"`
}

TODO, once pull/1559 is merged into pilosa main branch no need to maintain a duplicate definition of ErrIDOffsetDesync here

func (ErrIDOffsetDesync) Error

func (e ErrIDOffsetDesync) Error() string

type Field

type Field interface {
	Name() string
	DestName() string
	PilosafyVal(val interface{}) (interface{}, error) // TODO rename this
}

Field knows how to interpret values of different types and tells how they get indexed in Pilosa. Every field implementation should be a struct named like <something>Field, and have as members `NameVal string` and `DestNameVal string`, where NameVal contains the name of the field at the source, and DestNameVal contains the name of the field at the destination (pilosa)

Many Field implementations have a Quantum field which can be any valid Pilosa time quantum, e.g. "Y", "YMDH", "DH", etc. If Quantum is set to a valid quantum, the Pilosa field created for this field will be of type "time". Other fields which control field type will be ignored until/if Pilosa supports time+(othertype) fields.

func HeaderToField

func HeaderToField(headerField string, log logger.Logger) (field Field, _ error)

HeaderToField takes a header specification which looks like sourcename___destname__FieldType_Arg_Arg2 (note that sourcename and destname are separated by "___", triple underscore) and converts it to an idk Field like:

FieldTypeField {
    NameVal: sourcename,
    DestNameVal: destname,
    Thing1: Arg,
    Thing2: Arg2,
}

It does this using a variety of reflective magic. The unwritten rules are that all idk Fields must be structs and have their first member be `NameVal string`. The arguments are applied in order to exported fields.

type FieldType

type FieldType string
const (
	IDType               FieldType = "id"
	BoolType             FieldType = "bool"
	StringType           FieldType = "string"
	LookupTextType       FieldType = "lookuptext"
	IntType              FieldType = "int"
	ForeignKeyType       FieldType = "foreignkey"
	DecimalType          FieldType = "decimal"
	StringArrayType      FieldType = "stringarray"
	IDArrayType          FieldType = "idarray"
	DateIntType          FieldType = "dateint"
	RecordTimeType       FieldType = "recordtime"
	SignedIntBoolKeyType FieldType = "signedintboolkey"
	IgnoreType           FieldType = "ignore"
	TimestampType        FieldType = "timestamp"
)

type Fields

type Fields []Field

Fields is a list of Field, representing a schema.

func (Fields) ContainsBool

func (f Fields) ContainsBool() bool

ContainsBool returns true if at least one field in the list is a BoolField.

type IDAllocator

type IDAllocator interface {
	Next(context.Context, Record) (uint64, error)
	Reserve(context.Context, uint64) error
	Commit(context.Context) error
}

func NewRangeNexter

func NewRangeNexter(a RangeAllocator) (IDAllocator, error)

type IDArrayField

type IDArrayField struct {
	NameVal     string
	DestNameVal string

	// Quantum — see note about Quantum on "Field" interface.
	Quantum string

	TTL string

	*CacheConfig
}

func (IDArrayField) DestName

func (i IDArrayField) DestName() string

func (IDArrayField) Name

func (i IDArrayField) Name() string

func (IDArrayField) PilosafyVal

func (IDArrayField) PilosafyVal(val interface{}) (interface{}, error)

type IDField

type IDField struct {
	NameVal     string
	DestNameVal string

	// Mutex denotes whether we need to enforce that each record only
	// has a single value for this field. Put another way, says
	// whether a new value for this field be treated as adding an
	// additional value, or replacing the existing value (if there is
	// one).
	Mutex bool

	// Quantum — see note about Quantum on "Field" interface.
	Quantum string

	TTL string

	*CacheConfig
}

func (IDField) DestName

func (id IDField) DestName() string

func (IDField) Name

func (id IDField) Name() string

func (IDField) PilosafyVal

func (id IDField) PilosafyVal(val interface{}) (interface{}, error)

type IDRange

type IDRange struct {
	Start uint64
	End   uint64
}

IDRange is inclusive at Start and exclusive at End... like slices.

type IgnoreField

type IgnoreField struct{}

IgnoreField can be used when you wish not to process one of the input fields, but it is inconvenient to remove it ahead of time.

func (IgnoreField) DestName

func (IgnoreField) DestName() string

func (IgnoreField) Name

func (IgnoreField) Name() string

func (IgnoreField) PilosafyVal

func (IgnoreField) PilosafyVal(interface{}) (interface{}, error)

type IntField

type IntField struct {
	NameVal      string
	DestNameVal  string
	Min          *int64
	Max          *int64
	ForeignIndex string
}

IntField - if you add any new fields to this struct, please update the FieldsEqual function to accomodate.

func (IntField) DestName

func (i IntField) DestName() string

func (IntField) Name

func (i IntField) Name() string

func (IntField) PilosafyVal

func (i IntField) PilosafyVal(val interface{}) (interface{}, error)

type LocalRangeAllocator

type LocalRangeAllocator struct {
	// contains filtered or unexported fields
}

func (*LocalRangeAllocator) Get

func (a *LocalRangeAllocator) Get() (*IDRange, error)

func (*LocalRangeAllocator) Return

func (a *LocalRangeAllocator) Return(r *IDRange) error

type LookupBatcher

type LookupBatcher interface {
	AddFullRow([]interface{}) error
	Len() int
	Import() error
}

type LookupTextField

type LookupTextField struct {
	// NOTE this implements the Field interface for simplicity of implementation/API, but that interface is intended for data going into pilosa, while this is not.
	NameVal     string
	DestNameVal string
}

func (LookupTextField) DestName

func (s LookupTextField) DestName() string

func (LookupTextField) Name

func (s LookupTextField) Name() string

func (LookupTextField) PilosafyVal

func (s LookupTextField) PilosafyVal(val interface{}) (interface{}, error)

type Main

type Main struct {
	PilosaHosts              []string      `short:"p" help:"Alias for --featurebase-hosts. Will be deprecated in the next major release."`
	FeaturebaseHosts         []string      `short:"" help:"Comma separated list of host:port pairs for FeatureBase."`
	PilosaGRPCHosts          []string      `short:"" help:"Alias for --featurebase-grpc-hosts. Will be deprecated in the next major release."`
	FeaturebaseGRPCHosts     []string      `short:"" help:"Comma separated list of host:port pairs for FeatureBase's GRPC endpoint. Used by Kafka delete consumer."`
	BatchSize                int           `` /* 178-byte string literal not displayed */
	KeyTranslateBatchSize    int           `help:"Maximum number of keys to translate at a time."`
	BatchMaxStaleness        time.Duration `` /* 184-byte string literal not displayed */
	Index                    string        `short:"i" help:"Name of FeatureBase index."`
	LogPath                  string        `short:"l" help:"Log file to write to. Empty means stderr."`
	PrimaryKeyFields         []string      `` /* 188-byte string literal not displayed */
	IDField                  string        `` /* 168-byte string literal not displayed */
	AutoGenerate             bool          `short:"a" help:"Automatically generate IDs."`
	ExternalGenerate         bool          `short:"" help:"Use FeatureBase's ID generation (must be set alongside auto-generate)."`
	IDAllocKeyPrefix         string        `` /* 135-byte string literal not displayed */
	Concurrency              int           `` /* 252-byte string literal not displayed */
	CacheLength              uint64        `short:""  help:"Number of batches of ID mappings to cache."`
	PackBools                string        `` /* 130-byte string literal not displayed */
	Verbose                  bool          `short:"v" help:"Enable verbose logging."`
	Delete                   bool          `help:"If true, delete records rather than write them." flag:"-"`
	Pprof                    string        `short:"o" help:"host:port on which to listen for pprof"`
	Stats                    string        `short:"s" help:"host:port on which to host metrics"`
	ExpSplitBatchMode        bool          `` /* 213-byte string literal not displayed */
	AssumeEmptyPilosa        bool          `short:"u" help:"Alias for --assume-empty-featurebase. Will be deprecated in the next major release."`
	AssumeEmptyFeaturebase   bool          `` /* 366-byte string literal not displayed */
	WriteCSV                 string        `short:"" help:"Write data we're ingesting to a CSV file with the given name."`
	Namespace                string        `flag:"-"`
	DeleteIndex              bool          `` /* 181-byte string literal not displayed */
	DryRun                   bool          `short:"" help:"Dry run - just flag parsing."`
	TrackProgress            bool          `help:"Periodically print status updates on how many records have been sourced." short:""`
	OffsetMode               bool          `` /* 165-byte string literal not displayed */
	LookupDBDSN              string        `flag:"lookup-db-dsn" help:"Connection string for connecting to Lookup database."`
	LookupBatchSize          int           `help:"Number of records to batch before writing them to Lookup database."`
	AuthToken                string        `flag:"auth-token" help:"Authentication Token for FeatureBase"`
	CommitTimeout            time.Duration `help:"Maximum time before canceling commit."`
	AllowIntOutOfRange       bool          `help:"Allow ingest to continue when it encounters out of range integers in IntFields. (default false)"`
	AllowDecimalOutOfRange   bool          `help:"Allow ingest to continue when it encounters out of range decimals in DecimalFields. (default false)"`
	AllowTimestampOutOfRange bool          `help:"Allow ingest to continue when it encounters out of range timestamps in TimestampFields. (default false)"`
	SkipBadRows              int           `help:"If you fail to process the first n rows without processing one successfully, fail."`

	UseShardTransactionalEndpoint bool `` /* 216-byte string literal not displayed */

	MDSAddress     string             `short:"" help:"MDS address."`
	OrganizationID dax.OrganizationID `short:"" help:"auto-assigned organization ID"`
	DatabaseID     dax.DatabaseID     `short:"" help:"auto-assigned database ID"`
	TableName      dax.TableName      `short:"" help:"human friendly table name"`

	// TODO implement the auto-generated IDs... hopefully using Pilosa to manage it.
	TLS TLSConfig

	// NewSource must be set by the user of Main before calling
	// Main.Run. Main.Run will call this function "Concurrency" times. It
	// is the job of this function to ensure that the concurrent
	// sources which are started partition work appropriately. This is
	// typically set up (by convention) in the Source's package in
	// cmd.go
	NewSource func() (Source, error) `flag:"-"`

	NewImporterFn func() pilosacore.Importer `flag:"-"`

	SchemaManager SchemaManager       `flag:"-"`
	Qtbl          *dax.QualifiedTable `flag:"-"`

	// Future flags are used to represent features or functionality which is not
	// yet the default behavior, but will be in a future release.
	Future struct {
		// Rename, if true, will interact with a service called FeatureBase
		// instead of Pilosa.
		Rename bool `help:"Interact with FeatureBase instead of Pilosa."`
	}
	MaxMsgs uint64 `` /* 134-byte string literal not displayed */
	// contains filtered or unexported fields
}

Main holds all config for general ingest

func NewMain

func NewMain() *Main

func (*Main) Log

func (m *Main) Log() logger.Logger

func (*Main) NewLookupClient

func (m *Main) NewLookupClient() (*PostgresClient, error)

NewLookupClient returns an instance of a lookupClient. This represents a somewhat generic interface to a separate data store; currently the only implementation uses Postgres. This is also used for testing.

func (*Main) PilosaClient

func (m *Main) PilosaClient() *pilosaclient.Client

func (*Main) Rename

func (m *Main) Rename()

func (*Main) Run

func (m *Main) Run() (err error)

func (*Main) SetLog

func (m *Main) SetLog(log logger.Logger)

func (*Main) Setup

func (m *Main) Setup() (onFinishRun func(), err error)

type Metadata

type Metadata interface {
	// SchemaMetadata returns a string representation of source-specific details
	// about the schema.
	SchemaMetadata() string
	SchemaSubject() string
	SchemaSchema() string
	SchemaVersion() int
	SchemaID() int
}

type NopLookupBatcher

type NopLookupBatcher struct{}

NopLookupBatcher is a no-op implementation of a LookupBatcher.

func (*NopLookupBatcher) AddFullRow

func (n *NopLookupBatcher) AddFullRow(row []interface{}) error

func (*NopLookupBatcher) Import

func (n *NopLookupBatcher) Import() error

func (*NopLookupBatcher) Len

func (n *NopLookupBatcher) Len() int

type OffsetStreamRecord

type OffsetStreamRecord interface {
	Record

	// StreamOffset returns the stream from which the record originated, and the offset of the record within that stream.
	StreamOffset() (key string, offset uint64)
}

OffsetStreamRecord is an extension of the record type which also tracks offsets within streams.

type PathTable

type PathTable [][]string

func (PathTable) FlatMap

func (t PathTable) FlatMap() map[string]int

func (PathTable) Lookup

func (t PathTable) Lookup(root interface{}, allowMissingFields bool) ([]interface{}, error)

type PostgresClient

type PostgresClient struct {
	DSN         string
	TableName   string
	ColumnNames []string
	// contains filtered or unexported fields
}

func NewPostgresClient

func NewPostgresClient(DSN, TableName string, log logger.Logger) (*PostgresClient, error)

NewPostgresClient creates a Lookup client for a Postgres backend. It creates the corresponding Postgres table if necessary.

func (*PostgresClient) Close

func (c *PostgresClient) Close() error

func (*PostgresClient) DropTable

func (c *PostgresClient) DropTable() error

DropTable drops the PostgresClient's table from the Postgres backend, for test cleanup.

func (*PostgresClient) ReadString

func (c *PostgresClient) ReadString(id uint64, column string) (string, error)

ReadString reads the string value for the specified column, from the row with the specified id.

func (*PostgresClient) RowExists

func (c *PostgresClient) RowExists(id uint64) (bool, error)

RowExists returns true if a row exists containing the specified id.

func (*PostgresClient) Setup

func (c *PostgresClient) Setup(columnNames []string) error

Setup primarily serves to add text columns to the Postgres table. Column names are not known until ingester.Run starts, so columns are created separately from table creation.

type PostgresUpsertBatcher

type PostgresUpsertBatcher struct {
	// contains filtered or unexported fields
}

PostgresUpsertBatcher does bulk imports using an INSERT ... ON CONFLICT statement, to handle duplicate-key updates.

func NewPostgresUpsertBatcher

func NewPostgresUpsertBatcher(client *PostgresClient, batchSize int) *PostgresUpsertBatcher

NewPostgresUpsertBatcher creates a LookupBatcher, with a Postgres backend, which uses the Upsert method (see the Import function for details) to batch imports.

func (*PostgresUpsertBatcher) AddFullRow

func (pg *PostgresUpsertBatcher) AddFullRow(row []interface{}) error

AddFullRow adds a row to the batch, which includes data for each column in the table. If the addition causes the batch to reach the predefined batch size, an import occurs. An alternative might be AddPartialRow, which accepts data for only some of the columns in the table. An AddPartialRow function would be able to handle any number of missing Lookup fields in received messages; AddFullRow can not.

func (*PostgresUpsertBatcher) Import

func (pg *PostgresUpsertBatcher) Import() error

Import sends a batch of row data to Postgres using an INSERT ... ON CONFLICT statement. The ON CONFLICT portion has no effect on the rows in the batch with new ids. For rows with ids that already exist in the Postgres table, the ON CONFLICT causes the new values for the data columns to overwrite the existing values.

func (*PostgresUpsertBatcher) Len

func (pg *PostgresUpsertBatcher) Len() int

Len returns the number of rows in the current batch, that have yet to be imported to the Postgres backend.

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker tracks the progress of record sourcing.

func (*ProgressTracker) Check

func (t *ProgressTracker) Check() uint64

Check the number of records that have been sourced so far.

func (*ProgressTracker) Track

func (t *ProgressTracker) Track(src Source) Source

Track record generation progress on a source. Wraps the source with a progress-tracking mechanism.

type RangeAllocator

type RangeAllocator interface {
	Get() (*IDRange, error)
	Return(*IDRange) error
}

func NewLocalRangeAllocator

func NewLocalRangeAllocator(shardWidth uint64) RangeAllocator

type Record

type Record interface {
	// Commit notifies the Source which produced this record that it
	// and any record which came before it have been completely
	// processed. The Source can then take any necessary action to
	// record which records have been processed, and restart from the
	// earliest unprocessed record in the event of a failure.
	Commit(ctx context.Context) error

	Data() []interface{}
}

type RecordTimeField

type RecordTimeField struct {
	NameVal     string
	DestNameVal string
	Layout      string // Layout tells how the time should be parsed. Defaults to RFC3339.
	// need a way to create other time fields in the record (add time/quantum to String, StringArray, ID, IDArray?)
	// do we need a way to have timefields in a record with independent times/values
	Epoch time.Time
	Unit  Unit
}

RecordTimeField applies to whole record, but doesn't have a name (or quantum) of its own since it applies to any other time fields in the record.

func (RecordTimeField) DestName

func (r RecordTimeField) DestName() string

func (RecordTimeField) Name

func (r RecordTimeField) Name() string

func (RecordTimeField) PilosafyVal

func (r RecordTimeField) PilosafyVal(val interface{}) (interface{}, error)

PilosafyVal for RecordTimeField always returns a time.Time or nil.

type Recordizer

type Recordizer func(rawRec []interface{}, rec *pilosabatch.Row) error

type SchemaManager

type SchemaManager interface {
	StartTransaction(id string, timeout time.Duration, exclusive bool, requestTimeout time.Duration) (*pilosacore.Transaction, error)
	FinishTransaction(id string) (*pilosacore.Transaction, error)
	Schema() (*pilosaclient.Schema, error)
	SyncIndex(index *pilosaclient.Index) error
	DeleteIndex(index *pilosaclient.Index) error
	Status() (pilosaclient.Status, error)
	SetAuthToken(string)
}

SchemaManager is meant to be an interface for managing schema information; i.e. for interacting with a single source of truth for schema information, like the MDS Schemar. But... it currently contains methods which are not related to schema because the first goal was just to introduce an interface in ingest.go for any methods being called on *m.client. We don't want a FeatureBase client directly called from ingest, rather, we want to call these interface methods and allow for different implementations (such as an MDS implementation which uses the Schemar in MDS as opposed to a FeatureBase node or cluster).

var NopSchemaManager SchemaManager = &nopSchemaManager{}

NopSchemaManager is an implementation of the SchemaManager interface that doesn't do anything.

type SignedIntBoolKeyField

type SignedIntBoolKeyField struct {
	NameVal     string
	DestNameVal string
}

SignedIntBoolKeyField translates a signed integer value to a (rowID, bool) pair corresponding to the magnitude and sign of the original value. This may be used to specify whether a bool value is to be set (positive/true) or cleared (negative/false).

func (SignedIntBoolKeyField) DestName

func (b SignedIntBoolKeyField) DestName() string

func (SignedIntBoolKeyField) Name

func (b SignedIntBoolKeyField) Name() string

func (SignedIntBoolKeyField) PilosafyVal

func (SignedIntBoolKeyField) PilosafyVal(val interface{}) (interface{}, error)

type Source

type Source interface {
	// Record returns a data record, and an optional error. If the
	// error is ErrSchemaChange, then the record is valid, but one
	// should call Source.Schema to understand how each of its fields
	// should be interpreted.
	Record() (Record, error)

	// Schema returns a slice of Fields which applies to the most
	// recent Record returned from Source.Record. Every Field has a
	// name and a type, and depending on the concrete type of the
	// Field, may have other information which is relevant to how it
	// should be indexed.
	Schema() []Field

	Close() error
}

Source is an interface implemented by sources of data which can be ingested into Pilosa. Each Record returned from Record is described by the slice of Fields returned from Source.Schema directly after the call to Source.Record. If the error returned from Source.Record is nil, then the call to Schema which applied to the previous Record also applies to this Record. Source implementations are fundamentally not threadsafe (due to the interplay between Record and Schema).

type StringArrayField

type StringArrayField struct {
	NameVal     string
	DestNameVal string

	// Quantum — see note about Quantum on "Field" interface.
	Quantum string

	TTL string

	*CacheConfig
}

func (StringArrayField) DestName

func (s StringArrayField) DestName() string

func (StringArrayField) Name

func (s StringArrayField) Name() string

func (StringArrayField) PilosafyVal

func (StringArrayField) PilosafyVal(val interface{}) (interface{}, error)

type StringField

type StringField struct {
	NameVal     string
	DestNameVal string

	// Mutex denotes whether we need to enforce that each record only
	// has a single value for this field. Put another way, says
	// whether a new value for this field be treated as adding an
	// additional value, or replacing the existing value (if there is
	// one).
	Mutex bool

	// Quantum — see note about Quantum on "Field" interface.
	Quantum string

	TTL string

	*CacheConfig
}

func (StringField) DestName

func (s StringField) DestName() string

func (StringField) Name

func (s StringField) Name() string

func (StringField) PilosafyVal

func (s StringField) PilosafyVal(val interface{}) (interface{}, error)

type TLSConfig

type TLSConfig struct {
	// CertificatePath contains the path to the certificate (.crt or .pem file)
	CertificatePath string `json:"certificate" help:"Path to certificate file, or literal PEM data."`
	// CertificateKeyPath contains the path to the certificate key (.key file)
	CertificateKeyPath string `json:"key" help:"Path to certificate key file, or literal PEM data."`
	// CACertPath is the path to a CA certificate (.crt or .pem file)
	CACertPath string `json:"ca-certificate" help:"Path to CA certificate file, or literal PEM data."`
	// SkipVerify disables verification of server certificates.
	SkipVerify bool `json:"skip-verify" help:"Disables verification of server certificates."`
	// EnableClientVerification enables verification of client TLS certificates (Mutual TLS)
	EnableClientVerification bool `json:"enable-client-verification" help:"Enable verification of client certificates."`
}

TLSConfig contains TLS configuration. *Path elements can be set with local file paths, OR with literal PEM data, which is detected automatically (naively). Passing cert data directly via environment variables is used for m-cloud deployment, secured via SSM. This makes the parameter names slightly inaccurate, but it avoids introducing yet another configuration parameter.

type TimestampField

type TimestampField struct {
	NameVal     string
	DestNameVal string
	Layout      string
	Epoch       time.Time
	Unit        Unit
	Granularity string
}

func (TimestampField) DestName

func (d TimestampField) DestName() string

func (TimestampField) Name

func (d TimestampField) Name() string

func (TimestampField) PilosafyVal

func (t TimestampField) PilosafyVal(val interface{}) (interface{}, error)

PilosafyVal for TimestampField always returns an int or nil.

type Unit

type Unit string

func (Unit) Duration

func (u Unit) Duration() (time.Duration, error)

func (Unit) DurationFromValue

func (u Unit) DurationFromValue(val int64) (time.Duration, error)

func (Unit) IsCustom

func (u Unit) IsCustom() bool

func (Unit) ToNanos

func (u Unit) ToNanos() (int64, error)

ToNanos returns the number of Nanoseconds per given Unit

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL