The advent of genome-wide association studies (GWAS) in the late 2000s enabled scientists to begin to understand the causes of complex diseases such as diabetes and Crohn’s disease at their most fundamental level. However, academic bioinformatics tools to perform GWAS have not kept pace with the growth of genomic data, which has been doubling globally every seven months.
Given the scale of the challenge and the importance of genomics to the future of healthcare, at Databricks we have dedicated an engineering team to develop extensible Spark-native implementations of workflows such as GWAS, which leverage the high-performance big-data store, Delta Lake, and log runs with MLflow. Combining these three technologies with a library we have developed in-house to enable customers to work with genomic data solves the challenges that we have seen our customers face when working with population-scale genomic data.
This tooling includes an architecture that allows users to ingest genomics data directly from flat file formats such as bed, VCF, or BGEN, into Delta Lake. In this blog, we focus on moving common association testing kernels into Spark SQL, streamlining the running of common tests such as genome-wide linear regression. In our next blog, we will generalize this process by using the pipe-transformer parallelize any single-node bioinformatics tool with Apache Spark™, starting with the GWAS tool SAIGE.
Here we showcase how to run and end-to-end GWAS workflow in a single notebook using the publicly available 1,000 genomes dataset, producing the results in figure 1. We used associated variants from the GWAS catalog to generate a synthetic body-mass index (BMI) phenotype (since the 1000 Genomes project did not capture phenotypes). This notebook is written in Python, but can also be implemented in R, Scala and SQL.
Figure 1. Databricks dashboard showing key results from a GWAS on simulated data based on the 1000 genomes dataset.
Ingest 1,000 Genomes Data into Delta Lake
To start, we will load in the 1,000 Genomes VCF file as a Spark SQL DataFrame and calculate summary statistics. Our schema is an intuitive representation of genomic variants that is consistent across both VCF and BGEN data.
# Reading Databricks Delta version of the VCF file spark.read.format("com.databricks.vcf"). \ option("splitToBiallelic", "true"). \ option("flattenInfoFields", "false"). \ load(vcf_path). \ selectExpr("*", "expand_struct(call_summary_stats(genotypes))", "expand_struct(hardy_weinberg(genotypes))"). \ write. \ format("delta"). \ save(delta_path)
Figure 2. Databricks’
display() command showing VCF file in a Spark DataFrame
The 1,000 Genomes dataset contains whole genome sequencing data, and thus includes many rare variants. By running a count query on the dataset, we find that there are more than 80 million variants. Let’s go ahead and log this metric to MLflow.
# VCF count num_variants = spark.read.format("delta").load(delta_path).count() mlflow.log_metric("Number Variants pre-QC", num_variants) num_variants # Output 81271745
Perform quality control
In our genomics library, we have added quality control functions that compute common statistics across the genotypes at a single variant, as well as across all of the samples in a single callset. Here we are going to filter variants that are not in Hardy-Weinberg equilibrium (“pValueHwe”), which is a population genetics statistic that can be used to assess if variants have been correctly genotyped. We will exclude rare variants based on allele frequency.
spark.read.format("delta"). \ load(delta_path). \ where((col("alleleFrequencies").getItem(0) >= allele_freq_cutoff) & (col("alleleFrequencies").getItem(0) <= (1.0 - allele_freq_cutoff)) & (col("pValueHwe") >= hwe_cutoff)). \ write. \ format("delta"). \ save(delta_qc_path)
Figure 3. Histogram of Hardy-Weinberg Equilibrium P values
Control for ancestry
Population structure can confound genotype-phenotype association analyses. To control for differing ancestry between participants in the study, here we calculate principal components (PCs), which are provided as covariates to the regression kernel. Spark supports singular value decomposition (SVD) through the Spark MLLib DistributedMatrix API, and SVD can be used to calculate PCs from the transpose of the genotypes matrix. We have introduced an API in Spark that makes it easy to build a DistributedMatrix from a DataFrame, and use this to run SVD and get our PCs.
vectorized = spark.read.format("delta"). \ load(delta_qc_path). \ selectExpr("array_to_sparse_vector(genotype_states(genotypes)) as features").cache() matrix = RowMatrix(MLUtils.convertVectorColumnsFromML(vectorized, "features").rdd.map(lambda x: x.features)) pcs = matrix.computeSVD(num_pcs) pcs_df = spark.createDataFrame(pcs.V.toArray().tolist(), ["pc" + str(i) for i in range(num_pcs)])
After running PCA, we get back a dense matrix of PCs per sample, that we will pass as covariates to the regression analysis. The next steps will extract out only the sampleId and the principal components. This allows us to join against the 1,000 Genomes sample metadata file to label each sample with their super-population.
display() command, we can view the clusters of our components within the following scatterplot.
Figure 4. Principal Component Analysis with Super Population Labelling:
EUR = European, EAS = East Asian, AMR = Admixed American, SAS = South Asian, AFR = African
Ingest Phenotype Data
For this genome-wide association study, we will be using simulated BMI phenotypic data to associate with the genotypes. Similar to the ingestion of our genotype data, we will ingest the BMI data by reading our sample Parquet data.
# Ingest normalized phenotype data phenotypes_path = "dbfs:/databricks-datasets/genomics/1000G/phenotypes.normalized" bmiPhenotype = spark.read. \ format("parquet"). \ load(phenotype_path). \ withColumnRenamed("values", "phenotype_values") # View BMI data display(bmiPhenotype.selectExpr("explode(phenotype_values) AS bmi"))
You can visualize the BMI histogram from the preceding
Figure 5. BMI histogram
Running the Genome-Wide Association Study
Now we have performed the necessary quality control and data extraction, transformation and loading (ETL), the next phase of our solution is to run our GWAS by performing the following tasks:
- Mapping the genotypes, phenotypes, and principal components together (using
- Calculate the GWAS statistics by running linear regression.
- Build a new Apache Spark DataFrame (
gwas_df) that contains the GWAS statistics.
# Map variants to GWAS via cross-joins between genotypes, phenotypes, and principle components covariates = spark.read.format("delta").load(principal_components_path) phenotypeAndCovariates = bmiPhenotype.crossJoin(covariates) genotypes = spark.read.format("delta").load(delta_qc_path) genotypes.crossJoin(phenotypeAndCovariates). \ selectExpr("contigName", "start", "phenotype", \ "expand_struct(linear_regression_gwas(genotype_states(genotypes), phenotype_values, covariates))"). \ write. \ format("delta"). \ save(gwas_results_path) # Display data display(spark.read.format("delta").load(gwas_results_path))
Figure 6. Spark DataFrame of GWAS results
display() command allows us to sanity check the results. Next we can convert our PySpark DataFrame to R thus allowing us to use the
qqman package to visualize the results across the genome with a Manhattan plot.
# Extract out GWAS results (and alias various column names) gwas_results <- select(gwas_df, c(cast(alias(gwas_df$contigName, "CHR"), "double"), alias(gwas_df$start, "BP"), "P")) # Convert from a Spark DataFrame to an R DataFrame gwas_results_rdf <- as.data.frame(gwas_results) # Install packages necessary for Manhattan plot install.packages("qqman", repos="http://cran.us.r-project.org") library(qqman) # Create Manhatatan plot of GWAS results and log to MLflow png('/databricks/driver/manhattan.png') manhattan(gwas_results_rdf, col = c("#228b22", "#6441A5"), chrlabs = NULL, suggestiveline = -log10(1e-05), genomewideline = -log10(5e-08), highlight = NULL, logp = TRUE, annotatePval = NULL, ylim=c(0,17)) dev.off() mlflow.log_artifact('/databricks/driver/manhattan.png')
Figure 7. GWAS Manhattan Plot
As you can see from our genome-wide association study, for our 1000 genomes simulated data, there are several loci associated with BMI clustered on chromosome 2. In fact, these are the loci whose known associations with BMI were used to simulate our BMI phenotype.
# Execute QQ plot qq(gwas_results_rdf$P)
We can also check that we have successfully controlled for ancestry by making a quantile-quantile (QQ) plot. In this case, the deviation from expected represents true associations.
Figure 8. GWAS QQ Plot
Finally, we have logged parameters, metrics and plots associated with this GWAS run using MLflow, enabling tracking, monitoring and reproducing of analyses.
Summarizing the Analysis
In this blog, we have demonstrated an end-to-end GWAS workflow using Apache Spark, Delta Lake, and MLflow. Whether you are validating the accuracy of a genotyping assay in clinical use like Sanford Health, or performing a meta-analysis of GWAS results for target identification like Regeneron, the Databricks platform makes it easy to extend analyses and build downstream exploratory visualizations through our built-in dashboarding functionality or through our optimized connectors to BI tools like Tableau and PowerBI which can enable non-coding bench scientists and clinicians to rapidly explore large datasets.
Figure 9. MLflow tracking of each run enables reproducibility of experiments
By robustly engineering an end-to-end GWAS workflow, scientists can move away from ad hoc analysis on flat files, to scalable and reproducible computational frameworks in production. Furthermore by reading VCF data as a Spark DataSource into Delta Lake, data scientists can now integrate tabular phenotypes, Electronic Health Record (EHR) extracts, images, real-world evidence and lab values under a unified framework. Try it yourself today by downloading the Engineering population scale GWAS Databricks notebook.