Documentation ¶
Index ¶
- type DataFrame
- type DataFrameReader
- type DataFrameWriter
- type GroupedData
- func (gd *GroupedData) Agg(ctx context.Context, exprs ...column.Column) (DataFrame, error)
- func (gd *GroupedData) Avg(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Count(ctx context.Context) (DataFrame, error)
- func (gd *GroupedData) Max(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Mean(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Min(ctx context.Context, cols ...string) (DataFrame, error)
- func (gd *GroupedData) Pivot(ctx context.Context, pivotCol string, pivotValues []any) (*GroupedData, error)
- func (gd *GroupedData) Sum(ctx context.Context, cols ...string) (DataFrame, error)
- type ResultCollector
- type SparkSession
- type SparkSessionBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataFrame ¶
type DataFrame interface { // PlanId returns the plan id of the data frame. PlanId() int64 // Alias creates a new DataFrame with the specified subquery alias Alias(ctx context.Context, alias string) DataFrame // Cache persists the DataFrame with the default storage level. Cache(ctx context.Context) error // Coalesce returns a new DataFrame that has exactly numPartitions partitions.DataFrame // // Similar to coalesce defined on an :class:`RDD`, this operation results in a // narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, // there will not be a shuffle, instead each of the 100 new partitions will // claim 10 of the current partitions. If a larger number of partitions is requested, // it will stay at the current number of partitions. // // However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, // this may result in your computation taking place on fewer nodes than // you like (e.g. one node in the case of numPartitions = 1). To avoid this, // you can call repartition(). This will add a shuffle step, but means the // current upstream partitions will be executed in parallel (per whatever // the current partitioning is). Coalesce(ctx context.Context, numPartitions int) DataFrame // Columns returns the list of column names of the DataFrame. Columns(ctx context.Context) ([]string, error) // Corr calculates the correlation of two columns of a :class:`DataFrame` as a double value. // Currently only supports the Pearson Correlation Coefficient. Corr(ctx context.Context, col1, col2 string) (float64, error) CorrWithMethod(ctx context.Context, col1, col2 string, method string) (float64, error) // Count returns the number of rows in the DataFrame. Count(ctx context.Context) (int64, error) // Cov calculates the sample covariance for the given columns, specified by their names, as a // double value. Cov(ctx context.Context, col1, col2 string) (float64, error) // Collect returns the data rows of the current data frame. Collect(ctx context.Context) ([]types.Row, error) // CreateTempView creates or replaces a temporary view. CreateTempView(ctx context.Context, viewName string, replace, global bool) error // CreateOrReplaceTempView creates or replaces a temporary view and replaces the optional existing view. CreateOrReplaceTempView(ctx context.Context, viewName string) error // CreateGlobalTempView creates a global temporary view. CreateGlobalTempView(ctx context.Context, viewName string) error // CreateOrReplaceGlobalTempView creates or replaces a global temporary view and replaces the optional existing view. CreateOrReplaceGlobalTempView(ctx context.Context, viewName string) error // CrossJoin joins the current DataFrame with another DataFrame using the cross product CrossJoin(ctx context.Context, other DataFrame) DataFrame // CrossTab computes a pair-wise frequency table of the given columns. Also known as a // contingency table. // The first column of each row will be the distinct values of `col1` and the column names // will be the distinct values of `col2`. The name of the first column will be `$col1_$col2`. // Pairs that have no occurrences will have zero as their counts. CrossTab(ctx context.Context, col1, col2 string) DataFrame // Cube creates a multi-dimensional cube for the current DataFrame using // the specified columns, so we can run aggregations on them. Cube(ctx context.Context, cols ...column.Convertible) *GroupedData // Describe omputes basic statistics for numeric and string columns. // This includes count, mean, stddev, min, and max. Describe(ctx context.Context, cols ...string) DataFrame // Distinct returns a new DataFrame containing the distinct rows in this DataFrame. Distinct(ctx context.Context) DataFrame // Drop returns a new DataFrame that drops the specified list of columns. Drop(ctx context.Context, columns ...column.Convertible) (DataFrame, error) // DropByName returns a new DataFrame that drops the specified list of columns by name. DropByName(ctx context.Context, columns ...string) (DataFrame, error) // DropDuplicates returns a new DataFrame that contains only the unique rows from this DataFrame. DropDuplicates(ctx context.Context, columns ...string) (DataFrame, error) // ExceptAll is similar to Substract but does not perform the distinct operation. ExceptAll(ctx context.Context, other DataFrame) DataFrame // Explain returns the string explain plan for the current DataFrame according to the explainMode. Explain(ctx context.Context, explainMode utils.ExplainMode) (string, error) // Filter filters the data frame by a column condition. Filter(ctx context.Context, condition column.Convertible) (DataFrame, error) // FilterByString filters the data frame by a string condition. FilterByString(ctx context.Context, condition string) (DataFrame, error) // Returns the first row of the DataFrame. First(ctx context.Context) (types.Row, error) FreqItems(ctx context.Context, cols ...string) DataFrame FreqItemsWithSupport(ctx context.Context, support float64, cols ...string) DataFrame // GetStorageLevel returns the storage level of the data frame. GetStorageLevel(ctx context.Context) (*utils.StorageLevel, error) // GroupBy groups the DataFrame by the spcified columns so that the aggregation // can be performed on them. See GroupedData for all the available aggregate functions. GroupBy(cols ...column.Convertible) *GroupedData // Head is an alias for Limit Head(ctx context.Context, limit int32) ([]types.Row, error) // Intersect performs the set intersection of two data frames and only returns distinct rows. Intersect(ctx context.Context, other DataFrame) DataFrame // IntersectAll performs the set intersection of two data frames and returns all rows. IntersectAll(ctx context.Context, other DataFrame) DataFrame // IsEmpty returns true if the DataFrame is empty. IsEmpty(ctx context.Context) (bool, error) // Join joins the current DataFrame with another DataFrame using the specified column using the joinType specified. Join(ctx context.Context, other DataFrame, on column.Convertible, joinType utils.JoinType) (DataFrame, error) // Limit applies a limit on the DataFrame Limit(ctx context.Context, limit int32) DataFrame // Offset returns a new DataFrame by skipping the first `offset` rows. Offset(ctx context.Context, offset int32) DataFrame // OrderBy is an alias for Sort OrderBy(ctx context.Context, columns ...column.Convertible) (DataFrame, error) Persist(ctx context.Context, storageLevel utils.StorageLevel) error RandomSplit(ctx context.Context, weights []float64) ([]DataFrame, error) // Repartition re-partitions a data frame. Repartition(ctx context.Context, numPartitions int, columns []string) (DataFrame, error) // RepartitionByRange re-partitions a data frame by range partition. RepartitionByRange(ctx context.Context, numPartitions int, columns ...column.Convertible) (DataFrame, error) // Rollup creates a multi-dimensional rollup for the current DataFrame using // the specified columns, so we can run aggregation on them. Rollup(ctx context.Context, cols ...column.Convertible) *GroupedData // SameSemantics returns true if the other DataFrame has the same semantics. SameSemantics(ctx context.Context, other DataFrame) (bool, error) // Show uses WriteResult to write the data frames to the console output. Show(ctx context.Context, numRows int, truncate bool) error // Schema returns the schema for the current data frame. Schema(ctx context.Context) (*types.StructType, error) // Select projects a list of columns from the DataFrame Select(ctx context.Context, columns ...column.Convertible) (DataFrame, error) // SelectExpr projects a list of columns from the DataFrame by string expressions SelectExpr(ctx context.Context, exprs ...string) (DataFrame, error) // SemanticHash returns the semantic hash of the data frame. The semantic hash can be used to // understand of the semantic operations are similar. SemanticHash(ctx context.Context) (int32, error) // Sort returns a new DataFrame sorted by the specified columns. Sort(ctx context.Context, columns ...column.Convertible) (DataFrame, error) // Subtract subtracts the other DataFrame from the current DataFrame. And only returns // distinct rows. Subtract(ctx context.Context, other DataFrame) DataFrame // Summary computes the specified statistics for the current DataFrame and returns it // as a new DataFrame. Available statistics are: "count", "mean", "stddev", "min", "max" and // arbitrary percentiles specified as a percentage (e.g., "75%"). If no statistics are given, // this function computes "count", "mean", "stddev", "min", "25%", "50%", "75%", "max". Summary(ctx context.Context, statistics ...string) DataFrame // Tail returns the last `limit` rows as a list of Row. Tail(ctx context.Context, limit int32) ([]types.Row, error) // Take is an alias for Limit Take(ctx context.Context, limit int32) ([]types.Row, error) // ToArrow returns the Arrow representation of the DataFrame. ToArrow(ctx context.Context) (*arrow.Table, error) // Union is an alias for UnionAll Union(ctx context.Context, other DataFrame) DataFrame // UnionAll returns a new DataFrame containing union of rows in this and another DataFrame. UnionAll(ctx context.Context, other DataFrame) DataFrame // UnionByName performs a SQL union operation on two dataframes but reorders the schema // according to the matching columns. If columns are missing, it will throw an eror. UnionByName(ctx context.Context, other DataFrame) DataFrame // UnionByNameWithMissingColumns performs a SQL union operation on two dataframes but reorders the schema // according to the matching columns. Missing columns are supported. UnionByNameWithMissingColumns(ctx context.Context, other DataFrame) DataFrame // Unpersist resets the storage level for this data frame, and if necessary removes it // from server-side caches. Unpersist(ctx context.Context) error // WithColumn returns a new DataFrame by adding a column or replacing the // existing column that has the same name. The column expression must be an // expression over this DataFrame; attempting to add a column from some other // DataFrame will raise an error. // // Note: This method introduces a projection internally. Therefore, calling it multiple // times, for instance, via loops in order to add multiple columns can generate big // plans which can cause performance issues and even `StackOverflowException`. // To avoid this, use :func:`select` with multiple columns at once. WithColumn(ctx context.Context, colName string, col column.Convertible) (DataFrame, error) WithColumns(ctx context.Context, alias ...column.Alias) (DataFrame, error) // WithColumnRenamed returns a new DataFrame by renaming an existing column. // This is a no-op if the schema doesn't contain the given column name. WithColumnRenamed(ctx context.Context, existingName, newName string) (DataFrame, error) // WithColumnsRenamed returns a new DataFrame by renaming multiple existing columns. WithColumnsRenamed(ctx context.Context, colsMap map[string]string) (DataFrame, error) // WithMetadata returns a new DataFrame with the specified metadata for each of the columns. WithMetadata(ctx context.Context, metadata map[string]string) (DataFrame, error) WithWatermark(ctx context.Context, eventTime string, delayThreshold string) (DataFrame, error) Where(ctx context.Context, condition string) (DataFrame, error) // Writer returns a data frame writer, which could be used to save data frame to supported storage. Writer() DataFrameWriter // Write is an alias for Writer // Deprecated: Use Writer Write() DataFrameWriter // WriteResult streams the data frames to a result collector WriteResult(ctx context.Context, collector ResultCollector, numRows int, truncate bool) error }
DataFrame is a wrapper for data frame, representing a distributed collection of data row.
func NewDataFrame ¶
NewDataFrame creates a new DataFrame
type DataFrameReader ¶
type DataFrameReader interface { // Format specifies data format (data source type) for the underlying data, e.g. parquet. Format(source string) DataFrameReader // Load reads the underlying data and returns a data frame. Load(path string) (DataFrame, error) // Reads a table from the underlying data source. Table(name string) (DataFrame, error) Option(key, value string) DataFrameReader }
DataFrameReader supports reading data from storage and returning a data frame. TODO needs to implement other methods like Option(), Schema(), and also "strong typed" reading (e.g. Parquet(), Orc(), Csv(), etc.
func NewDataframeReader ¶
func NewDataframeReader(session *sparkSessionImpl) DataFrameReader
NewDataframeReader creates a new DataFrameReader
type DataFrameWriter ¶
type DataFrameWriter interface { // Mode specifies saving mode for the data, e.g. Append, Overwrite, ErrorIfExists. Mode(saveMode string) DataFrameWriter // Format specifies data format (data source type) for the underlying data, e.g. parquet. Format(source string) DataFrameWriter // Save writes data frame to the given path. Save(ctx context.Context, path string) error }
DataFrameWriter supports writing data frame to storage.
type GroupedData ¶
type GroupedData struct {
// contains filtered or unexported fields
}
func (*GroupedData) Agg ¶
Agg compute aggregates and returns the result as a DataFrame. The aggegrate expressions are passed as column.Column arguments.
func (*GroupedData) Count ¶
func (gd *GroupedData) Count(ctx context.Context) (DataFrame, error)
Count Computes the count value for each group.
func (*GroupedData) Pivot ¶
func (gd *GroupedData) Pivot(ctx context.Context, pivotCol string, pivotValues []any) (*GroupedData, error)
type ResultCollector ¶
type ResultCollector interface { // WriteRow receives a single row from the data frame WriteRow(values []any) }
ResultCollector receives a stream of result rows
type SparkSession ¶
type SparkSession interface { Read() DataFrameReader Sql(ctx context.Context, query string) (DataFrame, error) Stop() error Table(name string) (DataFrame, error) CreateDataFrameFromArrow(ctx context.Context, data arrow.Table) (DataFrame, error) CreateDataFrame(ctx context.Context, data [][]any, schema *types.StructType) (DataFrame, error) Config() client.RuntimeConfig }
type SparkSessionBuilder ¶
type SparkSessionBuilder struct {
// contains filtered or unexported fields
}
func NewSessionBuilder ¶
func NewSessionBuilder() *SparkSessionBuilder
NewSessionBuilder creates a new session builder for starting a new spark session
func (*SparkSessionBuilder) Build ¶
func (s *SparkSessionBuilder) Build(ctx context.Context) (SparkSession, error)
func (*SparkSessionBuilder) Remote ¶
func (s *SparkSessionBuilder) Remote(connectionString string) *SparkSessionBuilder
Remote sets the connection string for remote connection
func (*SparkSessionBuilder) WithChannelBuilder ¶
func (s *SparkSessionBuilder) WithChannelBuilder(cb channel.Builder) *SparkSessionBuilder