ds2bq

package module
v2.1.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2018 License: MIT Imports: 22 Imported by: 5

README

ds2bq

Import Datastore backup into BigQuery & Clean up old Datastore backup information.

How it works?

  1. Setup Google Cloud Storage - Object Change Notification.
  2. Setup Datastore Scheduled Backups.
  3. Receive webhook and import data to BigQuery when create backup by cron.
    • appengine(backup cron) -> GCS object (send notification by webhook) -> appengine(import into bq)
  4. Clean up backups on GCS files (by lifecycle) and meta data (on Datastore) by cron.

Setup

Coding

see example.

Prepare
Environment variables & account

We will use above environment variables. You can change it with your favorite settings.

$ SERVICE_ACCOUNT_NAME=gcs-objectchangenotification
$ APP_ID=foobar
$ BACKUP_BUCKET=foobar-datastore-backups
$ API_ENDPOINT=https://foobar.appspot.com/api/gcs/object-change-notification
$ echo ${SERVICE_ACCOUNT_NAME} ${APP_ID} ${BACKUP_BUCKET} ${API_ENDPOINT}

We will exec some commands in local machine. set up gcloud command account that uses service account.

$ gcloud auth activate-service-account ${SERVICE_ACCOUNT_NAME}@${APP_ID}.iam.gserviceaccount.com --key-file <downloaded secret key file path>
$ gcloud auth list

GCS OCN setup

https://cloud.google.com/storage/docs/object-change-notification

You MUST save the execution log.

$ gsutil acl ch -u ${APP_ID}@appspot.gserviceaccount.com:O gs://${BACKUP_BUCKET}
$ gsutil notification watchbucket ${API_ENDPOINT} gs://${BACKUP_BUCKET}
Watching bucket gs://foobar-datastore-backups/ with application URL https://foobar.appspot.com/api/gcs/object-change-notification ...
Successfully created watch notification channel.
Watch channel identifier: XXXXX
Canonicalized resource identifier: YYYYYY
Client state token: None

If you want to stop receiving, You can stop the channel.

$ gsutil notification stopchannel XXXXX YYYYYY

This parameters can't obtaine again using any command. (isn't it?)

GCS lifecycle setup

https://cloud.google.com/storage/docs/managing-lifecycles

Set up expire duration same as DatastoreManagementService#ExpireDuration (go code).

$ cat additional-settings.json
{
  "lifecycle": {
    "rule": [
      {
        "action": {
          "type": "Delete"
        },
        "condition": {
          "age": 30
        }
      }
    ]
  }
}
$ gsutil lifecycle get gs://${BACKUP_BUCKET} > bucket-lifecycle.json
# merge JSON manually
$ gsutil lifecycle set bucket-lifecycle.json gs://${BACKUP_BUCKET}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidID = errors.New("invalid id")

ErrInvalidID is message of Invalid ID error.

View Source
var ErrInvalidState = errors.New("invalid state")

ErrInvalidState is message of Invalid State error.

Functions

func DeleteBackupTaskHandlerFunc

func DeleteBackupTaskHandlerFunc(queueName string) http.HandlerFunc

DeleteBackupTaskHandlerFunc returns a http.HandlerFunc that removes all child entities about AEBackupInformation or AEDatastoreAdminOperation kinds.

func DeleteOldBackupAPIHandlerFunc

func DeleteOldBackupAPIHandlerFunc(queueName, path string) http.HandlerFunc

DeleteOldBackupAPIHandlerFunc returns a http.HandlerFunc that delegate to taskqueue. The path is for DeleteOldBackupTask.

func DeleteOldBackupTaskHandlerFunc

func DeleteOldBackupTaskHandlerFunc(queueName, path string, expireAfter time.Duration) http.HandlerFunc

DeleteOldBackupTaskHandlerFunc returns a http.HandlerFunc that adds tasks to delete old AEBackupInformation. The path is for DeleteBackupTask.

func ExecQuery

func ExecQuery(c context.Context, q *datastore.Query, ldr QueryListLoader) error

ExecQuery with QueryListLoader.

func ImportBigQueryHandleFunc

func ImportBigQueryHandleFunc(datasetID string) http.HandlerFunc

ImportBigQueryHandleFunc returns a http.HandlerFunc that imports GCSObject to BigQuery.

func ReceiveOCN

func ReceiveOCN(c context.Context, obj *GCSObject, queueName, path string) error

ReceiveOCN is Process payload of Object Change Notification

func ReceiveOCNHandleFunc

func ReceiveOCNHandleFunc(bucketName, queueName, path string, kindNames []string) http.HandlerFunc

ReceiveOCNHandleFunc returns a http.HandlerFunc that receives OCN. The path is for

Types

type AEBackupEntityTypeInfo

type AEBackupEntityTypeInfo struct {
	Kind       string                            `json:"kind"`
	Properties []*AEBackupEntityTypeInfoProperty `json:"properties"`
}

AEBackupEntityTypeInfo mapped to field of AEBackupInformationKindTypeInfo type.

type AEBackupEntityTypeInfoProperty

type AEBackupEntityTypeInfoProperty struct {
	EmbeddedEntities []string `json:"embedded_entities"` // 型が不明だったので仮置き
	IsRepeated       bool     `json:"is_repeated"`
	Name             string   `json:"name"`
	PrimitiveTypes   []int    `json:"primitive_types"`
}

AEBackupEntityTypeInfoProperty mapped to field of AEBackupInformationKindTypeInfo type.

type AEBackupInformation

type AEBackupInformation struct {
	Kind string `goon:"kind,_AE_Backup_Information" json:"-"`
	// This kind does not have ParentKey rarely. maybe it comes from restore with backup of other app created.
	ParentKey     *datastore.Key `json:"-" datastore:"-" goon:"parent"` // AEDatastoreAdminOperation
	ID            int64          `datastore:"-" goon:"id"`
	ActiveJobs    []string       `datastore:"active_jobs"`
	CompleteTime  time.Time      `datastore:"complete_time"`
	CompletedJobs []string       `datastore:"completed_jobs"`
	Destination   string         `datastore:"destination"` // This field is string type maybe.
	Filesystem    string         `datastore:"filesystem"`
	GSHandle      string         `datastore:"gs_handle"`
	Kinds         []string       `datastore:"kinds"`
	Name          string         `datastore:"name"`
	OriginalApp   string         `datastore:"original_app"`
	StartTime     time.Time      `datastore:"start_time"`

	AEBackupInformationKindFilesList []*AEBackupInformationKindFiles `datastore:"-"`
}

AEBackupInformation mapped to _AE_Backup_Information kind. +qbg

func (*AEBackupInformation) FetchChildren

func (entity *AEBackupInformation) FetchChildren(c context.Context) error

FetchChildren gathering children and fills fields.

type AEBackupInformationDeleteReq

type AEBackupInformationDeleteReq struct {
	Key string `json:"key"`
}

AEBackupInformationDeleteReq provides request of delete Datastore backup.

func DecodeAEBackupInformationDeleteReq

func DecodeAEBackupInformationDeleteReq(r io.Reader) (*AEBackupInformationDeleteReq, error)

DecodeAEBackupInformationDeleteReq decodes a AEBackupInformationDeleteReq from r.

type AEBackupInformationKindFiles

type AEBackupInformationKindFiles struct {
	Kind      string         `goon:"kind,_AE_Backup_Information_Kind_Files" json:"-"`
	ParentKey *datastore.Key `json:"-" datastore:"-" goon:"parent"` // AEBackupInformation
	ID        string         `datastore:"-" goon:"id"`
	Files     []string       `datastore:"files"`
}

AEBackupInformationKindFiles mapped to _AE_Backup_Information_Kind_Files kind. +qbg

func (*AEBackupInformationKindFiles) FetchChildren

func (entity *AEBackupInformationKindFiles) FetchChildren(c context.Context) error

FetchChildren gathering children and fills fields.

type AEBackupInformationKindTypeInfo

type AEBackupInformationKindTypeInfo struct {
	Kind               string                  `goon:"kind,_AE_Backup_Information_Kind_Type_Info" json:"-"`
	ParentKey          *datastore.Key          `json:"-" datastore:"-" goon:"parent"` // AEBackupKind
	ID                 string                  `datastore:"-" goon:"id"`
	EntityTypeInfo     string                  `datastore:"entity_type_info" json:"-"`
	EntityTypeInfoJSON *AEBackupEntityTypeInfo `datastore:"-" json:"entityTypeInfo"`
	IsPartial          bool                    `datastore:"is_partial"`
}

AEBackupInformationKindTypeInfo mapped to _AE_Backup_Information_Kind_Type_Info kind. +qbg

func (*AEBackupInformationKindTypeInfo) FetchChildren

func (entity *AEBackupInformationKindTypeInfo) FetchChildren(c context.Context) error

FetchChildren gathering children and fills fields.

type AEBackupInformationListLoader

type AEBackupInformationListLoader struct {
	List     []*AEBackupInformation
	Req      ReqListBase
	RespList *RespListBase
}

AEBackupInformationListLoader implements QueryListLoader.

func (*AEBackupInformationListLoader) Append

func (ldr *AEBackupInformationListLoader) Append(v interface{}) error

Append instance to internal list.

func (*AEBackupInformationListLoader) LoadInstance

func (ldr *AEBackupInformationListLoader) LoadInstance(c context.Context, key *datastore.Key) (interface{}, error)

LoadInstance from Datastore.

func (*AEBackupInformationListLoader) PostProcess

func (ldr *AEBackupInformationListLoader) PostProcess(c context.Context) error

PostProcess internal list.

func (*AEBackupInformationListLoader) ReqListBase

func (ldr *AEBackupInformationListLoader) ReqListBase() ReqListBase

ReqListBase returns internal stored ReqListBase.

func (*AEBackupInformationListLoader) RespListBase

func (ldr *AEBackupInformationListLoader) RespListBase() *RespListBase

RespListBase returns internal stored *RespListBase.

type AEBackupKind

type AEBackupKind struct {
	Kind      string         `goon:"kind,Kind" json:"-"`
	ParentKey *datastore.Key `json:"-" datastore:"-" goon:"parent"` // AEBackupInformation
	ID        string         `datastore:"-" goon:"id"`

	AEBackupInformationKindTypeInfoList []*AEBackupInformationKindTypeInfo `datastore:"-"`
}

AEBackupKind mapped to _AE_DatastoreAdmin_Operation/_AE_Backup_Information/Kind kind. +qbg

func (*AEBackupKind) FetchChildren

func (entity *AEBackupKind) FetchChildren(c context.Context) error

FetchChildren gathering children and fills fields.

type AEDatastoreAdminOperation

type AEDatastoreAdminOperation struct {
	Kind          string    `goon:"kind,_AE_DatastoreAdmin_Operation" json:"-"`
	ID            int64     `datastore:"-" goon:"id"`
	ActiveJobIDs  []string  `datastore:"active_job_ids"`
	ActiveJobs    int       `datastore:"active_jobs"`
	CompletedJobs int       `datastore:"completed_jobs"`
	Description   string    `datastore:"description"`
	LastUpdated   time.Time `datastore:"last_updated"`
	ServiceJobID  string    `datastore:"service_job_id"` // 型が不明だったので仮置き
	Status        string    `datastore:"status"`
	StatusInfo    string    `datastore:"status_info"`

	AEBackupInformationList []*AEBackupInformation `datastore:"-"`
	AEBackupKindList        []*AEBackupKind        `datastore:"-"`
}

AEDatastoreAdminOperation mapped to _AE_DatastoreAdmin_Operation kind. +qbg

func (*AEDatastoreAdminOperation) FetchChildren

func (entity *AEDatastoreAdminOperation) FetchChildren(c context.Context) error

FetchChildren gathering children and fills fields.

type AEDatastoreAdminOperationListLoader

type AEDatastoreAdminOperationListLoader struct {
	List     []*AEDatastoreAdminOperation
	Req      ReqListBase
	RespList *RespListBase
}

AEDatastoreAdminOperationListLoader implements QueryListLoader.

func (*AEDatastoreAdminOperationListLoader) Append

func (ldr *AEDatastoreAdminOperationListLoader) Append(v interface{}) error

Append instance to internal list.

func (*AEDatastoreAdminOperationListLoader) LoadInstance

func (ldr *AEDatastoreAdminOperationListLoader) LoadInstance(c context.Context, key *datastore.Key) (interface{}, error)

LoadInstance from Datastore.

func (*AEDatastoreAdminOperationListLoader) PostProcess

PostProcess internal list.

func (*AEDatastoreAdminOperationListLoader) ReqListBase

ReqListBase returns internal stored ReqListBase.

func (*AEDatastoreAdminOperationListLoader) RespListBase

RespListBase returns internal stored *RespListBase.

type AEDatastoreStore

type AEDatastoreStore struct{}

AEDatastoreStore provides methods of Datastore backup information handling.

func (*AEDatastoreStore) DeleteAEBackupInformationAndRelatedData

func (store *AEDatastoreStore) DeleteAEBackupInformationAndRelatedData(c context.Context, key *datastore.Key) error

DeleteAEBackupInformationAndRelatedData removes all child entities about AEBackupInformation or AEDatastoreAdminOperation kinds.

func (*AEDatastoreStore) GetAEBackupInformation

func (store *AEDatastoreStore) GetAEBackupInformation(c context.Context, parentKey *datastore.Key, id int64) (*AEBackupInformation, error)

GetAEBackupInformation returns AEBackupInformation that specified id.

func (*AEDatastoreStore) GetAEDatastoreAdminOperation

func (store *AEDatastoreStore) GetAEDatastoreAdminOperation(c context.Context, id int64) (*AEDatastoreAdminOperation, error)

GetAEDatastoreAdminOperation returns AEDatastoreAdminOperation that specified by id.

func (*AEDatastoreStore) ListAEBackupInformation

func (store *AEDatastoreStore) ListAEBackupInformation(c context.Context, req *ReqListBase) ([]*AEBackupInformation, *RespListBase, error)

ListAEBackupInformation return list of AEBackupInformation.

func (*AEDatastoreStore) ListAEDatastoreAdminOperation

func (store *AEDatastoreStore) ListAEDatastoreAdminOperation(c context.Context, req *ReqListBase) ([]*AEDatastoreAdminOperation, *RespListBase, error)

ListAEDatastoreAdminOperation return list of AEDatastoreAdminOperation.

type DatastoreExportService

type DatastoreExportService interface {
	Export(c context.Context, outputGCSPrefix string, entityFilter *EntityFilter) (*dsapi.GoogleLongrunningOperation, error)
}

DatastoreExportService serves DatastoreExport API Function.

func NewDatastoreExportService

func NewDatastoreExportService() DatastoreExportService

NewDatastoreExportService returns ready to use DatastoreExportService

type DatastoreManagementService

type DatastoreManagementService interface {
	SetupWithUconSwagger(swPlugin *swagger.Plugin)
	HandlePostTQ(c context.Context, req *Noop) (*Noop, error)
	HandlePostDeleteList(c context.Context, r *http.Request, req *ReqListBase) (*Noop, error)
	HandleDeleteAEBackupInformation(c context.Context, r *http.Request, req *AEBackupInformationDeleteReq) (*Noop, error)
}

DatastoreManagementService serves Datastore management APIs.

func NewDatastoreManagementService

func NewDatastoreManagementService(opts ...ManagementOption) DatastoreManagementService

NewDatastoreManagementService returns ready to use DatastoreManagementService.

type EntityFilter

type EntityFilter struct {
	Kinds           []string `json:"kinds,omitempty"`
	NamespaceIds    []string `json:"namespaceIds,omitempty"`
	ForceSendFields []string `json:"-"`
	NullFields      []string `json:"-"`
}

EntityFilter is Entity condition to export

type GCSHeader

type GCSHeader struct {
	ChannelID     string
	ClientToken   string
	ResourceID    string
	ResourceState string
	ResourceURI   string
}

GCSHeader is a header in OCN. see https://cloud.google.com/storage/docs/object-change-notification

func NewGCSHeader

func NewGCSHeader(r *http.Request) *GCSHeader

NewGCSHeader returns the header from r.

type GCSObject

type GCSObject struct {
	ID             string    `json:"id"`
	SelfLink       string    `json:"selfLink"`
	Name           string    `json:"name"`
	Bucket         string    `json:"bucket"`
	Generation     string    `json:"generation"`
	MetaGeneration string    `json:"metageneration"`
	ContentType    string    `json:"contentType"`
	Updated        time.Time `json:"updated"`
	Size           int64     `json:"size,string"`
	Md5Hash        string    `json:"md5Hash"`
	MediaLink      string    `json:"mediaLink"`
	Crc32c         string    `json:"crc32c"`
	Etag           string    `json:"etag"`
	TimeCreated    time.Time `json:"timeCreated"`
	TimeDeleted    time.Time `json:"timeDeleted"`
}

GCSObject is received json data from GCS OCN.

func DecodeGCSObject

func DecodeGCSObject(r io.Reader) (*GCSObject, error)

DecodeGCSObject decodes a GCSObject from r.

func (*GCSObject) ExtractKindName

func (obj *GCSObject) ExtractKindName() string

ExtractKindName extracts kind name from the object name.

func (*GCSObject) IsImportTarget

func (obj *GCSObject) IsImportTarget(c context.Context, r *http.Request, bucketName string, kindNames []string) bool

IsImportTarget reports whether the GCSObject is an import target.

func (*GCSObject) IsRequiredKind

func (obj *GCSObject) IsRequiredKind(requires []string) bool

IsRequiredKind reports whether the GCSObject is related required kind.

func (*GCSObject) ToBQJobReq

func (obj *GCSObject) ToBQJobReq() *GCSObjectToBQJobReq

ToBQJobReq creates a new GCSObjectToBQJobReq from the object.

type GCSObjectToBQJobReq

type GCSObjectToBQJobReq struct {
	Bucket      string    `json:"bucket"`
	FilePath    string    `json:"filePath"`
	KindName    string    `json:"kindName"`
	TimeCreated time.Time `json:"TimeCreated"`
}

GCSObjectToBQJobReq means request of OCN to BQ.

func DecodeGCSObjectToBQJobReq

func DecodeGCSObjectToBQJobReq(r io.Reader) (*GCSObjectToBQJobReq, error)

DecodeGCSObjectToBQJobReq decodes a GCSObjectToBQJobReq from r.

type GCSWatcherOption

type GCSWatcherOption interface {
	// contains filtered or unexported methods
}

GCSWatcherOption provides option value of GCSWatcherService.

func GCSWatcherWithAfterContext

func GCSWatcherWithAfterContext(f func(c context.Context) (GCSWatcherOption, error)) GCSWatcherOption

GCSWatcherWithAfterContext can process GCSWatcherOption with context.

func GCSWatcherWithBackupBucketName

func GCSWatcherWithBackupBucketName(bucketName string) GCSWatcherOption

GCSWatcherWithBackupBucketName provides bucket name of datastatore backup target.

func GCSWatcherWithDatasetID

func GCSWatcherWithDatasetID(id string) GCSWatcherOption

GCSWatcherWithDatasetID provides Dataset ID of BigQuery.

func GCSWatcherWithQueueName

func GCSWatcherWithQueueName(queueName string) GCSWatcherOption

GCSWatcherWithQueueName provides queue name of TaskQueue.

func GCSWatcherWithTargetKindNames

func GCSWatcherWithTargetKindNames(names ...string) GCSWatcherOption

GCSWatcherWithTargetKindNames provides target kind that insert into BigQuery.

func GCSWatcherWithTargetKinds

func GCSWatcherWithTargetKinds(targets ...interface{}) GCSWatcherOption

GCSWatcherWithTargetKinds provides target kind that insert into BigQuery. interface{} processed by Kind method of *goon.Goon.

func GCSWatcherWithURLs

func GCSWatcherWithURLs(apiURL, tqURL string) GCSWatcherOption

GCSWatcherWithURLs provies API endpoint URL.

type GCSWatcherService

type GCSWatcherService interface {
	SetupWithUcon()
	HandleOCN(c context.Context, r *http.Request, obj *GCSObject) error
	HandleBackupToBQJob(c context.Context, req *GCSObjectToBQJobReq) error
}

GCSWatcherService serves GCS Object Change Notification receiving APIs.

func NewGCSWatcherService

func NewGCSWatcherService(opts ...GCSWatcherOption) (GCSWatcherService, error)

NewGCSWatcherService returns ready to use GCSWatcherService.

type ManagementOption

type ManagementOption interface {
	// contains filtered or unexported methods
}

ManagementOption provides option value of datastoreManagementService.

func ManagementWithExpireDuration

func ManagementWithExpireDuration(expireAfter time.Duration) ManagementOption

ManagementWithExpireDuration privides expire duration of backup informations. default expiration duration is 30 days.

func ManagementWithQueueName

func ManagementWithQueueName(queueName string) ManagementOption

ManagementWithQueueName provides queue name of TaskQueue.

func ManagementWithURLs

func ManagementWithURLs(apiDeleteBackupURL, deleteOldBackupURL, deleteUnitOfBackupURL string) ManagementOption

ManagementWithURLs provides API endpoint URL.

type Noop

type Noop struct {
}

Noop is Noop.

type Plugger

type Plugger interface {
	Plugin() Plugin
}

Plugger supply Plugin component.

type Plugin

type Plugin interface {
	Init(typeName string)
	Ancestor(ancestor *datastore.Key)
	KeysOnly()
	Start(cur datastore.Cursor)
	Offset(offset int)
	Limit(limit int)
	Filter(name, op string, value interface{})
	Asc(name string)
	Desc(name string)
}

Plugin supply hook point for query constructions.

type QueryListLoader

type QueryListLoader interface {
	LoadInstance(c context.Context, key *datastore.Key) (interface{}, error)
	Append(v interface{}) error
	PostProcess(c context.Context) error
	ReqListBase() ReqListBase
	RespListBase() *RespListBase
}

QueryListLoader hosted entity list construction.

type ReqListBase

type ReqListBase struct {
	Limit  int    `json:"limit" endpoints:"d=10" swagger:",in=query,d=10"`
	Offset int    `json:"offset" swagger:",in=query"`
	Cursor string `json:"cursor" swagger:",in=query"`
}

ReqListBase means request of query.

func DecodeReqListBase

func DecodeReqListBase(r io.Reader) (*ReqListBase, error)

DecodeReqListBase decodes a ReqListBase from r.

type RespListBase

type RespListBase struct {
	Cursor string `json:"cursor,omitempty" swagger:",in=query"`
}

RespListBase means response of query.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL