Migrating Spark applications

How to refactor Spark 2 workloads to Spark 3 during the upgrade/migration process due to the removal of Spark 2 in Cloudera Private Cloud.

Introduction

The purpose of this document is to gather all information required to carry out a Spark application migration between different versions.

The necessary set of steps largely depends on the source and target Spark versions, while major version changes require considerable effort, minor and maintenance version changes mostly require only small config or no adjustments.

Major version migration

Migration between major versions requires considerable effort and taking into account many factors.

This documentation focuses on migrating applications from Spark 2 to Spark 3, the two major versions currently supported by Cloudera on different versions of Cloudera Private Cloud.

Java versions

Cloudera currently supports 3 major JDK versions in general:

  • 8
  • 11
  • 17

Refer to Support Matrix for a list of supported versions of Java.

Scala versions

As Cloudera only supports only Spark 2 applications compiled with Scala 2.11, and Spark 3 applications with Scala 2.12, a major version change always require:

  1. Spark Scala applications to be recompiled with Scala 2.12,
  2. adjusting the dependencies to use Spark 3 version binaries provided by Cloudera in the public maven repository and the Scala 2.12 version of third-party libraries.

Scala version changes can also require source code changes, for which see the Scala documentation.

Python versions

The supported versions of Python can change between Spark versions. Refer to the table below for details on the supported Python versions for each Spark version, and follow the Python documentation to adjust your application.

Spark version Minimum supported Python version Maximum supported Python version
3.5.0 3.8 3.11
3.4.0 3.7 3.11
3.3.2 3.7 3.10
3.3.0 3.7 3.10
3.2.3 3.6 3.9
2.4.8 2.7/3.4 3.7

Spark commands

Cloudera supports multiple versions of Spark, depending on the version of Cloudera Private Cloud clusters. The general (unversioned) Apache Spark commands (spark-submit, pyspark, etc.) can point to different versions based on the cluster version.

  1. The original commands always point to the earliest available version of Spark in the distribution.

    For example, the spark-submit command points to Spark 2 in version 7.1.9 SP1, but points to Spark 3 in version 7.3.1.

  2. Other available Spark 3 versions can be used via versioned commands.

    For example, the spark3-submit command points to Spark 3 in all versions.

Spark connectors

Spark 3 supports certain Spark connectors from certain versions.

If Spark 2 connectors are used, please take the connectors into account when choosing the minimum Cloudera Private Cloud version you need to upgrade to when migrating a Spark application to a higher version.

  • Hive Warehouse Connector for Spark 3 is supported from:
    • Cloudera Private Cloud version 7.1.8
    • Cloudera Distribution of Spark (CDS) version 3.3.0
  • HBase connector for Spark 3 is supported from:
    • Cloudera Private Cloud version 7.1.7
    • Cloudera Distribution of Spark (CDS) version 3.2
  • Phoenix connector for Spark 3 is supported from:
    • Cloudera Private Cloud version 7.1.8
    • Cloudera Distribution of Spark (CDS) version 3.3.0
  • Oozie for Spark 3 is supported from:
    • Cloudera Private Cloud version 7.1.9
    • Cloudera Distribution of Spark (CDS) version 3.3.2
  • Solr for Spark 3 is supported from:
    • Cloudera Private Cloud version 7.1.9
    • Cloudera Distribution of Spark (CDS) version 3.3.2
  • Spark Schema Registry is supported from:
    • Cloudera Private Cloud version 7.1.9 SP1
    • Cloudera Distribution of Spark (CDS) version 3.3.2

Logging

Since Apache Spark 3 has transitioned from log4j to log4j2, you need to adjust the logging library and/or logging configuration used in your application.

Third-party libraries

When migrating between versions, ensure that your 3rd-party runtime dependencies align with the Spark versions.

Spark behavior changes

As Apache Spark evolves, its behavior can change between major and minor versions, but many times legacy configurations are added to support the old behavior.

As configurations can be defined at multiple levels, restoring the old behavior might require changing the application itself, the application starting commands/scripts, and/or the default Spark configurations defined on the cluster.

Spark 2 to Spark 3 workload refactoring

The following list summarizes the most important behavior changes from Spark 2 to Spark 3, and gives examples on how to refactor the Spark 2 application to become Spark 3 compatible.

The list is not exhaustive, refer to the Apache Spark Migration guides for the complete list.

Spark Core

Spark Core language/syntactic-level changes

Spark 2 Spark 3 Refactor action
TaskContext.isRunningLocally Deprecated method, removed. Remove TaskContext.isRunningLocally if used in code.
ShuffleBytesWritten and shuffleRecordsWritten (ShuffleWriteMetrics class) bytesWritten and recordsWritten (org.apache.spark.status.api.v1.OutputMetrics class) Use bytesWritten and recordsWritten, available in class org.apache.spark.status.api.v1.OutputMetrics.
org.apache.spark Class Accumulator org.apache.spark.util.AccumulatorV2 Replace org.apache.spark.Accumulator with org.apache.spark.util.AccumulatorV2.
For non-struct types, (e.g. int, string, array, Dataset.groupByKey) results in a grouped dataset with key attribute is wrongly named as value. For non-struct types (e.g. int, string, array, Dataset.groupByKey) results to a grouped dataset with key attribute is named as key. Refactor the value attribute used in logic to key. To preserve the old behavior, set spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue to false.

Spark SQL

Spark SQL language/syntactic-level changes

Spark 2 Spark 3 Refactor action
Path option is overwritten if one path parameter is passed to DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or DataStreamWriter.start(). Path option cannot coexist when the following methods are called with path parameter(s): DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or DataStreamWriter.start(). Remove the path option if it's the same as the path parameter, or add it to the load() parameter if you do want to read multiple paths. To ignore this check, set spark.sql.legacy.pathOptionBehavior.enabled to true.
count(tblName.*) works. An exception is thrown if count(tblName.*) is used for getting the number of records in the table. Refactor the code to use count(*), or expand the columns manually. (Example: count(col1, col2).) To restore the old behavior, set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount to true.

Spark SQL configuration-level changes

Spark 2 Spark 3 Refactor action
SET command works for SparkConf entries. AnalysisException error is thrown if SET command is used to modify the SparkConf entries. Remove SET commands for SparkConf entries from your code. You can enter SparkConf values at the cluster level by entering them in the cluster’s Spark configuration and restarting the cluster. To disable the check, set spark.sql.legacy.setCommandRejectsSparkCoreConfs to false.
The second argument of date_add function (num_days) can be a fraction, as it gets casted to Int internally. The second argument of date_add function (num_days). If an integer is not provided, an AnalysisException is thrown. Make sure that in code always integer is passed as the second argument to date_add and date_subtract function.
Fractional and string types are allowed in percentile_approx third argument i.e. accuracy, as it gets casted to Int internally. percentile_approx third argument accuracy can only be integer. If an integer is not provided, an AnalysisException is thrown. Make sure that in code always integer is passed as the third argument to percentile_approx function.
Hash expressions can be applied on MapType elements. Hash expressions are prohibited on MapType elements. If hash expression is applied on map type, refactor the code to remove it, OR set spark.sql.legacy.allowHashOnMapType to true.
a map can be created with duplicate keys via built-in functions like CreateMap, StringToMap, map_from_arrays etc. Spark throws RuntimeException when duplicated keys are found in Map. Users may still read map values with duplicate keys from data sources which do not enforce it (for example, Parquet). If duplicate keys are passed into built in functions to create a map then try to remove duplicate keys OR set spark.sql.mapKeyDedupPolicy to LAST_WIN, the map keys are deduplicated.
the resulting date is adjusted in add_months, when the original date is a last day of months. For example, adding a month to 2019-02-28 results in 2019-03-31. the add_months function does not adjust the resulting date to a last day of month if the original date is a last day of months. For example, adding a month to 2019-02-28 results in 2019-03-28. Adjust the code according to logic if required.
multiple from-to units is allowed in Interval literal. multiple from-to units Interval literal is not allowed. Remove multiple from-to units is allowed in Interval literal. Adjust the code according to logic if required.
Dataset query success if it contains ambiguous column reference that is caused by self join. Dataset query fails if it contains ambiguous column reference that is caused by self join. This is because Spark cannot resolve Dataset column references that point to tables being self joined, and df1("a") is exactly the same as df2("a") in Spark. Use aliases. For example: df2.as("purchases").join(df1.as("devices"), col("devices.key1") === col("purchases.key2")).show()
invalid time zone ids are silently ignored and replaced by GMT timezone. invalid time zone ids are rejected, and Spark throws java.time.DateTimeException. rectify to correct Zone ID.
for Parsing and formatting of timestamp and date strings, java.text.SimpleDateFormat is used for timestamp/date string conversions, and the supported patterns are described in SimpleDateFormat. DateTimeFormatter under the hood or Parsing and formatting of timestamp and date strings. Strict checking of Input is performed. Refactor the code to correct pattern matching for Input OR set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior, OR set it to CORRECTED and treat it as an invalid datetime string.
datetime pattern letter F is aligned to week of month that represents the concept of the count of weeks within the month where weeks start on a fixed day-of-week. datetime pattern letter F is aligned to day of week in month that represents the concept of the count of days within the period of a week where the weeks are aligned to the start of the month. Refactor the code to accommodate new behavior of pattern F.
SparkContext can be created in executors. an exception will be thrown when creating SparkContext in executor. Refactor the code to remove the creation of Spark context OR allow it by setting the configuration spark.executor.allowSparkContext when creating SparkContext in executors.
TRANSFORM operator can support alias in inputs. TRANSFORM operator can’t support alias in inputs. Refactor the code to remove aliases from Inputs.
Loading and saving of timestamps from and to Parquet files does not fail if the timestamps are before 1900-01-01 00:00:00Z. Loading and saving of timestamps from and to Parquet files fails if the timestamps are before 1900-01-01 00:00:00Z. Ensure that Input reads do not contain timestamps before 1900-01-01 00:00:00Z. Alternatively, set spark.sql.parquet.int96RebaseModeInWrite to CORRECTED to write the datetime values as it is.
The Char(n) type handled inconsistently, depending on whether the table is partitioned or not. In upstream Spark 3 the spark.sql.legacy.charVarcharAsString config was introduced, but does not solve all incompatibilities.

A new configuration spark.cloudera.legacy.charVarcharLegacyPadding is introduced in Cloudera to keep the full compatibility.

The Row field names are sorted alphabetically when constructing with named arguments for Python versions 3.6 and above. The Row field names are no longer sorted alphabetically. To enable sorted fields by default as in Spark 2.4, set the environment variable PYSPARK_ROW_FIELD_SORTING_ENABLED to true for both the executors and the driver.

Spark SQL property-level changes

Spark 2 Spark 3 Refactor action
When there is nested CTE with a conflicting name, outer CTE definitions take precedence.

When there is nested CTE with conflicting name, Spark throws an AnalysisException by default, and forces users to choose the specific substitution order they wanted.

If the value of spark.sql.legacy.ctePrecedencePolicy is set to CORRECTED (recommended), inner CTE definitions take precedence over outer definitions

.

If the value of spark.sql.legacy.ctePrecedencePolicy is set to LEGACY, outer CTE definitions take precedence over inner definitions.

Set spark.sql.legacy.ctePrecedencePolicy to CORRECTED.
Type conversions during table insertion are allowed as long as they are valid Cast. The type coercion is performed as per the ANSI SQL standard. Ensure the type coercion is performed as per the ANSI SQL standard. Alternatively, set spark.sql.storeAssignmentPolicy to Legacy to restore previous behavior.

Spark storage location configuration changes

To execute workloads in Cloudera Public Cloud, modify the local data storage locations to cloud storage (for example, from HDFS to S3 bucket).

The following example shows a sample workload, with the modified data location highlighted in bold.

Spark 3.2 (HDFS) Spark 3.2 (S3)
spark-shell spark-shell
scala> spark.sql("CREATE TABLE IF NOT EXISTS default.sales_spark_2(Region string, Country string,Item_Type string,Sales_Channel string,Order_Priority string,Order_Date date,Order_ID int,Ship_Date date,Units_sold string,Unit_Price string,Unit_cost string,Total_revenue string,Total_Cost string,Total_Profit string) row format delimited fields terminated by ','")
    scala> spark.sql("load data local inpath '/tmp/sales.csv' into table default.sales_spark_3")
    
scala> spark.sql("select count(*) from default.sales_spark_3").show()
scala> spark.sql("CREATE TABLE IF NOT EXISTS default.sales_spark_2(Region string, Country string,Item_Type string,Sales_Channel string,Order_Priority string,Order_Date date,Order_ID int,Ship_Date date,Units_sold string,Unit_Price string,Unit_cost string,Total_revenue string,Total_Cost string,Total_Profit string) row format delimited fields terminated by ','")
    scala> spark.sql("load data inpath 's3://[*** BUCKET ***]/sales.csv' into table default.sales_spark_3")
    
scala> spark.sql("select count(*) from default.sales_spark_3").show()

Unsupported features

Unsupported Spark 3 features in Cloudera Private Cloud.

ZSTD compression in ORC data source

spark.sql.orc.compression.codec config doesn't accept zstd value.

Apache Jira:SPARK-33978

spark.hadoopRDD.ignoreEmptySplits

Causes issues in HBase TableInputFormat.

Apache Jira:SPARK-34809

LDAP authentication for livy-server

Open CVEs in Apache Directory Server dependency, LDAP based authentication is not supported in Livy Server.

Apache Jira:LIVY-356

Thrift ldap authentication, based on ldapurl, basedn, domain

Open CVEs in Apache Directory Server dependency, LDAP based authentication is not supported in Livy Thrift Server.

Apache Jira:LIVY-678

For more information, see Unsupported Apache Spark Features.

Post-migration checklist

Benchmark testing

After all post-migration configurations are performed, perform benchmark testing on the new Apache Spark version.

Troubleshooting

Troubleshoot failed or slow performing workloads by analyzing the application event and driver logs, and fine tune the workloads for better performance.