Documentation ¶
Overview ¶
package all
This package is the canonical location for all migrations being made against the single shared kv.Store implementation used by InfluxDB (while it remains a single store).
The array all.Migrations contains the list of migration specifications which drives the serial set of migration operations required to correctly configure the backing metadata store for InfluxDB.
This package is arranged like so:
doc.go - this piece of documentation. all.go - definition of Migration array referencing each of the name migrations in number migration files (below). migration.go - an implementation of migration.Spec for convenience. 000X_migration_name.go (example) - N files contains the specific implementations of each migration enumerated in `all.go`. ...
Managing this list of files and all.go can be fiddly. There is a buildable cli utility called `kvmigrate` in the `internal/cmd/kvmigrate` package. This has a command `create` which automatically creates a new migration in the expected location and appends it appropriately into the all.go Migration array.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var Migration0001_InitialMigration = kv.InitialMigration{}
Migration0001_InitialMigration contains all the buckets created before the time of migrations in kv
var Migration0002_AddURMByUserIndex = kv.NewIndexMigration(index.URMByUserIndexMapping, kv.WithIndexMigrationCleanup)
Migration0002_AddURMByUserIndex creates the URM by user index and populates missing entries based on the source.
var Migration0003_TaskOwnerIDUpMigration = UpOnlyMigration( "migrate task owner id", func(ctx context.Context, store kv.SchemaStore) error { var ownerlessTasks []*influxdb.Task err := store.View(ctx, func(tx kv.Tx) error { taskBucket, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := taskBucket.ForwardCursor([]byte{}) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } for k, v := c.Next(); k != nil; k, v = c.Next() { kvTask := &kvTask{} if err := json.Unmarshal(v, kvTask); err != nil { return influxdb.ErrInternalTaskServiceError(err) } t := kvToInfluxTask(kvTask) if !t.OwnerID.Valid() { ownerlessTasks = append(ownerlessTasks, t) } } if err := c.Err(); err != nil { return err } return c.Close() }) if err != nil { return err } for _, t := range ownerlessTasks { err := store.Update(ctx, func(tx kv.Tx) error { taskKey, err := taskKey(t.ID) if err != nil { return err } b, err := tx.Bucket(taskBucket) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } if !t.OwnerID.Valid() { v, err := b.Get(taskKey) if kv.IsNotFound(err) { return influxdb.ErrTaskNotFound } authType := struct { AuthorizationID influxdb.ID `json:"authorizationID"` }{} if err := json.Unmarshal(v, &authType); err != nil { return influxdb.ErrInternalTaskServiceError(err) } encodedID, err := authType.AuthorizationID.Encode() if err == nil { authBucket, err := tx.Bucket([]byte("authorizationsv1")) if err != nil { return err } a, err := authBucket.Get(encodedID) if err == nil { auth := &influxdb.Authorization{} if err := json.Unmarshal(a, auth); err != nil { return err } t.OwnerID = auth.GetUserID() } } } if !t.OwnerID.Valid() { b, err := tx.Bucket([]byte("userresourcemappingsv1")) if err != nil { return err } id, err := t.OrganizationID.Encode() if err != nil { return err } cur, err := b.ForwardCursor(id, kv.WithCursorPrefix(id)) if err != nil { return err } for k, v := cur.Next(); k != nil; k, v = cur.Next() { m := &influxdb.UserResourceMapping{} if err := json.Unmarshal(v, m); err != nil { return err } if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner { t.OwnerID = m.UserID break } } if err := cur.Close(); err != nil { return err } } if !t.OwnerID.Valid() { return &influxdb.Error{ Code: influxdb.EInternal, Msg: "could not populate owner ID for task", } } taskBytes, err := json.Marshal(t) if err != nil { return influxdb.ErrInternalTaskServiceError(err) } err = b.Put(taskKey, taskBytes) if err != nil { return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil }) if err != nil { return err } } return nil }, )
Migration0003_TaskOwnerIDUpMigration adds missing owner IDs to some legacy tasks
var Migration0004_AddDbrpBuckets = migration.CreateBuckets(
"create DBRP buckets",
dbrpBucket,
dbrpIndexBucket,
dbrpDefaultBucket,
)
Migration0004_AddDbrpBuckets creates the buckets necessary for the DBRP Service to operate.
var Migration0005_AddPkgerBuckets = migration.CreateBuckets(
"create pkger stacks buckets",
pkgerStacksBucket,
pkgerStackIndexBucket,
)
Migration0005_AddPkgerBuckets creates the buckets necessary for the pkger service to operate.
var Migration0006_DeleteBucketSessionsv1 = migration.DeleteBuckets("delete sessionsv1 bucket", []byte("sessionsv1"))
Migration0006_DeleteBucketSessionsv1 removes the sessionsv1 bucket from the backing kv store.
var Migration0007_CreateMetaDataBucket = migration.CreateBuckets( "Create TSM metadata buckets", meta.BucketName)
var Migration0008_LegacyAuthBuckets = migration.CreateBuckets( "Create Legacy authorization buckets", []byte("legacy/authorizationsv1"), []byte("legacy/authorizationindexv1"))
var Migration0009_LegacyAuthPasswordBuckets = migration.CreateBuckets( "Create legacy auth password bucket", []byte("legacy/authorizationPasswordv1"))
var Migration0010_AddIndexTelegrafByOrg = kv.NewIndexMigration(telegraf.ByOrganizationIndexMapping, kv.WithIndexMigrationCleanup)
Migration0010_AddIndexTelegrafByOrg adds the index telegraf configs by organization ID
var Migration0011_PopulateDashboardsOwnerId = UpOnlyMigration("populate dashboards owner id", func(ctx context.Context, store kv.SchemaStore) error { var urmBucket = []byte("userresourcemappingsv1") type userResourceMapping struct { UserID influxdb.ID `json:"userID"` UserType influxdb.UserType `json:"userType"` MappingType influxdb.MappingType `json:"mappingType"` ResourceType influxdb.ResourceType `json:"resourceType"` ResourceID influxdb.ID `json:"resourceID"` } var mappings []*userResourceMapping if err := store.View(ctx, func(tx kv.Tx) error { bkt, err := tx.Bucket(urmBucket) if err != nil { return err } cursor, err := bkt.ForwardCursor(nil) if err != nil { return err } return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) { var mapping userResourceMapping if err := json.Unmarshal(v, &mapping); err != nil { return false, err } if mapping.ResourceType == influxdb.DashboardsResourceType && mapping.UserType == influxdb.Owner { mappings = append(mappings, &mapping) } return true, nil }) }); err != nil { return err } var dashboardsBucket = []byte("dashboardsv2") // dashboard represents all visual and query data for a dashboard. type dashboard struct { ID influxdb.ID `json:"id,omitempty"` OrganizationID influxdb.ID `json:"orgID,omitempty"` Name string `json:"name"` Description string `json:"description"` Cells []*influxdb.Cell `json:"cells"` Meta influxdb.DashboardMeta `json:"meta"` OwnerID *influxdb.ID `json:"owner,omitempty"` } var ( batchSize = 100 flush = func(batch []*userResourceMapping) (err error) { ids := make([][]byte, len(batch)) for i, urm := range batch { ids[i], err = urm.ResourceID.Encode() if err != nil { return } } return store.Update(ctx, func(tx kv.Tx) error { bkt, err := tx.Bucket(dashboardsBucket) if err != nil { return err } values, err := bkt.GetBatch(ids...) if err != nil { return err } for i, value := range values { var dashboard dashboard if err := json.Unmarshal(value, &dashboard); err != nil { return err } if dashboard.OwnerID != nil { fmt.Printf("dashboard %q already has owner %q", dashboard.ID, dashboard.OwnerID) continue } dashboard.OwnerID = &batch[i].UserID updated, err := json.Marshal(dashboard) if err != nil { return err } return bkt.Put(ids[i], updated) } return nil }) } ) for i := 0; i < len(mappings); i += batchSize { end := i + batchSize if end > len(mappings) { end = len(mappings) } flush(mappings[i:end]) } return nil })
Migration0011_PopulateDashboardsOwnerId backfills owner IDs on dashboards based on the presence of user resource mappings
var Migration0012_DBRPByOrgIndex = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping, kv.WithIndexMigrationCleanup)
var Migration0013_RepairDBRPOwnerAndBucketIDs = UpOnlyMigration( "repair DBRP owner and bucket IDs", func(ctx context.Context, store kv.SchemaStore) error { type oldStyleMapping struct { ID influxdb.ID `json:"id"` Database string `json:"database"` RetentionPolicy string `json:"retention_policy"` Default bool `json:"default"` // These 2 fields were renamed. OrganizationID influxdb.ID `json:"organization_id"` BucketID influxdb.ID `json:"bucket_id"` } // Collect DBRPs that are using the old schema. var mappings []*oldStyleMapping if err := store.View(ctx, func(tx kv.Tx) error { bkt, err := tx.Bucket(dbrpBucket) if err != nil { return err } cursor, err := bkt.ForwardCursor(nil) if err != nil { return err } return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) { var mapping oldStyleMapping if err := json.Unmarshal(v, &mapping); err != nil { return false, err } if mapping.OrganizationID.Valid() && mapping.BucketID.Valid() { mappings = append(mappings, &mapping) } return true, nil }) }); err != nil { return err } type newStyleDbrpMapping struct { ID influxdb.ID `json:"id"` Database string `json:"database"` RetentionPolicy string `json:"retention_policy"` Default bool `json:"default"` // New names for the 2 renamed fields. OrganizationID influxdb.ID `json:"orgID"` BucketID influxdb.ID `json:"bucketID"` } batchSize := 100 writeBatch := func(batch []*oldStyleMapping) (err error) { ids := make([][]byte, len(batch)) for i, mapping := range batch { ids[i], err = mapping.ID.Encode() if err != nil { return } } return store.Update(ctx, func(tx kv.Tx) error { bkt, err := tx.Bucket(dbrpBucket) if err != nil { return err } values, err := bkt.GetBatch(ids...) if err != nil { return err } for i, value := range values { var mapping newStyleDbrpMapping if err := json.Unmarshal(value, &mapping); err != nil { return err } if !mapping.OrganizationID.Valid() { mapping.OrganizationID = batch[i].OrganizationID } if !mapping.BucketID.Valid() { mapping.BucketID = batch[i].BucketID } updated, err := json.Marshal(mapping) if err != nil { return err } if err := bkt.Put(ids[i], updated); err != nil { return err } } return nil }) } for i := 0; i < len(mappings); i += batchSize { end := i + batchSize if end > len(mappings) { end = len(mappings) } if err := writeBatch(mappings[i:end]); err != nil { return err } } return nil }, )
var Migration0014_ReindexDBRPs = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping)
var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration( "record shard group durations in bucket metadata", func(ctx context.Context, store kv.SchemaStore) error { type bucket struct { ID influxdb.ID `json:"id,omitempty"` OrgID influxdb.ID `json:"orgID,omitempty"` Type int `json:"type"` Name string `json:"name"` Description string `json:"description"` RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources RetentionPeriod time.Duration `json:"retentionPeriod"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` // This is expected to be 0 for all buckets created before // we began tracking it in metadata. ShardGroupDuration time.Duration `json:"shardGroupDuration"` } bucketBucket := []byte("bucketsv1") // Collect buckets that need to be updated var buckets []*bucket if err := store.View(ctx, func(tx kv.Tx) error { bkt, err := tx.Bucket(bucketBucket) if err != nil { return err } cursor, err := bkt.ForwardCursor(nil) if err != nil { return err } return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) { var b bucket if err := json.Unmarshal(v, &b); err != nil { return false, err } if b.ShardGroupDuration == 0 { buckets = append(buckets, &b) } return true, nil }) }); err != nil { return err } batchSize := 100 writeBatch := func(batch []*bucket) (err error) { ids := make([][]byte, len(batch)) for i, b := range batch { ids[i], err = b.ID.Encode() if err != nil { return err } } return store.Update(ctx, func(tx kv.Tx) error { bkt, err := tx.Bucket(bucketBucket) if err != nil { return err } values, err := bkt.GetBatch(ids...) if err != nil { return err } for i, value := range values { var b bucket if err := json.Unmarshal(value, &b); err != nil { return err } if b.ShardGroupDuration == 0 { b.ShardGroupDuration = meta.NormalisedShardDuration(0, b.RetentionPeriod) } updated, err := json.Marshal(b) if err != nil { return err } if err := bkt.Put(ids[i], updated); err != nil { return err } } return nil }) } for i := 0; i < len(buckets); i += batchSize { end := i + batchSize if end > len(buckets) { end = len(buckets) } if err := writeBatch(buckets[i:end]); err != nil { return err } } return nil }, )
var Migrations = [...]migration.Spec{ Migration0001_InitialMigration, Migration0002_AddURMByUserIndex, Migration0003_TaskOwnerIDUpMigration, Migration0004_AddDbrpBuckets, Migration0005_AddPkgerBuckets, Migration0006_DeleteBucketSessionsv1, Migration0007_CreateMetaDataBucket, Migration0008_LegacyAuthBuckets, Migration0009_LegacyAuthPasswordBuckets, Migration0010_AddIndexTelegrafByOrg, Migration0011_PopulateDashboardsOwnerId, Migration0012_DBRPByOrgIndex, Migration0013_RepairDBRPOwnerAndBucketIDs, Migration0014_ReindexDBRPs, Migration0015_RecordShardGroupDurationsInBucketMetadata, }
Migrations contains all the migrations required for the entire of the kv store backing influxdb's metadata.
Functions ¶
Types ¶
type Migration ¶
type Migration struct {
// contains filtered or unexported fields
}
Migration is a type which implements the migration packages Spec interface It can be used to conveniently create migration specs for the all package
func UpOnlyMigration ¶
func UpOnlyMigration(name string, up MigrationFunc) *Migration
UpOnlyMigration is a migration with an up function and a noop down function
func (*Migration) MigrationName ¶
MigrationName returns the underlying name of the migation
type MigrationFunc ¶
type MigrationFunc func(context.Context, kv.SchemaStore) error
MigrationFunc is a function which can be used as either an up or down operation.
Source Files ¶
- 0001_initial_migration.go
- 0002_urm_by_user_index.go
- 0003_task_owners.go
- 0004_add_dbrp_buckets.go
- 0005_add_pkger_buckets.go
- 0006_delete-bucket-sessionsv1.go
- 0007_CreateMetaDataBucket.go
- 0008_LegacyAuthBuckets.go
- 0009_LegacyAuthPasswordBuckets.go
- 0010_add-index-telegraf-by-org.go
- 0011_populate-dashboards-owner-id.go
- 0012_dbrp_by_org_index.go
- 0013_repair-DBRP-owner-and-bucket-IDs.go
- 0014_reindex-DBRPs.go
- 0015_record-shard-group-durations-in-bucket-metadata.go
- all.go
- doc.go
- migration.go