DNA Sequences Preprocessing Using PySpark Library
Updated 05/08/2024
1. Overview
2. Splice-junction Gene Sequences Dataset
3. Custom PySpark DataFrame ‘shape’ Attribute
4. PySpark DataFrame Loading
5. Preprocessing PySpark Functions
6. PySpark Transform Pipeline
7. Conclusions
1. Overview
Distributed Computing is a field of computer science that deals with designing and implementing systems where multiple computers work together to achieve a common goal. Instead of relying on a single powerful machine, distributed computing harnesses the computational power of multiple interconnected computers, often referred to as nodes or hosts, to solve complex problems or perform tasks efficiently.
In distributed computing, tasks are typically divided into smaller sub-tasks, which are then distributed among the nodes in the network. These nodes can communicate with each other to coordinate their efforts, share data, and synchronize their actions. Distributed computing systems can be designed to operate in various architectures, such as client-server models, peer-to-peer networks, or clusters.
PySpark is the Python API for Apache Spark, a powerful open-source distributed computing system designed for big data processing and analytics. Spark provides an easy-to-use interface for parallel processing large datasets across a cluster of computers. PySpark allows developers to write Spark applications using Python, leveraging the rich ecosystem of Python libraries while taking advantage of Spark’s distributed computing capabilities. It’s widely used for tasks such as data cleaning, transformation, analysis, Machine Learning (ML), and more, especially in environments where large-scale data processing is required.
Spark MLlib is the ML library within Apache Spark, an open-source distributed computing system. MLlib provides a scalable set of machine learning algorithms and tools that can be seamlessly integrated into Spark applications. It offers various functionalities for tasks such as classification, regression, clustering, collaborative filtering, dimensionality reduction, and more. MLlib is designed to work efficiently with large-scale data sets and leverages Spark’s distributed computing framework to perform computations in parallel across a cluster of machines. This allows for faster processing of machine learning tasks compared to traditional single-machine solutions.
In PySpark, Transformers are functions that take a DataFrame as input and output another DataFrame with the modified features. These functions can be divided into different categories such as text data transformations, categorical features transformations and continuous numerical transformations. A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:
- A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
- A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.
Based on the advanced DNA sequence preprocessing ETL algorithm present in Advanced DNA Sequences Preprocessing for Deep Learning Networks, this paper will show how to implement the same logic using the PySpark library. Some of the latest and best practices of Machine Learning algorithms applied in genomics Life Sciences have been published on Medium.com in the paper Machine Learning Applications in Genomics Life Sciences by Ernest Bonat, Ph.D.
2. Splice-junction Gene Sequences Dataset
The splice-junction gene sequences dataset contains information about splice-junctions taking from Genbank 64.1. The task description states that genes are removed during the RNA transcription process, and are called introns, while regions are used to generate mRNA and are called exons. Junctions between them are called splice-junctions. These junctions are points on a DNA sequence at which `superfluous’ DNA is removed during the process of protein creation in higher organisms. There are two kinds of splice-junctions: exon-intron junction and intron-exon junctions. Each of the DNA sequences in this dataset are 60 base nucleotides long. Each DNA sequence belong to one of 3 classes: “EI” (Extron-Intron junction), “IE” (Intron-Extron junction) and “N” (neither EI or IE). There are 767 genes with the EI label, 768 with the IE label, and 1655 with the N label. The task of this dataset is to classify, given a DNA sequence, the boundaries between exons (the parts of the DNA sequence retained after splicing) and introns (the parts of the DNA sequence that are spliced out).
3. Custom PySpark DataFrame ‘shape’ Attribute
For some reason, the PySpark DataFrame does not support the ‘shape’ attribute, as Pandas DataFrame does. This attribute can be created with a simple ‘sdf_shape()’ function, as shown below.
@staticmethod
def sdf_shape(sdf):
""" determine dataframe shape as (# rows, # columns)
args:
sdf (dataframe): pyspark dataframe
returns:
tuple: dataframe shape
"""
try:
sdf_shape = (sdf.count(), len(sdf.columns))
except:
tb.print_exc()
return sdf_shape
The new shape attribute needs to be set up with the ‘sdf_shape()’ function using a single line of code below. That’s all.
pyspark.sql.dataframe.DataFrame.shape = sdf_shape
4. PySpark DataFrame Loading
To load data into a PySpark DataFrame object, the ‘read()’ method of the Spark session is used. The ‘etl_pyspark_dna_load()’ function defined below sets the DataFrame shape, creates a Spark session, and loads the DataFrame with a CSV file. It returns the DataFrame and Spark session for future code logic implementation. It’s a good practice to use this PySpark DataFrame loading function anywhere it’s needed. It can be added to a generic module or a class and used by its users as a simple static method. Docstring comments and error handling code have been implemented — don’t forget that!
@staticmethod
def etl_pyspark_dna_load(csv_path_folder, dna_sequence_csv_name, sdf_shape_function):
"""create spark session and dataframe by loading a csv file
args:
csv_path_folder (string): csv path of the folder
dna_sequence_csv_name (string): dna sequence csv file name
sdf_shape_function (function): custom shape function
returns:
objects: spark session and dataframe
"""
try:
pyspark.sql.dataframe.DataFrame.shape = sdf_shape_function
spark_session = SparkSession.builder.appName("etl_pyspark_dna_preprocessing").getOrCreate()
dna_sequence_csv_path = os.path.join(csv_path_folder, dna_sequence_csv_name)
sdf= spark_session.read.format("csv").option("header","true").load(dna_sequence_csv_path)
except:
tb.print_exc()
return sdf, spark_session
Here is an example of calling the ‘etl_pyspark_dna_load()’ function and returning the DataFrame and the Spark session.
csv_path_folder = r"csv_path_folder"
dna_sequence_csv_name = "dna_sequence_csv_name.csv"
sdf, spark_session = etl_pyspark_dna_load(csv_path_folder, dna_sequence_csv_name, sdf_shape)
print(sdf.shape)
Result:
(3190, 2)
5. Preprocessing PySpark Functions
Follow the ETL algorithm’s logic presented in Advanced DNA Sequences Preprocessing for Deep Learning Networks. Here are the required PySpark functions.
# 1. call the function etl_pyspark_dna_cleanup()
sdf = etl_pyspark_dna_cleanup(sdf, spark_session)
print(sdf.shape)
Result:
(21, 2) # there are 21 DNA sequence string errors.
As you can see, this first cleanup function passes the DataFrame and the Spark session as arguments. Both arguments are important for the chain implementation of the required ‘transform()’
functions shown at the end of this paper. The cleanup function code is shown below.
@staticmethod
def etl_pyspark_dna_cleanup(sdf, spark_session):
"""validate a dna sequence
args:
sdf (object): dataframe
spark_session (object): spark session
returns:
object: dataframe
"""
try:
dna_class_list = []
dna_sequence_list = []
for row in sdf.collect():
dna_class = row.dna_class
dna_sequence = row.dna_sequence
is_dna_result = is_dna(dna_sequence)
if is_dna_result == True:
dna_class_list.append(dna_class)
dna_sequence_list.append(dna_sequence)
sdf = spark_session.createDataFrame(data=list(zip(dna_class_list, dna_sequence_list)), schema=sdf.columns)
except:
tb.print_exc()
return sdf
Here are the 21 DNA sequences bad string found in the original CSV dataset file.
Below are the ETL steps for processes 2, 3, and 4. After step 4, the final DataFrame is created and saved as a CSV file using the ‘etl_pyspark_dna_create_csv()’ function.
Below are the ETL steps for processes 2, 3, and 4. After step 4, the final DataFrame is created and saved as a CSV file using the ‘etl_pyspark_dna_create_csv()’ function.
# 2. call the function etl_pyspark_dna_label_encoder()
sdf = etl_pyspark_dna_label_encoder(sdf, spark_session)
print(sdf.shape)
Result:
(3169, 61)
# 3. call the function etl_pyspark_dna_smote()
sdf = etl_pyspark_dna_smote(sdf, spark_session)
print(sdf.shape)
Result:
(4926, 61)
# 4. call the function etl_pyspark_dna_deep_learning_network()
sdf = etl_pyspark_dna_deep_learning_network(sdf, spark_session)
print(sdf.shape)
Result:
(4926, 2)
# call the function etl_pyspark_dna_create_csv()
etl_pyspark_dna_csv_name = "etl_pyspark_dna_deep_learning_network.csv"
dna_csv_column_name = ["dna_class", "dna_sequence"]
etl_pyspark_dna_create_csv(csv_path_folder, etl_pyspark_dna_csv_name, dna_csv_column_name)
Here are 10 rows of the ‘etl_pyspark_dna_deep_learning_network.csv’ created final file.
6. PySpark Transform Pipeline
Now that we have all the ETL PySpark DNA functions developed and running, the chain of custom transformation functions can be used to create a pipeline process to run them one by one as shown below. Simple and good!
def etl_pyspark_dna_pipeline(sdf):
"""pyspark dna tranform pipeline
args:
sdf (dataframe): dna pyspark dataframe
"""
try:
sdf = sdf.transform(etl_pyspark_dna_cleanup, spark_session) \
.transform(etl_pyspark_dna_label_encoder, spark_session) \
.transform(etl_pyspark_dna_smote, spark_session) \
.transform(etl_pyspark_dna_deep_learning_network, spark_session)
except:
tb.print_exc()
Result:
etl_pyspark_dna_pipeline(sdf)
print(sdf.shape)
(4926, 2)
7. Conclusions
1. Develop and test the PySpark preprocessing functions first, and then apply the chaining transformation functions to run the pipeline process.
2. A generic load function was created to return the initial DataFrame and the used Spark session.
3. Based on the Pandas DataFrame shape attribute, a new one was developed for PySpark DataFrame.