Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Backup = &cli.Command{ Name: "backup", Aliases: []string{"b"}, Usage: "backup time-series data from Amazon Timestream", Flags: []cli.Flag{ &cli.StringFlag{ Name: "region", Aliases: []string{"r"}, Usage: "AWS `REGION`", Value: "eu-west-1", }, &cli.StringFlag{ Name: "database", Aliases: []string{"db"}, Usage: "Amazon Timestream `DATABASE`", Value: "TestDB", }, &cli.StringFlag{ Name: "table", Aliases: []string{"tbl"}, Usage: "Amazon Timestream `TABLE`", Value: "IoT", }, &cli.StringFlag{ Name: "bucket", Aliases: []string{"b"}, Usage: "Amazon S3 `BUCKET_NAME`", Value: "test-playday-bucket", }, &cli.StringFlag{ Name: "column", Aliases: []string{"c"}, Usage: "Amazon Timestream `COLUMN` to partition time-series data", Value: "measure_name", }, &cli.StringFlag{ Name: "from", Usage: "Amazon Timestream `FROM` time value. Can be in absolute or relative (e.g 1m, 1h, 1d, 1w)", Value: "", DefaultText: "1 hour ago", }, &cli.StringFlag{ Name: "to", Usage: "Amazon Timestream `TO` time value. Can be in absolute or relative (e.g 1m, 1h, 1d, 1w)", Value: "", DefaultText: "Current date/time", }, &cli.Int64Flag{ Name: "rows", Usage: "Number of `ROWS` per chunk", Value: 1000, }, &cli.BoolFlag{ Name: "dry-run", Usage: "Do not create backup. Only runs query against Amazon Timestream", Value: false, }, &cli.BoolFlag{ Name: "verbose", Aliases: []string{"v"}, Usage: "Byakugan!!", Value: false, }, }, Action: func(c *cli.Context) error { logger, _ := zap.NewProduction() if c.Bool("verbose") { logger, _ = zap.NewDevelopment() } defer logger.Sync() sugar := logger.Sugar() isDryRun := c.Bool("dry-run") if isDryRun { sugar.Info("***** RUNNING IN DRY-RUN MODE *****") } region := c.String("region") sugar.Debugw("creating new aws session...", "region", region) sess, err := session.NewSession(&aws.Config{ Region: aws.String(region), }) if err != nil { sugar.Errorw("create new aws session", "error", err, "region", region) return fmt.Errorf("create new aws session: %v", err) } sugar.Debug("creating timestreamquery and s3uploader services...") querySvc := timestreamquery.New(sess) uploader := s3manager.NewUploader(sess) database := c.String("database") table := c.String("table") partitionColumn := c.String("column") currentTime := time.Now() to, err := now.Parse(c.String("to")) if err != nil { sugar.Debugw("parse 'to' as time.Time. parsing as duration...", "error", err, "to", c.String("to")) toDur, err := str2duration.ParseDuration(c.String("to")) if err != nil { sugar.Debugw("parse 'to' as time.Duration. using default...", "error", err, "to", c.String("to")) to = currentTime } else { to = currentTime.Add(-toDur) } } from, err := now.Parse(c.String("from")) if err != nil { sugar.Debugw("parse 'from' as time.Time. parsing as duration...", "error", err, "from", c.String("from")) fromDur, err := str2duration.ParseDuration(c.String("from")) if err != nil { sugar.Debugw("parse 'from' as time.Duration. using default...", "error", err, "from", c.String("from")) from = to.Add(-1 * time.Hour) } else { from = to.Add(-fromDur) } } sugar.Infow("time interval", "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat)) if from.After(to) { sugar.Errorw("'to' cannot be before 'from'", "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat)) return fmt.Errorf("'to' cannot be before 'from'") } if from.Equal(to) { sugar.Errorw("'to' cannot be equal to 'from'", "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat)) return fmt.Errorf("'to' cannot be equal to 'from'") } sql := fmt.Sprintf( "SELECT %s FROM \"%s\".\"%s\" WHERE time >= '%s' and time <= '%s' GROUP BY %s", partitionColumn, database, table, from.Format(queryTimeFormat), to.Format(queryTimeFormat), partitionColumn, ) sugar.Debugw("retrieving partition values...", "sql", sql) var ( nextToken *string = nil partitionValues []string ) for { partitionOutput, err := querySvc.QueryWithContext(c.Context, ×treamquery.QueryInput{ QueryString: aws.String(sql), NextToken: nextToken, MaxRows: aws.Int64(100), }) if err != nil { sugar.Errorw("retrieve partition values", "error", err, "sql", sql) return fmt.Errorf("retrieve partition values: %v", err) } for _, row := range partitionOutput.Rows { if row.Data[0].ScalarValue == nil { continue } partitionValues = append(partitionValues, *row.Data[0].ScalarValue) } nextToken = partitionOutput.NextToken if nextToken == nil { break } } sugar.Debugw("partition", "column", partitionColumn, "values", partitionValues) maxRows := c.Int64("rows") if maxRows > 1000 { sugar.Warnw("maxRows cannot exceed 1000. maxRows set to 1000.", "maxRows given", maxRows) maxRows = 1000 } errorsCh := make(chan error, 1000) var mu sync.Mutex totalRowsPerPartitions := make(map[string]int) sugar.Infow("backing up all data...", "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat), "partitions", partitionValues) startTimeAllPartition := time.Now() var wg sync.WaitGroup for _, partitionValue := range partitionValues { wg.Add(1) go func(partitionValue string) { defer wg.Done() mu.Lock() totalRowsPerPartitions[partitionValue] = 0 mu.Unlock() startTimeSinglePartition := time.Now() sql := fmt.Sprintf( "SELECT * FROM \"%s\".\"%s\" WHERE time >= '%s' and time <= '%s' AND %s = '%s' ORDER BY time DESC", database, table, from.Format(queryTimeFormat), to.Format(queryTimeFormat), partitionColumn, partitionValue, ) sugar.Debugw("query", "partition", partitionValue, "sql", sql) sugar.Infow("backing up data...", "partition", partitionValue) err = querySvc.QueryPagesWithContext(c.Context, ×treamquery.QueryInput{ QueryString: aws.String(sql), MaxRows: aws.Int64(maxRows), }, func(page *timestreamquery.QueryOutput, lastPage bool) bool { if len(page.Rows) == 0 { return true } inMemoryStore := bytes.NewBuffer([]byte{}) gzipWriter, _ := gzip.NewWriterLevel(inMemoryStore, gzip.BestCompression) var from, to time.Time var processedRows int for i, row := range page.Rows { processedRow := helpers.ProcessRowType(row.Data, page.ColumnInfo) if i == 0 { to, err = now.Parse(processedRow["time"].(string)) if err != nil { sugar.Warnw("time format to", "error", err, "to", c.String("to"), "partition", partitionValue) } } else if i == len(page.Rows)-1 { from, err = now.Parse(processedRow["time"].(string)) if err != nil { sugar.Warnw("time format from", "error", err, "from", c.String("from"), "partition", partitionValue) } } marshaledRow, err := json.Marshal(processedRow) if err != nil { sugar.Errorw("marshal data", "error", err, "partition", partitionValue, "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat)) errorsCh <- fmt.Errorf("marshal data: %v", err) continue } fmt.Fprintf(gzipWriter, "%s\n", marshaledRow) processedRows++ } gzipWriter.Close() mu.Lock() totalRowsPerPartitions[partitionValue] += processedRows mu.Unlock() if isDryRun { sugar.Infow("not uploading data due to dry run", "partition", partitionValue, "rows", processedRows) return true } bucket := c.String("bucket") checksum := crc32.ChecksumIEEE(inMemoryStore.Bytes()) key := fmt.Sprintf("%s/%s/%s/%s/%s_%s_%08x.log.gz", database, table, partitionValue, from.Format(dirTimeFormat), from.Format(filenameTimeFormat), to.Format(filenameTimeFormat), checksum) sugar.Debugw("uploading data...", "bucket", bucket, "key", key, "partition", partitionValue, "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat), "rows", processedRows) result, err := uploader.UploadWithContext(c.Context, &s3manager.UploadInput{ Bucket: aws.String(bucket), Key: aws.String(key), Body: inMemoryStore, }) if err != nil { sugar.Errorw("upload data", "error", err, "bucket", bucket, "key", key, "partition", partitionValue, "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat)) errorsCh <- fmt.Errorf("upload data: %v", err) return true } sugar.Debugw("uploaded data", "path", result.Location, "partition", partitionValue, "rows", processedRows) return true }, ) if err != nil { sugar.Errorw("query with partition value", "error", err, "partition", partitionValue, "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat)) errorsCh <- fmt.Errorf("query with partition value: %v", err) return } sugar.Infow("finished backing up data", "partition", partitionValue, "rows", totalRowsPerPartitions[partitionValue], "time taken", time.Since(startTimeSinglePartition).String()) }(partitionValue) } wg.Wait() close(errorsCh) for err := range errorsCh { if err != nil { return errors.New("encountered errors") } } sugar.Infow("finished backing up all data", "from", from.Format(queryTimeFormat), "to", to.Format(queryTimeFormat), "rows/partition", totalRowsPerPartitions, "time taken", time.Since(startTimeAllPartition).String()) return nil }, }
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.