top of page

Transform data using Spark in Synapse Analytics

  • Writer: Harini Mallawaarachchi
    Harini Mallawaarachchi
  • Dec 16, 2023
  • 1 min read

Updated: Dec 18, 2023


Data engineers often use Spark notebooks as one of their preferred tools to perform extract, transform, and load (ETL) or extract, load, and transform (ELT) activities that transform data from one format or structure to another.

In this exercise, you'll use a Spark notebook in Azure Synapse Analytics to transform data in files.


Before you start

You'll need an Azure subscription in which you have administrative-level access.


Review the Apache Spark in Azure Synapse Analytics Core Concepts article in the Azure Synapse Analytics documentation.


Use a Spark notebook to transform data

  1. After the deployment script has completed, in the Azure portal, go to the dp203-xxxxxxx resource group that it created, and notice that this resource group contains your Synapse workspace, a Storage account for your data lake, and an Apache Spark pool.

  2. Select your Synapse workspace, and in its Overview page, in the Open Synapse Studio card, select Open to open Synapse Studio in a new browser tab; signing in if prompted.

  3. On the left side of Synapse Studio, use the ›› icon to expand the menu - this reveals the different pages within Synapse Studio that you'll use to manage resources and perform data analytics tasks.

  4. On the Manage page, select the Apache Spark pools tab and note that a Spark pool with a name similar to spark*xxxxxxx* has been provisioned in the workspace.

  5. On the Data page, view the Linked tab and verify that your workspace includes a link to your Azure Data Lake Storage Gen2 storage account, which should have a name similar to synapse*xxxxxxx* (Primary - datalake*xxxxxxx*).

  6. Expand your storage account and verify that it contains a file system container named files (Primary).

  7. Select the files container, and note that it contains folders named data and synapse. The synapse folder is used by Azure Synapse, and the data folder contains the data files you are going to query.

  8. Open the data folder and observe that it contains .csv files for three years of sales data.

  9. Right-click any of the files and select Preview to see the data it contains. Note that the files contain a header row, so you can select the option to display column headers.

  10. Close the preview. Then download the Spark Transform.ipynb from https://raw.githubusercontent.com/MicrosoftLearning/dp-203-azure-data-engineer/master/Allfiles/labs/06/notebooks/Spark%20Transform.ipynb

  11. Then on Develop page, expand Notebooks click on the + Import options


  1. Select the file you just downloaded and saved as Spark Transfrom.ipynb.

  2. Attach the notebook to your spark*xxxxxxx* Spark pool.


  1. Review the notes in the notebook and run the code cells.



Transform data by using Spark

Apache Spark provides a distributed data processing platform that you can use to perform complex data transformations at scale.


Load source data

Let's start by loading some historical sales order data into a dataframe.

Review the code in the cell below, which loads the sales order from all of the csv files within the data directory. Then click the button to the left of the cell to run it.

Note: The first time you run a cell in a notebook, the Spark pool must be started; which can take several minutes.

order_details = spark.read.csv('/data/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))



Transform the data structure

The source data includes a CustomerName field, that contains the customer's first and last name. Let's modify the dataframe to separate this field into separate FirstName and LastName fields.


from pyspark.sql.functions import split, col

# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")

display(transformed_df.limit(5))

The code above creates a new dataframe with the CustomerName field removed and two new FirstName and LastName fields.

You can use the full power of the Spark SQL library to transform the data by filtering rows, deriving, removing, renaming columns, and any applying other required data modifications.

Save the transformed data

After making the required changes to the data, you can save the results in a supported file format.

Note: Commonly, Parquet format is preferred for data files that you will use for further analysis or ingestion into an analytical store. Parquet is a very efficient format that is supported by most large scale data analytics systems. In fact, sometimes your data transformation requirement may simply be to convert data from another format (such as CSV) to Parquet!

Use the following code to save the transformed dataframe in Parquet format (Overwriting the data if it already exists).


transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")

In the files tab (which should still be open above), navigate to the root files container and verify that a new folder named transformed_data has been created, containing a file named orders.parquet. Then return to this notebook.


Partition data

A common way to optimize performance when dealing with large volumes of data is to partition the data files based on one or more field values. This can significant improve performance and make it easier to filter data.

Use the following cell to derive new Year and Month fields and then save the resulting data in Parquet format, partitioned by year and month.


from pyspark.sql.functions import year, month, col

dated_df = transformed_df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))
display(dated_df.limit(5))dated_df.write.partitionBy("Year","Month").mode("overwrite").parquet("/partitioned_data")

print ("Transformed data saved!")


In the files tab (which should still be open above), navigate to the root files container and verify that a new folder named partitioned_data has been created, containing a hierachy of folders in the format Year=NNNN / Month=N, each containing a .parquet file for the orders placed in the corresponding year and month. Then return to this notebook.

You can read this data into a dataframe from any folder in the hierarchy, using explicit values or wildcards for partitioning fields. For example, use the following code to get the sales orders placed in 2020 for all months.



orders_2020 = spark.read.parquet('/partitioned_data/Year=2020/Month=*')
display(orders_2020.limit(5))

Note that the partitioning columns specified in the file path are omitted in the resulting dataframe.



Use SQL to transform data

Spark is a very flexible platform, and the SQL library that provides the dataframe also enables you to work with data using SQL semantics. You can query and transform data in dataframes by using SQL queries, and persist the results as tables - which are metadata abstractions over files.

First, use the following code to save the original sales orders data (loaded from CSV files) as a table. Technically, this is an external table because the path parameter is used to specify where the data files for the table are stored (an internal table is stored in the system storage for the Spark metastore and managed automatically).


order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')

In the files tab (which should still be open above), navigate to the root files container and verify that a new folder named sales_orders_table has been created, containing parquet files for the table data. Then return to this notebook.

Now that the table has been created, you can use SQL to transform it. For example, the following code derives new Year and Month columns and then saves the results as a partitioned external table.


sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")
display(sql_transform.limit(5))
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')

In the files tab (which should still be open above), navigate to the root files container and verify that a new folder named transformed_orders_table has been created, containing a hierachy of folders in the format Year=NNNN / Month=N, each containing a .parquet file for the orders placed in the corresponding year and month. Then return to this notebook.

Essentially you've performed the same data transformation into partitioned parquet files as s before, but by using SQL instead of native dataframe methods.

You can read this data into a dataframe from any folder in the hierarchy as before, but because the data files are also abstracted by a table in the metastore, you can query the data directly using SQL.



%%sql


SELECT * FROM transformed_orders

WHERE Year = 2021

    AND Month = 1



Because these are external tables, you can drop the tables from the metastore without deleting the files - so the transfomed data remains available for other downstream data analytics or ingestion processes.


%%sql

DROP TABLE transformed_orders;
DROP TABLE sales_orders;




Recent Posts

See All

Comments


bottom of page