Building Incremental Data Pipelines Using Delta Lake

Royal Cyber Inc.
9 min readAug 12, 2022

Data engineers are often asked to move huge chunks of organizational data from a data source to a new destination. While this may seem an easy job, things get complicated when the data gets updated, or new data arrives, and we are supposed to pour it into the same data lake. The challenge ahead lies in figuring out which files have already been transferred or processed and which are the new ones that need to be moved. In such instances, data pipelines with abilities to carry incremental data loads can be very helpful. And here, inventions like Delta Lake become relevant for building such pipelines. Let’s dive deeper into this topic.

What is Delta Lake by Databricks?

Delta Lake is a storage layer that sits on top of the data layer and brings advanced features like the ACID property. It has Apache Spark as its core execution engine and significantly enhances the performance, scalability, and reliability of a data lake. The data lake in the discussion can be provided by any cloud provider like AWS, Microsoft Azure, and Google Cloud Platform.

Traditional databases lack an in-built regulatory setup that can help data engineers organize their data the way they prefer. Delta Lake by Databricks not only brings structure to one’s data but also enables its users to build incremental pipelines and enjoy smart features.

In this blog, we’ll walk you through the steps of building an incremental data pipeline using delta lake — which makes it possible through its Auto Loader feature. So, let’s quickly go over some basics before we begin creating the pipeline.

How can Auto Loader help you?

The Auto Loader allows incremental movement of data by maintaining a queue in its backlog. It helps delta lake detect how much and what data is being processed. This feature ultimately enables the lake to differ between the existing data and the new incoming files that need to be processed.

Auto Loader’s function resembles that of a file watcher that is usually found in AWS. A file watcher continuously monitors the user’s directory and generates notifications if new files arrive. Auto Loader follows the same principle — it automatically detects the new arrivals and the processed files. One can identify the changes by comparing the source data and the data lying in the destination.

Why do you need to create a Databricks Job?

When we are dealing with incremental data pipelines, it gets tough if we perform all the processing and transformations on a single notebook. To get the job done efficiently, we often need to create smaller tasks (which can be a Python notebook or a Java library) and combine them to make an entire data pipeline.

In Databricks, each task is followed by a succeeding task which shows the dependency. You don’t need third-party platforms like Airflow to perform a task. Databricks is a unified solution that lets you orchestrate your entire pipeline within its own system. You can create these tasks by using tools like CLI and Python programming language.

Creating Incremental Data Pipelines

Now, let’s move toward implementation and create the architecture for the pipeline within Databricks.

We’ll begin with ingesting data into Bronze Table. It contains the raw data that has been gleaned from diverse sources. Data Collection, as shown below, is the bronze-level notebook. It should be our starting point; we just have to pull the data and store it in some target table.

Below, the cloud files represent the Auto Loader in Databricks. By adding these lines, you can activate the Spark engine to use Auto Loader to detect new files and process only those that have not been processed already.

You can also see some pre-processing done in the image above. For instance, “delimiter” indicates that some files (e.g., tab-separated files) need to be read, and “multiline” shows that our CSV file contains some columns that have data in more than two or three rows.

Our Path, in this particular instance, is present on the Azure data lake, and our YouTube delta directory contains two folders.

The ‘data’ folder contains video data in CSV format.

The ‘metadata’ format contains data in JSON format. This is the raw data which will be processed and refined by the silver notebook.

We need to set the trigger flag when we plan to read the data in a triggered fashion. Its opposite is the continuous flag which means your notebook will keep on running until you stop it manually. By placing the “trigger(availableNow=True)” setting, you instruct your Databricks notebook to fetch all the data that is not processed yet, process it, and shut down automatically once the task is done.

The ’checkpointlocation,’ as shown below, helps the users determine the amount of data that has been processed and what the data that needs to be processed. The option ‘mergeSchema’ is to make sure that different schemas can be treated since you are not sure about the kind of schemas your raw data contains and how they may change with time.

Moreover, we have used external data tables here, as indicated by Path, because even if you accidentally delete data in the external tables, the associated data related to the deleted table will not be lost.

All the above steps are repeated for metadata too; the only difference is that we’ve put the file format in JSON, not CSV.

When working with the silver layer, you must clean your data first. It is followed by the transformation process that removes the null values and standardizes the data. Also, if you are planning to create and run a pipeline later, you need not run your notebook related to data_bronze just yet.

Nevertheless, we have run the notebook in the below image for demonstrating the schema.

As you may note, our schema is not in the format required by the business. For example, the portion that says ‘trending_date’ should actually show the date format, not the phrase as it is.

The following code will transform our unstructured data into a structured form. We have enforced the schema by rearranging into a proper format as required by the business.

We have quoted the functions of pyspark as F. Check the image below.

In the same manner, we have also parsed the format for metadata data files — previously present in a nested form.

Here’s how we have parsed it.

We are creating only temporary views. This is so because it is merely an intermediate step. If you register the table and don’t put it as a temporary view, it will incur an extra cost on your storage layer.

As a final step, we are merging the clean data and metadata tables.

After merging the two, we’ll store the data in the following table. It can easily be utilized later. The term ‘writeStream’ means that we are planning to run our pipeline in a streaming fashion, i.e., we are planning to update the new data incrementally and merge it into the already processed data in one go.

Below, we are aggregating the data for the gold transformation by reading data from the silver layer and applying the groupBy. We group the data according to the country code to generate country-specific statistics.

The same thing is done for another category, i.e., Popular Video Categories. Also, during aggregation, we are registering the table in the meta store of the Databricks and creating an external table as it is for production purposes.

You can see the created pipeline below. In order to create one of your own, you just have to click the “job” option given on the upper right side of the screen and press “create job.”

Now, name your task, describe the type (notebook in this case), and provide the notebook Path. You can use an existing cluster or create a new one, in which case; you might have to pay for the resource being utilized on the basis of consumption/usage per second. You can also set specific parameters on your pipeline, for instance, timestamp.

This is the flow of our data — flowing through the bronze, silver, and gold stage respectively.

Let’s evaluate the results. As you can see below, we have processed data related to three countries. Now, we’ll ingest new data and run the pipeline again.

To do that, we have to manually upload CSV file and metadata. Once the new data is uploaded, Databricks will identify the new data with the help of Auto Loader.

We can either run the pipeline manually or schedule it for a specific date and point in time.

For now, we have triggered the pipeline manually. You can see the job running in the image below. If you click on the job, it will give you the details of what cell is running and which one is finished running.

After the bronze stage, the silver one will start running. You can see the details of the run for it too. You can also click further to see which query is running in specific. The silver stage may take up some time as it is merging data from two different sources.

The final notebook operates after the silver stage. Here, the aggregated data begins running. You can see the duration on the left side of the image. The green tick appears on the left side as a status once the job has run successfully.

When you run the table again, you will observe that previously we didn’t have the data related to GBR. The Auto Loader automatically detected the new data and merged it into the already processed data without contaminating it in any way.

This final step concludes our task.

Conclusion

In this blog, we discussed building incremental data pipelines using the Delta Lake by Databricks. If you have any queries regarding the process and the platform, Royal Cyber’s Data engineering and data science team can help you with them.

Author bio:

Hassan Sherwani is the Head of Data Analytics and Data Science working at Royal Cyber. He holds a PhD in IT and data analytics and has acquired a decade worth experience in the IT industry, startups and Academia. Hassan is also obtaining hands-on experience in Machine (Deep) learning for energy, retail, banking, law, telecom, and automotive sectors as part of his professional development endeavors.

--

--

Royal Cyber Inc.

Royal Cyber Inc is one of North America’s leading technology solutions provider based in Naperville IL. We have grown and transformed over the past 20+ years.