Parallelizing SAIGE Across Hundreds of Cores

As population genetics datasets grow exponentially, it is becoming impractical to work with genetic data without leveraging Apache Spark™. There are many ways to use Spark to derive novel insights into the role of genetic variation on disease processes. For example, Regeneron works directly on Spark SQL DataFrames, and the open-source Hail package can be used to scale common GWAS kernels using Apache Spark™ (Hail is available pre-installed on Databricks). However, there is a large body of tools outside of Spark that provides novel algorithms that help us understand genetic variants and phenotype associations. A recent list by OmicX identified several hundred bioinformatics tools that can be used in a genome-wide association study (GWAS) workflow. Recent work in GWAS method development has focused on tools like SAIGE and BoltLMM, which provide better statistical methods for dealing with biobank datasets.

While these tools are statistically powerful, they are also computationally intensive and designed for a single node compute architecture. At Databricks, we aim to leverage Spark to build a generic architecture to efficiently parallelize bioinformatics tools. This technique both accelerates these GWAS algorithms while unifying these methods with customers’ existing Spark and Delta-based architectures for storing and processing genomic data.

In this blog, we introduce our new Pipe Transformer, which integrates command-line tools with Apache Spark™ and Delta Lake. We will walk through the Pipe Transformer architecture before demonstrating how to use this API to parallelize SAIGE as an example of a custom GWAS workflow. This API enables parallelization of a broad range of genomics tools using Spark.

Architecture of the Pipe Transformer

Bioinformatics tools are written in many different languages, and the pipe transformer provides a simple API for Spark to run command-line tools.  For each partition of data in a Spark SQL DataFrame, the pipe transformer converts that data into a genomic-specific file format like VCF, streams this data into a running instance of a command-line tool, then converts the output of that tool back into a Spark SQL Row. To date, we have used this API under the hood for our Variant Effect Predictor annotation pipeline, as VEP is a command-line tool written in Perl that reads a VCF file and creates a new VCF file.

With one line of code, we can run a command with the pipe transformer. The command we want to run and the input/output file types are specified via option keys and values. We then pass a reference to the Spark SQL DataFrame and the options to the transformer, which then runs the command in parallel over the partitions of the input DataFrame. This simple abstraction for running commands in parallel is exposed in both Python and Scala.

Python: dbg.transform(“pipe”, df, key1=value1, key2=value2)

Scala: DBGenomics.transform(“pipe”, df, argMap)

For those familiar with the Spark ecosystem, this is similar to the RDD pipe() function. However, the pipe transformer has two important distinctions. Our pipe transformer works on DataFrames instead of RDDs. Additionally, the pipe transformer automatically formats input elements as either a text or binary file format, instead of requiring the user to create their own function to transform an RDD element into text. The pipe transformer currently supports text, CSV, and VCF. This makes it easy to run a bioinformatics tool like SAIGE that accepts VCF and outputs CSV.

Running SAIGE End-to-End

In the rest of this blog, we’ll focus on running the SAIGE tool in parallel on Apache Spark™. SAIGE is a tool with a very sensitive statistical model that can more accurately identify phenotypes that are correlated with genomic variants when working on biobank-scale datasets. Biobank datasets are generally pulled from a single population (e.g., a single hospital system or people from a single geographic area), may include many related individuals, and often include hundreds or thousands of phenotypes, many of which are present in only a small number of patients (e.g., a condition like epilepsy). The statistical models in tools like SAIGE or BoltLMM do a better job of efficiently incorporating sample relatedness and imbalanced case/control ratios compared to traditional GWAS methods, making these tools uniquely powerful when looking at biobank datasets.

To run SAIGE end-to-end, we built out a set of notebooks. To set up the workflow, we have notebooks that install SAIGE on a Databricks cluster, and prepares the phenotype data and the prior statistical models that SAIGE needs. The SAIGE installation notebook needs to be run once. The phenotype preparation and statistical model generation notebooks need to be run once per phenotype. We then have a single notebook that runs SAIGE in parallel on the Spark cluster, which we will focus on in the rest of this blog.

Running SAIGE through the Pipe Transformer

In this notebook, we will run SAIGE on the chromosome 22 genotypes from the 1,000 Genomes project. Since 1,000 Genomes does not have associated phenotype data, we generated a simulated phenotype (Type 2 Diabetes) using known genotype-phenotype associations from the GWAS Catalog.  We provide this test phenotype and genotype data built into the Databricks platform.  In this notebook, we define the SAIGE command, configure the pipe transformer, and then run the association test and save the associations as a new Delta Lake table.

As a preliminary, we define a bash script with three primary steps. The script creates a tabix-indexed and bgzipped VCF from the piped stdin of VCF rows from our input DataFrame, runs SAIGE’s R code, and writes the association test results CSV to stdout. The pipe transformer will be run on each partition of our DataFrame.

First, we load in the 1,000 Genomes VCF file using our VCF data source. This creates a DataFrame that includes information about the variant (chromosome and position, alleles, INFO fields), and the genotypes called at that site (including all FORMAT fields).

Once we’ve completed the setup, running SAIGE via the pipe command creates a new DataFrame that contains the association results from running SAIGE with the pipe transformer configured via an argument list. This defines which command to run cmd=cmd, the format of the data passed to the running command input_formatter=‘vcf’, the format of the data coming out of the running command output_formatter=’csv’; and also passes configuration options to the input and output formatters. The schema is automatically determined by looking at the header of the CSV file produced by SAIGE. In this command, we display the GWAS results in a tabular form before saving the results out as a Delta Lake.

This approach is flexible and can be applied to most command line bioinformatics tools. For instance, by adding a step that partitions genomic variants by gene, we can quickly adapt this workflow to run SAIGE-GENE. As we mentioned earlier in this blog, we are already using this workflow in our VEP annotation pipeline.

Performance and Scaling

To understand the performance of SAIGE via the pipe transformer, we ran the association testing phase of SAIGE on chromosome 22 of the Thousand Genomes Project. For the local run on an r4.xlarge machine, SAIGE took 43.61 minutes starting from a flat tabix-indexed and bgzipped VCF and ending with a flat CSV. For the pipe transformer, we ingested the variant data from a Delta Lake that was repartitioned to have the same number of partitions as there were cores in our cluster. We then ran SAIGE, and uploaded the results to a Delta Lake.

During our scale testing, we tested clusters with 1 to 16 r4.xlarge workers (4 to 64 cores), doubling the cluster size each time. The figure above plots the speedup of running SAIGE through the pipe transformer versus running SAIGE on a single node. On a single node, our workflow is 13% faster than naïvely running SAIGE. As we increase the cluster to 16 nodes, our workflow gets 13.5x faster than the naïve run , dropping to under 3 minutes of runtime. The runtime of the pipe transformer has overhead relative to the naïve run due to performing I/O to cloud storage, and converting between DataFrames and flat VCF or CSV files. These additional steps allow for embarrassing parallelism such that the user does not have to shard the input files or merge the output ones, while keeping all the data in a central Delta Lake.

まとめ

With the Pipe Transformer, it is easy to create a reusable and scalable GWAS pipeline. Variant data can be ingested from VCF or BGEN, QC filtering can be performed in Spark SQL, and pre-processed data can be saved to a Delta Lake. The association test is then parallelized and the results can then be uploaded to Delta Lake, making it easy to manage multiple runs and perform fast queries downstream on GWAS summary data.

For more detail on how to run SAIGE with the Pipe Transformer, see the documentation.

Try it!

Using the pipe transformer and Spark SQL, we have been able to develop an easy-to-use, fast, and scalable framework that enables running traditional single-node bioinformatics tools on Databricks (Azure | AWS). Learn more about our genomics solutions in the Databricks Unified Analytics for Genomics and try out a preview today.

Databricks 無料トライアル 使ってみる

ご登録