SeccoSQL (Separate communication from computation) is an experimental distributed SQL engine on Spark for processing complex SQL/Graph queries. It adopts the new communication/computation separated optimization framework proposed in "Parallel Query Processing: To Separate Communication from Computation (SIGMOD 2022)", and many other state-of-the-art query optimization/execution techniques (e.g., GHD-based join optimization, multi-way shuffle, multi-way join, etc).
- Overview
- QuickStart
- Dependency
- Installation
- Reference
- Contact
SeccoSQL is a new distributed SQL engine for processing complex SQL/Graph queries. A full description is in our manuscript. A brief overview is as follows.
In the existing SQL engine , it optimizes an SQL query by arranging relational algebra operators to reduce the total cost, where, for each operator, it involves (i) distribution of data partitioned to computing nodes by communication, and (ii) computation on computing nodes locally. That kinds of parallelization is also called intra-operator parallelism, where the communication and computation are dealt with inside an operator and are not separable. It is worth noting that the optimizer reason about communication and computation implicitly, and it is difficult to avoid large intermediate results and hence reduce the communication cost.
In SeccoSQL, we deal with communication and computation explicitly. To do so, we separate communication from computation using several new operators proposed in this paper.
- pair operator (⊗): pair the partitions of a relation 𝑅 with the partitions of a relation 𝑆, where a partition is specified by a hash function.
- merge operator (
$\tilde{\cup}$ ): collect all partial results from computing nodes as they are. - local computation operator (
$\tilde{op}$ ): does local computation for pairs on a computing node as$\tilde{op}$ for any RA (relational algebra) operator op.
With the pair operator defined, we can explicitly deal with communication to deliver pairs of partitions to computing nodes. With local computation defined, we can perform local computation on pairs.
In short, with pair, merge and local computation, we are able to explicitly specify communication and computation for RA operators. And, with communication and computation being explicitly specified, we make it possible to “move” the communication and computation inside the RA plan in a very flexible way.
Beyond that SeccoSQL leverages latest research on query optimization and query execution, such as GHD-based join optimization, Multiway Shuffle, Multiway Join, etc.
SeccoSQL is built on Spark and is designed to run as a library just like SparkSQL, but intended for complex SQL/Graph queries, where SparkSQL cannot perform well. Behind the scenes, SeccoSQL have a very similar structure to SparkSQL.
- Parser
- Expression
- Catalog
- Codegen
- Optimizer
- Planner
- Storage
SeccoSQL support multiple ways to query. It support SQL and Cypher to query the data in a declarive ways. Also, it support DataFrame, which allow user to construct the query gradually. Note SeccoSQL is mainly used as a experimental platform for testing new ideas, the languages support for SQL and Cypher are limited. Only basic SQL and Cypher support are expected. With the parser query, SeccoSQL optimizes query in two steps, it first optimize the query as an RA expression, then on top of that, it perform the separation, and does the reordering of communication and computation to further optimize the query.
Note: SeccoSQL is very unstable and contains many bugs right now. It will improve gradually. For benchmark purpose, you can check Secco-SIGMOD, which is the Secco version that comes with benchmark mentioned in the SIGMOD paper. It has limited capability, but it is much more stable.
The main object in Secco to manipulate is Dataset
, which just like the Dataset
in SparkSQL
. In Dataset
, it defines relational algebra operators (e.g., select, project, join) that transforms the dataset.
The main entry of Secco is SeccoSession, where you can create the Dataset
, register Dataset
in Catalog
, get Dataset
from Catalog
, and issuse SQL
query.
An example is shown below.
// Obtain SeccoSession via singleton.
val dlSession = SeccoSession.currentSession
/** --- SQL API --- */
val sqlString =
"""
|select A, B
|from R1
|where A < B
|""".stripMargin
val ds = dlSession.sql(sqlString)
/** --- DataFrame API --- */
// get dataset R1.
val ds1 = dlSession.table("R1")
// Construct RA expression via Dataframe like API.
val ds2 = ds1.select("A < B")
// Explain the query execution of ds1 and ds2. It will show parsed plan, analyzed plan, optimized plan, execution plan.
ds1.explain()
ds2.explain()
For more usage, please check class org.apache.spark.secco.SeccoSession
and org.apache.spark.secco.Dataset
, there contains comments for guiding you using the system. We recommand you using the Dataset
api instead of SQL
api, as it currently have some bugs, and we disable it for now.
The dependency are handled by the build.sbt in SeccoSQL. If you want to add SeccoSQL to your own project, you may need to include following lines to your SBT file.
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.7" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.7" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-graphx" % "2.4.7" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.7" % "provided"
//libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.2"
// Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.3" % "test"
// Util
libraryDependencies += "net.sf.trove4j" % "trove4j" % "3.0.3"
libraryDependencies += "it.unimi.dsi" % "fastutil" % "8.1.0"
// Math Optimizer
libraryDependencies += "com.joptimizer" % "joptimizer" % "5.0.0"
// Graph Processing
libraryDependencies += "org.jgrapht" % "jgrapht-core" % "1.3.0"
// Args Parsing
libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.0-RC2"
// Configuration
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
// Better Printing
libraryDependencies += "com.lihaoyi" %% "pprint" % "0.5.4"
// Metering
libraryDependencies += "com.storm-enroute" %% "scalameter-core" % "0.7"
Currently, you can download the project, add the SeccoSQL to your project, and add dependency in your SBT file.
In the future, SeccoSQL will be published to maven, which allows you do import SeccoSQL by one line such as libraryDependencies += "secco" %% "SeccoSQL" % "0.1-alpha"
We give a reference list of new query optimization and query execution techniques implemented in SeccoSQL. Also, we explain why we used such techniques.
Why separate communication from computation?
Separate communication from computation helps us reorder communication and computation in a fine grained ways, which allows to delay the generation of large intermediate results.
Why GHD-based Join Optimization?
GHD-based join optimization is very effective for complex queries whose hypergraph of join contains cycle. Such complex queries occurs frequently in graph queries.
Why Aggregation Push-Down over GHD?
By pushing aggregation down, it further reduce the intermediate results generated.
Why Worst-case Optimal Join?
Worst-case optimal join is a new kinds of join algorithm that is very effective at handling join query that contains cycle.
Why does caching in worst-case optimal join?
Worst-case optimal join incurs duplicate computation sometimes. By caching, it can further improve speed of worst-case optimal join in average cases.
Why HyperCube Shuffle?
HyperCube shuffle is a ways of shuffle multiple relations at a time, which is also worst-case optimal.
Here is a road map for SeccoSQL in the short term.
- Fix bugs in the optimizer when handling equi-joins
- Fix the bugs in codegen
- Publish SeccoSQL to maven