JDBC in Spark SQL
Apache Spark has very powerful built-in API for gathering data from a relational database. Effectiveness and efficiency, following the usual Spark approach, is managed in a transparent way.
The two basic concepts we have to know when dealing in such scenarios are
Dataset: Dataset API provides a type-safe, object-oriented programming interface.Datasets provide compile-time type safety—which means that production applications can be checked for errors before they are run—and they allow direct operations over user-defined classes
DataFrame:DataFrame
is an alias for an untyped Dataset [Row]
. DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood
Being conceptually similar to a table in a relational database, the Dataset is the structure that will hold our RDBMS data:
val dataset = sparkSession.read.jdbc(…);
Here’s the parameters description:
url: JDBC database url of the form jdbc:subprotocol:subname.
table: Name of the table in the external database.
columnName: the name of a column of integral type that will be used for partitioning.
lowerBound: the minimum value of columnName used to decide partition stride.
upperBound: the maximum value of columnName used to decide partition stride.
numPartitions: the number of partitions. This, along with lowerBound (inclusive), upperBound(exclusive), form partition strides for generated WHERE clause expressions used to split the columncolumnName` evenly. When the input is less than 1, the number is set to 1.
connectionProperties: JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least a “user” and “password” property should be included. “fetchsize” can be used to control the number of rows per fetch.
DataframeReader provides various flavor of the JDBC connection:
Read data from JDBC
The first reading option loads data from a database table. This uses a single JDBC connection to pull the table into the Spark environment.
def jdbc(url: String, table: String, properties: Properties)
val jdbcHostname = "<hostname>" val jdbcPort = 3306 val jdbcDatabase = "<database>" // Create the JDBC URL without passing in the user and password parameters. val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}" // Create a Properties() object to hold the parameters. import java.util.Properties val connectionProperties = new Properties() val jdbcUsername = "" val jdbcPassword = "" connectionProperties.put("user", s"$jdbcUsername") connectionProperties.put("password", s"$jdbcPassword") val driverClass = "com.mysql.jdbc.Driver" connectionProperties.setProperty("Driver", driverClass) val employees_table = sparkSession.read.jdbc(jdbcUrl, "employees", connectionProperties)
Configure partitioning in Spark SQL
Correctly balanced partitions help to improve application performance. Ideally, each of executors would work on similar subset of data. To configure that in Spark SQL using RDBMS connections we must define 4 options during DataFrameReader building: the partition column, the upper and lower bounds and the desired number of partitions. At first glance it seems to be not complicated but after some code writing, they all deserve some explanations:
- partitionColumn – as the name indicates, it defines the name of the column which data will be used to partition rows. One important preqrequisite – the row must be of numeric (integer or decimal) type.
- numberOfPartitions – no surprises here, it defines the desired number of partitions. The “desired” is the important word to keep in mind. To see why, let’s analyze the following code generating the object called PartitionInfo:
As you see, the number of partitions, under some circumstances, can be lower than defined.
- lower and upper bounds – as you saw above, these 2 parameters have at least an influence on partitions number. But it’s not their single purpose. In additional, or maybe among others, they’re used to define the slices containing partitioning data.These boundaries generate the stride. It specifies how many rows of a given range of partition column values can be kept within a single partition. To understand it better, let’s take a look at the simplified algorithm coming from org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#columnPartition(partitioning: JDBCPartitioningInfo) method:
stride = (upper_bound/partitions_number) - (lower_bound/partitions_number) partition_nr = 0 while (partition_nr < partitions_number) generate WHERE clause: partition_column IS NULL OR partition_column < stride if: partition_nr == 0 AND partition_nr < partitions_number or generate WHERE clause: partition_column >= stride AND partition_column < next_stride if: partition_nr > 0 AND partition_nr < partitions_number or generate WHERE clause partition_column >= stride if: partition_nr > 0 AND partition_nr == partitions_number where next_stride is calculated after computing the left sideo of the WHERE clause by next_stride += stride
Thus, if we define the number of partitions to 5 and the upper bound to 20 and lower bound to 0, Spark SQL will retrieve data with the following queries:
(stride = (20/5) - (0/5) = 4 SELECT * FROM my_table WHERE partition_column IS NULL OR partition_column < 4 SELECT * FROM my_table WHERE partition_column >= 4 AND partition_column < 8 SELECT * FROM my_table WHERE partition_column >= 8 AND partition_column < 12 SELECT * FROM my_table WHERE partition_column >= 12 AND partition_column < 16 SELECT * FROM my_table WHERE partition_column >= 16
As you see, the above queries generate 5 partitions of data, each containing the values from: (null-3), (4-7), (8-11), (12-15) and (16 and more).
def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties)
val df = sparkSession.read.jdbc(jdbcUrl, "employees", columnName="emp_no", lowerBound=1L, upperBound=100000L, numPartitions=100, connectionProperties=connectionProperties)
Predicate pushdown in Spark SQL
The predicate pushdown is a logical optimization rule that consists on sending filtering operation directly to the data source. For instance, in the case of RDBMS, it’s translated by executing “WHERE….” clause directly on the database level. This optimization helps to reduce the amount of loaded data and helps to use the query optimizations (e.g. RDBMS indexes) defined in the data source level.
def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties)
val df = sparkSession.read.jdbc(jdbcUrl, "employees", Array("position='Manager'"), connectionProperties=connectionProperties)