Documentation
¶
Overview ¶
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- type Batch
- type BatchMaker
- type Destination
- type ErrorMessage
- type Field
- func Create(fieldType string, value interface{}) (*Field, error)
- func CreateBigFloatField(value big.Float) (*Field, error)
- func CreateBigIntField(value big.Int) (*Field, error)
- func CreateBoolField(value bool) (*Field, error)
- func CreateByteArrayField(value []byte) (*Field, error)
- func CreateByteField(value byte) (*Field, error)
- func CreateDateTimeField(value time.Time) (*Field, error)
- func CreateDoubleField(value float64) (*Field, error)
- func CreateField(value interface{}) (*Field, error)
- func CreateFieldFromSDCField(value interface{}) (*Field, error)
- func CreateFileRefField(value FileRef) (*Field, error)
- func CreateFloatField(value float32) (*Field, error)
- func CreateFloatListField(listFloatValue []float64) (*Field, error)
- func CreateInteger32Field(value int32) (*Field, error)
- func CreateIntegerField(value int) (*Field, error)
- func CreateListField(listValue []interface{}) (*Field, error)
- func CreateListFieldWithListOfFields(listFields []*Field) *Field
- func CreateListMapField(listMapValue *linkedhashmap.Map) (*Field, error)
- func CreateListMapFieldWithMapOfFields(mapFields *linkedhashmap.Map) *Field
- func CreateLongField(value int64) (*Field, error)
- func CreateLongFieldU64(value uint64) (*Field, error)
- func CreateMapField(mapValue map[string]interface{}) (*Field, error)
- func CreateMapFieldWithMapOfFields(mapFields map[string]*Field) *Field
- func CreateMapListField(listValue []map[string]interface{}) (*Field, error)
- func CreateShortField(value int8) (*Field, error)
- func CreateStringField(value string) (*Field, error)
- func CreateStringListField(listStringValue []string) (*Field, error)
- func CreateUInteger16Field(value uint16) (*Field, error)
- func CreateUInteger32Field(value uint32) (*Field, error)
- type FileRef
- type Header
- type Origin
- type Processor
- type Record
- type Service
- type Stage
- type StageContext
Constants ¶
const ( EventRecordHeaderType = "sdc.event.type" EventRecordHeaderVersion = "sdc.event.version" EventRecordHeaderCreationTimestamp = "sdc.event.creation_timestamp" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
Batch is the interface that wraps the basic Batch method.
GetSourceOffset returns the initial offset of the current batch. This return value should be treated as an opaque value as it is source dependent.
GetRecords returns an iterator with all the records in the batch for the current stage. Every time this method is called it returns a new iterator with all records in the batch.
type BatchMaker ¶
BatchMaker is the interface that wraps the basic methods for adding record to pipeline. Data Collector Edge origin stages receive an instance of a BatchMaker to write to the pipeline the records they create or process.
GetLanes returns the available lane names (stream names) for the stage.
AddRecord adds a record to the BatchMaker.
type Destination ¶
Destination is a Data Collector Edge destination stage. Destination stages receive records from origin stages and write them to an external system.
Write method, when running a pipeline, the Data Collector Edge calls this method from the Destination stage to write a batch of records to an external system.
type ErrorMessage ¶
type Field ¶
type Field struct { Type string Value interface{} }
func CreateBoolField ¶
func CreateByteArrayField ¶
func CreateByteField ¶
func CreateDoubleField ¶
func CreateField ¶
func CreateFieldFromSDCField ¶
func CreateFileRefField ¶
func CreateFloatField ¶
func CreateFloatListField ¶
func CreateInteger32Field ¶
func CreateIntegerField ¶
func CreateListField ¶
func CreateListMapField ¶
func CreateListMapField(listMapValue *linkedhashmap.Map) (*Field, error)
func CreateListMapFieldWithMapOfFields ¶
func CreateListMapFieldWithMapOfFields(mapFields *linkedhashmap.Map) *Field
func CreateLongField ¶
func CreateLongFieldU64 ¶
func CreateMapField ¶
func CreateMapListField ¶
func CreateShortField ¶
func CreateStringField ¶
func CreateStringListField ¶
func CreateUInteger16Field ¶
func CreateUInteger32Field ¶
func (*Field) GetValueAsFloat ¶
type Header ¶
type Header interface { GetStageCreator() string GetSourceId() string GetTrackingId() string GetPreviousTrackingId() string GetStagesPath() string GetErrorDataCollectorId() string GetErrorPipelineName() string GetErrorMessage() string GetErrorStage() string GetErrorTimestamp() int64 GetAttributeNames() []string GetAttributes() map[string]string GetAttribute(name string) interface{} SetAttribute(name string, value string) }
Header represents metadata about the record
type Origin ¶
type Origin interface {
Produce(lastSourceOffset *string, maxBatchSize int, batchMaker BatchMaker) (*string, error)
}
Origin is Data Collector Edge origin stage. Origin stages consume data from an external system creating records that can be processed by processor or destination stages.
Produce method - When running a pipeline, the Data Collector Edge calls this method from the Origin stage to obtain a batch of records for processing. Origin stages should not block indefinitely within this method if there is no data. They should have an internal timeout after which they produce an empty batch. By doing so it gives the chance to other stages in pipeline to know that the pipeline is still healthy but there is no data coming; and potentially allowing notifications to external systems. lastSourceOffset the offset returned by the previous call to this method, or NULL if this method is being called for the first time ever. maxBatchSize the requested maximum batch size a single call to this method should produce. batchMaker records created by the Source stage must be added to the BatchMaker for them to be available to the rest of the pipeline. Return the offset for the next call to this method. If NULL is returned it means the Source stage has fully process that data, that no more data is to be expected and that the pipeline should finish once the current batch is fully processed. Return error if the Source had an error while consuming data or creating records.
type Processor ¶
type Processor interface {
Process(batch Batch, batchMaker BatchMaker) error
}
Processor is Data Collector Edge processor stage. Processor stages receive records from an origin or other processors stages, perform operations on the records and write them out so they can be processed by another processor or destination stages.
Produce method - When running a pipeline, the Data Collector Edge calls this method from the Processor stage with a batch of records to process. Parameter batch - the batch of records to process. Parameter batchMaker - records created by the Processor stage must be added to the BatchMaker for them to be available to the rest of the pipeline.
type Record ¶
type Record interface { GetHeader() Header Get(fieldPath ...string) (*Field, error) Set(field *Field) *Field SetField(fieldPath string, field *Field) (*Field, error) GetFieldPaths() map[string]bool Delete(fieldPath string) (*Field, error) Clone() Record }
Record represents the unit of data Data Collector Edge pipelines process.
GetHeader method returns the metadata header of the record.
Get method returns the root data field of the record.
type Service ¶
type Service interface { Init(stageContext StageContext) []validation.Issue Destroy() error }
type Stage ¶
type Stage interface { Init(stageContext StageContext) []validation.Issue Destroy() error }
Stage is the base interface for Data Collector Edge stages implementations defining their common context and lifecycle.
Init method initializes the stage. This method is called once when the pipeline is being initialized before the processing any data. If the stage returns an empty list of ConfigIssue then the stage is considered ready to process data. Else it is considered it is mis-configured or that there is a problem and the stage is not ready to process data, thus aborting the pipeline initialization.
Destroy method destroys the stage. It should be used to release any resources held by the stage after initialization or processing. This method is called once when the pipeline is being shutdown. After this method is called, the stage will not be called to process any more data. This method is also called after a failed initialization to allow releasing resources created before the initialization failed.
type StageContext ¶
type StageContext interface { // If we plan to support ELs later, we should remove and provide in build support for this GetResolvedValue(configValue interface{}) (interface{}, error) CreateRecord(recordSourceId string, value interface{}) (Record, error) CreateEventRecord(recordSourceId string, value interface{}, eventType string, eventVersion int) (Record, error) GetMetrics() metrics.Registry ToError(err error, record Record) ToEvent(record Record) ReportError(err error) GetOutputLanes() []string Evaluate(value string, configName string, ctx context.Context) (interface{}, error) IsErrorStage() bool CreateConfigIssue(error string, optional ...interface{}) validation.Issue GetService(serviceName string) (Service, error) IsPreview() bool GetPipelineParameters() map[string]interface{} SetStop() IsStopped() bool }