Documentation
¶
Overview ¶
Package processor defines abstraction for serverless concurrent processing
Index ¶
- Constants
- func NewDataCorruption(msg string) error
- func NewPartialRetry(msg string, data interface{}) error
- type BaseReporter
- type Config
- func (c Config) AdjustScannerBuffer(scanner *bufio.Scanner)
- func (c Config) Deadline(ctx context.Context) time.Time
- func (c *Config) ExpandDestination(startTime time.Time) *config.Stream
- func (c Config) ExpandDestinationRotationURL(startTime time.Time) string
- func (c Config) ExpandDestinationURL(startTime time.Time) string
- func (c *Config) Init(ctx context.Context, fs afs.Service) error
- func (c *Config) InitWithNoLimit()
- func (c Config) LoaderDeadline(ctx context.Context) time.Time
- func (c *Config) Validate() error
- type DataCorruption
- type Field
- type Fields
- type Handler
- type HandlerReporter
- type PartialRetry
- type PostProcessor
- type PreProcessor
- type Processor
- type Reporter
- type Request
- type Response
- type Service
- type Sort
- type Sortables
- type Spec
- type StatusSet
- type Writer
Examples ¶
Constants ¶
const ( //OnDoneDelete delete action OnDoneDelete = "delete" //OnDoneMove move action OnDoneMove = "move" )
const ( //Source types Parquet = "parquet" JSON = "json" CSV = "csv" )
const ( StatusOk = "ok" StatusError = "error" StatusSetOk = StatusSet(1) StatusSetError = StatusSet(2) StatusSetRetriable = StatusSet(4) StatusSetCorrupted = StatusSet(8) )
const (
RetryFragment = "-retry"
)
Variables ¶
This section is empty.
Functions ¶
func NewDataCorruption ¶
NewDataCorruption returns data corruption error
func NewPartialRetry ¶
NewDataCorruption returns data corruption error
Types ¶
type BaseReporter ¶
type BaseReporter struct { *Response // contains filtered or unexported fields }
func (*BaseReporter) BaseResponse ¶
func (r *BaseReporter) BaseResponse() *Response
Response returns base response info
Example ¶
package main import ( "context" "encoding/json" "fmt" "github.com/viant/afs" "github.com/viant/cloudless/data/processor" "net/http" "strings" "sync" ) // URLReporter represents URL reporter type URLReporter struct { processor.BaseReporter ByResponseCode map[int]int mutex sync.Mutex } // NewURLReporter represents URL reporeter func NewURLReporter() processor.Reporter { return &URLReporter{ ByResponseCode: make(map[int]int), BaseReporter: processor.BaseReporter{ Response: &processor.Response{Status: processor.StatusOk}, }, } } type HTTPProcessor struct { BaseURL string } func (p HTTPProcessor) Process(ctx context.Context, data interface{}, reporter processor.Reporter) error { urlReporter := reporter.(*URLReporter) URL := p.BaseURL + string(data.([]byte)) request, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil) if err != nil { return processor.NewDataCorruption(fmt.Sprintf("invalid request: %v", URL)) } response, err := http.DefaultClient.Do(request) if err != nil { return err } urlReporter.mutex.Lock() defer urlReporter.mutex.Unlock() urlReporter.ByResponseCode[response.StatusCode]++ return nil } func main() { ctx := context.Background() service := processor.New(&processor.Config{ CorruptionURL: "mem://localhost/corrupted", RetryURL: "mem://localhost/retry", FailedURL: "mem://localhost/failed", }, afs.New(), &HTTPProcessor{BaseURL: "http://mydataexporter/enrich/?data="}, NewURLReporter) reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("dGVzdCBpcyB0ZXN0\nYW5vdGhlciBvbmU="), nil, "mem://localhost/trigger/data.txt")) response, _ := json.Marshal(reporter) fmt.Printf("%s\n", response) }
Output:
type Config ¶
type Config struct { DeadlineReductionMs int // Deadline typically comes from Lambda ctx. Max exec time == Deadline - DeadlineReductionMs LoaderDeadlineLagMs int // Loader will finish earlier than workers to let the latter complete MaxRetries int Concurrency int DestinationURL string // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat DestinationCodec string Destination *config.Stream RetryURL string // destination for the data to be retried FailedURL string // destination for the data that has failed max retires CorruptionURL string /// destination for the corrupted data MaxExecTimeMs int // default execution timeMs used when context does not come with deadline OnDone string //move or delete, (move moves data to process URL,or delete for delete) OnDoneURL string ReaderBufferSize int //if set above zero uses afs Steam option BatchSize int //number of data lines passed to processor (1 by default) Sort Sort //optional sorting config ScannerBufferMB int //use in case you see bufio.Scanner: token too long MetricPort int //if specified HTTP endpoint port to expose metrics RowTypeName string // parquet/json row type OnMirrorURL string //OnMirror represents copy url of the resource }
Config represents processor configuration
func (Config) AdjustScannerBuffer ¶
func (*Config) ExpandDestination ¶
func (Config) ExpandDestinationRotationURL ¶
func (Config) ExpandDestinationURL ¶
func (*Config) InitWithNoLimit ¶
func (c *Config) InitWithNoLimit()
InitWithNoLimit intialise config with no execution limit
func (Config) LoaderDeadline ¶
Deadline returns max execution time for a Processor
type DataCorruption ¶
type DataCorruption struct {
// contains filtered or unexported fields
}
DataCorruption represents corruption error
type Fields ¶
type Fields struct { Sort // contains filtered or unexported fields }
Fields
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler represents custom processor handler, that allows creating a process per request URL
func NewHandler ¶
Handler create a custom handler processor (a dedicated process can be created based on processor.Request)
type HandlerReporter ¶
HandlerReporter creates a handler reporter
type PartialRetry ¶
type PartialRetry struct {
// contains filtered or unexported fields
}
PartialRetry partial retry error allows to write only partial data back to retry stream
type PostProcessor ¶
PostProcessor is an optional preprocessor interface
type PreProcessor ¶
type PreProcessor interface {
Pre(ctx context.Context, reporter Reporter) (context.Context, error)
}
PreProcessor is an optional preprocessor interface
type Processor ¶
type Processor interface {
Process(ctx context.Context, data interface{}, reporter Reporter) error
}
Processor represents data processor
type Reporter ¶
type Reporter interface { //BaseResponse returns base response BaseResponse() *Response }
Reporter represents interfcae providing processor response
func NewHandlerReporter ¶
NewHandlerReporter represents URL reporeter
type Request ¶
type Request struct { io.ReadCloser SourceType string io.ReaderAt RowType reflect.Type Attrs map[string]interface{} StartTime time.Time SourceURL string //incoming original filename url }
Request represents a processing request
func NewRequest ¶
NewRequest create a processing request
func (*Request) Retry ¶
Retry extracts number of retry from URL . It looks after two consecutive digits eg: s3://bucket/prefix/filename-retry05.csv would extract number 5
func (*Request) TransformSourceURL ¶
TransformSourceURL returns baseURL + sourceURL path
type Response ¶
type Response struct { Status string Errors []string `json:",omitempty"` StartTime time.Time RuntimeMs int SourceURL string `json:",omitempty"` Destination *config.Stream RetryURL string `json:"-"` // destination for the data to be replayed CorruptionURL string `json:"-"` Processed int32 `json:",omitempty"` RetryErrors int32 `json:",omitempty"` CorruptionErrors int32 `json:",omitempty"` RetriableErrors int32 `json:",omitempty"` Loaded int32 `json:",omitempty"` LoadTimeouts int32 `json:",omitempty"` Batched int32 `json:",omitempty"` Skipped int32 `json:",omitempty"` // contains filtered or unexported fields }
Response represents base processing response
type Service ¶
type Service struct { Config *Config Metrics *gmetric.Service Processor // contains filtered or unexported fields }
Service represents processing service
func New ¶
func New(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter) *Service
New creates data processing service
func NewWithMetrics ¶
func NewWithMetrics(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter, metrics *gmetric.Service) *Service
NewWithMetrics creates data processing service
func (*Service) Do ¶
Do starts service processing
Example ¶
package main import ( "context" "encoding/json" "fmt" "github.com/viant/afs" "github.com/viant/cloudless/data/processor" "github.com/viant/toolbox" "strings" "sync/atomic" ) type sumKeyType string const sumKey = sumKeyType("sum") // SumProcessor represents sum processor type SumProcessor struct{} // Process sums comma separated numbers func (p SumProcessor) Process(ctx context.Context, data interface{}, reporter processor.Reporter) error { tmpData := data.([]byte) if len(tmpData) == 0 { return nil } value := ctx.Value(sumKey) sum := value.(*int64) aNumber, err := toolbox.ToInt(string(tmpData)) if err != nil { return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", tmpData, err)) } atomic.AddInt64(sum, int64(aNumber)) return nil } func main() { service := processor.New(&processor.Config{ CorruptionURL: "mem://localhost/corrupted", RetryURL: "mem://localhost/retry", FailedURL: "mem://localhost/failed", }, afs.New(), &SumProcessor{}, processor.NewReporter) sum := int64(0) ctx := context.WithValue(context.Background(), sumKey, &sum) reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("1\n2\n3\n5\nasd\n373\n23"), nil, "mem://localhost/response/numbers.txt")) fmt.Printf("Sum: %v\n", sum) //Prints sum 407 response, _ := json.Marshal(reporter) fmt.Printf("%s\n", response) /* Prints { "CorruptionErrors": 1, "CorruptionURL": "mem://localhost/corrupted/response/numbers.txt", "Errors": [ "invalid number: asd, strconv.ParseInt: parsing \"asd\": invalid syntax" ], "Loaded": 7, "Processed": 6, "RetriableErrors": 0, "RetryErrors": 0, "RetryURL": "mem://localhost/retry/response/numbers-retry01.txt", "RuntimeMs": 1, "Status": "ok" } */ }
Output:
func (*Service) StartMetricsEndpoint ¶
func (s *Service) StartMetricsEndpoint()
type Sortables ¶
Sortables represent sortable items