Using Delta Live Tables and automation to ensure quality Feature Store pipelines
By Chang Shi Lim and Jeanne Choo
Github for this blogpost can be found here
Overview
Training quality machine learning models depends on input data being of high quality — data should be free from errors and updated reliably. In this blog post, we address how to use Delta Live Tables (DLT) to ensure the quality of our Feature Store data pipelines. Firstly, we walk through how DLT can be used as we move data through our multi-hop architecture (from bronze tables, to silver, and then to gold). Then, we show how this setup can be automated and productionized using the Databricks Terraform Provider and Github Actions, thus helping reduce manual work for Data Engineers and Data Scientists alike.
Why clean data matters
Data that comes from upstream sources into feature tables can be incomplete and messy. This is particularly the case when the data sources output information in real-time and at high volume. For instance, clickstream data providers might provide a best-effort sample of clicks in real-time, and then update the dataset with late-arriving data only a day later. Because machine learning models’ performance and business metrics can be affected by erroneous data, Data Engineers and Data Scientists need suitable tools to check the health of their input data pipelines.
Furthermore, correctness issues, when not fixed early in the ETL pipeline, can have escalating consequences for downstream models. For example, inappropriate day-of-week values for certain rows may not seem like a make-or-break issue during data ingestion. However, if that corrupt data is used to train many models that are subsequently pushed to production, the downstream effects on business decisions may be severe.
What data errors do we often encounter during feature engineering?
Upstream data sources can be broken. For example, an IOT device breaks down and stops sending data, leading to null values. In other cases, if data formats are not standardized between systems, we have the same data represented in different ways, such as day-of-week being expressed as Monday and 1.
There are also smaller changes that may be harder to detect. For instance:
- A column type that changes from
Int
toDouble
- Precision changes, for example data that used to be published in units of seconds changing to milliseconds, leading to reported values being off by an order of magnitude
- Casing changes, for example input data being recorded as “monday”, “Monday” and “MONDAY”
These errors can lead to data inputs that do not make sense or that have to be reconciled. Usually, these errors, if known upfront, can be mitigated by conducting row-level assertions on the dataset, for example specifying that all day-of-week values have to be between 0 and 6. When these errors are detected, there are several options. We can drop the bad rows, copy the data into a separate location for further investigation, or fail the pipeline.
Use case:
We show an example setup for feature engineering might look like. For our dataset and model, we make use of the Instacart Market Basket Analysis data from Kaggle to predict what previously purchased products a customer might reorder. From this dataset, we might create input features to our model such as:
- Percentage of items in a user’s basket that are reorders
- Average order size per user
- Average days between orders for each user
Predicting reorders can bring business benefits in a few ways. A reorder prediction model can be used to provide customers with more relevant product recommendations when they are browsing an e-store. Additionally, marketing teams can tailor discount offers or campaigns for a customer based on what they are likely to order repeatedly.
Tools:
- Delta Live Tables for running production-grade ETL pipelines and performing data checks
- Databricks Repos for managing code
- Databricks Terraform Provider for provisioning DLT and Databricks Workflows
- Github Actions for CI/CD
Data organization
As part of Databricks Lakehouse best practices, our Feature Store data pipeline will follow a multi-hop medallion architecture (see image below for a graphical explanation). With a medallion architecture, data is ingested into the Bronze layer before being progressively cleansed and transformed through the successive Silver and Gold layers. By the time data reaches the Gold layer, it should be ready for users to use.
Step 1: Moving data from from bronze to silver tables
After ingesting raw data into our bronze layer, we can start cleaning our data and, as we move data from the bronze to silver layer, enforcing data constraints.
For example, NULL values may not be useful for downstream models and should be dropped or imputed. Rather than having a data scientist write data cleaning code at the modeling stage, we can act early, at the bronze layer, where we screen for NULL values and either drop them or output them into an ancillary table for follow-up. By doing this, our pipelines can deliver accurate and reliable data to downstream consumers.
Data checks can be specified by DLT’s expectations. DLT does this by allowing us to specify “constraints” on our dataset, for example stating that a column should not contain NULL values.
@dlt.expect_all({“aisle_validity_check”:”aisle IS NOT NULL”, “aisle_id_validity_check”: “aisle_id IS NOT NULL”})
Note that these constraints are defined at the row-level. At this point, we do not validate values on a column-level basis, for example checking the summary statistics of numerical columns.
Later, when moving data from bronze to silver layer, we will cover how column level checks and summary statistics on a dataset can be generated.
As best practice, we can make our expectations portable and reusable by storing our rules externally in CSV format, and then reading in this file when we want to apply our expectations. As shown in the code snippet below, we write a function that reads our CSV file, filters it for the relevant tag, and returns a dictionary with {“name”:”constraint"}
key-value pairs.
Note that we can also define how stringent we want to be with our expectations. We can specify expect
if we want to retain the records that violate our expectations, expect_or_drop
if we want to instead drop the rows, or expect_or_fail
to stop our pipeline execution. The strictness of our expectations depends on the potential severity of the issues that would be caused by an expectation not being met.
expectations_filepath = "dbfs:/FileStore/feature_store_data_quality/instacart_data/instacart_dlt_constraints.csv"
expectations_df = spark.read.option("header", True).csv(expectations_filepath)
def get_rules(tag, expectations_df):
rules = {}
for row in expectations_df.filter(F.col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
Finally, at this step, we join the different data sources together into a silver table so we can work with one table for subsequent data processing.
@dlt.table(name="orders_joined_silver")
def orders_joined_silver():
order = dlt.read("order_bronze")
order_product = dlt.read("order_product_bronze")
product = dlt.read("product_bronze")
return (order
.join(order_product, how="inner", on="order_id")
.join(product, how="inner", on="product_id"))
Step 3: Feature Engineering
Now our data is cleaned and residing in our silver table.
Using this data, we can start to compute the features that will form inputs to our model. These features capture user behavior and be used to predict reorders.
Several helpful practices we can use:
- To make unit testing easy, functions that take in raw data and transform them into features should expect a dataframe as input and output a dataframe.
- Our feature engineering logic should be self-contained. This means that ideally one function creates one or a set of related features rather than all features at once. Also, any variables that are created when the function is executed will not exist after execution ends.
- By keeping these two points in mind, we can more easily define unit tests for our feature engineering logic later.
In this example, we calculate three features that may be useful to our model:
- what percentage of items bought by a user are reorders,
- average number of days between orders for a particular user, and
- average order size for a particular user.
This is what our DLT pipeline looks like for this stage:
Step 4: Write features to Feature Store
As the final step in our workflow, we write the table containing our features into a Feature Store table. By having features inside the Feature Store, data scientists can have a common repository of features that they can select when building their models. Additionally, the lineage of a feature (both its upstream and downstream can be tracked from the Feature Store UI).
At the time of writing, tables output from a DLT pipeline have to be written to the Feature Store in a separate step.
To tie together all these steps, a Databricks multi-task workflow can be used.
Step 5: Automating the entire process
After the feature engineering workflow is developed, it’s time to automate the deployment of the workflow into a Databricks workspace through a CI/CD process. Databricks has many tools available to support this, e.g. Databricks CLI, REST APIs, and the Databricks Terraform provider.
In this blog post, we provide an example to show how GitHub Actions can make use of Databricks Repos, Databricks CLI and Terraform to deploy the feature engineering workflow. However, the general workflow is applicable to other CI/CD tools such as Azure DevOps or Jenkins
Databricks Repos
Databricks Repos provides a convenient way to sync a Databricks workspace with a remote Git repository. With this capability, it is very convenient for a CI/CD agent to move and load code from one workspace to another (e.g. committing notebooks developed in a Dev workspace and load them in a Staging workspace for testing).
The GitHub actions logic for this project is as follows:
- Initialize and set up the GitHub Actions runner.
- Using Databricks CLI, create a Databricks Repos for the project if it doesn’t exist yet.
- Using Databricks CLI, update the Databricks Repos with the latest commit from the remote Git repository.
- With the latest files available in the Databricks Repos of the workspace, Terraform can now deploy Databricks resources (e.g. DLT pipelines and Job) by referencing the files located inside the Databricks Repos.
Why Terraform?
While Terraform is usually associated to be used by platform administrators to manage infrastructure, ML engineers can also consider the convenience provided by Terraform to manage complex machine learning pipelines.
For example, to update an already deployed Databricks Job, an ML engineer would previously have had to orchestrate multiple API calls in the CI/CD pipeline to:
- List all current jobs and retrieve the “job_id” for the job with the matching name.
- With the “job_id”, perform an update to the job with the new requirements.
- With Terraform’s state file, this is simplified as such information is automatically tracked. Instead of orchestrating multiple API calls, the ML engineer just has to define the new requirements for the Job and Terraform will handle the update seamlessly.
Main Module
The Terraform project’s structure is quite straightforward and follows how a typical Terraform project structure would look like. The structure is as follow:
The main.tf file is where Databricks resources are defined. The script is doing the following:
- Obtaining the list of notebooks that are found in the folder of
app/notebooks
. - Calling the DLT module to create DLT pipelines based on the notebooks found in the folder
app/dlt.
- With these, create a Databricks Jobs that will execute these notebooks and DLT pipelines.
The vars.tf
file is a typical Terraform variables file. Most variables are quite self-explanatory. A special case will be the variable task_dependency
, where each key represents a Databricks Job task and the value represents the dependent task. The first task will have no dependency, hence the value will be defined as an empty string.
For instance, 00_instacart_get_data
is the first task, hence it has no dependency. 01_instacart_ingest_bronze
is dependent on 00_instacart_get_data
.
Supporting Modules
There are 2 modules used in this project: Delta Live Tables (DLT) and Databricks Jobs.
DLT Module
The dlt
module is where a DLT resource is defined. The output of this module consists of:
task_resource_id
: DLT pipeline ID createdtask_resource_name
: Name of DLT pipeline createdtask_type
: Type of task created (in this case DLT)
These output will be used by the main module for creating a Databricks Job
Job Module
The job module is where a Databricks Job resource is defined. Based on the variable task_lists, this module attempts to dynamically create tasks of various kinds (for now, either DLT or notebook). Task dependency will be determined by the variable task_dependency
.
By using Terraform, we can automate the provisioning of the DLT and Jobs resources that we need. Additionally, when our code changes and we want to apply CI/CD to manage these changes, we can include the terraform init and terraform apply steps inside our Github Actions workflow. By using Terraform with a CI/CD tool such as Github Actions, our DLT pipelines and Jobs are updated upon a code change without manual effort.
Summary
In this blog post, we covered the importance of clean data and how this can be achieved with Databricks. Using a reorder prediction use case, we also shared how Terraform can be used in a CI/CD process to deploy feature engineering workflows in the form of DLT and Databricks Jobs. To get started, visit the GitHub repository that we have created for this blog post.
We hope that these examples will be helpful for anyone who is designing and setting up their MLOps pipeline in Databricks. Stay tuned to this space as we are continuously looking to enhance the overall experience of performing feature engineering in Databricks.