Pyspark emr

Pyspark emr DEFAULT

Transform Data with EMR

This is an optional module. In this module we are going to use Amazon EMR to submit pyspark jobs to read the raw data and perform some transformations + aggregations and persist the results back in S3.

Architecture Diagram


Pre-requisites

Complete the previous modules:

  • Ingest and Storage
  • Catalog Data

Copy the script to S3

In this step, we will navigate to S3 Console and create couple of folders to be used for the EMR step.

  • Go to: S3 Console Click me
  • Add the PySpark script:
    • Open yourname-analytics-workshop-bucket
      • Click Create folder
        • Create a new folder called
        • Click Save
    • Open scripts
      • Download this file locally: emr_pyspark.py
      • In the S3 console, click Upload:
        • Click Add files and upload the emr_pyspark.py file you just downloaded
        • Click Upload
  • Create a folder for EMR logs:
    • Open yourname-analytics-workshop-bucket
      • Click Create folder
        • Create a new folder called
        • Click Save

Create EMR cluster and add step

In this step, we will create a EMR cluster and submit a Spark step.

  • Go to to the EMR console: Click me
  • Click on Create cluster
  • Continue with Create cluster - Quick options
    • Provide a name for the cluster:
    • Check the Logging option and provide the S3 folder:
      • Change the Launch mode to Step execution
      • Under Add steps select Spark application from the drop down
        • Click on Configure
        • In the popup window, leave the Spark-submit options as blank
        • For the Application location, select the location to the script that was previously uploaded -
        • Click on Select
        • Under Arguments, enter the name of your s3 bucket
        • For Action on failure, select Terminate cluster from the drop down
        • Click on Add to complete EMR Create Cluster
      • Leave the Use AWS Glue Data Catalog for table metadata option unchecked
      • Confirm the Software configuration is set to
        • Release: emr-x.x
        • Applications: Hadoop x.x.x, Spark x.x.x
        • The versions here will be set to the latest version by default
      • Confirm the Hardware configuration defaults are
        • Instance type: m5.xlarge
        • Number of Instances: 3 (1 master and 2 core nodes)
      • Leave the Security and access configurations to default
      • Click on Create cluster to complete.

    Check the status of the Transform Job run on EMR

    • The EMR cluster will take 6-8 minute to get provisioned, and another minute or so to complete the Spark step execution.
    • The Cluster will be terminated after the Spark job has been executed.
    • To check the status of the job, click on the Cluster name: analytics-workshop-transformer
      • Go to the Steps tab
      • Here you should see two items: Spark application and Setup hadoop debugging
      • The status of the Spark application should change from Pending to Running to CompletedEMR Step Completion
      • Once the Spark job run is complete the EMR cluster will be terminated
      • Under EMR > Cluster, you will see the Status of the cluster as Terminated with All steps completed message. EMR CLuster Terminated

    In case of any issues in executing the Spark job, you will see the Status of the cluster as Terminated with errors (Step failure). In case of failures, you can browse through the EMR logs to understand the root cause of the job failure


    Validate - Transformed / Processed data has arrived in S3

    Let’s go ahead and confirm that the EMR transform job has created the data set in the S3 console: Click me

    • Click - yourname-analytics-workshop-bucket > data
    • Open the new folder emr-processed-data:
      • Ensure that .parquet files are created in this folder.

    Rerun the Glue Crawler

    • Go to: Glue console Click me
    • On the left panel, click on Crawlers
      • Select the crawler created in the previous module: AnalyticsworkshopCrawler
    • You should see the Status change to Starting.
      • Wait for few minutes for the crawler run to complete
      • The crawler to display the Tables added as 1

    You can go to the databases section on the left and confirm that emr_processed_data table has been added.


    You will now be able to query the results of the EMR job using Amazon Athena in the next module.

    Sours: https://intro-to-analytics-on-aws.workshop.aws/en/lab-guide/emr.html

    Getting Started with PySpark on AWS EMR

    Data Pipelines with PySpark and AWS EMR is a multi-part series. This is part 1 of 2. Check out part 2 if you’re looking for guidance on how to run a data pipeline as a product job.

    1. Getting Started with PySpark on AWS EMR (this article)
    2. Production Data Processing with PySpark on AWS EMR(up next)

    If you have been following business and technology trends over the past decade, you’re likely aware that the amount of data organizations are generating has skyrocketed. Businesses are eager to use all of this data to gain insights and improve processes; however, “big data” means big challenges.

    Entirely new technologies had to be invented to handle larger and larger datasets. These new technologies include the offerings of cloud computing service providers like Amazon Web Services (AWS) and open-source large-scale data processing engines like Apache Spark. As the amount of data generated continues to soar, aspiring data scientists who can use these “big data” tools will stand out from their peers in the market.

    In this guide, I will teach you how to get started processing data using PySpark on an Amazon EMR cluster. This tutorial is for current and aspiring data scientists who are familiar with Python but beginners at using Spark.

    Spark is great for processing large datasets for everyday data science tasks like exploratory data analysis and feature engineering. It can also be used to implement many popular machine learning algorithms at scale.

    1. A brief overview of Spark, Amazon S3 and EMR
    2. Creating a cluster on Amazon EMR
    3. Connecting to our cluster through a Jupyter notebook
    4. Loading data from Amazon S3

    Spark

    From the docs, “Apache Spark is a unified analytics engine for large-scale data processing.” Spark’s engine allows you to parallelize large data processing tasks on a distributed cluster. A Spark cluster contains a master node that acts as the central coordinator and several worker nodes that handle the tasks doled out by the master node.

    We’ll be using Python in this guide, but Spark developers can also use Scala or Java. The module contains syntax that users of Pandas and SQL will find familiar. The module can be used to implement many popular machine learning models.

    Spark uses lazy evaluation, which means it doesn’t do any work until you ask for a result. This way, the engine can decide the most optimal way to execute your DAG (directed acyclical graph — or list of operations you’ve specified). When I define an operation — — Spark adds the operation to my DAG but doesn’t execute. Once I ask for a result — — Spark executes my filter and any other operations I specify.

    A quick note before we proceed: using distributed cloud technologies can be frustrating. At first, you’ll likely find Spark error messages to be incomprehensible and difficult to debug. I encourage you to stick with it! Read the errors. Learn what parts are informative and google it. I can’t promise that you’ll eventually stop banging your head on the keyboard, but it will get easier. It wouldn’t be a great way to differentiate yourself from others if there wasn’t a learning curve!

    Amazon S3

    Amazon S3 (Simple Storage Service) is an easy and relatively cheap way to store a large amount of data securely. A typical Spark workflow is to read data from an S3 bucket or another source, perform some transformations, and write the processed data back to another S3 bucket.

    Amazon EMR

    Amazon EMR (Elastic Map Reduce) is a big data platform that synchronizes multiple nodes into a scaleable cluster that can process large amounts of data. As mentioned above, we submit our jobs to the master node of our cluster, which figures out the optimal way to run it. The master node then doles out tasks to the worker nodes accordingly.

    First things first, create an AWS account and sign in to the console. I recommend taking the time now to create an IAM user and delete your root access keys.

    UPDATE: I’ve created an AWS Quick Setup Guide walking you through how to create IAM users and roles, create an S3 bucket, and configure the AWS CLI.

    I’ll be using the region US West (Oregon) for this tutorial. You can change your region with the drop-down in the top right:

    Warning on AWS expenses: You’ll need to provide a credit card to create your account. To keep costs minimal, don’t forget to terminate your EMR cluster after you are done using it. For this guide, we’ll be using m5.xlarge instances, which at the time of writing cost $0.192 per hour. Also, there is a small monthly charge to host data on Amazon S3 — this cost will go up with the amount of data you host. To avoid continuing costs, delete your bucket after using it.

    Store config files on Amazon S3

    To install useful packages on all of the nodes of our cluster, we’ll need to create the file and add it to a bucket on S3.

    #!/bin/bash
    sudo pip install -U \
    matplotlib \
    pandas

    Navigate to S3 by searching for it using the “Find Services” search box in the console:

    Click “Create Bucket”, fill in the “Bucket name” field, and click “Create”:

    Click “Upload”, “Add files” and open the file you created . Click “Upload” to upload the file.

    Create a key pair file

    Navigate to EC2 from the homepage of your console:

    Select “Key Pairs”

    Click “Create Key Pair” then enter a name and click “Create”.

    Your file should download automatically. Store it in a directory you’ll remember. I put my files in . Be sure to keep this file out of your GitHub repos, or any other public places, to keep your AWS resources more secure.

    Create a cluster on Amazon EMR

    Navigate to EMR from your console, click “Create Cluster”, then “Go to advanced options”.

    Make the following selections, choosing the latest release from the “Release” dropdown and checking “Spark”, then click “Next”.

    Select the “Default in us-west-2a” option “EC2 Subnet” dropdown, change your instance types to m5.xlarge to use the latest generation of general-purpose instances, then click “Next”.

    Name your cluster, add as a bootstrap action, then click “Next”. The script location of your bootstrap action will be the S3 file-path where you uploaded to earlier in the tutorial. Your bootstrap action will install the packages you specified on each node in your cluster.

    Select the key pair you created earlier and click “Create cluster”. Your cluster will take a few minutes to start, but once it reaches “Waiting”, you are ready to move on to the next step — connecting to your cluster with a Jupyter notebook.

    Connect to your cluster with a Jupyter notebook

    Navigate to “Notebooks” in the left panel. Click “Create notebook” and follow the step below.

    Name your notebook and choose the cluster you just created.

    Once your notebook is “Ready”, click “Open”. You’re now ready to start running Spark on the cloud!

    Connect to data sources on S3

    In the first cell of your notebook, import the packages you intend to use. For example:

    from pyspark.sql import functions as F

    You should get the following output:

    Note: a SparkSession is automatically defined in the notebook as — you will have to define this yourself when creating scripts to submit as Spark jobs.

    Next, let’s import some data from S3. We’ll use data Amazon has made available in a public bucket. Let’s look at the Amazon Customer Reviews Dataset. In particular, let’s look at book reviews:

    input_bucket = 's3://amazon-reviews-pds'
    input_path = '/parquet/product_category=Books/*.parquet'
    df = spark.read.parquet(input_bucket + input_path)df.show()

    The syntax in tells Spark to read all files in the bucket directory.

    I’ll be coming out with a tutorial on data wrangling with the PySpark DataFrame API shortly, but for now, check out this excellent cheat sheet from DataCamp to get started.

    Once you’ve tested your PySpark code in a Jupyter notebook, move it to a script and create a production data processing workflow with Spark and the AWS Command Line Interface. Then, you’re ready to schedule your Spark job on Airflow.

    Thank you for reading! Please let me know if you liked the article or if you have any critiques. If this guide was useful to you, be sure to follow me and so you won’t miss any of my future articles.

    If you need help with a data project or want to say hi, connect with and message me on LinkedIn. Cheers!

    Sours: https://towardsdatascience.com/getting-started-with-pyspark-on-amazon-emr-c85154b6b921
    1. Admiral craft parts
    2. Aluminum atv stand
    3. Skull metaphysical meaning

    Making Sense of Big Data

    Have you ever run into a situation where your computer simply fails to process the kind of data you were trying to work with? I know I have. Hmmm, so you are probably dealing with a big data that is most likely too large and complex to be processed by a single machine on CPU. Well, what do we mean by big data? How much data is considered as big data? Well, we can argue endlessly about that — so let’s not do that here. Instead, let’s just say that you have a large enough data and your computer is struggling to process it. Hopefully, there is no any flames or smoke coming out of your machine. Joke aside, this is a very common problem and the most common approach to solve it is to process such large datasets on a distributed computing platform. Apache Spark is an open-source parallel computing framework and is designed to enable the processing of such large datasets on a cluster of computers.

    There are a few different providers for distributed computing platform you can choose from and some popular choices include Cloudera, Hortonworks, Databricks, Amazon AWS and Microsoft Azure. In this article, I will show you how to set up a distributed computing platform for your needs on AWS cloud, in particular Amazon EMR. We will be using PySpark which is the Python API to Apache Spark.

    Before proceeding to set up PySpark on Amazon EMR, make sure you have the following:

    1. Create an account on AWS
    2. Create EC2 key pair
    3. Create EC2 security group

    Step 1: Find Amazon EMR in AWS Services

    Now that you have an account, browse through the AWS services and find EMR. See below for an example.

    Step 2: Create cluster

    Once in EMR, find the button where it says “Create cluster” and click on it.

    Step 3: Go to advanced options

    After clicking on “create cluster”, you will see a page like this and click on “Go to advanced options”

    Step 4: Software and steps

    For EMR version, I select emr-5.31.0 as it is the latest version (not beta) at the time of this posting. Then I select Hadoop and Spark as those are the only other two software I need pre-installed on the clusters to do my labeling job. Once these options are selected, no need to mess with the other options so go ahead and click on Next.

    Step 5: Hardware selection

    In this section, you will choose the type of hardware for the master and worker nodes of the cluster. First, on the upper part of the page, select “Uniform instance groups” for the Instance group configuration.

    Then scroll down to find the part of the page that looks as follows and select the hardware type. Ideally, you would need reasonable amount of CPU and Memory in your worker nodes. I select m5.xlarge for both the master and core/worker nodes as they are sufficient for the kind of preprocessing I intend to do with the size of the dataset I have. Depending on the dataset and what you want to achieve, you can choose the right amount of CPU and Memory needed. Notice that I am requesting 2 instances of core/worker nodes. You can play with this number of instances as per your needs for the task at hand. You will also see task nodes displayed underneath the core/worker nodes, you won’t need them for this exercise hence can be ignored.

    Having done the above selections, leave the rest of the options as default and press Next.

    Step 6: General cluster settings

    In this section, select a name for your cluster. You can give whatever name you want. It is up to you whether to select Logging or not, I usually turn it off as it will store each and every one of EMR logs in S3 which tend to occupy space on S3. But the best practice suggests to leave it on.

    Leave the rest of the options on this page in their default settings and click Next.

    Step 7: Configure Security

    This section is very important! There are two things you will configure in this section:

    1. EC2 key pair
    2. Security groups

    Select the EC2 key pair and the security group you have previously created and finally click on “Create cluster”

    Now you will see a page like below as your new EMR cluster gets created

    Once created, the status of your cluster will change from “Starting” to “Waiting” which means your cluster is now ready for use.

    Step 8: Create a notebook instance on EMR

    Now you need a Jupyter notebook to use PySpark to work with the master node of your newly created cluster. To do that, click on “Notebooks” on the left banner and then click on “Create notebook”.

    In the next page, give a name to your notebook and click on “Choose” under the Cluster then find and select your newly created cluster in there. Finally, choose a location on your S3 to associate with your notebook for reading and writing data. Remember to leave the “AWS service role” in its default value. Then click on “Create notebook” and voila.

    Now you have a Jupyter notebook that is connected to the master node of your newly created EMR cluster which came preloaded with Hadoop and Spark. You are ready to start processing your big data using PySpark on Amazon EMR cluster.

    WARNING: Remember to stop your notebook and terminate your cluster when you are done with it!!! Otherwise, you will incur a huge cost in your next AWS bill!!!

    Enjoy crunching through your big data using the powerful Apache Spark on Amazon EMR cluster!

    If you found this article useful, I’d love to get your feedback in the form of a clap or comment below. And let’s connect on LinkedIn

    Thank you!

    Sours: https://towardsdatascience.com/data-science-at-scale-with-pyspark-on-amazon-emr-cluster-622a0f4534ed
    Running Spark applications using Scala and Python on EMR Cluster

    Pyspark Job submission

    Python interpreter is bundled in the EMR containers spark image that is used to run the spark job.Python code and dependencies can be provided with the below options.

    Python code self contained in a single .py file¶

    To start with, in the most simplest scenario - the example below shows how to submit a pi.py file that is self contained and doesn't need any other dependencies.

    Python file from S3¶

    Request
    pi.py used in the below request payload is from spark examples

    Python file from mounted volume¶

    In the below example - pi.py is placed in a mounted volume. FSx for Lustre filesystem is mounted as a Persistent Volume on the driver pod under and will be referenced by file prefix. For more information on how to mount FSx for lustre - EMR-Containers-integration-with-FSx-for-Lustre

    This approach can be used to provide spark application code and dependencies for execution. Persistent Volume mounted to the driver and executor pods lets you access the application code and dependencies with prefix.

    Python code with dependencies¶

    List of .py files

    This is not a scalable approach as the number of dependent files can grow to a large number, and also need to manually specify all of the transitive dependencies.

    Upload dependentFunc.py and py-files-pi.py to s3

    Request:

    Bundled as a zip file

    In this approach all the dependent python files are bundled as a zip file. Each folder should have file as documented in zip python dependencies. Zip should be done at the top folder level and using the -r option.

    dependentFunc.py from earlier example has been bundled as pyspark-packaged-dependency-src.zip. Upload this file to a S3 location

    Request:

    Bundled as a .egg file

    Create a folder structure as in the below screenshot with the code from the previous example -

    Steps to create .egg file

    Upload to a S3 location

    Request:

    Bundled as a .whl file

    Create a folder structure as in the below screenshot with the code from the previous example - py-files-zip-pi.py, dependentFunc.py

    Steps to create .whl file

    Upload to a s3 location

    Request:

    Bundled as a .pex file¶

    pex is a library for generating .pex (Python EXecutable) files which are executable Python environments.PEX files can be created as below

    To read more about PEX: PEXPEX documentationTips on PEXpex packaging for pyspark

    Approach 1: Using Persistent Volume - FSx for Lustre cluster

    Upload to a s3 location that is mapped to a FSx for Lustre cluster. can be placed on any Kubernetes persistent volume and mounted to the driver pod and executor pod.
    Request: used in the below request is from spark examples

    Approach 2: Using Custom Pod Templates

    Upload to a s3 location. Create custom pod templates for driver and executor pods. Custom pod templates allows running a command through initContainers before the main application container is created. In this case, the command will download the file to the path of the driver and executor pods.

    Note: This approach is only supported for release image 5.33.0 and later or 6.3.0 and later.

    Sample driver pod template YAML file:

    Sample executor pod template YAML file:

    Replace initContainer's with the respective release label's container image. In this case we are using the image of release . Upload the driver and executor custom pod templates to S3

    Request: used in the below request is from spark examples

    Point to Note:
    PEX files don’t have the python interpreter bundled with it. Using the PEX env variables, we pass in the python interpreter installed in the spark driver and executor docker image.

    pex vs conda-pack A pex file contain only dependent Python packages but not a Python interpreter in it while a conda-pack environment has a Python interpreter as well, so with the same Python packages a conda-pack environment is much larger than a pex file. A conda-pack environment is a tar.gz file and need to be decompressed before being used while a pex file can be used directly. If a Python interpreter exists, pex is a better option than conda-pack. However, conda-pack is the ONLY CHOICE if you need a specific version of Python interpreter which does not exist and you do not have permission to install one (e.g., when you need to use a specific version of Python interpreter with an enterprise PySpark cluster). If the pex file or conda-pack environment needs to be distributed to machines on demand, there are some overhead before running your application. With the same Python packages, a conda-pack environment has large overhead/latency than the pex file as the conda-pack environment is usually much larger and need to be decompressed before being used.

    For more information - Tips on PEX

    Bundled as a tar.gz file with conda-pack¶

    conda-pack for spark Install conda through Miniconda Open a new terminal and execute the below commands

    Upload to a s3 location that is mapped to a FSx for Lustre cluster. can be placed on any Kubernetes persistent volume and mounted to the driver pod and executor pod.Alternatively, S3 path for can also be passed using --py-files

    Request:

    The above request doesn't work with spark on kubernetes

    Bundled as virtual env¶

    This will not work with spark on kubernetes.This feature only works with YARN - cluster mode In this implementation for YARN - the dependencies will be installed from the repository for every driver and executor. This might not be a more scalable model as per SPARK-25433. Recommended solution is to pass in the dependencies as PEX file.

    Import of Dynamic Modules (.pyd, .so)

    Import of dynamic modules(.pyd, .so) is disallowed when bundled as a zip

    Steps to create a .so file
    example.c

    example.h

    Upload to a S3 location.

    pyspark code to be executed - py_c_call.py

    Request:

    Configuration of interest:
    distributes the to the working directory of all executors.
    Dynamic modules(.pyd, .so) can also be imported by bundling within .egg (SPARK-6764), .whl and .pex files.

    Sours: https://aws.github.io/aws-emr-containers-best-practices/submit-applications/docs/spark/pyspark/

    Emr pyspark

    Programmatic Ponderings

    According to AWS, Amazon Elastic MapReduce (Amazon EMR) is a Cloud-based big data platform for processing vast amounts of data using common open-source tools such as Apache Spark, Hive, HBase, Flink, Hudi, and Zeppelin, Jupyter, and Presto. Using Amazon EMR, data analysts, engineers, and scientists are free to explore, process, and visualize data. EMR takes care of provisioning, configuring, and tuning the underlying compute clusters, allowing you to focus on running analytics. 

    Image for post

    Users interact with EMR in a variety of ways, depending on their specific requirements. For example, you might create a transient EMR cluster, execute a series of data analytics jobs using Spark, Hive, or Presto, and immediately terminate the cluster upon job completion. You only pay for the time the cluster is up and running. Alternatively, for time-critical workloads or continuously high volumes of jobs, you could choose to create one or more persistent, highly available EMR clusters. These clusters automatically scale compute resources horizontally, including EC2 Spot instances, to meet processing demands, maximizing performance and cost-efficiency.

    With EMR, individuals and teams can also use notebooks, including EMR Notebooks, based on JupyterLab, the web-based interactive development environment for Jupyter notebooks for ad-hoc data analytics. Apache Zeppelin is also available to collaborate and interactively explore, process, and visualize data. With EMR notebooks and the EMR API, users can programmatically execute a notebook without the need to interact with the EMR console, referred to as headless execution.

    AWS currently offers 5.x and 6.x versions of Amazon EMR. Each major and minor release of Amazon EMR offers incremental versions of nearly 25 different, popular open-source big-data applications to choose from, which Amazon EMR will install and configure when the cluster is created. One major difference between EMR versions relevant to this post is EMR 6.x’s support for the latest Hadoop and Spark 3.x frameworks. The latest Amazon EMR releases are Amazon EMR Release 6.2.0 and Amazon EMR Release 5.32.0.

    In the following series of posts, we will focus on the options available to interact with Amazon EMR using the Python API for Apache Spark, known as PySpark. We will divide the methods for accessing PySpark on EMR into two categories: PySpark applications and notebooks. We will explore both interactive and automated patterns for running PySpark applications (Python scripts) and PySpark-based notebooks. In this first post, I will cover the first four PySpark Application Methods listed below. In part two, I will cover Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and in part three, the use of notebooks.

    PySpark Application Methods

    1. Add Job Flow Steps: Remote execution of EMR Steps on an existing EMR cluster using the  method;
    2. EMR Master Node: Remote execution over SSH of PySpark applications using  on an existing EMR cluster’s Master node;
    3. Run Job Flow: Remote execution of EMR Steps on a newly created long-lived or auto-terminating EMR cluster using the  method;
    4. AWS Step Functions: Remote execution of EMR Steps using AWS Step Functions on an existing or newly created long-lived or auto-terminating EMR cluster;
    5. Apache Airflow: Remote execution of EMR Steps using the recently released Amazon MWAA on an existing or newly created long-lived or auto-terminating EMR cluster (see part two of this series);

    Notebook Methods

    1. EMR Notebooks for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Jupyter Notebooks on an existing EMR cluster;
    2. Headless Execution of EMR Notebooks: Headless execution of notebooks from an existing EMR cluster or newly created auto-terminating cluster;
    3. Apache Zeppelin for Ad-hoc Analytics: Interactive, ad-hoc analytics and machine learning using Zeppelin notebooks on an existing EMR cluster;

    Note that wherever the AWS SDK for Python () is used in this post, we can substitute the AWS CLI or AWS Tools for PowerShell. Typically, these commands and Python scripts would be run as part of a DevOps or DataOps deployment workflow, using CI/CD platforms like AWS CodePipeline, Jenkins, Harness, CircleCI, Travis CI, or Spinnaker.

    To prepare the AWS EMR environment for this post, we need to perform a few preliminary tasks.

    1. Download a copy of this post’s GitHub repository;
    2. Download three Kaggle datasets and organize locally;
    3. Create an Amazon EC2 key pair;
    4. Upload the EMR bootstrap script and create the CloudFormation Stack;
    5. Allow your IP address access to the EMR Master node on port 22;
    6. Upload CSV data files and PySpark applications to S3;
    7. Crawl the raw data and create a Data Catalog using AWS Glue;

    Step 1: GitHub Repository

    Using this  command, download a copy of this post’s GitHub repository to your local environment.

    git clone --branch main --single-branch --depth 1 --no-tags \
    https://github.com/garystafford/emr-demo.git

    Step 2: Kaggle Datasets

    Kaggle is a well-known data science resource with 50,000 public datasets and 400,000 public notebooks. We will be using three Kaggle datasets in this post. You will need to join Kaggle to access these free datasets. Download the following three Kaggle datasets as CSV files. Since we are working with (moderately) big data, the total size of the datasets will be approximately 1 GB.

    1. Movie Ratings: https://www.kaggle.com/rounakbanik/the-movies-dataset
    2. Bakery: https://www.kaggle.com/sulmansarwar/transactions-from-a-bakery
    3. Stocks: https://www.kaggle.com/timoboz/stock-data-dow-jones

    Organize the (38) downloaded CSV files into the  directory of the locally cloned GitHub repository, exactly as shown below. We will upload these files to Amazon S3, in the proceeding step.

    In this post, we will be using three different datasets. However, if you want to limit the potential costs associated with big data analytics on AWS, you can choose to limit job submissions to only one or two of the datasets. For example, the bakery and stocks datasets are fairly small yet effectively demonstrate most EMR features. In contrast, the movie rating dataset has nearly 27 million rows of ratings data, which starts to demonstrate the power of EMR and PySpark for big data.

    Step 3: Amazon EC2 key pair

    According to AWS, a key pair, consisting of a private key and a public key, is a set of security credentials that you use to prove your identity when connecting to an [EC2] instance. Amazon EC2 stores the public key, and you store the private key. To SSH into the EMR cluster, you will need an Amazon key pair. If you do not have an existing Amazon EC2 key pair, create one now. The easiest way to create a key pair is from the AWS Management Console.

    Image for post

    Your private key is automatically downloaded when you create a key pair in the console. Store your private key somewhere safe. If you use an SSH client on a macOS or Linux computer to connect to EMR, use the following  command to set the correct permissions of your private key file so that only you can read it.

    chmod 0400 /path/to/my-key-pair.pem

    Step 4: Bootstrap Script and CloudFormation Stack

    The bulk of the resources that are used as part of this demonstration are created using the CloudFormation stack, . The CloudFormation template that creates the stack, , is included in the repository. Please review all resources and understand the cost and security implications before continuing.

    There is also a JSON-format CloudFormation parameters file, , containing values for all but two of the parameters in the CloudFormation template. The two parameters not in the parameter file are the name of the EC2 key pair you just created and the bootstrap bucket’s name. Both will be passed along with the CloudFormation template using the Python script, . For each type of environment, such as Development, Test, and Production, you could have a separate CloudFormation parameters file, with different configurations.

    The template will create approximately (39) AWS resources, including a new AWS VPC, a public subnet, an internet gateway, route tables, a 3-node EMR v6.2.0 cluster, a series of Amazon S3 buckets, AWS Glue data catalog, AWS Glue crawlers, several Systems Manager Parameter Store parameters, and so forth.

    The CloudFormation template includes the location of the EMR bootstrap script located on Amazon S3. Before creating the CloudFormation stack, the Python script creates an S3 bootstrap bucket and copies the bootstrap script, , from the local project repository to the S3 bucket. The script will be used to install additional packages on EMR cluster nodes, which are required by our PySpark applications. The script also sets the default AWS Region for .

    From the GitHub repository’s local copy, run the following command, which will execute a Python script to create the bootstrap bucket, copy the bootstrap script, and provision the CloudFormation stack. You will need to pass the name of your EC2 key pair to the script as a command-line argument.

    python3 ./scripts/create_cfn_stack.py \ --environment dev \ --ec2-key-name <my-key-pair-name>

    The CloudFormation template should create a CloudFormation stack, , as shown below.

    Image for post

    Step 5: SSH Access to EMR

    For this demonstration, we will need access to the new EMR cluster’s Master EC2 node, using SSH and your key pair, on port 22. The easiest way to add a new inbound rule to the correct AWS Security Group is to use the AWS Management Console. First, find your EC2 Security Group named .

    Image for post

    Then, add a new Inbound rule for SSH (port 22) from your IP address, as shown below.

    Image for post

    Alternately, you could use the AWS CLI or AWS SDK to create a new security group ingress rule.

    export EMR_MASTER_SG_ID=$(aws ec2 describe-security-groups | \
    jq -r '.SecurityGroups[] | select(.GroupName=="ElasticMapReduce-master").GroupId')aws ec2 authorize-security-group-ingress \
    --group-id ${EMR_MASTER_SG_ID} \
    --protocol tcp \
    --port 22 \
    --cidr $(curl ipinfo.io/ip)/32

    Step 6: Raw Data and PySpark Apps to S3

    As part of the  CloudFormation stack, we now have several new Amazon S3 buckets within our AWS Account. The naming conventions and intended usage of these buckets follow common organizational patterns for data lakes. The data buckets use the common naming convention of , , and  data in reference to the data stored within them. We also use a widely used, corresponding naming convention of ‘bronze’, ‘silver’, and ‘gold’ when referring to these data buckets as parameters.

    > aws s3api list-buckets | \ jq -r '.Buckets[] | select(.Name | startswith("emr-demo-")).Name' emr-demo-raw-123456789012-us-east-1 emr-demo-processed-123456789012-us-east-1 emr-demo-analyzed-123456789012-us-east-1 emr-demo-work-123456789012-us-east-1 emr-demo-logs-123456789012-us-east-1 emr-demo-glue-db-123456789012-us-east-1 emr-demo-bootstrap-123456789012-us-east-1

    There is a  data bucket (aka bronze) that will contain the original CSV files. There is a  data bucket (aka silver) that will contain data that might have had any number of actions applied: data cleansing, obfuscation, data transformation, file format changes, file compression, and data partitioning. Finally, there is an  data bucket (aka gold) that has the results of the data analysis. We also have a  bucket that holds the PySpark applications, a  bucket that holds EMR logs, and a  bucket to hold the Glue Data Catalog metadata.

    Whenever we submit PySpark jobs to EMR, the PySpark application files and data will always be accessed from Amazon S3. From the GitHub repository’s local copy, run the following command, which will execute a Python script to upload the approximately (38) Kaggle dataset CSV files to the  S3 data bucket.

    python3 ./scripts/upload_csv_files_to_s3.py

    Next, run the following command, which will execute a Python script to upload a series of PySpark application files to the  S3 data bucket.

    python3 ./scripts/upload_apps_to_s3.py

    Step 7: Crawl Raw Data with Glue

    The last preliminary step to prepare the EMR demonstration environment is to catalog the raw CSV data into an AWS Glue data catalog database, using one of the two Glue Crawlers we created. The three kaggle dataset’s data will reside in Amazon S3, while their schema and metadata will reside within tables in the Glue data catalog database, . When we eventually query the data from our PySpark applications, we will be querying the Glue data catalog’s database tables, which reference the underlying data in S3.

    From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the raw data’s schema and metadata information into the Glue data catalog database, .

    python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-raw

    Once the crawler is finished, from the AWS Console, we should see a series of nine tables in the Glue data catalog database, , all prefixed with . The tables hold metadata and schema information for the three CSV-format Kaggle datasets loaded into S3.

    Image for post

    Alternately, we can use the  AWS CLI command to review the tables.

    > aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("raw_")).Name' raw_bakery raw_credits_csv raw_keywords_csv raw_links_csv raw_links_small_csv raw_movies_metadata_csv raw_ratings_csv raw_ratings_small_csv raw_stocks

    Let’s explore four methods to run PySpark applications on EMR.

    Image for post

    1. Add Job Flow Steps to an Existing EMR Cluster

    We will start by looking at running PySpark applications using EMR Steps. According to AWS, we can use Amazon EMR steps to submit work to the Spark framework installed on an EMR cluster. The EMR step for PySpark uses a  command. According to Spark’s documentation, the  script, located in Spark’s  directory, is used to launch applications on a [EMR] cluster. A typical  command we will be using resembles the following example. This command runs a PySpark application in S3, .

    spark-submit –deploy-mode cluster –master yarn \
    –conf spark.yarn.submit.waitAppCompletion=true \
    s3a://emr-demo-work-123456789012-us-east-1/analyze/bakery_sales_ssm.py

    We will target the existing EMR cluster created by CloudFormation earlier to execute our PySpark applications using EMR Steps. We have two sets of PySpark applications. The first set of three PySpark applications will transform the raw CSV-format datasets into Apache Parquet, a more efficient file format for big data analytics. Alternately, for your workflows, you might prefer AWS Glue ETL Jobs, as opposed to PySpark on EMR, to perform nearly identical data processing tasks. The second set of four PySpark applications perform data analysis tasks on the data.

    There are two versions of each PySpark application. Files with suffix  use the AWS Systems Manager (SSM) Parameter Store service to obtain dynamic parameter values at runtime on EMR. Corresponding non-SSM applications require those same parameter values to be passed on the command line when they are submitted to Spark. Therefore, these PySpark applications are not tightly coupled to  or the SSM Parameter Store. We will use  versions of the scripts in this post’s demonstration.

    We will start by executing the three PySpark processing applications. They will convert the CSV data to Parquet. Below, we see an example of one of the PySpark applications we will run, . The PySpark application will convert the Bakery Sales dataset’s CSV file to Parquet and write it to S3.

    #!/usr/bin/env python3
    # Process raw CSV data and output Parquet
    # Author: Gary A. Stafford (November 2020)
    importos
    importboto3
    fromec2_metadataimportec2_metadata
    frompyspark.sqlimportSparkSession
    os.environ['AWS_DEFAULT_REGION'] =ec2_metadata.region
    ssm_client=boto3.client('ssm')
    defmain():
    params=get_parameters()
    spark=SparkSession \
    .builder \
    .appName("bakery-csv-to-parquet") \
    .getOrCreate()
    convert_to_parquet(spark, "bakery", params)
    defconvert_to_parquet(spark, file, params):
    df_bakery=spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", "true") \
    .load(f"s3a://{params['bronze_bucket']}/bakery/{file}.csv")
    write_parquet(df_bakery, params)
    defwrite_parquet(df_bakery, params):
    df_bakery.write \
    .format("parquet") \
    .save(f"s3a://{params['silver_bucket']}/bakery/", mode="overwrite")
    defget_parameters():
    params= {
    'bronze_bucket': ssm_client.get_parameter(Name='/emr_demo/bronze_bucket')['Parameter']['Value'],
    'silver_bucket': ssm_client.get_parameter(Name='/emr_demo/silver_bucket')['Parameter']['Value']
    }
    returnparams
    if__name__=="__main__":
    main()

    The three PySpark data processing application’s  commands are defined in a separate JSON-format file, , a snippet of which is shown below. The same goes for the four analytics applications.

    [
    {
    "Name": "Bakery CSV to Parquet",
    "ActionOnFailure": "CONTINUE",
    "HadoopJarStep": {
    "Jar": "command-runner.jar",
    "Args": [
    "spark-submit",
    "–deploy-mode",
    "cluster",
    "–master",
    "yarn",
    "–conf",
    "spark.yarn.submit.waitAppCompletion=true",
    "s3a://{{ work_bucket }}/process/bakery_csv_to_parquet_ssm.py"
    ]
    }
    },
    {
    "Name": "Stocks CSV to Parquet",
    "ActionOnFailure": "CONTINUE",
    "HadoopJarStep": {
    "Jar": "command-runner.jar",
    "Args": [
    "spark-submit",
    "–deploy-mode",
    "cluster",
    "–master",
    "yarn",
    "–conf",
    "spark.yarn.submit.waitAppCompletion=true",
    "s3a://{{ work_bucket }}/process/stocks_csv_to_parquet_ssm.py"
    ]
    }
    }
    ]

    Using this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python script, , shown below. Note line 31, where the Steps are injected into the  method’s parameters.

    #!/usr/bin/env python3
    # Purpose: Submit a variable number of Steps defined in a separate JSON file
    # Author: Gary A. Stafford (November 2020)
    importargparse
    importjson
    importlogging
    importos
    importboto3
    frombotocore.exceptionsimportClientError
    logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
    ssm_client=boto3.client('ssm')
    emr_client=boto3.client('emr')
    defmain():
    args=parse_args()
    params=get_parameters()
    steps=get_steps(params, args.job_type)
    add_job_flow_steps(params['cluster_id'], steps)
    defadd_job_flow_steps(cluster_id, steps):
    """Add Steps to an existing EMR cluster"""
    try:
    response=emr_client.add_job_flow_steps(
    JobFlowId=cluster_id,
    Steps=steps
    )
    print(f'Response: {response}')
    exceptClientErrorase:
    logging.error(e)
    returnFalse
    returnTrue
    defget_steps(params, job_type):
    """
    Load EMR Steps from a separate JSON-format file and substitutes tags for SSM parameter values
    """
    dir_path=os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
    file=open(f'{dir_path}/job_flow_steps/job_flow_steps_{job_type}.json', 'r')
    steps=json.load(file)
    new_steps= []
    forstepinsteps:
    step['HadoopJarStep']['Args'] =list(
    map(lambdast: str.replace(st, '{{ work_bucket }}', params['work_bucket']), step['HadoopJarStep']['Args']))
    new_steps.append(step)
    returnnew_steps
    defget_parameters():
    """Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
    params= {
    'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value'],
    'cluster_id': ssm_client.get_parameter(Name='/emr_demo/cluster_id')['Parameter']['Value']
    }
    returnparams
    defparse_args():
    """Parse argument values from command-line"""
    parser=argparse.ArgumentParser(description='Arguments required for script.')
    parser.add_argument('-t', '–job-type', required=True, choices=['process', 'analyze'],
    help='process or analysis')
    args=parser.parse_args()
    returnargs
    if__name__=='__main__':
    main()

    The Python script used for this task takes advantage of AWS Systems Manager Parameter Store parameters. The parameters were placed in the Parameter Store, within the  path, by CloudFormation. We will reference these parameters in several scripts throughout the post.

    > aws ssm get-parameters-by-path --path '/emr_demo' | \ jq -r ".Parameters[] | {Name: .Name, Value: .Value}"
    {
    "Name": "/emr_demo/bootstrap_bucket",
    "Value": "emr-demo-bootstrap-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/ec2_key_name",
    "Value": "emr-demo-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/ec2_subnet_id",
    "Value": "subnet-06aa61f790a932b32"
    }
    {
    "Name": "/emr_demo/emr_ec2_role",
    "Value": "EMR_EC2_DemoRole"
    }
    {
    "Name": "/emr_demo/emr_role",
    "Value": "EMR_DemoRole"
    }
    {
    "Name": "/emr_demo/gold_bucket",
    "Value": "emr-demo-analyzed-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/silver_bucket",
    "Value": "emr-demo-processed-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/sm_log_group_arn",
    "Value": "arn:aws:logs:us-east-1:123456789012:log-group:EmrDemoStateMachineLogGroup:*"
    }
    {
    "Name": "/emr_demo/sm_role_arn",
    "Value": "arn:aws:iam::123456789012:role/State_ExecutionRole"
    }
    {
    "Name": "/emr_demo/work_bucket",
    "Value": "emr-demo-work-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/bronze_bucket",
    "Value": "emr-demo-raw-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/cluster_id",
    "Value": "j-3J44BFJXNVSCT"
    }
    {
    "Name": "/emr_demo/glue_db_bucket",
    "Value": "emr-demo-logs-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/logs_bucket",
    "Value": "emr-demo-logs-123456789012-us-east-1"
    }
    {
    "Name": "/emr_demo/vpc_id",
    "Value": "vpc-01d4151396c119d3a"
    }

    From the GitHub repository’s local copy, run the following command, which will execute a Python script to load the three  commands from JSON-format file, , and run the PySpark processing applications on the existing EMR cluster.

    python3 ./scripts/add_job_flow_steps.py --job-type process

    While the three Steps are running concurrently, the view from the Amazon EMR Console’s Cluster Steps tab should look similar to the example below.

    Image for post

    Once the three Steps have been completed, we should note three sub-directories in the  data bucket containing Parquet-format files.

    Image for post

    Of special note is the Stocks dataset, which has been converted to Parquet and partitioned by stock symbol. According to AWS, by partitioning your data, we can restrict the amount of data scanned by each query by specifying filters based on the partition, thus improving performance and reducing cost.

    Image for post

    Lastly, the movie ratings dataset has been divided into sub-directories, based on the schema of each table. Each sub-directory contains Parquet files specific to that unique schema.

    Image for post

    Crawl Processed Data with Glue

    Similar to the raw data earlier, catalog the newly processed Parquet data into the same AWS Glue data catalog database using one of the two Glue Crawlers we created. Similar to the raw data, earlier, processed data will reside in the Amazon S3  data bucket while their schemas and metadata will reside within tables in the Glue data catalog database, .

    From the GitHub repository’s local copy, run the following command, which will execute a Python script to run the Glue Crawler and catalog the processed data’s schema and metadata information into the Glue data catalog database, .

    python3 ./scripts/crawl_raw_data.py --crawler-name emr-demo-processed

    Once the crawler has finished successfully, using the AWS Console, we should see a series of nine tables in the Glue data catalog database, , all prefixed with . The tables represent the three kaggle dataset’s contents converted to Parquet and correspond to the equivalent tables with the prefix.

    Image for post

    Alternately, we can use the  AWS CLI command to review the tables.

    > aws glue get-tables --database emr_demo | \ jq -r '.TableList[] | select(.Name | startswith("processed_")).Name' processed_bakery processed_credits processed_keywords processed_links processed_links_small processed_movies_metadata processed_ratings processed_ratings_small processed_stocks

    2. Run PySpark Jobs from EMR Master Node

    Next, we will explore how to execute PySpark applications remotely on the Master node on the EMR cluster using  and SSH. Although this method may be optimal for certain use cases as opposed to using the EMR SDK, remote SSH execution does not scale as well in my opinion due to a lack of automation, and it exposes some potential security risks.

    There are four PySpark applications in the GitHub repository. For this part of the demonstration, we will just submit the  application. This application will perform a simple analysis of the bakery sales data. While the other three PySpark applications use AWS Glue, the  application reads data directly from the  data S3 bucket.

    The application writes its results into the  data S3 bucket, in both Parquet and CSV formats. The CSV file is handy for business analysts and other non-technical stakeholders who might wish to import the results of the analysis into Excel or business applications.

    #!/usr/bin/env python3
    # Purpose: Submit Spark job to EMR Master Node
    # Author: Gary A. Stafford (December 2020)
    # Usage Example: python3 ./submit_spark_ssh.py \
    # –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
    importargparse
    importlogging
    importboto3
    fromparamikoimportSSHClient, AutoAddPolicy
    logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
    ssm_client=boto3.client('ssm')
    defmain():
    args=parse_args()
    params=get_parameters()
    submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
    defsubmit_job(master_public_dns, username, ec2_key_path, work_bucket):
    """Submit job to EMR Master Node"""
    ssh=SSHClient()
    ssh.load_system_host_keys()
    ssh.set_missing_host_key_policy(AutoAddPolicy())
    ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
    stdin_, stdout_, stderr_=ssh.exec_command(
    command=f"""
    spark-submit –deploy-mode cluster –master yarn \
    –conf spark.yarn.submit.waitAppCompletion=true \
    s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
    )
    stdout_lines=''
    whilenotstdout_.channel.exit_status_ready():
    ifstdout_.channel.recv_ready():
    stdout_lines=stdout_.readlines()
    logging.info(' '.join(map(str, stdout_lines)))
    ssh.close()
    defget_parameters():
    """Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
    params= {
    'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
    'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
    }
    returnparams
    defparse_args():
    """Parse argument values from command-line"""
    parser=argparse.ArgumentParser(description='Arguments required for script.')
    parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
    args=parser.parse_args()
    returnargs
    if__name__=='__main__':
    main()

    Earlier, we created an inbound rule to allow your IP address to access the Master node on port 22. From the EMR Console’s Cluster Summary tab, note the command necessary to SSH into the Master node of the EMR cluster.

    The Python script, , shown below, will submit the PySpark job to the EMR Master Node, using , a Python implementation of SSHv2. The script is replicating the same functionality as the shell-based SSH command above to execute a remote command on the EMR Master Node. The command is on lines 36–38, below.

    #!/usr/bin/env python3
    # Purpose: Submit Spark job to EMR Master Node
    # Author: Gary A. Stafford (December 2020)
    # Usage Example: python3 ./submit_spark_ssh.py \
    # –ec2-key-path ~/.ssh/emr-demo-123456789012-us-east-1.pem
    importargparse
    importlogging
    importboto3
    fromparamikoimportSSHClient, AutoAddPolicy
    logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
    ssm_client=boto3.client('ssm')
    defmain():
    args=parse_args()
    params=get_parameters()
    submit_job(params['master_public_dns'], 'hadoop', args.ec2_key_path, params['work_bucket'])
    defsubmit_job(master_public_dns, username, ec2_key_path, work_bucket):
    """Submit job to EMR Master Node"""
    ssh=SSHClient()
    ssh.load_system_host_keys()
    ssh.set_missing_host_key_policy(AutoAddPolicy())
    ssh.connect(hostname=master_public_dns, username=username, key_filename=ec2_key_path)
    stdin_, stdout_, stderr_=ssh.exec_command(
    command=f"""
    spark-submit –deploy-mode cluster –master yarn \
    –conf spark.yarn.submit.waitAppCompletion=true \
    s3a://{work_bucket}/analyze/bakery_sales_ssm.py"""
    )
    stdout_lines=''
    whilenotstdout_.channel.exit_status_ready():
    ifstdout_.channel.recv_ready():
    stdout_lines=stdout_.readlines()
    logging.info(' '.join(map(str, stdout_lines)))
    ssh.close()
    defget_parameters():
    """Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
    params= {
    'master_public_dns': ssm_client.get_parameter(Name='/emr_demo/master_public_dns')['Parameter']['Value'],
    'work_bucket': ssm_client.get_parameter(Name='/emr_demo/work_bucket')['Parameter']['Value']
    }
    returnparams
    defparse_args():
    """Parse argument values from command-line"""
    parser=argparse.ArgumentParser(description='Arguments required for script.')
    parser.add_argument('-e', '–ec2-key-path', required=True, help='EC2 Key Path')
    args=parser.parse_args()
    returnargs
    if__name__=='__main__':
    main()

    From the GitHub repository’s local copy, run the following command, which will execute a Python script to submit the job. The script requires one input parameter, which is the path to your EC2 key pair (e.g., )

    python3 ./scripts/submit_spark_ssh.py \ --ec2-key-path <

    The  command will be executed remotely on the EMR cluster’s Master node over SSH. All variables in the commands will be replaced by the environment variables, set in advance, which use AWS CLI  and  commands.

    Image for post

    Monitoring Spark Jobs

    We set  to . According to Spark’s documentation, this property controls whether the client waits to exit in YARN cluster mode until the application is completed. If set to , the client process will stay alive, reporting the application’s status. Otherwise, the client process will exit after submission. We can watch the job’s progress from the terminal.

    Image for post

    We can also use the YARN Timeline Server and the Spark History Server in addition to the terminal. Links to both are shown on both the EMR Console’s Cluster ‘Summary’ and ‘Application user interfaces’ tabs. Unlike other EMR application web interfaces, using port forwarding, also known as creating an SSH tunnel, is not required for the YARN Timeline Server or the Spark History Server.

    Image for post

    YARN Timeline Server

    Below, we see that the job we submitted running on the YARN Timeline Server also includes useful tools like access to configuration, local logs, server stacks, and server metrics.

    Image for post

    YARN Timeline Server allows us to drill down into individual jobs and view logs. Logs are ideal for troubleshooting failed jobs, especially the  logs.

    Image for post

    Spark History Server

    You can also view the PySpark application we submitted from the Master node using the Spark History Server. Below, we see completed Spark applications (aka Spark jobs) in the Spark History Server.

    Image for post

    Below, we see more details about our Spark job using the Spark History Server.

    Image for post

    We can even see visual representations of each Spark job’s Directed Acyclic Graph (DAG).

    Image for post

    3. Run Job Flow on an Auto-Terminating EMR Cluster

    The next option to run PySpark applications on EMR is to create a short-lived, auto-terminating EMR cluster using the  method. We will create a new EMR cluster, run a series of Steps (PySpark applications), and then auto-terminate the cluster. This is a cost-effective method of running PySpark applications on-demand.

    We will create a second 3-node EMR v6.2.0 cluster to demonstrate this method, using Amazon EC2 Spot instances for all the EMR cluster’s Master and Core nodes. Unlike the first, long-lived, more general-purpose EMR cluster, we will only deploy the Spark application to this cluster as that is the only application we will need to run the Steps.

    Using the  method, we will execute the four PySpark data analysis applications. The PySpark application’s  commands are defined in a separate JSON-format file, . Similar to the previous  script, this pattern of decoupling the Spark job command and arguments from the execution code, we can define and submit any number of Steps without changing the Python execution script. Also similar, this script retrieves parameter values from the SSM Parameter Store.

    #!/usr/bin/env python3
    # Purpose: Create a new EMR cluster and submits a variable
    # number of Steps defined in a separate JSON file
    # Author: Gary A. Stafford (November 2020)
    importargparse
    importjson
    importlogging
    importos
    importboto3
    frombotocore.exceptionsimportClientError
    fromscripts.parametersimportparameters
    logging.basicConfig(format='[%(asctime)s] %(levelname)s – %(message)s', level=logging.INFO)
    emr_client=boto3.client('emr')
    defmain():
    args=parse_args()
    params=parameters.get_parameters()
    steps=get_steps(params, args.job_type)
    run_job_flow(params, steps)
    defrun_job_flow(params, steps):
    """Create EMR cluster, run Steps, and then terminate cluster"""
    try:
    response=emr_client.run_job_flow(
    Name='demo-cluster-run-job-flow',
    LogUri=f's3n://{params["logs_bucket"]}',
    ReleaseLabel='emr-6.2.0',
    Instances={
    'InstanceFleets': [
    {
    'Name': 'MASTER',
    'InstanceFleetType': 'MASTER',
    'TargetSpotCapacity': 1,
    'InstanceTypeConfigs': [
    {
    'InstanceType': 'm5.xlarge',
    },
    ]
    },
    {
    'Name': 'CORE',
    'InstanceFleetType': 'CORE',
    'TargetSpotCapacity': 2,
    'InstanceTypeConfigs': [
    {
    'InstanceType': 'r5.2xlarge',
    },
    ],
    },
    ],
    'Ec2KeyName': params['ec2_key_name'],
    'KeepJobFlowAliveWhenNoSteps': False,
    'TerminationProtected': False,
    Sours: https://programmaticponderings.com/2020/12/02/running-pyspark-applications-on-amazon-emr-methods-for-interacting-with-pyspark-and-automating-job-submissions-on-amazon-emr/
    AWS EMR Tutorial - Submitting Apache Spark Jobs

    How do I configure Amazon EMR to run a PySpark job using Python 3.4 or 3.6?

    Python 3.4 or 3.6 is installed on my Amazon EMR cluster instances, but Spark is running Python 2.7. How do I upgrade Spark to Python 3.4 or 3.6?

    Short description

    In most Amazon EMR release versions, cluster instances and system applications use different Python versions by default:

    • Amazon EMR release versions 4.6.0-5.19.0: Python 3.4 is installed on the cluster instances. Python 2.7 is the system default.
    • Amazon EMR release versions 5.20.0 and later: Python 3.6 is installed on the cluster instances. For 5.20.0-5.29.0, Python 2.7 is the system default. For Amazon EMR version 5.30.0 and later, Python 3 is the system default.

    To upgrade the Python version that PySpark uses, point the PYSPARK_PYTHON environment variable for the spark-env classification to the directory where Python 3.4 or 3.6 is installed.

    Resolution

    On a running cluster

    Amazon EMR release version 5.21.0 and later

    Submit a reconfiguration request with a configuration object similar to the following:

    Amazon EMR release version 4.6.0-5.20.x

    1.    Connect to the master node using SSH.

    2.    Run the following command to change the default Python environment:

    3.    Run the pyspark command to confirm that PySpark is using the correct Python version:

    The output shows that PySpark is now using the same Python version that is installed on the cluster instances. Example:

    Spark uses the new configuration for the next PySpark job.

    On a new cluster


    Sours: https://aws.amazon.com/premiumsupport/knowledge-center/emr-pyspark-python-3x/

    Now discussing:

    Building a Big Data Pipeline with PySpark and Amazon EMR on EC2 Spot Fleets and On-Demand Instances

    If you are a data scientist and you are ready to take the next step in your career and become an applied scientist you must leave behind school projects that involve working with small datasets, the true nature of an applied scientist is knowing how to take advantage of computing on a massive scale, and the resources available to analyze large datasets in a cost-effective way, you must begin to know the technologies available to work and process large datasets and this is where data engineering skills begin to be relevant to take the next step in your career, also, this new change involves more responsibilities such as:

    1. Choose a provider for cloud computing
    2. Create scalable and cost-effective architectures
    3. A strategy to monitor your expenses and resources
    4. Tuning
    5. Be updated in technologies that allow you to do cloud computing in a profitable way.

    The goal of this project is to offer an AWS EMR template that you can use quickly if the need for your analysis involves working with millions of records, the template can be easily altered to support the size of your project and in this way you will not worry about creating everything from the begining and just focus on writing pyspark code.

    Architecture - Automate Word Cloud

    alt text

    Data sources

    In order to reproduce the effect of working with a large dataset we are using the Amazon Customer Reviews Dataset, counting the different words that have the titles of the purchased books and creating word clouds for each year. This project does not focus on specific analysis, its objective is to create a big data pipeline and connect its different tasks involved using an AWS EMR cluster. You could use this same template for other types of projects or analysis.

    Infrastructure as Code (IaC) in AWS

    This project is managing everything using boto3 IAC, the only thing in which you need to be focus is in write your PySpark code tasks or steps and write the logic of communication between these steps or tasks using the file steps.json. At the end, all these things will be your big data pipeline.

    File structure

    IAC files

    Since the entire infrastructure is created by code, there are several files that were modified to create this project, you can get the original source in the amazon documentation Python Code Samples for Amazon EMR , the modified files of this project are:

    1. ec2.py: this file helps us create the security groups in our VPC.
    2. iam.py: this file is for create the two associated roles: AmazonElasticMapReduceRole and AmazonElasticMapReduceforEC2Role.
    3. s3.py: Control and manage the initial configuration that our S3 bucket needs, scripts, logs, configuration files, etc..
    4. poller.py: this is checking a function for status each N seconds until reach a specified status.
    5. emr.py: this file contains the functions to create an emr cluster and add steps to the cluster using boto3.

    Main process

    1. emr_process.py: this file is wrapping the basic functions of the boto3 library, the difference is that it is modified to interpret and understand our configuration files or project template.

      • create_cluster
      • add_steps
      • execute_steps

    alt text

    Configuration files

    1. cluster-ec2-spot-fleet.json: this file contains everything related to the fleet of ec2 spot and on-demand instances, it contains special configuration for spark and yarn, the property yarn.nodemanager.resource.memory-mb is the amount of physical memory, in MB, that is reserved for non-YARN processes, was configured following the below rule:

    executor memory + memory overhead < yarn.nodemanager.resource.memory-mb

    1. bootstrap-action.sh: the first step of the pipeline will use sparknlp , the lines recomended by them were added to the bootstrap file plus other needed packages

    2. steps.json: this file contains all the logic to communicate the output of a step with the input of another step, if you want to modify this template for another type of project then all your steps must be explained in this file and the logic of how they communicate must to be here.

    If the property input_dependency_from_output_step is 1, then the output of the previous step will be the input of the current step

    PySpark code

    1. pyspark_preprocessing_text.py: this code is using spark nlp to preprocess the text included in the titles of books purchased on Amazon, extract the relevant words from the title of each book purchased and create another column called exploded_text.
    2. pyspark_grouping_words.py: this code is grouping by year all the words found in the title of books, its output is a dataset with all the words by year, it will be the input of the next step.
    3. generate_clouds.py: this file is receiving as a input the output of the last step and is generating word clouds by year and putting each word cloud image in the S3 bucket of the project.

    alt text

    Running the example

    If you do not want to read the steps below you can see this Youtube video where it explains how to run the example step by step.

    Steps to follow:

    • Create a new User in AWS with AdministratorAccess* and get your security credentials
    • Go to this url: AWS CLI and configure your AWS Credentials in your local machine
    • Install git-bash for windows, once installed , open git bash and download this repository, this will download all the files needed.

    Now, Let's create a new AWS EMR cluster with the name: wittline

    The above step will throw a new cluster id, this time was: "j-39DQEKGDZO6C0", the next step will upload the steps involved of the current cluster job to our S3 bucket of the project.

    Once the steps were uploaded, now you can execute the steps, this action will read the steps from the S3 bucket of the project and will throw them to the job in the correct order.

    Once the execution of the three steps were completed, you can download the word clouds generated using the following command:

    The name of the S3 bucket created on this case is: wittline-1624387595519505700

    Remember to terminate your cluster and choose Yes to delete all resources created

    Evolution of the word clouds from the titles of books purchased on Amazon USA (1995 - 2015)

    word_clouds

    Contributing and Feedback

    Any ideas or feedback about this repository? or do you need help to deploy it?

    Contact me on | Twitter | LinkedIn | Medium

    Authors

    License

    This project is licensed under the terms of the Apache License.

    Sours: https://github.com/Wittline/pyspark-on-aws-emr


    413 414 415 416 417