Accurately Building Genomic Cohorts at Scale with Delta Lake and Spark SQL

This is the second post in our “Genomic Analysis at Scale”  series.  In our first postwe explored a simple problem: how to provide real-time aggregates when sequencing large volumes of genomes. We solved this problem by using Delta Lake and a streaming pipeline built using Spark SQL. In this blog, we focus on the more advanced process of joint genotyping, which involves merging variant calls from many individuals into a single view of a population. This is one of the most common and complex problems in genomics.


At Databricks we have leveraged innovations in distributed computation, storage, and cloud infrastructure and applied them to genomics to help solve problems that have hindered the ability for organizations to perform joint-genotyping, the “N + 1” problem, and the challenge of scaling to population-level cohorts. Our Unified Analytics Platform for Genomics provides an optimized pipeline that scales to massive clusters and thousands of samples with a single click. In this blog, we explore how to apply those innovations to joint genotyping.

Before we dive into joint genotyping, first let’s discuss why people do large scale sequencing. Most people are familiar with the genetic data produced by 23andMe or AncestryDNA. These tests use genotyping arrays, which read a fixed number of variants in the genome, typically ~1,000,000 well-known variants which occur commonly in the normal human population. With sequencing, we get an unbiased picture of all the variants an individual has, whether they are variants we’ve seen many times before in healthy humans or variants that we’ve never seen before that contribute to or protect against diseases. Figure 1 demonstrates the difference between these two approaches.

 

Figure 1: This diagram illustrates the difference between variation data produced by genotype arrays (left) and by sequencing (middle) followed by joint genotyping (right). Genotyping arrays are restricted to “read” a fixed number of known variants, but guarantee a genotype for every sample at every variant. In sequencing, we are able to discover variants that are so rare that they only exist in a single individual, but determining if a novel variant is truly unique in this person or just hard to detect with current technology is a non-trivial problem.

While sequencing provides much higher resolution, we encounter a problem when trying to examine the effect of a genetic variant across many patients. Since a genotyping array measures the same variants across all samples, looking across many individuals is a straightforward proposition: all variants have been measured across all individuals. When working with sequencing data, we have a trickier proposition: if we saw a variant in patient 1, but didn’t see that variant in patient 2, what does that tell us? Did patient 2 not have an allele of that variant? Alternatively, when we sequenced patient 2, did an error occur that caused the sequencer to not read the variant we are interested in?

Joint genotyping addresses this problem in three separate ways:

  1. Combining evidence from multiple samples enables us to rescue variants which do not meet strict statistical significance to be detected accurately in a single sample
  2. As the accuracy of your predictions at each site in the human genome increases, you are better able to model sequencing errors and filter spurious variants
  3. Joint genotyping provides a common variant representation across all samples that simplifies asking whether a variant in individual X is also present in individual Y

Accurately Identifying Genetic Variants at Scale with Joint Genotyping

Joint genotyping works by pooling data together from all of the individuals in our study when computing the likelihood for each individual’s genotype. This provides us a uniform representation of how many copies of each variant are present in each individual, a key stepping stone for looking at the link between a genetic variant and disease. When we compute these new likelihoods, we are also able to compute a prior probability distribution for a given variant appearing in a population, which we can use to disambiguate borderline variant calls.

For a more concrete example, table 1 shows the precision and recall statistics for indel (insertions/deletions) and single-nucleotide variants (SNVs) for the sample HG002 called via the GATK variant calling pipeline compared to the Genome-in-a-Bottle (GIAB) high-confidence variant calls in high-confidence regions.

Table 1: Variant calling accuracy for HG002, processed as a single sample

Recall Precision
Indel 96.25% 98.32%
SNV 99.72% 99.40%

As a contrast, table 2 shows improvement for indel precision and recall and SNV recall when we jointly call variants across HG002 and two relatives (HG003 and HG004). The halving of this error rate is significant, especially for clinical applications.

Table 2: Variant calling accuracy for HG002 following joint genotyping with HG003 and HG004

Recall Precision
Indel 98.21% 98.98%
SNV 99.78% 99.34%

Originally, joint genotyping was performed directly from the raw sequencing data for all individuals, but as studies have grown to petabytes in size, this approach has become impractical. Modern approaches start from a genome variant call file (gVCF), a tab-delimited file containing all variants seen in a single sample, and information about the quality of the sequencing data at every position where no variant was seen. While the gVCF-based approach touches less data than looking at the sequences, a moderately sized project can still have tens of terabytes of gVCF data. This gVCF-based approach eliminates the need to go back to the raw reads but still requires all N+1 gVCFs to be reprocessed when jointly calling variants with a new sample added to the cohort. Our approach uses Delta Lakes to enable incrementally squaring-off the N+1 samples in the cohort, while parallelizing regenotyping using Apache SparkTM.

Challenges Calling Variants at a Population level

Despite the importance of joint variant calling, bioinformatics teams often defer this step because the existing infrastructure around GATK4 makes these workloads hard to run and even harder to scale. The default implementation of the GATK4’s joint genotyping algorithm is single threaded, and scaling this implementation relies on manually parallelizing the joint genotyping kernel using a workflow language and runners like WDL and Cromwell. While GATK4 has support for a Spark-based HaplotypeCaller, it does not support running GenotypeGVCFs parallelized using Spark. Additionally, for scalability, the GATK4 best practice joint genotyping workflow relies on storing data in GenomicsDB. Unfortunately, GenomicsDB has limited support for cloud storage systems like AWS S3 or Azure Blob Storage, and studies have demonstrated that the new GenomicsDB workflow is slower than the old CombineGVCFs/GenotypeGVCFs workflow on some large datasets.

Our Solution

The Unified Analytics Platform for Genomics’ Joint Genotyping Pipeline (Azure | AWS) provides a solution for these common needs. Figure 2 shows the computational architecture of the joint genotyping pipeline. This pipeline is provided as a notebook (Azure | AWS) that can be called as a Databricks job, the joint variant calling pipeline is simple to run: the user simply needs to provide their input files and output directory. When the pipeline runs, it starts by appending the input gVCF data via our VCF reader Azure | AWS) to Delta Lake. Delta Lake provides inexpensive incremental updates, which makes it cheap to add an N+1th sample into an existing cohort. When the pipeline runs, it uses Spark SQL to bin the variant calls. The joint variant calling algorithm then runs in parallel over each bin, scaling linearly with the number of variants.

Figure 2: The computational flow of the Databricks joint genotyping pipeline. In stage 1, the gVCF data is ingested into a Delta Lake columnar store in a scheme partitioned by a genomic bin. This Delta Lake table can be incrementally updated as new samples arrive. In stage 2, we then load the variants and reference models from the Delta Lake tables and directly run the core re-genotyping algorithm from the GATK4’s GenotypeGVCFs tool. The final squared off genotype matrix is saved to Delta Lake by default, but can also be written out as VCF.

The parallel joint variant calling step is implemented through Spark SQL, using a similar architecture to our DNASeq and TNSeq pipelines. Specifically, we bin all of the input genotypes/reference models from the gVCF files into contiguous regions of the reference genome. Within each bin, we then sort the data by reference position and sample ID. We then directly invoke the joint genotyping algorithm from the GATK4’s GenotypeGVCFs tool over the sorted iterator for the genomic bin. We then save this data out to a Delta table, and optionally as a VCF file. For more detail on the technical implementation, see our Spark Summit 2019 talk.

Benchmarking

To benchmark our approach, we used the low-coverage WGS data from the 1000 Genomes project for scale testing, and data from the Genome-in-a-Bottle consortium for accuracy benchmarking. To generate input gVCF files, we aligned and called variants using our DNASeq pipeline. Figure 3 demonstrates that our approach is efficiently scalable with both dataset and cluster size. With this architecture, we are able to jointly call variants across the 2,504 sample whole genome sequencing data from the 1000 Genomes Project in 79 hours on 13 c5.9xlarge machines. To date, we have worked with customers to scale this pipeline across projects with more than 3,000 whole genome samples.

Figure 3: Strong scaling (left) was evaluated by holding the input data constant at 10 samples and increasing the number of executors. Weak scaling (right) was evaluated by holding the cluster size fixed at 13 i3.8xlarge workers and increasing the number of gVCFs processed.

Running with default settings, the pipeline is highly concordant with the GATK4 joint variant calling pipeline on the HG002, HG003 and HG004 trio. Table 3 describes concordance at the variant and genotype level when comparing our pipeline against the “ground truth” of the GATK4 WDL workflow for joint genotyping. Variant concordance implies that the same variant was called across both tools; a variant called by our joint genotyper only is a false positive, while a variant called only by the GATK GenotypeGVCFs workflow is a false negative. Genotype concordance is computed across all variants that were called by both tools. A genotype call is treated as a false positive relative to the GATK if the count of called alternate alleles increased in our pipeline, and as a false negative if the count of called alternate alleles decreased.

Table 3: Concordance Statistics Comparing Open-source GATK4  vs. Databricks for Joint Genotyping Workflows

Precision Recall
Variant 99.9985% 99.9982%
Genotype 99.9988% 99.9992%

The discordant calls generally occur at locations in the genome that have a high number of observed alleles in the input gVCF files. At these sites, the GATK discards some alternate alleles to reduce the cost of re-genotyping. However, the alleles eliminated in this process depends on the sequence that variants are read. Our approach is to list the alternate alleles in the lexicographical order of the sample names prior to pruning. This approach ensures that the output variant calls are consistent given the same sample names and variant sites, regardless of how the computation is parallelized.

Of additional note, our joint genotyping implementation exposes a configuration option (Azure | AWS) which “squares off” and re-genotypes the input data without adjusting the genotypes by a prior probability which is computed from the data. This feature was requested by customers who are concerned about the deflation of rare variants caused by the prior probability model used in the GATK4.

Try it!

Databricks 無料トライアル 使ってみる

ご登録