Documentation ¶
Overview ¶
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func AddStageToStagePath(header *HeaderImpl, stageInstanceName string)
- func CreateRecordId(prefix string, counter int) string
- func CreateTrackingId(header *HeaderImpl)
- type BaseStage
- func (b *BaseStage) Destroy() error
- func (b *BaseStage) GetDataGeneratorService() (dataformats.DataFormatGeneratorService, error)
- func (b *BaseStage) GetDataParserService() (dataformats.DataFormatParserService, error)
- func (b *BaseStage) GetStageConfig() *StageConfiguration
- func (b *BaseStage) GetStageContext() api.StageContext
- func (b *BaseStage) Init(stageContext api.StageContext) []validation.Issue
- type BuildInfo
- type Config
- type ConfigDefinition
- type ErrorSink
- func (e *ErrorSink) ClearErrorRecordsAndMessages()
- func (e *ErrorSink) GetErrorMessages() map[string][]api.ErrorMessage
- func (e *ErrorSink) GetErrorRecords() map[string][]api.Record
- func (e *ErrorSink) GetStageErrorMessages(stageIns string) []api.ErrorMessage
- func (e *ErrorSink) GetStageErrorRecords(stageIns string) []api.Record
- func (e *ErrorSink) GetTotalErrorMessages() int64
- func (e *ErrorSink) GetTotalErrorRecords() int64
- func (e *ErrorSink) ReportError(stageIns string, errorMessage api.ErrorMessage)
- func (e *ErrorSink) ToError(stageIns string, record api.Record)
- type EventSink
- type HeaderImpl
- func (h *HeaderImpl) GetAttribute(name string) interface{}
- func (h *HeaderImpl) GetAttributeNames() []string
- func (h *HeaderImpl) GetAttributes() map[string]string
- func (h *HeaderImpl) GetErrorDataCollectorId() string
- func (h *HeaderImpl) GetErrorMessage() string
- func (h *HeaderImpl) GetErrorPipelineName() string
- func (h *HeaderImpl) GetErrorStage() string
- func (h *HeaderImpl) GetErrorTimestamp() int64
- func (h *HeaderImpl) GetPreviousTrackingId() string
- func (h *HeaderImpl) GetSourceId() string
- func (h *HeaderImpl) GetSourceRecord() api.Record
- func (h *HeaderImpl) GetStageCreator() string
- func (h *HeaderImpl) GetStagesPath() string
- func (h *HeaderImpl) GetTrackingId() string
- func (h *HeaderImpl) SetAttribute(name string, value string)
- func (h *HeaderImpl) SetErrorDataCollectorId(errorDataCollectorId string)
- func (h *HeaderImpl) SetErrorMessage(errorMessage string)
- func (h *HeaderImpl) SetErrorPipelineName(errorPipelineName string)
- func (h *HeaderImpl) SetErrorStageInstance(errorStageInstance string)
- func (h *HeaderImpl) SetErrorTimeStamp(timeStamp int64)
- func (h *HeaderImpl) SetPreviousTrackingId(previousTrackingId string)
- func (h *HeaderImpl) SetSourceId(sourceId string)
- func (h *HeaderImpl) SetSourceRecord(sourceRecord api.Record)
- func (h *HeaderImpl) SetStageCreator(stageCreator string)
- func (h *HeaderImpl) SetStagesPath(stagesPath string)
- func (h *HeaderImpl) SetTrackingId(trackingId string)
- type ModelDefinition
- type PathElement
- type PipelineConfiguration
- type PipelineEnvelope
- type PipelineFragmentConfiguration
- type PipelineInfo
- type PipelineState
- type RecordImpl
- func (r *RecordImpl) Clone() api.Record
- func (r *RecordImpl) Delete(fieldPath string) (*api.Field, error)
- func (r *RecordImpl) Get(fieldPath ...string) (*api.Field, error)
- func (r *RecordImpl) GetFieldPaths() map[string]bool
- func (r *RecordImpl) GetHeader() api.Header
- func (r *RecordImpl) Set(field *api.Field) *api.Field
- func (r *RecordImpl) SetField(fieldPath string, field *api.Field) (*api.Field, error)
- type RuntimeInfo
- type ServiceConfiguration
- type ServiceDefinition
- type SourceOffset
- type StageConfiguration
- type StageContextImpl
- func (s *StageContextImpl) CreateConfigIssue(error string, optional ...interface{}) validation.Issue
- func (s *StageContextImpl) CreateEventRecord(recordSourceId string, value interface{}, eventType string, eventVersion int) (api.Record, error)
- func (s *StageContextImpl) CreateRecord(recordSourceId string, value interface{}) (api.Record, error)
- func (s *StageContextImpl) Evaluate(value string, configName string, ctx context.Context) (interface{}, error)
- func (s *StageContextImpl) GetMetrics() metrics.Registry
- func (s *StageContextImpl) GetOutputLanes() []string
- func (s *StageContextImpl) GetParameterValue(paramName string) interface{}
- func (s *StageContextImpl) GetPipelineParameters() map[string]interface{}
- func (s *StageContextImpl) GetResolvedValue(configValue interface{}) (interface{}, error)
- func (s *StageContextImpl) GetService(serviceName string) (api.Service, error)
- func (s *StageContextImpl) IsErrorStage() bool
- func (s *StageContextImpl) IsPreview() bool
- func (s *StageContextImpl) IsStopped() bool
- func (s *StageContextImpl) ReportError(err error)
- func (s *StageContextImpl) SetStop()
- func (s *StageContextImpl) ToError(err error, record api.Record)
- func (s *StageContextImpl) ToEvent(record api.Record)
- type StageDefinition
Constants ¶
const ( HeaderXAppAuthToken = "X-SS-App-Auth-Token" HeaderXAppComponentId = "X-SS-App-Component-Id" HeaderXUserAuthToken = "X-SS-User-Auth-Token" HeaderXRestCall = "X-Requested-By" HeaderContentType = "Content-Type" ApplicationJson = "application/json" HeaderXRestCallValue = "true" HttpPost = "POST" )
const ( ROOT = "ROOT" MAP = "MAP" LIST = "LIST" InvalidFieldPath = "Invalid fieldPath '%s' at char '%d'" InvalidFieldPathReason = "Invalid fieldPath '%s' at char position '%d' (%s)" ReasonEmptyFieldName = "field name can't be empty" ReasonInvalidStart = "field path needs to start with '[' or '/'" ReasonNotANumber = "only numbers and '*' allowed between '[' and ']'" ReasonQuotes = "quotes are not properly closed" InvalidFieldPathNumber = "Invalid fieldPath '%s' at char '%d' ('%s' needs to be a number or '*')" )
const ( PipelineConfigSchemaVersion = 6 PipelineConfigVersion = 10 ErrorRecordPolicyOriginal = "ORIGINAL_RECORD" ErrorRecordPolicyStage = "STAGE_RECORD" FragmentSourceStageName = "com_streamsets_pipeline_stage_origin_fragment_FragmentSource" FragmentProcessorStageName = "com_streamsets_pipeline_stage_processor_fragment_FragmentProcessor" FragmentTargetStageName = "com_streamsets_pipeline_stage_destination_fragment_FragmentTarget" ConfFragmentId = "conf.fragmentId" ConfFragmentInstanceId = "conf.fragmentInstanceId" )
const ( EDITED = "EDITED" // pipeline job has been create/modified, didn't run since the creation/modification STARTING = "STARTING" // pipeline job starting (initialization) START_ERROR = "START_ERROR" // pipeline job failed while start (during initialization) RUNNING = "RUNNING" // pipeline job running RUNNING_ERROR = "RUNNING_ERROR" // pipeline job failed while running (calling destroy on pipeline) RUN_ERROR = "RUN_ERROR" // pipeline job failed while running (done) FINISHING = "FINISHING" // pipeline job finishing (source reached end, returning NULL offset) (calling destroy on pipeline) FINISHED = "FINISHED" // pipeline job finished RETRY = "RETRY" // pipeline job retrying STOPPING = "STOPPING" // pipeline job has been manually stopped (calling destroy on pipeline) STOPPED = "STOPPED" // pipeline job has been manually stopped (done) )
const ( CurrentOffsetVersion = 2 PollSourceOffsetKey = "$com.streamsets.sdc2go.pollsource.offset$" )
const ( ConfigDefTagName = "ConfigDef" ConfigDefBeanTagName = "ConfigDefBean" ListBeanModelTagName = "ListBeanModel" PredicateModelTagName = "PredicateModel" EvaluationExplicit = "EXPLICIT" EvaluationImplicit = "IMPLICIT" )
const (
EdgeIdFile = "/data/edge.id"
)
const StageConfig = "STAGE_CONFIG"
Variables ¶
var ( BuiltBy string BuiltDate string BuiltRepoSha string SourceMd5Checksum string Version string )
var RootPathElement = &PathElement{ Type: ROOT, Name: "", Idx: 0, }
Functions ¶
func AddStageToStagePath ¶
func AddStageToStagePath(header *HeaderImpl, stageInstanceName string)
func CreateRecordId ¶
func CreateTrackingId ¶
func CreateTrackingId(header *HeaderImpl)
Types ¶
type BaseStage ¶
type BaseStage struct {
// contains filtered or unexported fields
}
func (*BaseStage) GetDataGeneratorService ¶
func (b *BaseStage) GetDataGeneratorService() (dataformats.DataFormatGeneratorService, error)
func (*BaseStage) GetDataParserService ¶
func (b *BaseStage) GetDataParserService() (dataformats.DataFormatParserService, error)
func (*BaseStage) GetStageConfig ¶
func (b *BaseStage) GetStageConfig() *StageConfiguration
func (*BaseStage) GetStageContext ¶
func (b *BaseStage) GetStageContext() api.StageContext
func (*BaseStage) Init ¶
func (b *BaseStage) Init(stageContext api.StageContext) []validation.Issue
type BuildInfo ¶
type BuildInfo struct { BuiltBy string `json:"builtBy"` BuiltDate string `json:"builtDate"` BuiltRepoSha string `json:"builtRepoSha"` SourceMd5Checksum string `json:"sourceMd5Checksum"` Version string `json:"version"` }
func NewBuildInfo ¶
type ConfigDefinition ¶
type ErrorSink ¶
type ErrorSink struct {
// contains filtered or unexported fields
}
func NewErrorSink ¶
func NewErrorSink() *ErrorSink
func (*ErrorSink) ClearErrorRecordsAndMessages ¶
func (e *ErrorSink) ClearErrorRecordsAndMessages()
After each batch call this function to clear current batch error messages/records
func (*ErrorSink) GetErrorMessages ¶
func (e *ErrorSink) GetErrorMessages() map[string][]api.ErrorMessage
func (*ErrorSink) GetStageErrorMessages ¶
func (e *ErrorSink) GetStageErrorMessages(stageIns string) []api.ErrorMessage
func (*ErrorSink) GetStageErrorRecords ¶
func (*ErrorSink) GetTotalErrorMessages ¶
func (*ErrorSink) GetTotalErrorRecords ¶
func (*ErrorSink) ReportError ¶
func (e *ErrorSink) ReportError(stageIns string, errorMessage api.ErrorMessage)
type EventSink ¶
type EventSink struct {
// contains filtered or unexported fields
}
func NewEventSink ¶
func NewEventSink() *EventSink
func (*EventSink) ClearEventRecords ¶
func (e *EventSink) ClearEventRecords()
type HeaderImpl ¶
type HeaderImpl struct { StageCreator string `json:"stageCreator"` SourceId string `json:"sourceId"` StagesPath string `json:"stagesPath"` TrackingId string `json:"trackingId"` PreviousTrackingId string `json:"previousTrackingId"` ErrorDataCollectorId string `json:"errorDataCollectorId"` ErrorPipelineName string `json:"errorPipelineName"` ErrorStageInstance string `json:"errorStage"` ErrorMessage string `json:"errorMessage"` ErrorTimestamp int64 `json:"errorTimestamp"` Attributes map[string]interface{} `json:"values"` // contains filtered or unexported fields }
func (*HeaderImpl) GetAttribute ¶
func (h *HeaderImpl) GetAttribute(name string) interface{}
func (*HeaderImpl) GetAttributeNames ¶
func (h *HeaderImpl) GetAttributeNames() []string
func (*HeaderImpl) GetAttributes ¶
func (h *HeaderImpl) GetAttributes() map[string]string
func (*HeaderImpl) GetErrorDataCollectorId ¶
func (h *HeaderImpl) GetErrorDataCollectorId() string
func (*HeaderImpl) GetErrorMessage ¶
func (h *HeaderImpl) GetErrorMessage() string
func (*HeaderImpl) GetErrorPipelineName ¶
func (h *HeaderImpl) GetErrorPipelineName() string
func (*HeaderImpl) GetErrorStage ¶
func (h *HeaderImpl) GetErrorStage() string
func (*HeaderImpl) GetErrorTimestamp ¶
func (h *HeaderImpl) GetErrorTimestamp() int64
func (*HeaderImpl) GetPreviousTrackingId ¶
func (h *HeaderImpl) GetPreviousTrackingId() string
func (*HeaderImpl) GetSourceId ¶
func (h *HeaderImpl) GetSourceId() string
func (*HeaderImpl) GetSourceRecord ¶
func (h *HeaderImpl) GetSourceRecord() api.Record
func (*HeaderImpl) GetStageCreator ¶
func (h *HeaderImpl) GetStageCreator() string
func (*HeaderImpl) GetStagesPath ¶
func (h *HeaderImpl) GetStagesPath() string
func (*HeaderImpl) GetTrackingId ¶
func (h *HeaderImpl) GetTrackingId() string
func (*HeaderImpl) SetAttribute ¶
func (h *HeaderImpl) SetAttribute(name string, value string)
func (*HeaderImpl) SetErrorDataCollectorId ¶
func (h *HeaderImpl) SetErrorDataCollectorId(errorDataCollectorId string)
func (*HeaderImpl) SetErrorMessage ¶
func (h *HeaderImpl) SetErrorMessage(errorMessage string)
func (*HeaderImpl) SetErrorPipelineName ¶
func (h *HeaderImpl) SetErrorPipelineName(errorPipelineName string)
func (*HeaderImpl) SetErrorStageInstance ¶
func (h *HeaderImpl) SetErrorStageInstance(errorStageInstance string)
func (*HeaderImpl) SetErrorTimeStamp ¶
func (h *HeaderImpl) SetErrorTimeStamp(timeStamp int64)
func (*HeaderImpl) SetPreviousTrackingId ¶
func (h *HeaderImpl) SetPreviousTrackingId(previousTrackingId string)
func (*HeaderImpl) SetSourceId ¶
func (h *HeaderImpl) SetSourceId(sourceId string)
func (*HeaderImpl) SetSourceRecord ¶
func (h *HeaderImpl) SetSourceRecord(sourceRecord api.Record)
func (*HeaderImpl) SetStageCreator ¶
func (h *HeaderImpl) SetStageCreator(stageCreator string)
func (*HeaderImpl) SetStagesPath ¶
func (h *HeaderImpl) SetStagesPath(stagesPath string)
func (*HeaderImpl) SetTrackingId ¶
func (h *HeaderImpl) SetTrackingId(trackingId string)
type ModelDefinition ¶
type ModelDefinition struct {
ConfigDefinitionsMap map[string]*ConfigDefinition
}
type PathElement ¶
func CreateListElement ¶
func CreateListElement(idx int) PathElement
func CreateMapElement ¶
func CreateMapElement(name string) PathElement
func ParseFieldPath ¶
func ParseFieldPath(fieldPath string, isSingleQuoteEscaped bool) ([]PathElement, error)
type PipelineConfiguration ¶
type PipelineConfiguration struct { SchemaVersion int `json:"schemaVersion"` Version int `json:"version"` PipelineId string `json:"pipelineId"` Title string `json:"title"` Description string `json:"description"` UUID string `json:"uuid"` Configuration []Config `json:"configuration"` UiInfo map[string]interface{} `json:"uiInfo"` Stages []*StageConfiguration `json:"stages"` ErrorStage *StageConfiguration `json:"errorStage"` TestOriginStage *StageConfiguration `json:"testOriginStage"` StatsAggregatorStage *StageConfiguration `json:"statsAggregatorStage"` Previewable bool `json:"previewable"` Info PipelineInfo `json:"info"` Metadata map[string]interface{} `json:"metadata"` Fragments []*PipelineFragmentConfiguration `json:"fragments"` }
func (*PipelineConfiguration) ProcessFragmentStages ¶
func (p *PipelineConfiguration) ProcessFragmentStages()
type PipelineEnvelope ¶
type PipelineEnvelope struct { PipelineConfig PipelineConfiguration `json:"pipelineConfig"` PipelineRules map[string]interface{} `json:"pipelineRules"` LibraryDefinitions map[string]interface{} `json:"libraryDefinitions"` }
type PipelineFragmentConfiguration ¶
type PipelineFragmentConfiguration struct { SchemaVersion int `json:"schemaVersion"` Version int `json:"version"` PipelineId string `json:"fragmentId"` FragmentInstanceId string `json:"fragmentInstanceId"` Title string `json:"title"` Description string `json:"description"` UUID string `json:"uuid"` Configuration []Config `json:"configuration"` UiInfo map[string]interface{} `json:"uiInfo"` Stages []*StageConfiguration `json:"stages"` TestOriginStage *StageConfiguration `json:"testOriginStage"` Previewable bool `json:"previewable"` Info PipelineInfo `json:"info"` Metadata map[string]interface{} `json:"metadata"` Fragments []*PipelineFragmentConfiguration `json:"fragments"` }
type PipelineInfo ¶
type PipelineInfo struct { PipelineId string `json:"pipelineId"` Title string `json:"title"` Description string `json:"description"` Created int64 `json:"created"` LastModified int64 `json:"lastModified"` Creator string `json:"creator"` LastModifier string `json:"lastModifier"` LastRev string `json:"lastRev"` UUID string `json:"uuid"` Valid bool `json:"valid"` Metadata map[string]interface{} `json:"metadata"` Name string `json:"name"` SdcVersion string `json:"sdcVersion"` SdcId string `json:"sdcId"` }
type PipelineState ¶
type RecordImpl ¶
type RecordImpl struct {
// contains filtered or unexported fields
}
func (*RecordImpl) Clone ¶
func (r *RecordImpl) Clone() api.Record
func (*RecordImpl) GetFieldPaths ¶
func (r *RecordImpl) GetFieldPaths() map[string]bool
func (*RecordImpl) GetHeader ¶
func (r *RecordImpl) GetHeader() api.Header
type RuntimeInfo ¶
type RuntimeInfo struct { ID string BaseDir string HttpUrl string DPMEnabled bool AppAuthToken string }
func NewRuntimeInfo ¶
func NewRuntimeInfo(httpUrl string, baseDir string) (*RuntimeInfo, error)
type ServiceConfiguration ¶
type ServiceConfiguration struct { Service string `json:"service"` ServiceVersion interface{} `json:"serviceVersion"` // interface{} is for backward compatible - SDC-10526 Configuration []Config `json:"configuration"` }
func (ServiceConfiguration) GetConfigurationMap ¶
func (s ServiceConfiguration) GetConfigurationMap() map[string]Config
type ServiceDefinition ¶
type ServiceDefinition struct { Name string Version string ConfigDefinitionsMap map[string]*ConfigDefinition }
type SourceOffset ¶
func GetDefaultOffset ¶
func GetDefaultOffset() SourceOffset
type StageConfiguration ¶
type StageConfiguration struct { InstanceName string `json:"instanceName"` Library string `json:"library"` StageName string `json:"stageName"` StageVersion string `json:"stageVersion"` Configuration []Config `json:"configuration"` Services []*ServiceConfiguration `json:"services"` UiInfo map[string]interface{} `json:"uiInfo"` InputLanes []string `json:"inputLanes"` OutputLanes []string `json:"outputLanes"` EventLanes []string `json:"eventLanes"` }
func (StageConfiguration) GetConfigurationMap ¶
func (s StageConfiguration) GetConfigurationMap() map[string]Config
func (StageConfiguration) GetOutputAndEventLanes ¶
func (s StageConfiguration) GetOutputAndEventLanes() []string
type StageContextImpl ¶
type StageContextImpl struct { StageConfig *StageConfiguration Parameters map[string]interface{} Metrics metrics.Registry ErrorSink *ErrorSink EventSink *EventSink ErrorStage bool ErrorRecordPolicy string Services map[string]api.Service ElContext context.Context // contains filtered or unexported fields }
func NewStageContext ¶
func NewStageContext( stageConfig *StageConfiguration, resolvedParameters map[string]interface{}, metricRegistry metrics.Registry, errorSink *ErrorSink, errorStage bool, errorRecordPolicy string, services map[string]api.Service, elContext context.Context, eventSink *EventSink, isPreview bool, ) (*StageContextImpl, error)
func (*StageContextImpl) CreateConfigIssue ¶
func (s *StageContextImpl) CreateConfigIssue(error string, optional ...interface{}) validation.Issue
optional argument, first optional argument is configGroup, second optional argument- configName
func (*StageContextImpl) CreateEventRecord ¶
func (*StageContextImpl) CreateRecord ¶
func (s *StageContextImpl) CreateRecord(recordSourceId string, value interface{}) (api.Record, error)
func (*StageContextImpl) GetMetrics ¶
func (s *StageContextImpl) GetMetrics() metrics.Registry
func (*StageContextImpl) GetOutputLanes ¶
func (s *StageContextImpl) GetOutputLanes() []string
func (*StageContextImpl) GetParameterValue ¶
func (s *StageContextImpl) GetParameterValue(paramName string) interface{}
func (*StageContextImpl) GetPipelineParameters ¶
func (s *StageContextImpl) GetPipelineParameters() map[string]interface{}
func (*StageContextImpl) GetResolvedValue ¶
func (s *StageContextImpl) GetResolvedValue(configValue interface{}) (interface{}, error)
func (*StageContextImpl) GetService ¶
func (s *StageContextImpl) GetService(serviceName string) (api.Service, error)
func (*StageContextImpl) IsErrorStage ¶
func (s *StageContextImpl) IsErrorStage() bool
func (*StageContextImpl) IsPreview ¶
func (s *StageContextImpl) IsPreview() bool
func (*StageContextImpl) IsStopped ¶
func (s *StageContextImpl) IsStopped() bool
func (*StageContextImpl) ReportError ¶
func (s *StageContextImpl) ReportError(err error)
func (*StageContextImpl) SetStop ¶
func (s *StageContextImpl) SetStop()
func (*StageContextImpl) ToEvent ¶
func (s *StageContextImpl) ToEvent(record api.Record)
type StageDefinition ¶
type StageDefinition struct { Name string Library string Version string ConfigDefinitionsMap map[string]*ConfigDefinition }