Editor’s note: Suman Debnath is a speaker for ODSC APAC this August 22-23. Be sure to check out his talk, “Build Classification and Regression Models with Spark on AWS,” there!

In the unceasingly dynamic arena of data science, discerning and applying the right instruments can significantly shape the outcomes of your machine learning initiatives. A cordial greeting to all data science enthusiasts! I consider myself fortunate to have the opportunity to speak at the upcoming ODSC APAC conference slated for the 22nd of August 2023. My presentation will focus on the development of classification and regression models using PySpark on AWS.

Understanding the Session

In this engaging and interactive session, we will delve into PySpark MLlib, an invaluable resource in the field of machine learning, and explore how various classification algorithms can be implemented using AWS Glue/EMR as our platform.

Our focus will be hands-on, with an emphasis on the practical application and understanding of essential machine learning concepts. Attendees will be introduced to a variety of machine learning algorithms, placing a spotlight on logistic regression, a potent supervised learning technique for solving binary classification problems.

But this session goes beyond just concepts and algorithms. We will also navigate through critical data preprocessing techniques, essential for creating effective machine learning models. By the session’s conclusion, participants will acquire skills to handle missing values, modify column data types, and divide their data into training and testing datasets. This hands-on experience will all take place within the versatile AWS Glue/EMR environment.

What Will You Gain?

This session is designed to help participants gain an in-depth understanding of:

  • PySpark MLlib
  • Unsupervised learning techniques
  • Various types of classification algorithms
  • Implementation of logistic regression classifiers
  • Data preprocessing using PySpark on AWS using AWS Glue and Amazon EMR
  • Model building with PySpark on AWS

If you’re a data engineer, data scientist, or machine learning enthusiast looking to get started with Machine Learning with Apache Spark on AWS, this session is perfect for you.

Now, let’s give you a taste of what’s in store (the GitHub code repository can be found here).

We selected a dataset comprising 20,057 dish names, each detailed with 680 columns that characterize the ingredient list, the nutritional content, and the dish’s category. Our collective objective here is to predict whether a dish is a dessert. This is a straightforward and mostly clear-cut question – most of us can likely classify a dish as a dessert or not simply by reading its name, which makes it an excellent candidate for a simple ML model.

Step 1: Importing the Necessary Libraries

The first step involves importing the necessary libraries, including PySpark SQL functions and types

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Imputer, MinMaxScaler, VectorAssembler

Step 2: Data Preprocessing and EDA (Exploratory Data Analysis)

We load the CSV dataset of food recipes using Spark’s read.csv function. The inferSchema parameter is set to True to infer the data types of the columns, and header is set to True to use the first row as headers.

# Loading the data
dataset = 's3://fcc-spark-example/dataset/2023/recipes_dataset/epi_r.csv'
food = (
          spark
              .read
              .csv(dataset, inferSchema=True, header=True)
      )
     
# Sanitizing the column names
def sanitize_column_name(name):
  answer = name
  for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
      answer = answer.replace(i, j)
  return "".join(
      [
          char
          for char in answer
          if char.isalpha() or char.isdigit() or char == "_"
      ]
  )
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])

This part of the script sanitises the column names by replacing spaces, dashes, slashes, and ampersands with underscores. It also removes non-alphanumeric characters.

# Filtering the data
food = food.where(
  (
      F.col("cakeweek").isin([0.0, 1.0])
      | F.col("cakeweek").isNull()
  )
  & (
      F.col("wasteless").isin([0.0, 1.0])
      | F.col("wasteless").isNull()
  )
)

Here we filter the data to keep only the rows where the cakeweek and wasteless columns have values of 0.0 or 1.0, or are null.

# Defining identifier, continuous, target, and binary columns
IDENTIFIERS = ["title"]
CONTINUOUS_COLUMNS = [
  "rating",
  "calories",
  "protein",
  "fat",
  "sodium",
]
TARGET_COLUMN = ["dessert"]
BINARY_COLUMNS = [
  x
  for x in food.columns
  if x not in CONTINUOUS_COLUMNS
  and x not in TARGET_COLUMN
  and x not in IDENTIFIERS
]

In this section, we define which columns are identifiers, continuous variables, target variables, and binary variables.

# Handling missing values
food = food.dropna(
  how="all",
  subset=[x for x in food.columns if x not in IDENTIFIERS],
)
food = food.dropna(subset=TARGET_COLUMN)
food = food.fillna(0.0, subset=BINARY_COLUMNS)

We handle missing values by dropping rows that have all nulls (excluding identifier columns), dropping rows with nulls in the target column, and filling nulls in binary columns with 0.0.

# Converting string numbers to float and capping continuous variables
from typing import Optional

@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
  if not value:
      return True
  try:
      _ = float(value)
  except ValueError:
      return False
  return True
for column in ["rating", "calories"]:
  food = food.where(is_a_number(F.col(column)))
  food = food.withColumn(column, F.col(column).cast(T.DoubleType()))
maximum = {
  "calories": 3203.0,
  "protein": 173.0,
  "fat": 207.0,
  "sodium": 5661.0,
}
for k, v in maximum.items():
  food = food.withColumn(
      k,
      F.when(F.isnull(F.col(k)), F.col(k)).otherwise(
          F.least(F.col(k), F.lit(v))
      ),
  )

In this part, we create a user-defined function is_a_number to check if a string can be converted to a float. We use this function to filter out non-numeric values in the “rating” and “calories” columns and then cast them to double type.

We then cap the values of continuous variables “calories”, “protein”, “fat”, and “sodium” at specified maximums to handle possible outliers.

# Calculating the sum of each binary column
inst_sum_of_binary_columns = [
  F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS
]
# Selecting the sums of binary columns and converting the result to a dictionary
sum_of_binary_columns = (
  food.select(*inst_sum_of_binary_columns).head().asDict()
)
# Counting the total number of rows
num_rows = food.count()
# Identifying the rare features
too_rare_features = [
  k
  for k, v in sum_of_binary_columns.items()
  if v < 10 or v > (num_rows - 10)
]
# Excluding the rare features from the binary columns
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))

Next, we calculate the sum of each binary column and convert the result to a dictionary. Then, we identify the “rare” features—those that are true less than 10 times or true in all but less than 10 instances—and remove them from our binary columns.

# Creating new features
food = food.withColumn(
  "protein_ratio", F.col("protein") * 4 / F.col("calories")
).withColumn(
  "fat_ratio", F.col("fat") * 9 / F.col("calories")
)
# Handling missing values in the new features
food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])
# Adding new features to the continuous columns
CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]

Here, we create new features “protein_ratio” and “fat_ratio” that represent the ratio of protein and fat to calories, respectively. We fill missing values in these new features with 0.0 and add them to our continuous columns.

# Imputing missing values in the continuous columns
OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "sodium_i"]
imputer = Imputer(
  strategy="mean",
  inputCols=OLD_COLS,
  outputCols=NEW_COLS,
)
imputer_model = imputer.fit(food)
# Updating the continuous columns
CONTINUOUS_COLUMNS = (
  list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS
)
# Applying the imputer model to the data
food = imputer_model.transform(food)

In this section, we impute missing values in the “calories”, “protein”, “fat”, and “sodium” columns with their mean values using Spark’s Imputer. We then update our list of continuous columns to include the imputed ones.

# Assembling continuous features into a single vector
CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]
continuous_assembler = VectorAssembler(
  inputCols=CONTINUOUS_NB, outputCol="continuous"
)
food_features = continuous_assembler.transform(food)

Next, we use the VectorAssembler to assemble our continuous features into a single vector column “continuous.”

# Scaling the continuous features
continuous_scaler = MinMaxScaler(
  inputCol="continuous",
  outputCol="continuous_scaled",
)
food_features = continuous_scaler.fit(food_features).transform(
  food_features
)

Finally, we scale the continuous features to the range [0, 1] using the MinMaxScaler, fitting it to our data and transforming our data. At this point, our dataset is ready for machine learning tasks!

Now we are ready to perform the Machine Learning training job.

Step 3: Train, Test, and Evaluate Model

Once the data is processed and transformed, we can split it into a training set and a testing set. After training the model, we can then evaluate its performance using various metrics. In this section, we build an ML pipeline with the estimators we used for our dessert prediction feature preparation program and add the modeling step in the mix.

from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
imputer = MF.Imputer(
                      strategy="mean",
                      inputCols=["calories", "protein", "fat", "sodium"],
                      outputCols=["calories_i", "protein_i", "fat_i", "sodium_i"],
                  )
continuous_assembler = MF.VectorAssembler(
                                          inputCols=["rating", "calories_i", "protein_i", "fat_i", "sodium_i"],
                                          outputCol="continuous",
                                      )
continuous_scaler = MF.MinMaxScaler(
                                    inputCol="continuous",
                                    outputCol="continuous_scaled",
                                  )
food_pipeline = Pipeline(
                          stages=[imputer, continuous_assembler, continuous_scaler]
                      )

We can assemble the final dataset with the vector column type.

preml_assembler = MF.VectorAssembler(
                                      inputCols=BINARY_COLUMNS
                                      + ["continuous_scaled"]
                                      + ["protein_ratio", "fat_ratio"],
                                      outputCol="features",
                                  )
food_pipeline.setStages(
                          [imputer, continuous_assembler, continuous_scaler, preml_assembler]
                      )
food_pipeline_model = food_pipeline.fit(food)
food_features = food_pipeline_model.transform(food)

Our data frame is ready for machine learning! We have a number of records, each with

  • A target (or label ) column, dessert, containing a binary input (1.0 if the recipe is a dessert, 0.0 otherwise)
  • A vector of features, called features, containing all the information we want to train our machine learning model with

We can display the predicted outcomes:

food_features.select("title", "dessert", "features").show(30, truncate=30)

Let’s now train an ML model using a LogisticRegression classifier:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
                          featuresCol="features", labelCol="dessert", predictionCol="prediction"
                      )
food_pipeline.setStages(
  [
      imputer,
      continuous_assembler,
      continuous_scaler,
      preml_assembler,
      lr,
  ]
)
# Splitting our data frame for training and testing
train, test = food.randomSplit([0.7, 0.3], 13)
train.cache()
food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)
Let us now evaluate the model, and look at the confusion matrix
results.select("prediction", "rawPrediction", "probability").show(3, False)
# Creating a confusion matrix for our model using pivot()
results.groupby("dessert").pivot("prediction").count().show()
Finally, we can calculate the precision and recall of our model:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))
print(f"Model precision: {metrics.precisionByLabel[1]}")
print(f"Model recall: {metrics.recallByLabel[1]}")

Please note that the full script has been simplified for the purpose of this tutorial. For a comprehensive understanding of the practical applications, including a detailed code walkthrough from data preparation to model deployment, please join us at the ODSC APAC conference 2023.

This brief tutorial has given you a glimpse of what’s to be covered in the ODSC session. By attending the session, you’ll get to explore these topics more profoundly and understand the intricacies of PySpark MLlib. The primary objective is to empower data science enthusiasts and professionals to harness the full potential of Spark MLlib in their machine learning projects.

Remember, the key to mastering any skill lies in consistent learning and practical implementation. So, buckle up and get ready to dive deeper into the fascinating world of machine learning with Spark on AWS at the ODSC conference. Looking forward to seeing you there!

About the Author:

Suman Debnath is a Principal Developer Advocate(Data Engineering) at Amazon Web Services, primarily focusing on Data Engineering, Data Analysis and Machine Learning. He is passionate about large scale distributed systems and is a vivid fan of Python. His background is in storage performance and tool development, where he has developed various performance benchmarking and monitoring tools.