In the world of modern data engineering, the Databricks Lakehouse Platform simplifies the process of building reliable streaming and batch data pipelines. However, handling obscure or less common file formats still poses challenges for ingesting data into the Lakehouse. Upstream teams responsible for providing data make decisions on how to store and transmit it, resulting in variations in standards across organizations. For instance, data engineers must sometimes work with CSVs where schemas are open to interpretation, or files where the filenames lack extensions, or where proprietary formats require custom readers. Sometimes, simply requesting "Can I get this data in Parquet instead?" solves the problem, while other times a more creative approach is necessary to construct a performant pipeline.
One data engineering team at a large customer wanted to process the raw text of emails for cyber security use cases on Databricks. An upstream team provided these in zipped/compressed Tar files, where each Tar contained many email (.eml) files. In the customer's development environment, engineers devised a suitable solution: a PySpark UDF invoked the Python "tarfile" library to convert each Tar into an array of strings, then used the native PySpark explode() function to return a new row for each email in the array. This seemed to be a solution in a testing environment, but when they moved to production with much larger Tar files (up to 300Mb of email files before Tarring), the pipeline started causing cluster crashes due to out-of-memory errors. With a production target of processing 200 million emails per day, a more scalable solution was required.
There are a few simple methods for handling complex data transformations in Databricks, and in this case, we can use mapInPandas() to map a single input row (e.g. a cloud storage path of a large Tar file) to multiple output rows (e.g. the contents of individual .eml text files). Introduced in Spark 3.0.0., mapInPandas() allows you to efficiently complete arbitrary actions on each row of a Spark DataFrame with a Python-native function and yield more than one return row. This is exactly what this high-tech customer needed to "unpack" their compressed files into multiple usable rows containing the contents of each email, while avoiding the memory overhead from Spark UDFs.
Now that we have the basics, let's see how this customer applied this to their scenario. The diagram below serves as a conceptual model of the architectural steps involved:
The end result is a analysis-ready Delta table that is queryable from Databricks SQL or a notebook that contains our email data, and the email_id column to uniquely identify each unpacked email:
The notebooks showcasing this solution contain the full mapInPandas() logic, as well as pipeline configuration settings. See them here.
With the approach described here, we have a scalable solution to process Tar email files at low latency for important business applications. Delta Live Tables can be quickly adjusted to match file arrival volumes, as we can switch a pipeline from continuous to triggered without any changes to the underlying code. While this example focused on the "bronze" layer of ingesting raw files from S3, this pipeline can be easily extended with cleansing, enrichment, and aggregation steps to make this valuable data source available to business users and machine learning applications.
More generally though, this mapInPandas() approach works well for any file-processing tasks that are otherwise challenging with Spark:
file123
is actually a file of type "tar", but was saved without a .tar.gz file extensionFind more examples of Delta Live Tables notebooks here, or see how customers are using DLT in production here.