Documentation
¶
Overview ¶
Package processor defines abstraction for serverless concurrent processing
Index ¶
Examples ¶
Constants ¶
const ( //StatusOk represents successful processing status StatusOk = "ok" //StatusError represents error processing status StatusError = "error" )
Variables ¶
This section is empty.
Functions ¶
func NewDataCorruption ¶
NewDataCorruption returns data corruption error
Types ¶
type BaseReporter ¶
type BaseReporter struct { *Response // contains filtered or unexported fields }
func (*BaseReporter) BaseResponse ¶
func (r *BaseReporter) BaseResponse() *Response
Response returns base response info
Example ¶
package main import ( "context" "encoding/json" "fmt" "github.com/viant/afs" "github.com/viant/cloudless/data/processor" "net/http" "strings" "sync" ) // URLReporter represents URL reporter type URLReporter struct { processor.BaseReporter ByResponseCode map[int]int mutex sync.Mutex } // NewURLReporter represents URL reporeter func NewURLReporter() processor.Reporter { return &URLReporter{ ByResponseCode: make(map[int]int), BaseReporter: processor.BaseReporter{ Response: &processor.Response{Status: processor.StatusOk}, }, } } type HTTPProcessor struct { BaseURL string } func (p HTTPProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error { urlReporter := reporter.(*URLReporter) URL := p.BaseURL + string(data) request, err := http.NewRequestWithContext(ctx, http.MethodGet, URL, nil) if err != nil { return processor.NewDataCorruption(fmt.Sprintf("invalid request: %v", URL)) } response, err := http.DefaultClient.Do(request) if err != nil { return err } urlReporter.mutex.Lock() defer urlReporter.mutex.Unlock() urlReporter.ByResponseCode[response.StatusCode]++ return nil } func main() { ctx := context.Background() service := processor.New(&processor.Config{ CorruptionURL: "mem://localhost/corrupted", RetryURL: "mem://localhost/retry", FailedURL: "mem://localhost/failed", }, afs.New(), &HTTPProcessor{BaseURL: "http://mydataexporter/enrich/?data="}, NewURLReporter) reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("dGVzdCBpcyB0ZXN0\nYW5vdGhlciBvbmU="), nil, "mem://localhost/trigger/data.txt")) response, _ := json.Marshal(reporter) fmt.Printf("%s\n", response) }
Output:
type Config ¶
type Config struct { DeadlineReductionMs int // Deadline typically comes from Lambda ctx. Max exec time == Deadline - DeadlineReductionMs LoaderDeadlineLagMs int // Loader will finish earlier than workers to let the latter complete MaxRetries int Concurrency int DestinationURL string // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat DestinationCodec string RetryURL string // destination for the data to be retried FailedURL string // destination for the data that has failed max retires CorruptionURL string /// destination for the corrupted data MaxExecTimeMs int // default execution timeMs used when context does not come with deadline }
Config represents processor configuration
func (Config) ExpandDestinationURL ¶
func (Config) LoaderDeadline ¶
Deadline returns max execution time for a Processor
type DataCorruption ¶
type DataCorruption struct {
// contains filtered or unexported fields
}
DataCorruption represents corruption error
type PostProcessor ¶
PostProcessor is an optional preprocessor interface
type PreProcessor ¶
type PreProcessor interface {
Pre(ctx context.Context, reporter Reporter) (context.Context, error)
}
PreProcessor is an optional preprocessor interface
type Reporter ¶
type Reporter interface { //BaseResponse returns base response BaseResponse() *Response }
Reporter represents interfcae providing processor response
type Request ¶
type Request struct { io.ReadCloser Attrs map[string]interface{} StartTime time.Time SourceURL string //incoming original filename url }
Request represents a processing request
func NewRequest ¶
NewRequest create a processing request
func (*Request) Retry ¶
Retry extracts number of retry from URL . It looks after two consecutive digits eg: s3://bucket/prefix/filename-retry05.csv would extract number 5
func (*Request) TransformSourceURL ¶
TransformSourceURL returns baseURL + sourceURL path
type Response ¶
type Response struct { Status string Errors []string `json:",omitempty"` RuntimeMs int DestinationURL string `json:",omitempty"` // Service processing data destination URL. This is a template, e.g. $gs://$mybucket/$prefix/$a.dat DestinationCodec string `json:",omitempty"` //optional compression codec (i.e gzip) RetryURL string `json:",omitempty"` // destination for the data to be replayed CorruptionURL string `json:",omitempty"` Processed int32 `json:",omitempty"` RetryErrors int32 `json:",omitempty"` CorruptionErrors int32 `json:",omitempty"` ProcessingErrors int32 `json:",omitempty"` Loaded int32 `json:",omitempty"` // contains filtered or unexported fields }
Response represents base processing response
type Service ¶
type Service struct { Processor // contains filtered or unexported fields }
Service represents processing service
func New ¶
func New(config *Config, fs afs.Service, processor Processor, reporterProvider func() Reporter) *Service
New creates a processing service
func (*Service) Do ¶
Do starts service processing
Example ¶
package main import ( "context" "encoding/json" "fmt" "github.com/viant/afs" "github.com/viant/cloudless/data/processor" "github.com/viant/toolbox" "strings" "sync/atomic" ) type sumKeyType string const sumKey = sumKeyType("sum") // SumProcessor represents sum processor type SumProcessor struct{} // Process sums coma separated numbers func (p SumProcessor) Process(ctx context.Context, data []byte, reporter processor.Reporter) error { if len(data) == 0 { return nil } value := ctx.Value(sumKey) sum := value.(*int64) aNumber, err := toolbox.ToInt(string(data)) if err != nil { return processor.NewDataCorruption(fmt.Sprintf("invalid number: %s, %v", data, err)) } atomic.AddInt64(sum, int64(aNumber)) return nil } func main() { service := processor.New(&processor.Config{ CorruptionURL: "mem://localhost/corrupted", RetryURL: "mem://localhost/retry", FailedURL: "mem://localhost/failed", }, afs.New(), &SumProcessor{}, processor.NewReporter) sum := int64(0) ctx := context.WithValue(context.Background(), sumKey, &sum) reporter := service.Do(ctx, processor.NewRequest(strings.NewReader("1\n2\n3\n5\nasd\n373\n23"), nil, "mem://localhost/response/numbers.txt")) fmt.Printf("Sum: %v\n", sum) //Prints sum 407 response, _ := json.Marshal(reporter) fmt.Printf("%s\n", response) /* Prints { "CorruptionErrors": 1, "CorruptionURL": "mem://localhost/corrupted/response/numbers.txt", "Errors": [ "invalid number: asd, strconv.ParseInt: parsing \"asd\": invalid syntax" ], "Loaded": 7, "Processed": 6, "ProcessingErrors": 0, "RetryErrors": 0, "RetryURL": "mem://localhost/retry/response/numbers-retry01.txt", "RuntimeMs": 1, "Status": "ok" } */ }
Output: