1.1. SeQuiLa ecosystem¶
SeQuiLa is a distributed, ANSI-SQL compliant solution for efficient genomic operations (currently available genomic intervals querying/processing and depth of coverage calculations) that is available as an additional Apache Spark package.
In runtime it extends SparkSQL Catalyst optimizer with custom execution strategies. It implements distributed map joins using interval forest for inner range join operations and distributed event-based algorithm for depth of coverage computations. Useful genomic transformations have been added as User Defined Functions/Aggregates and exposed to the SQL interface. Furthermore, SeQuiLa data sources for both BAM and ADAM file formats have been implemented.
It can be integrated with third-party tools using SparkSQL JDBC driver and with R using sparklyr package. SeQuiLa is also available as a Docker container and can be run locally or on a Hadoop cluster using Yet Another Resource Negotiator
1.2. Distributed interval joins¶
Consider datasets s1 and s2, storing genomic intervals, such as |s1|<|s2|. Let’s assume that their structure contains necessary genomic coordinates (chromosome, start position, end position) along with optional additional interval annotations (e.g. targetId).
Our goal is to efficiently perform query as shown below:
SELECT s2.targetId,count(*) FROM reads s1 JOIN targets s2 ON s1.chr=s2.chr AND s1.end>=s2.start AND s1.start<=s2.end GROUP BY targetId;
At it’s core SeQuiLa’s range joins are based on IntervalTree data structure. The main idea of the algorithm is to transform dataset s1 into a broadcastable structure of an interval forest (a hash map of interval trees, each representing one chromosome). The intervals from dataset s2 can be efficiently intersected with the constructed interval forest.
An interval tree is a tree data structure to hold intervals. It is a augmented, balanced red-black tree with low endpoint as node key and additional max value of any endpoint stored in subtree. Each node contains following fields: parent, left subtree, right subtree, color, low endpoint, high endpoint and max endpoint of subtree. It can be proved that this structure allows for correct interval insertion, deletion and search in \(O(lg n)\) time ([CLR])
Our implementation of IntervalTree is based on explanations in [CLR] although it is extended in the following ways:
data structure allows storing non-unique intervals
data structure allows storing in the tree nodes additional interval annotations if they fit into dedicated Spark driver’s memory fraction
Let’s presume that we have a cluster with a Spark driver, three worker nodes and that the tables are partitioned between worker nodes. When interval query is performed, all dataset’s \(s1\) partitions are sent to the driver node on which interval forest is constructed (for each chromosome a separate interval tree is created). The forest is subsequently sent back to worker nodes on which efficient interval operations based on interval trees are performed. Depending on the strategy chosen by rule-based optimizer different set of columns are stored in tree nodes
SeQuiLa package introduces a new rule based optimizer (RBO) that chooses most efficient join strategy based on input data statistics computed in the runtime. The first step of the algorithm is to obtain value of maxBroadcastSize parameter. It can set explicitly by the end user or computed as a fraction of the Apache Spark Driver memory. In the next step table row counts are computed and based on that table with the fewer rows is selected for constructing interval forest. This is the default approach - it can be overridden by setting spark.biodatageeks.rangejoin.useJoinOrder to true. In this scenario no row counts are computed and the right join table is used for creating interval forest. Such an strategy can be useful in situation when it is known upfront which table should be used for creating a broadcast structure. The final step of the optimization procedure is to estimate the row size and the size of the whole projected table. If it fits into dedicated Spark Driver’s memory (controlled by maxBroadcastSize parameter) the interval forest is augmented with all columns from s1 (SeQuiLa_it_all strategy) completing map-side join procedure in one stage. Otherwise an interval tree is used as an index for additional lookup step before the equi-shuffle-join operation between s1 and s2 (SeQuiLa_it_int strategy).
1.2.4. Custom interval structure¶
By default SeQuiLa uses Red-Black tree with intervals implemented in org.biodatageeks.sequila.rangejoins.methods.IntervalTree.IntervalTreeRedBlack<V> class. However, it is possible to provide a custom interval structure and set it using spark.biodatageeks.rangejoin.intervalHolderClass parameter. There are 2 prerequisites:
custom class must implement interface provided by the trait: org.biodatageeks.sequila.rangejoins.methods.base.BaseIntervalHolder[V]
node class used for storing intervals must extend org.biodatageeks.sequila.rangejoins.methods.base.BaseNode[V]
Both Scala and Java classes are supported. Please use default interval tree implementation for reference.
1.3. Distributed depth of coverage¶
Consider a set of aligned reads from a BAM file (read_set) for which we want to compute the depth of coverage.
Firstly, we provide a tabular view on the input data, through custom SQL-like data source. The data from the file can be queried using standard SELECT statements. Secondly, using our table-valued function, coverage for read_set can be calculated.
-- create table read_set CREATE TABLE read_set USING org.biodatageeks.org.biodatageeks.sequila.datasources.BAM.BAMDataSource OPTIONS (path '/data/samples/*.bam'); -- sample SQL query on read_set SELECT sample, contig, start, end, cigar, flag, sequence FROM read_set WHERE sample='sample1' AND flag=99 AND contig='chr3' -- calculate coverage on read_set SELECT contig, start, end, coverage FROM coverage('read_set','sample1','blocks') WHERE contig='chr3'
SeQuiLa-cov implements distributed event-based algorithm for coverage calculations. Event-based approach has better computational complexity than traditionally used pileup-based methods since it tracks only selected events of alignment blocks instead of analyzing each base of each alignment block.
The event-based algorithm allocates
events vector for each contig and subsequently iterates through the set of reads parsing it’s CIGAR string to determine continuous alignment blocks. For each block the value in the
events vector in the position corresponding to block start is being incremented and the value in the position corresponding to block end is being decremented. The depth of coverage for a specific locus is calculated using the cumulative sum of all elements in the
events vector preceding specified position.
The algorithm may produce three typically used coverage types: per-base coverage, which includes the coverage value for each genomic position separately, blocks which lists adjacent positions with equal coverage values are merged into single interval, and fixed-length windows coverage that generates set of equal-size, non-overlapping and tiling genomic ranges and outputs arithmetic mean of base coverage values for each region.
In the most general case, the algorithm can be used in a distributed environment. Let’s consider input data set,
read_set, of aligned sequencing reads sorted by genomic position from a BAM file partitioned into the n data slices (
In this setting each cluster node computes the coverage for the subset of data slices using the event-based method. For the i-th partition containing the set of reads (
read_set_i), the set of
events_i vectors for each chromosome is allocated and updated, based on the items from
read_set_i. After performing the CIGAR analysis and appropriate updates on the
events vectors, the cumulative sum is realized and stored in
partial_coverage vectors for each chromosome.
Note that the set of
partial_coverage_i vectors is distributed among the computation nodes. To calculate the final coverage for the input data set, an additional step of correction for overlaps between the partitions is required.
An overlap of length l between vectors adjacent
partial_coverage vectors may occur when l tailing genomic positions of preceding vector are the same as l heading genomic positions of consecutive vector.
If an overlap is identified then the coverage values from the preceding
l-length tail are added into the consecutive
partial_coverage head and subsequently the last
l elements of first
partial_coverage are removed. Once this correction step is completed, non-overlapping set of
coverage vectors are collected and yield the final coverage values for the whole input
The described above algorithm is simplified. In the more general case (specifically when dealing with long reads), overlaps may occur between any number of partitions (more than two) and the overlap length can be bigger than partition length.
1.3.3. Implementation and optimizations¶
The main characteristic of the described above algorithm is its ability to distribute data and calculations (such as BAM decompression and main coverage procedure) among the available computation nodes. Moreover, instead of simply performing full data reduction stage of the partial coverage vectors, our solution minimizes required data shuffling among cluster nodes by limiting it to the overlapping part of coverage vectors.
To efficiently access the data from a BAM file we have prepared a custom data source using Data Source API exposed by SparkSQL. Performance of the read operation benefits from the Intel Genomics Kernel Library (GKL) used for decompressing the BAM files chunks and from predicate push-down mechanism that filters out data at the earliest stage.
The implementation of the core coverage calculation algorithm aimed at minimizing, whenever possible memory footprint by using parsimonious data types, e.g.
Short type instead of
Integer, and efficient memory allocation strategy for large data structures, e.g. favoring static
Arrays over dynamic size
Additionally, to reduce the overhead of data shuffling between the worker nodes in the correction for the overlaps stage we used Spark’s shared variables accumulators and broadcast variables. Accumulator is used to gather information about the worker nodes’ coverage vector ranges and coverage vector tail values, that are subsequently read and processed by the driver. This information is then used to construct a broadcast variable distributed to the worker nodes in order to perform adequate trimming and summing operations on partial coverage vectors.
SeQuiLa-cov computation model supports fine-grained parallelism at user-defined partition size in contrary to the traditional, coarse-grained parallelization strategies that involve splitting input data at a contig level.