all

package
v2.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

package all

This package is the canonical location for all migrations being made against the single shared kv.Store implementation used by InfluxDB (while it remains a single store).

The array all.Migrations contains the list of migration specifications which drives the serial set of migration operations required to correctly configure the backing metadata store for InfluxDB.

This package is arranged like so:

doc.go - this piece of documentation.
all.go - definition of Migration array referencing each of the name migrations in number migration files (below).
migration.go - an implementation of migration.Spec for convenience.
000X_migration_name.go (example) - N files contains the specific implementations of each migration enumerated in `all.go`.
...

Managing this list of files and all.go can be fiddly. There is a buildable cli utility called `kvmigrate` in the `internal/cmd/kvmigrate` package. This has a command `create` which automatically creates a new migration in the expected location and appends it appropriately into the all.go Migration array.

Index

Constants

This section is empty.

Variables

View Source
var Migration0001_InitialMigration = kv.InitialMigration{}

Migration0001_InitialMigration contains all the buckets created before the time of migrations in kv

Migration0002_AddURMByUserIndex creates the URM by user index and populates missing entries based on the source.

View Source
var Migration0003_TaskOwnerIDUpMigration = UpOnlyMigration(
	"migrate task owner id",
	func(ctx context.Context, store kv.SchemaStore) error {
		var ownerlessTasks []*influxdb.Task

		err := store.View(ctx, func(tx kv.Tx) error {
			taskBucket, err := tx.Bucket(taskBucket)
			if err != nil {
				return influxdb.ErrUnexpectedTaskBucketErr(err)
			}

			c, err := taskBucket.ForwardCursor([]byte{})
			if err != nil {
				return influxdb.ErrUnexpectedTaskBucketErr(err)
			}

			for k, v := c.Next(); k != nil; k, v = c.Next() {
				kvTask := &kvTask{}
				if err := json.Unmarshal(v, kvTask); err != nil {
					return influxdb.ErrInternalTaskServiceError(err)
				}

				t := kvToInfluxTask(kvTask)

				if !t.OwnerID.Valid() {
					ownerlessTasks = append(ownerlessTasks, t)
				}
			}
			if err := c.Err(); err != nil {
				return err
			}

			return c.Close()
		})
		if err != nil {
			return err
		}

		for _, t := range ownerlessTasks {

			err := store.Update(ctx, func(tx kv.Tx) error {
				taskKey, err := taskKey(t.ID)
				if err != nil {
					return err
				}
				b, err := tx.Bucket(taskBucket)
				if err != nil {
					return influxdb.ErrUnexpectedTaskBucketErr(err)
				}

				if !t.OwnerID.Valid() {
					v, err := b.Get(taskKey)
					if kv.IsNotFound(err) {
						return influxdb.ErrTaskNotFound
					}
					authType := struct {
						AuthorizationID influxdb.ID `json:"authorizationID"`
					}{}
					if err := json.Unmarshal(v, &authType); err != nil {
						return influxdb.ErrInternalTaskServiceError(err)
					}

					encodedID, err := authType.AuthorizationID.Encode()
					if err == nil {
						authBucket, err := tx.Bucket([]byte("authorizationsv1"))
						if err != nil {
							return err
						}

						a, err := authBucket.Get(encodedID)
						if err == nil {
							auth := &influxdb.Authorization{}
							if err := json.Unmarshal(a, auth); err != nil {
								return err
							}

							t.OwnerID = auth.GetUserID()
						}
					}

				}

				if !t.OwnerID.Valid() {
					b, err := tx.Bucket([]byte("userresourcemappingsv1"))
					if err != nil {
						return err
					}

					id, err := t.OrganizationID.Encode()
					if err != nil {
						return err
					}

					cur, err := b.ForwardCursor(id, kv.WithCursorPrefix(id))
					if err != nil {
						return err
					}

					for k, v := cur.Next(); k != nil; k, v = cur.Next() {
						m := &influxdb.UserResourceMapping{}
						if err := json.Unmarshal(v, m); err != nil {
							return err
						}
						if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner {
							t.OwnerID = m.UserID
							break
						}
					}

					if err := cur.Close(); err != nil {
						return err
					}
				}

				if !t.OwnerID.Valid() {
					return &influxdb.Error{
						Code: influxdb.EInternal,
						Msg:  "could not populate owner ID for task",
					}
				}

				taskBytes, err := json.Marshal(t)
				if err != nil {
					return influxdb.ErrInternalTaskServiceError(err)
				}

				err = b.Put(taskKey, taskBytes)
				if err != nil {
					return influxdb.ErrUnexpectedTaskBucketErr(err)
				}
				return nil
			})
			if err != nil {
				return err
			}
		}
		return nil
	},
)

Migration0003_TaskOwnerIDUpMigration adds missing owner IDs to some legacy tasks

View Source
var Migration0004_AddDbrpBuckets = migration.CreateBuckets(
	"create DBRP buckets",
	dbrpBucket,
	dbrpIndexBucket,
	dbrpDefaultBucket,
)

Migration0004_AddDbrpBuckets creates the buckets necessary for the DBRP Service to operate.

View Source
var Migration0005_AddPkgerBuckets = migration.CreateBuckets(
	"create pkger stacks buckets",
	pkgerStacksBucket,
	pkgerStackIndexBucket,
)

Migration0005_AddPkgerBuckets creates the buckets necessary for the pkger service to operate.

View Source
var Migration0006_DeleteBucketSessionsv1 = migration.DeleteBuckets("delete sessionsv1 bucket", []byte("sessionsv1"))

Migration0006_DeleteBucketSessionsv1 removes the sessionsv1 bucket from the backing kv store.

View Source
var Migration0007_CreateMetaDataBucket = migration.CreateBuckets(
	"Create TSM metadata buckets",
	meta.BucketName)
View Source
var Migration0008_LegacyAuthBuckets = migration.CreateBuckets(
	"Create Legacy authorization buckets",
	[]byte("legacy/authorizationsv1"), []byte("legacy/authorizationindexv1"))
View Source
var Migration0009_LegacyAuthPasswordBuckets = migration.CreateBuckets(
	"Create legacy auth password bucket",
	[]byte("legacy/authorizationPasswordv1"))

Migration0010_AddIndexTelegrafByOrg adds the index telegraf configs by organization ID

View Source
var Migration0011_PopulateDashboardsOwnerId = UpOnlyMigration("populate dashboards owner id", func(ctx context.Context, store kv.SchemaStore) error {
	var urmBucket = []byte("userresourcemappingsv1")
	type userResourceMapping struct {
		UserID       influxdb.ID           `json:"userID"`
		UserType     influxdb.UserType     `json:"userType"`
		MappingType  influxdb.MappingType  `json:"mappingType"`
		ResourceType influxdb.ResourceType `json:"resourceType"`
		ResourceID   influxdb.ID           `json:"resourceID"`
	}

	var mappings []*userResourceMapping
	if err := store.View(ctx, func(tx kv.Tx) error {
		bkt, err := tx.Bucket(urmBucket)
		if err != nil {
			return err
		}

		cursor, err := bkt.ForwardCursor(nil)
		if err != nil {
			return err
		}

		return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
			var mapping userResourceMapping
			if err := json.Unmarshal(v, &mapping); err != nil {
				return false, err
			}

			if mapping.ResourceType == influxdb.DashboardsResourceType &&
				mapping.UserType == influxdb.Owner {
				mappings = append(mappings, &mapping)
			}

			return true, nil
		})
	}); err != nil {
		return err
	}

	var dashboardsBucket = []byte("dashboardsv2")
	// dashboard represents all visual and query data for a dashboard.
	type dashboard struct {
		ID             influxdb.ID            `json:"id,omitempty"`
		OrganizationID influxdb.ID            `json:"orgID,omitempty"`
		Name           string                 `json:"name"`
		Description    string                 `json:"description"`
		Cells          []*influxdb.Cell       `json:"cells"`
		Meta           influxdb.DashboardMeta `json:"meta"`
		OwnerID        *influxdb.ID           `json:"owner,omitempty"`
	}

	var (
		batchSize = 100
		flush     = func(batch []*userResourceMapping) (err error) {
			ids := make([][]byte, len(batch))
			for i, urm := range batch {
				ids[i], err = urm.ResourceID.Encode()
				if err != nil {
					return
				}
			}

			return store.Update(ctx, func(tx kv.Tx) error {
				bkt, err := tx.Bucket(dashboardsBucket)
				if err != nil {
					return err
				}

				values, err := bkt.GetBatch(ids...)
				if err != nil {
					return err
				}

				for i, value := range values {
					var dashboard dashboard
					if err := json.Unmarshal(value, &dashboard); err != nil {
						return err
					}

					if dashboard.OwnerID != nil {
						fmt.Printf("dashboard %q already has owner %q", dashboard.ID, dashboard.OwnerID)
						continue
					}

					dashboard.OwnerID = &batch[i].UserID

					updated, err := json.Marshal(dashboard)
					if err != nil {
						return err
					}

					return bkt.Put(ids[i], updated)
				}

				return nil
			})
		}
	)

	for i := 0; i < len(mappings); i += batchSize {
		end := i + batchSize
		if end > len(mappings) {
			end = len(mappings)
		}

		flush(mappings[i:end])
	}

	return nil
})

Migration0011_PopulateDashboardsOwnerId backfills owner IDs on dashboards based on the presence of user resource mappings

View Source
var Migration0013_RepairDBRPOwnerAndBucketIDs = UpOnlyMigration(
	"repair DBRP owner and bucket IDs",
	func(ctx context.Context, store kv.SchemaStore) error {
		type oldStyleMapping struct {
			ID              influxdb.ID `json:"id"`
			Database        string      `json:"database"`
			RetentionPolicy string      `json:"retention_policy"`
			Default         bool        `json:"default"`

			// These 2 fields were renamed.
			OrganizationID influxdb.ID `json:"organization_id"`
			BucketID       influxdb.ID `json:"bucket_id"`
		}

		// Collect DBRPs that are using the old schema.
		var mappings []*oldStyleMapping
		if err := store.View(ctx, func(tx kv.Tx) error {
			bkt, err := tx.Bucket(dbrpBucket)
			if err != nil {
				return err
			}

			cursor, err := bkt.ForwardCursor(nil)
			if err != nil {
				return err
			}

			return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
				var mapping oldStyleMapping
				if err := json.Unmarshal(v, &mapping); err != nil {
					return false, err
				}

				if mapping.OrganizationID.Valid() && mapping.BucketID.Valid() {
					mappings = append(mappings, &mapping)
				}

				return true, nil
			})
		}); err != nil {
			return err
		}

		type newStyleDbrpMapping struct {
			ID              influxdb.ID `json:"id"`
			Database        string      `json:"database"`
			RetentionPolicy string      `json:"retention_policy"`
			Default         bool        `json:"default"`

			// New names for the 2 renamed fields.
			OrganizationID influxdb.ID `json:"orgID"`
			BucketID       influxdb.ID `json:"bucketID"`
		}
		batchSize := 100
		writeBatch := func(batch []*oldStyleMapping) (err error) {
			ids := make([][]byte, len(batch))
			for i, mapping := range batch {
				ids[i], err = mapping.ID.Encode()
				if err != nil {
					return
				}
			}

			return store.Update(ctx, func(tx kv.Tx) error {
				bkt, err := tx.Bucket(dbrpBucket)
				if err != nil {
					return err
				}

				values, err := bkt.GetBatch(ids...)
				if err != nil {
					return err
				}

				for i, value := range values {
					var mapping newStyleDbrpMapping
					if err := json.Unmarshal(value, &mapping); err != nil {
						return err
					}

					if !mapping.OrganizationID.Valid() {
						mapping.OrganizationID = batch[i].OrganizationID
					}
					if !mapping.BucketID.Valid() {
						mapping.BucketID = batch[i].BucketID
					}

					updated, err := json.Marshal(mapping)
					if err != nil {
						return err
					}

					if err := bkt.Put(ids[i], updated); err != nil {
						return err
					}
				}

				return nil
			})
		}

		for i := 0; i < len(mappings); i += batchSize {
			end := i + batchSize
			if end > len(mappings) {
				end = len(mappings)
			}
			if err := writeBatch(mappings[i:end]); err != nil {
				return err
			}
		}

		return nil
	},
)
View Source
var Migration0014_ReindexDBRPs = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping)
View Source
var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration(
	"record shard group durations in bucket metadata",
	func(ctx context.Context, store kv.SchemaStore) error {
		type bucket struct {
			ID                  influxdb.ID   `json:"id,omitempty"`
			OrgID               influxdb.ID   `json:"orgID,omitempty"`
			Type                int           `json:"type"`
			Name                string        `json:"name"`
			Description         string        `json:"description"`
			RetentionPolicyName string        `json:"rp,omitempty"` // This to support v1 sources
			RetentionPeriod     time.Duration `json:"retentionPeriod"`
			CreatedAt           time.Time     `json:"createdAt"`
			UpdatedAt           time.Time     `json:"updatedAt"`

			// This is expected to be 0 for all buckets created before
			// we began tracking it in metadata.
			ShardGroupDuration time.Duration `json:"shardGroupDuration"`
		}
		bucketBucket := []byte("bucketsv1")

		// Collect buckets that need to be updated
		var buckets []*bucket
		if err := store.View(ctx, func(tx kv.Tx) error {
			bkt, err := tx.Bucket(bucketBucket)
			if err != nil {
				return err
			}

			cursor, err := bkt.ForwardCursor(nil)
			if err != nil {
				return err
			}

			return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
				var b bucket
				if err := json.Unmarshal(v, &b); err != nil {
					return false, err
				}
				if b.ShardGroupDuration == 0 {
					buckets = append(buckets, &b)
				}

				return true, nil
			})
		}); err != nil {
			return err
		}

		batchSize := 100
		writeBatch := func(batch []*bucket) (err error) {
			ids := make([][]byte, len(batch))
			for i, b := range batch {
				ids[i], err = b.ID.Encode()
				if err != nil {
					return err
				}
			}

			return store.Update(ctx, func(tx kv.Tx) error {
				bkt, err := tx.Bucket(bucketBucket)
				if err != nil {
					return err
				}

				values, err := bkt.GetBatch(ids...)
				if err != nil {
					return err
				}

				for i, value := range values {
					var b bucket
					if err := json.Unmarshal(value, &b); err != nil {
						return err
					}

					if b.ShardGroupDuration == 0 {

						b.ShardGroupDuration = meta.NormalisedShardDuration(0, b.RetentionPeriod)
					}

					updated, err := json.Marshal(b)
					if err != nil {
						return err
					}
					if err := bkt.Put(ids[i], updated); err != nil {
						return err
					}
				}

				return nil
			})
		}

		for i := 0; i < len(buckets); i += batchSize {
			end := i + batchSize
			if end > len(buckets) {
				end = len(buckets)
			}
			if err := writeBatch(buckets[i:end]); err != nil {
				return err
			}
		}

		return nil
	},
)

Migrations contains all the migrations required for the entire of the kv store backing influxdb's metadata.

Functions

func Up

func Up(ctx context.Context, logger *zap.Logger, store kv.SchemaStore) error

Up is a convenience methods which creates a migrator for all migrations and calls Up on it.

Types

type Migration

type Migration struct {
	// contains filtered or unexported fields
}

Migration is a type which implements the migration packages Spec interface It can be used to conveniently create migration specs for the all package

func UpOnlyMigration

func UpOnlyMigration(name string, up MigrationFunc) *Migration

UpOnlyMigration is a migration with an up function and a noop down function

func (*Migration) Down

func (m *Migration) Down(ctx context.Context, store kv.SchemaStore) error

Down delegates to the underlying anonymous down migration function

func (*Migration) MigrationName

func (m *Migration) MigrationName() string

MigrationName returns the underlying name of the migation

func (*Migration) Up

func (m *Migration) Up(ctx context.Context, store kv.SchemaStore) error

Up delegates to the underlying anonymous up migration function

type MigrationFunc

type MigrationFunc func(context.Context, kv.SchemaStore) error

MigrationFunc is a function which can be used as either an up or down operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL