processor

package
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2020 License: Apache-2.0 Imports: 23 Imported by: 4

README

Cloud data processor

Motivation

On building serverless data processing solution architecture engineering team, noticed shared pattern across many deliverables. For example all input data uses new lime delimited data format, that need to be process in parallel. Since we use serverless runtime (lambda/cloud functions) we need ability to recover from error, timeout, max allocated execution time restricted by cloud provider. We also need to control number of retries to avoid cloud functions explosion. This project while addressing all these concerns, also reduce boilerplate code, increase productivity by reusing and maintaining shared data flow logic in place. This project is entry point agnostic, so it can run with any serverless architecture or even in stand-alone mode.

Introduction

Serverless Data Processing

In our system Cloud Serverless stack leverages processing hundreds of TB data daily, typical data flow looks like following:

  1. Crunching BigData with Magnus workflow system.
  2. Exporting the crunched data from BigQuery to Google Cloud Storage or Publishing Real Time data to Google Cloud Storage to trigger Storage Mirror rule
  3. Storage Mirror transfers data to S3 or splits into smaller data chunk that can be comfortably handled by Serverless runtime (Lambda/Cloud Function).
  4. Storage Mirror trigger data processing event, by cloud storage upload or message bus push
  5. Data processing takes place, typical processing includes:
    • Publishing data to aerospike
    • Calling Data/Cloud/ML API to enrich processing data
    • Taking data from 3rd party source to load it to BigQuery
  6. Optionally processed data is publish to Google Storage to trigger BQTail ingestion rules.
  7. BqTail ingest data to BigQuery.

This project defines generic data processing service.

Serverless resource restriction

Cloud serverless runtime comes with resource restriction, typical include:

  • max function duration
  • max function memory
  • max event size
  • rate limits (i.e. DNS lookup)
  • max throughput of incoming events

All of these has to be account for when implementing data processor.

Assume you have 1M items to process, but your serverless function has only fixed number of minutes allocated for execution. Given these, data processor service allows your to specify config.DeadlineReductionMs, that control max internal processing time, after which all unprocessed items will get written to the dedicated retry destination.

Imagine that you keep retrying but never succeed to process even one item due to 3rd party datastore timeout, in that case instead of overloading number of serverless function instances, you want to control maximum retries. Once max retries is reached retry data get moved to the failed dedicated destination.

Note that you can control size or number of items in data processor trigger event (storage/message bus) by using storage mirror split rule, which divide original large data file in many small chunks, either uploaded to cloud storage or published to a message bus.

Error handling

When processing data, all valid items needs to get processed or rewritten to the retry destination in case of recoverable error or timeout. All errors except DataCorruption are recoverable, in case of detecting invalid data just return DataCorruption error.

aNumber, err := toolbox.ToInt(item) //expected int
if err != nil {
    return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", item, err))
}

Sometime data processing can result in the incomplete state. For example for one data record processor calls various independent APIs, while some may failed other are success, in that case, you can use PartialRetry error, providing only remaining portion of record which still need to be reprocess.

All corrupted and retry data can be written to dedicated locations, specified by:

  • config.RetryURL
  • config.FailedURL
  • config.CorruptionURL

Usage

Basic data processor

The following example show how to implements basic processor

package mypackage

import (
	"encoding/json"
	"context"
	"fmt"
	"github.com/viant/afs"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/toolbox"
	"strings"
	"sync/atomic"
)

type sumKeyType string
const sumKey = sumKeyType("sum")

//SumProcessor represents sum processor
type SumProcessor struct{}

//Process sums supplied data
func (p SumProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
	if len(data) == 0 {
		return nil
	}
	value := ctx.Value(sumKey)
	sum := value.(*int64)
	aNumber, err := toolbox.ToInt(string(data))
	if err != nil {
		return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", data, err))
	}
	atomic.AddInt64(sum, int64(aNumber))
	return nil
}

func ExampleService_Do() {
	service := processor.New(&processor.Config{
		RetryURL: "mem://localhost/trigger",
		CorruptionURL: "mem://localhost/corrupted",
		FailedURL: "mem://localhost/failed",
        Concurrency: 5,
	}, afs.New(), &SumProcessor{}, processor.NewReporter)
	sum := int64(0)
	ctx := context.WithValue(context.Background(), sumKey, &sum)
	reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("1\n2\n3\n5\nasd\n373\n23"),
		nil,
		"mem://localhost/data/numbers.txt"))
	fmt.Printf("Sum: %v\n", sum)
	//Prints sum 407
	data, _ := json.Marshal(reporter)
	/* Prints
	{
		"CorruptionErrors": 1,
		"CorruptionURL": "mem://localhost/corrupted/data/numbers.txt",
		"Errors": [
			"invalid number: asd, strconv.ParseInt: parsing \"asd\": invalid syntax"
		],
		"Loaded": 7,
		"Processed": 6,
		"ProcessingErrors": 0,
		"RetryErrors": 0,
		"RetryURL": "mem://localhost/retry/data/numbers-retry01.txt",
		"RuntimeMs": 1,
		"Status": "ok"
	}
	 */
}
Pre/Post data processor

Custom data process can optionally implement Pre and Post method to initialise resources for actual processing and releasing resources and finalizing data processing.

The following example initialise sum, and uploads final sum to the destinationURL

//SumProcessor represents sum processor
type SumProcessor struct{}

//Pre runs preprocessing logic
func (p *SumProcessor) Pre(ctx context.Context, reporter Reporter) (context.Context, error) {
	var sum int32
	return context.WithValue(ctx, sumKey("sum"), &sum), nil
}

//Process sums supplied data
func (p SumProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
    //...
    return nil
}

func (p *sumProcessor) Post(ctx context.Context, reporter Reporter) error {
	destURL := reporter.BaseResponse().DestinationURL
	if destURL != "" {
		value := ctx.Value(sumKey("sum"))
		sum := value.(*int32)
		if err := p.fs.Upload(ctx, destURL, file.DefaultFileOsMode, strings.NewReader(fmt.Sprintf("%v", *sum))); err != nil {
			return err
		}
	}
	return nil
}

The following example shows simple CSV to JSON transformer processor

package mypackage
import (
	"bytes"
	"context"
	"encoding/csv"
	"fmt"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/cloudless/data/processor/destination"
	"github.com/viant/tapper/log"
	"github.com/viant/tapper/msg"
	"strings"
)

//Transformer transform csv into JSON
type Transformer struct {
	msgProvider *msg.Provider
}

//Pre initialize data logger for config.DestinationURL 
func (p Transformer) Pre(ctx context.Context, reporter processor.Reporter) (context.Context, error) {
	return destination.NewDataLogger(ctx, reporter)
}

//Process transform CSV into JSON
func (p Transformer) Process(ctx context.Context, data []byte) error {
	if len(data) == 0 {
		return nil
	}
	csvReader := csv.NewReader(bytes.NewReader(data))
	record, err := csvReader.Read()
	if err == nil && len(record) < 3 {
		err = fmt.Errorf("invalid record size, expected 3 but had: %v", len(record))
	}
	if err != nil {
		return processor.NewDataCorruption(fmt.Sprintf("failed to read record: %s, %v", data, err))
	}
	message := p.msgProvider.NewMessage()
	defer message.Free()
	message.PutString("ID", record[0])
	message.PutString("Region", record[1])
	message.PutStrings("SegmentIDS", strings.Split(record[2], ","))
	logger := ctx.Value(destination.DataLoggerKey).(*log.Logger)
	return logger.Log(message)
}

//Post closes logger and finalize data upload to the data destination
func (p Transformer) Post(ctx context.Context, reporter processor.Reporter) error {
	logger := ctx.Value(destination.DataLoggerKey).(*log.Logger)
	return logger.Close()
}

//NewTransformer creates a new transformer
func NewTransformer() *Transformer {
	return &Transformer{
		msgProvider: msg.NewProvider(16*1024, 20),
	}
}
Extending reporter

Reporter encapsulate Response and processing metrics reported to serverless standard output (cloud watch/stack driver)

The following example shows how to use extend BaseReporter with additional metrics.

//URLReporter represents URL reporeter
type URLReporter struct {
	processor.BaseReporter
	ByResponseCode map[int]int
	mutex          sync.Mutex
}

//NewURLReporter represents URL reporeter
func NewURLReporter() processor.Reporter {
	return  &URLReporter{
		ByResponseCode: make(map[int]int),
		BaseReporter: processor.BaseReporter{
			Response: &processor.Response{Status: processor.StatusOk},
		},
	}
}

type HTTPProcessor struct {
	BaseURL string
}

func (p HTTPProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
	urlReporter := reporter.(*URLReporter)
	URL := p.BaseURL + string(data)
	request, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil)
	if err != nil {
		return processor.NewDataCorruption(fmt.Sprintf("invalid request: %v", URL))
	}
	response, err := http.DefaultClient.Do(request)
	if err != nil {
		return err
	}
	urlReporter.mutex.Lock()
	defer urlReporter.mutex.Unlock()
	urlReporter.ByResponseCode[response.StatusCode]++
	return nil
}

func ExampleService_Do() {
	ctx := context.Background()
	service := processor.New(&processor.Config{
		CorruptionURL: "mem://localhost/corrupted",
		RetryURL:      "mem://localhost/retry",
		FailedURL:     "mem://localhost/failed",
	}, afs.New(), &HTTPProcessor{BaseURL: "http://mydataexporter/enrich/?data="}, NewURLReporter)
	reporter := service.Do(ctx, processor.NewRequest(strings.NewReader(""),
		nil,
		"mem://localhost/response/numbers.txt"))
	response, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", response)
}

Configuration

Data process service supports the following configuration options:

  • DeadlineReductionMs defines time to rewrite unprocess data to retry location, 1% of event max execution time by default
  • MaxRetries defines max retries, once max retries is exceeded retry data get written to the failed destination.
  • Concurrency number of go routines running processor.Process logic.
  • DestinationURL optional data destination URL
  • RetryURL retry data destination, it should be the source for the data processor trigger event.
  • FailedURL retry data failed destination (original data get never lost but requires manual intervention)
  • CorruptionURL destination for corrupted data (to manually inspect issue)
  • MaxExecTimeMs optional parameter for runtimes where context does not come with the deadline

All configuration URL support the following macro substitution:

  • $UUID: expands with random UUID
  • $TimePath: expaned with request.StartTime in the following format: yyyy/MM/dd/hh

Known Limitation

  • Concurrency setting
  • EOF error
  • Memory exceeded limit

Adapters

Adapter provides convenient way to create data processing request from various cloud serverless events.

S3 Event
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/viant/afs"
	_ "github.com/viant/afsc/s3"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/cloudless/data/processor/adapter/aws"
)

func main() {
    lambda.Start(handleEvent)
}

func handleEvent(ctx context.Context, event aws.S3Event) error {
	fs := afs.New()
	var service processor.Service //... get service singleton instance
	request, err := event.NewRequest(ctx, fs, service.Config)
	if err != nil {
		return err
	}
	reporter := service.Do(ctx, request)
	JSON, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", JSON)
	return nil
}

SQS Event
package main

import (
	"context"
	"encoding/json"
	"fmt"
	_ "github.com/viant/afsc/s3"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/cloudless/data/processor/adapter/aws"
)

func main() {
    lambda.Start(handleEvent)
}

func handleEvent(ctx context.Context, event aws.SQSEvent) error {
	 var service processor.Service //... get service singleton instance
	request, err := event.NewRequest()
	if err != nil {
		return err
	}
	reporter := service.Do(ctx, request)
	JSON, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", JSON)
	return nil
}
Google Storage Event
package mypacakge

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/viant/afs"
	_ "github.com/viant/afsc/gs"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/cloudless/data/processor/adapter/gcp"
)
//HandleEvent handles cloud function storage event
func HandleEvent(ctx context.Context, event gcp.GSEvent) error {
	fs := afs.New()
    var service processor.Service //... get service singleton instance
    	request, err := event.NewRequest(ctx, fs, service.Config)
	if err != nil {
		return err
	}
	reporter := service.Do(ctx, request)
	JSON, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", JSON)
	return nil	
}
Google Pub/Sub Event
package mypackage

import (
	"context"
	"encoding/json"
	"fmt"
	_ "github.com/viant/afsc/gs"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/cloudless/data/processor/adapter/gcp"
)
//HandleEvent handles cloud function pubsub event
func HandleEvent(ctx context.Context, event gcp.PubSubMessage) error {
	var service processor.Service //... get service singleton instance
	request, err := event.NewRequest()
	if err != nil {
		return err
	}
	reporter := service.Do(ctx, request)
	JSON, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", JSON)
	return nil
}

End to end testing

  • TODO add to the examples

Documentation

Overview

Package processor defines abstraction for serverless concurrent processing

Index

Examples

Constants

View Source
const (
	//OnDoneDelete delete action
	OnDoneDelete = "delete"
	//OnDoneMove move action
	OnDoneMove = "move"
)
View Source
const (
	StatusOk           = "ok"
	StatusError        = "error"
	StatusSetOk        = StatusSet(1)
	StatusSetError     = StatusSet(2)
	StatusSetRetriable = StatusSet(4)
	StatusSetCorrupted = StatusSet(8)
)

Variables

This section is empty.

Functions

func NewDataCorruption

func NewDataCorruption(msg string) error

NewDataCorruption returns data corruption error

func NewPartialRetry

func NewPartialRetry(msg string, data []byte) error

NewDataCorruption returns data corruption error

Types

type BaseReporter

type BaseReporter struct {
	*Response
	// contains filtered or unexported fields
}

func (*BaseReporter) BaseResponse

func (r *BaseReporter) BaseResponse() *Response

Response returns base response info

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/viant/afs"
	"github.com/viant/cloudless/data/processor"
	"net/http"
	"strings"
	"sync"
)

// URLReporter represents URL reporter
type URLReporter struct {
	processor.BaseReporter
	ByResponseCode map[int]int
	mutex          sync.Mutex
}

// NewURLReporter represents URL reporeter
func NewURLReporter() processor.Reporter {
	return &URLReporter{
		ByResponseCode: make(map[int]int),
		BaseReporter: processor.BaseReporter{
			Response: &processor.Response{Status: processor.StatusOk},
		},
	}
}

type HTTPProcessor struct {
	BaseURL string
}

func (p HTTPProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
	urlReporter := reporter.(*URLReporter)
	URL := p.BaseURL + string(data)
	request, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil)
	if err != nil {
		return processor.NewDataCorruption(fmt.Sprintf("invalid request: %v", URL))
	}
	response, err := http.DefaultClient.Do(request)
	if err != nil {
		return err
	}
	urlReporter.mutex.Lock()
	defer urlReporter.mutex.Unlock()
	urlReporter.ByResponseCode[response.StatusCode]++
	return nil
}

func main() {
	ctx := context.Background()
	service := processor.New(&processor.Config{
		CorruptionURL: "mem://localhost/corrupted",
		RetryURL:      "mem://localhost/retry",
		FailedURL:     "mem://localhost/failed",
	}, afs.New(), &HTTPProcessor{BaseURL: "http://mydataexporter/enrich/?data="}, NewURLReporter)
	reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("dGVzdCBpcyB0ZXN0\nYW5vdGhlciBvbmU="),
		nil,
		"mem://localhost/trigger/data.txt"))
	response, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", response)
}
Output:

type Config

type Config struct {
	DeadlineReductionMs int // Deadline typically comes from Lambda ctx. Max exec time == Deadline - DeadlineReductionMs
	LoaderDeadlineLagMs int // Loader will finish earlier than workers to let the latter complete
	MaxRetries          int
	Concurrency         int
	DestinationURL      string // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat
	DestinationCodec    string
	RetryURL            string // destination for the data to be retried
	FailedURL           string // destination for the data that has failed max retires
	CorruptionURL       string /// destination for the corrupted data
	MaxExecTimeMs       int    // default execution timeMs used when context does not come with deadline
	OnDone              string //move or delete, (move moves data to process URL,or delete for delete)
	OnDoneURL           string
	ReaderBufferSize    int  //if set above zero uses afs Steam option
	BatchSize           int  //number of data lines passed to processor (1 by default)
	Sort                Sort //optional sorting config
	ScannerBufferMB     int  //use in case you see bufio.Scanner: token too long

}

Config represents processor configuration

func (Config) AdjustScannerBuffer

func (c Config) AdjustScannerBuffer(scanner *bufio.Scanner)

func (Config) Deadline

func (c Config) Deadline(ctx context.Context) time.Time

Deadline returns max execution time for a Processor

func (Config) ExpandDestinationURL

func (c Config) ExpandDestinationURL(startTime time.Time) string

func (*Config) Init

func (c *Config) Init(ctx context.Context, fs afs.Service) error

Init sets default Config values

func (*Config) InitWithNoLimit

func (c *Config) InitWithNoLimit()

InitWithNoLimit intialise config with no execution limit

func (Config) LoaderDeadline

func (c Config) LoaderDeadline(ctx context.Context) time.Time

Deadline returns max execution time for a Processor

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if Config is valid

type DataCorruption

type DataCorruption struct {
	// contains filtered or unexported fields
}

DataCorruption represents corruption error

func (*DataCorruption) Error

func (e *DataCorruption) Error() string

Error returns an error

type Field

type Field struct {
	Name      string
	Index     int
	IsNumeric bool
}

Sort represents sort field definition

func (*Field) Value

func (f *Field) Value(data []byte, spec *Spec) interface{}

type Fields

type Fields struct {
	Sort
	// contains filtered or unexported fields
}

Fields

func (Fields) NKeys

func (f Fields) NKeys() int

we return 0, it tells the Decoder to decode all keys

func (*Fields) UnmarshalJSONObject

func (f *Fields) UnmarshalJSONObject(dec *gojay.Decoder, k string) error

Implementing Unmarshaler

type PartialRetry

type PartialRetry struct {
	// contains filtered or unexported fields
}

PartialRetry partial retry error allows to write only partial data back to retry stream

func (*PartialRetry) Error

func (e *PartialRetry) Error() string

Error returns an error

type PostProcessor

type PostProcessor interface {
	Post(ctx context.Context, reporter Reporter) error
}

PostProcessor is an optional preprocessor interface

type PreProcessor

type PreProcessor interface {
	Pre(ctx context.Context, reporter Reporter) (context.Context, error)
}

PreProcessor is an optional preprocessor interface

type Processor

type Processor interface {
	Process(ctx context.Context, data []byte, reporter Reporter) error
}

Processor represents data processor

type Reporter

type Reporter interface {
	//BaseResponse returns base response
	BaseResponse() *Response
}

Reporter represents interfcae providing processor response

func NewReporter

func NewReporter() Reporter

NewReporter return reporter

type Request

type Request struct {
	io.ReadCloser
	Attrs     map[string]interface{}
	StartTime time.Time
	SourceURL string //incoming original filename url
}

Request represents a processing request

func NewRequest

func NewRequest(reader io.Reader, attrs map[string]interface{}, sourceURL string) *Request

NewRequest create a processing request

func (*Request) Retry

func (r *Request) Retry() int

Retry extracts number of retry from URL . It looks after two consecutive digits eg: s3://bucket/prefix/filename-retry05.csv would extract number 5

func (*Request) TransformSourceURL

func (r *Request) TransformSourceURL(baseURL string) string

TransformSourceURL returns baseURL + sourceURL path

type Response

type Response struct {
	Status string

	Errors []string `json:",omitempty"`

	StartTime        time.Time
	RuntimeMs        int
	SourceURL        string `json:",omitempty"`
	DestinationURL   string `json:",omitempty"` // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat
	DestinationCodec string `json:"-"`          //optional compression codec (i.e gzip)
	RetryURL         string `json:"-"`          // destination for the data to be replayed
	CorruptionURL    string `json:"-"`
	Processed        int32  `json:",omitempty"`
	RetryErrors      int32  `json:",omitempty"`
	CorruptionErrors int32  `json:",omitempty"`
	RetriableErrors  int32  `json:",omitempty"`
	Loaded           int32  `json:",omitempty"`
	LoadTimeouts     int32  `json:",omitempty"`
	Batched          int32  `json:",omitempty"`
	Skipped          int32  `json:",omitempty"`
	// contains filtered or unexported fields
}

Response represents base processing response

func (*Response) LogError

func (r *Response) LogError(err error)

LogError logs error

type Service

type Service struct {
	Config *Config

	Processor
	// contains filtered or unexported fields
}

Service represents processing service

func New

func New(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter) *Service

New creates a processing service

func (*Service) Do

func (s *Service) Do(ctx context.Context, request *Request) Reporter

Do starts service processing

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/viant/afs"
	"github.com/viant/cloudless/data/processor"
	"github.com/viant/toolbox"
	"strings"
	"sync/atomic"
)

type sumKeyType string

const sumKey = sumKeyType("sum")

// SumProcessor represents sum processor
type SumProcessor struct{}

// Process sums coma separated numbers
func (p SumProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error {
	if len(data) == 0 {
		return nil
	}
	value := ctx.Value(sumKey)
	sum := value.(*int64)
	aNumber, err := toolbox.ToInt(string(data))
	if err != nil {
		return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", data, err))
	}
	atomic.AddInt64(sum, int64(aNumber))
	return nil
}

func main() {
	service := processor.New(&processor.Config{
		CorruptionURL: "mem://localhost/corrupted",
		RetryURL:      "mem://localhost/retry",
		FailedURL:     "mem://localhost/failed",
	}, afs.New(), &SumProcessor{}, processor.NewReporter)
	sum := int64(0)
	ctx := context.WithValue(context.Background(), sumKey, &sum)
	reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("1\n2\n3\n5\nasd\n373\n23"),
		nil,
		"mem://localhost/response/numbers.txt"))
	fmt.Printf("Sum: %v\n", sum)
	//Prints sum 407
	response, _ := json.Marshal(reporter)
	fmt.Printf("%s\n", response)
	/* Prints
	{
		"CorruptionErrors": 1,
		"CorruptionURL": "mem://localhost/corrupted/response/numbers.txt",
		"Errors": [
			"invalid number: asd, strconv.ParseInt: parsing \"asd\": invalid syntax"
		],
		"Loaded": 7,
		"Processed": 6,
		"RetriableErrors": 0,
		"RetryErrors": 0,
		"RetryURL": "mem://localhost/retry/response/numbers-retry01.txt",
		"RuntimeMs": 1,
		"Status": "ok"
	}
	*/
}
Output:

type Sort

type Sort struct {
	Spec
	By    []Field
	Batch bool //batches data by first sorted field
}

Sort represents configuration sort definition

func (Sort) Order

func (s Sort) Order(reader io.Reader, config *Config) (io.Reader, error)

Order orders the reader data

type Sortables

type Sortables struct {
	Sort
	Items [][]byte
}

Sortables represent sortable items

func (*Sortables) Len

func (s *Sortables) Len() int

Len is part of sort.Interface.

func (*Sortables) Less

func (s *Sortables) Less(srcIdx, destIdx int) bool

Less is part of sort.Interface

func (*Sortables) Swap

func (s *Sortables) Swap(i, j int)

Swap is part of sort.Interface.

type Spec

type Spec struct {
	Format    string
	Delimiter string
}

type StatusSet

type StatusSet int

StatusSet

func (StatusSet) String

func (s StatusSet) String() string

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer represents text data writer

func NewWriter

func NewWriter(URL string, fs afs.Service) *Writer

NewWriter creates a writer

func (*Writer) Close

func (w *Writer) Close() error

Close closes the writer if there are any writes

func (*Writer) Write

func (w *Writer) Write(ctx context.Context, data []byte) (err error)

Directories

Path Synopsis
adapter
aws
gcp
subscriber
gcp

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL