Documentation ¶
Overview ¶
Package ingest provides data ingestion from various external sources into Kusto.
For more information on Kusto Data Ingestion, please see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
Create a client ¶
Creating a client simply requires a *kusto.Client, the name of the database and the name of the table to be ingested into.
in, err := ingest.New(kustoClient, "database", "table") if err != nil { panic("add error handling") }
Ingestion from a local file ¶
Ingesting a local file requires simply passing the path to the file to be ingested:
if _, err := in.FromFile(ctx, "/path/to/a/local/file"); err != nil { panic("add error handling") }
FromFile() will accept Unix path names on Unix platforms and Windows path names on Windows platforms. The file will not be deleted after upload (there is an option that will allow that though).
Ingestion from an Azure Blob Storage file ¶
This package will also accept ingestion from an Azure Blob Storage file:
if _, err := in.FromFile(ctx, "https://myaccount.blob.core.windows.net/$root/myblob"); err != nil { panic("add error handling") }
This will ingest a file from Azure Blob Storage. We only support https:// paths and your domain name may differ than what is here.
Ingestion from an io.Reader ¶
Sometimes you want to ingest a stream of data that you have in memory without writing to disk. You can do this simply by chunking the data via an io.Reader.
r, w := io.Pipe() enc := json.NewEncoder(w) go func() { defer w.Close() for _, data := range dataSet { if err := enc.Encode(data); err != nil { panic("add error handling") } } }() if _, err := in.FromReader(ctx, r); err != nil { panic("add error handling") }
It is important to remember that FromReader() will terminate when it receives an io.EOF from the io.Reader. Use io.Readers that won't return io.EOF until the io.Writer is closed (such as io.Pipe).
Ingestion from a Stream ¶
Instestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(ctx , jsonEncodedData, ingest.JSON, "mappingName"); err != nil { panic("add error handling") }
Ingestion with Status Reporting ¶
You can use Kusto Go SDK to get table-based status reporting of ingestion operations. Ingestion commands run using FromFile() and FromReader() return an error and a channel that can be waited upon for a final status. If the error is not nil, the operation has failed locally. If the error is nil and Table Status Reporting option was used, the SDK user can wait on the channel for a success (nil) or failure (Error) status.
Note! This feature is not suitable for users running ingestion at high rates, and may slow down the ingestion operation.
status, err := ingestor.FromFile(ctx, "/path/to/file", ingest.ReportResultToTable()) if err != nil { // The ingestion command failed to be sent, Do something } err = <-status.Wait(ctx) if err != nil { // the operation complete with an error if ingest.IsRetryable(err) { // Handle retries } else { // inspect the failure // statusCode, _ := ingest.GetIngestionStatus(err) // failureStatus, _ := ingest.GetIngestionFailureStatus(err) } }
Index ¶
- Constants
- Variables
- func GetErrorCode(err error) (string, error)
- func IsRetryable(err error) bool
- func IsStatusRecord(err error) bool
- func StatusFromMapForTests(data map[string]interface{}) error
- type ClientScope
- type DataFormat
- type FailureStatusCode
- type FileOption
- func ClientRequestId(clientRequestId string) FileOption
- func Database(name string) FileOption
- func DeleteSource() FileOption
- func DontCompress() FileOption
- func FileFormat(et DataFormat) FileOption
- func FlushImmediately() FileOption
- func IfNotExists(ingestByTag string) FileOption
- func IgnoreSizeLimit() FileOption
- func IngestionMapping(mapping interface{}, mappingKind DataFormat) FileOption
- func IngestionMappingRef(refName string, mappingKind DataFormat) FileOption
- func ReportResultToTable() FileOption
- func SetCreationTime(t time.Time) FileOption
- func Table(name string) FileOption
- func Tags(tags []string) FileOption
- func ValidationPolicy(policy ValPolicy) FileOption
- type Ingestion
- func (i *Ingestion) Close() error
- func (i *Ingestion) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)
- func (i *Ingestion) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)
- func (i *Ingestion) Stream(ctx context.Context, payload []byte, format DataFormat, mappingName string) errordeprecated
- type Ingestor
- type Managed
- type Option
- type QueryClient
- type Result
- type SourceScope
- type StatusCode
- type Streaming
- type ValPolicy
- type ValidationImplication
- type ValidationOption
Examples ¶
Constants ¶
const ( FromFile SourceScope = 1 << iota FromReader FromBlob QueuedClient ClientScope = 1 << iota StreamingClient ManagedClient )
Variables ¶
var FileIsBlobErr = errors.ES(errors.OpIngestStream, errors.KClientArgs, "blobstore paths are not supported for streaming")
Functions ¶
func GetErrorCode ¶ added in v0.6.0
GetErrorCode extracts the error code from an ingestion error
func IsRetryable ¶ added in v0.3.0
IsRetryable indicates whether there's any merit in retying ingestion
func IsStatusRecord ¶ added in v0.6.0
IsStatusRecord verifies that the given error is a status record.
func StatusFromMapForTests ¶ added in v0.6.0
StatusFromMapForTests converts an ingestion status record to a key value map. This is useful for comparison in tests.
Types ¶
type ClientScope ¶ added in v0.6.0
type ClientScope uint
func (ClientScope) String ¶ added in v0.6.0
func (s ClientScope) String() string
type DataFormat ¶
type DataFormat = properties.DataFormat
DataFormat indicates what type of encoding format was used for source data. Not all options can be used in every method.
const ( // DFUnknown indicates the EncodingType is not set. DFUnknown DataFormat = properties.DFUnknown // AVRO indicates the source is encoded in Apache Avro format. AVRO DataFormat = properties.AVRO // ApacheAVRO indicates the source is encoded in Apache avro2json format. ApacheAVRO DataFormat = properties.ApacheAVRO // CSV indicates the source is encoded in comma seperated values. CSV DataFormat = properties.CSV // JSON indicates the source is encoded as one or more lines, each containing a record in Javascript Object Notation. JSON DataFormat = properties.JSON // MultiJSON indicates the source is encoded in JSON-Array of individual records in Javascript Object Notation. Optionally, //multiple documents can be concatenated. MultiJSON DataFormat = properties.MultiJSON // ORC indicates the source is encoded in Apache Optimized Row Columnar format. ORC DataFormat = properties.ORC // Parquet indicates the source is encoded in Apache Parquet format. Parquet DataFormat = properties.Parquet // PSV is pipe "|" separated values. PSV DataFormat = properties.PSV // Raw is a text file that has only a single string value. Raw DataFormat = properties.Raw // SCSV is a file containing semicolon ";" separated values. SCSV DataFormat = properties.SCSV // SOHSV is a file containing SOH-separated values(ASCII codepoint 1). SOHSV DataFormat = properties.SOHSV // SStream indicates the source is encoded as a Microsoft Cosmos Structured Streams format SStream DataFormat = properties.SStream // TSV is a file containing tab seperated values ("\t"). TSV DataFormat = properties.TSV // TSVE is a file containing escaped-tab seperated values ("\t"). TSVE DataFormat = properties.TSVE // TXT is a text file with lines ending with "\n". TXT DataFormat = properties.TXT // W3CLogFile indicates the source is encoded using W3C Extended Log File format W3CLogFile DataFormat = properties.W3CLogFile // SingleJSON indicates the source is a single JSON value -- newlines are regular whitespace. SingleJSON DataFormat = properties.SingleJSON )
note: any change here needs to be kept up to date with the properties version. I'm not a fan of having two copies, but I don't think it is worth moving to its own package to allow properties and ingest to both import without a cycle.
type FailureStatusCode ¶ added in v0.3.0
type FailureStatusCode string
FailureStatusCode indicates the status of failuted ingestion attempts
const ( // Unknown represents an undefined or unset failure state Unknown FailureStatusCode = "Unknown" // Permanent represnets failure state that will benefit from a retry attempt Permanent FailureStatusCode = "Permanent" // Transient represnet a retryable failure state Transient FailureStatusCode = "Transient" // Exhausted represents a retryable failure that has exhusted all retry attempts Exhausted FailureStatusCode = "Exhausted" )
func GetIngestionFailureStatus ¶ added in v0.3.0
func GetIngestionFailureStatus(err error) (FailureStatusCode, error)
GetIngestionFailureStatus extracts the ingestion failure code from an ingestion error
func (FailureStatusCode) IsRetryable ¶ added in v0.3.0
func (i FailureStatusCode) IsRetryable() bool
IsRetryable indicates whether there's any merit in retying ingestion
type FileOption ¶
type FileOption interface { fmt.Stringer SourceScopes() SourceScope ClientScopes() ClientScope Run(p *properties.All, clientType ClientScope, sourceType SourceScope) error }
FileOption is an optional argument to FromFile().
func ClientRequestId ¶ added in v0.6.0
func ClientRequestId(clientRequestId string) FileOption
ClientRequestId is an identifier for the ingestion, that can later be queried.
func Database ¶ added in v0.6.0
func Database(name string) FileOption
Database overrides the default database name.
func DeleteSource ¶
func DeleteSource() FileOption
DeleteSource deletes the source file from when it has been uploaded to Kusto.
func DontCompress ¶ added in v0.6.0
func DontCompress() FileOption
DontCompress sets whether to compress the data.
func FileFormat ¶
func FileFormat(et DataFormat) FileOption
FileFormat can be used to indicate what type of encoding is supported for the file. This is only needed if the file extension is not present. A file like: "input.json.gz" or "input.json" does not need this option, while "input" would. If an ingestion mapping is specified, there is no need to specify the file format.
func FlushImmediately ¶
func FlushImmediately() FileOption
FlushImmediately tells Kusto to flush on write.
func IfNotExists ¶
func IfNotExists(ingestByTag string) FileOption
IfNotExists provides a string value that, if specified, prevents ingestion from succeeding if the table already has data tagged with an ingest-by: tag with the same value. This ensures idempotent data ingestion. For more information see: https://docs.microsoft.com/en-us/azure/kusto/management/extents-overview#ingest-by-extent-tags
func IgnoreSizeLimit ¶
func IgnoreSizeLimit() FileOption
IgnoreSizeLimit ignores the size limit for data ingestion.
func IngestionMapping ¶
func IngestionMapping(mapping interface{}, mappingKind DataFormat) FileOption
IngestionMapping provides runtime mapping of the data being imported to the fields in the table. "ref" will be JSON encoded, so it can be any type that can be JSON marshalled. If you pass a string or []byte, it will be interpreted as already being JSON encoded. mappingKind can only be: CSV, JSON, AVRO, Parquet or ORC. The mappingKind parameter will also automatically set the FileFormat option.
func IngestionMappingRef ¶
func IngestionMappingRef(refName string, mappingKind DataFormat) FileOption
IngestionMappingRef provides the name of a pre-created mapping for the data being imported to the fields in the table. mappingKind can only be: CSV, JSON, AVRO, Parquet or ORC. For more details, see: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command The mappingKind parameter will also automatically set the FileFormat option.
func ReportResultToTable ¶ added in v0.3.0
func ReportResultToTable() FileOption
ReportResultToTable option requests that the ingestion status will be tracked in an Azure table. Note using Table status reporting is not recommended for high capacity ingestions, as it could slow down the ingestion. In such cases, it's recommended to enable it temporarily for debugging failed ingestions.
func SetCreationTime ¶ added in v0.4.0
func SetCreationTime(t time.Time) FileOption
SetCreationTime option allows the user to override the data creation time the retention policies are considered against If not set the data creation time is considered to be the time of ingestion
func Table ¶ added in v0.6.0
func Table(name string) FileOption
Table overrides the default table name.
func Tags ¶
func Tags(tags []string) FileOption
Tags are tags to be associated with the ingested ata.
func ValidationPolicy ¶
func ValidationPolicy(policy ValPolicy) FileOption
ValidationPolicy uses a ValPolicy to set our ingestion data validation policy. If not set, no validation policy is used. For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
type Ingestion ¶
type Ingestion struct {
// contains filtered or unexported fields
}
Ingestion provides data ingestion from external sources into Kusto.
func New ¶
func New(client QueryClient, db, table string, options ...Option) (*Ingestion, error)
New is a constructor for Ingestion.
func (*Ingestion) FromFile ¶
func (i *Ingestion) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)
FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path. This method is thread-safe.
Example ¶
package main import ( "context" "time" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/ingest" ) func main() { var err error kcsb := kusto.NewConnectionStringBuilder(`endpoint`).WithAadAppKey("clientID", "clientSecret", "tenentID") client, err := kusto.New(kcsb) if err != nil { // Do something } // Be sure to close the client when you're done. (Error handling omitted for brevity.) defer client.Close() ingestor, err := ingest.New(client, "database", "table") if err != nil { // Do something } // Closing the ingestor will not close the client (since the client may be used separately), //but it is still important to close the ingestor when you're done. defer ingestor.Close() // Setup a maximum time for completion to be 10 minutes. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() // Upload our file WITHOUT status reporting. // When completed, delete the file on local storage we are uploading. _, err = ingestor.FromFile(ctx, "/path/to/file", ingest.DeleteSource()) if err != nil { // The ingestion command failed to be sent, Do something } // Upload our file WITH status reporting. // When completed, delete the file on local storage we are uploading. status, err := ingestor.FromFile(ctx, "/path/to/file", ingest.DeleteSource(), ingest.ReportResultToTable()) if err != nil { // The ingestion command failed to be sent, Do something } err = <-status.Wait(ctx) if err != nil { // the operation complete with an error if ingest.IsRetryable(err) { // Handle reties } else { // inspect the failure // statusCode, _ := ingest.GetIngestionStatus(err) // failureStatus, _ := ingest.GetIngestionFailureStatus(err) } } }
Output:
func (*Ingestion) FromReader ¶ added in v0.2.0
func (i *Ingestion) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)
FromReader allows uploading a data file for Kusto from an io.Reader. The content is uploaded to Blobstore and ingested after all data in the reader is processed. Content should not use compression as the content will be compressed with gzip. This method is thread-safe.
func (*Ingestion) Stream
deprecated
func (i *Ingestion) Stream(ctx context.Context, payload []byte, format DataFormat, mappingName string) error
Deprecated: Stream usea streaming ingest client instead - `ingest.NewStreaming`. takes a payload that is encoded in format with a server stored mappingName, compresses it and uploads it to Kusto. More information can be found here: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command The context object can be used with a timeout or cancel to limit the request time.
type Managed ¶ added in v0.6.0
type Managed struct {
// contains filtered or unexported fields
}
func NewManaged ¶ added in v0.6.0
func NewManaged(client QueryClient, db, table string, options ...Option) (*Managed, error)
NewManaged is a constructor for Managed.
func (*Managed) FromReader ¶ added in v0.6.0
type Option ¶ added in v0.5.0
type Option func(s *Ingestion)
Option is an optional argument to New().
func WithStaticBuffer ¶ added in v0.5.0
WithStaticBuffer configures the ingest client to upload data to Kusto using a set of one or more static memory buffers with a fixed size.
type QueryClient ¶ added in v0.5.1
type QueryClient interface { io.Closer Auth() kusto.Authorization Endpoint() string Query(ctx context.Context, db string, query kusto.Stmt, options ...kusto.QueryOption) (*kusto.RowIterator, error) Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) HttpClient() *http.Client ClientDetails() *kusto.ClientDetails }
type Result ¶ added in v0.3.0
type Result struct {
// contains filtered or unexported fields
}
Result provides a way for users track the state of ingestion jobs.
type SourceScope ¶ added in v0.6.0
type SourceScope uint
func (SourceScope) String ¶ added in v0.6.0
func (s SourceScope) String() string
type StatusCode ¶ added in v0.3.0
type StatusCode string
StatusCode is the ingestion status
const ( // Pending status represents a temporary status. // Might change during the course of ingestion based on the // outcome of the data ingestion operation into Kusto. Pending StatusCode = "Pending" // Succeeded status represents a permanent status. // The data has been successfully ingested to Kusto. Succeeded StatusCode = "Succeeded" // Failed Status represents a permanent status. // The data has not been ingested to Kusto. Failed StatusCode = "Failed" // Queued status represents a permanent status. // The data has been queued for ingestion & status tracking was not requested. // (This does not indicate that the ingestion was successful.) Queued StatusCode = "Queued" // Skipped status represents a permanent status. // No data was supplied for ingestion. The ingest operation was skipped. Skipped StatusCode = "Skipped" // PartiallySucceeded status represents a permanent status. // Part of the data was successfully ingested to Kusto, while other parts failed. PartiallySucceeded StatusCode = "PartiallySucceeded" // StatusRetrievalFailed means the client ran into truble reading the status from the service StatusRetrievalFailed StatusCode = "StatusRetrievalFailed" // StatusRetrievalCanceled means the user canceld the status check StatusRetrievalCanceled StatusCode = "StatusRetrievalCanceled" )
func GetIngestionStatus ¶ added in v0.3.0
func GetIngestionStatus(err error) (StatusCode, error)
GetIngestionStatus extracts the ingestion status code from an ingestion error
func (StatusCode) IsFinal ¶ added in v0.3.0
func (i StatusCode) IsFinal() bool
IsFinal returns true if the ingestion status is a final status, or false if the status is temporary
func (StatusCode) IsSuccess ¶ added in v0.3.0
func (i StatusCode) IsSuccess() bool
IsSuccess returns true if the status code is a final successfull status code
type Streaming ¶ added in v0.6.0
type Streaming struct {
// contains filtered or unexported fields
}
Streaming provides data ingestion from external sources into Kusto.
func NewStreaming ¶ added in v0.6.0
func NewStreaming(client QueryClient, db, table string) (*Streaming, error)
NewStreaming is the constructor for Streaming. More information can be found here: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command
func (*Streaming) FromFile ¶ added in v0.6.0
func (i *Streaming) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)
FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path. This method is thread-safe.
func (*Streaming) FromReader ¶ added in v0.6.0
func (i *Streaming) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)
FromReader allows uploading a data file for Kusto from an io.Reader. The content is uploaded to Blobstore and ingested after all data in the reader is processed. Content should not use compression as the content will be compressed with gzip. This method is thread-safe.
type ValPolicy ¶
type ValPolicy struct { // Options provides an option that will flag data that does not validate. Options ValidationOption `json:"ValidationOptions"` // Implications sets what to do when a policy option is violated. Implications ValidationImplication `json:"ValidationImplications"` }
ValPolicy sets a policy for validating data as it is sent for ingestion. For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
type ValidationImplication ¶
type ValidationImplication int8
ValidationImplication is a setting used to indicate what to do when a Validation Policy is violated. These are defined as constants within this package.
const ( // FailIngestion indicates that any violation of the ValidationPolicy will cause the entire ingestion to fail. FailIngestion ValidationImplication = 0 // IgnoreFailures indicates that failure of the ValidationPolicy will be ignored. IgnoreFailures ValidationImplication = 1 )
type ValidationOption ¶
type ValidationOption int8
ValidationOption is an an option for validating the ingestion input data. These are defined as constants within this package.
const ( // VOUnknown indicates that a ValidationOption was not set. VOUnknown ValidationOption = 0 // SameNumberOfFields indicates that all records ingested must have the same number of fields. SameNumberOfFields ValidationOption = 1 // IgnoreNonDoubleQuotedFields indicates that fields that do not have double quotes should be ignored. IgnoreNonDoubleQuotedFields ValidationOption = 2 )
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
gzip
Package gzip provides a streaming object for taking in io.ReadCloser that is being written to and providing an io.ReadCloser that outputs the original content gzip compressed.
|
Package gzip provides a streaming object for taking in io.ReadCloser that is being written to and providing an io.ReadCloser that outputs the original content gzip compressed. |
properties
Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto based upon the type of ingestion we are doing.
|
Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto based upon the type of ingestion we are doing. |
queued
Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems such as local storage or blobstore.
|
Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems such as local storage or blobstore. |
resources
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods.
|
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods. |