{ "cells": [ { "cell_type": "markdown", "id": "e5905a69", "metadata": {}, "source": [ "# CSE6242 - HW3 - Q1" ] }, { "cell_type": "markdown", "id": "09289981", "metadata": {}, "source": [ "Pyspark Imports" ] }, { "cell_type": "code", "execution_count": 1, "id": "139318cb", "metadata": {}, "outputs": [], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "import pyspark\n", "from pyspark.sql import SQLContext\n", "from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce" ] }, { "cell_type": "markdown", "id": "3fd9e0f8", "metadata": {}, "source": [ "Initialize PySpark Context" ] }, { "cell_type": "code", "execution_count": 2, "id": "b0c18c6c", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "23/10/20 05:31:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n", "23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n", "23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.\n", "23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.\n", "/usr/local/lib/python3.9/dist-packages/pyspark/sql/context.py:113: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.\n", " warnings.warn(\n" ] } ], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "sc = pyspark.SparkContext(appName=\"HW3-Q1\")\n", "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "id": "d68ae314", "metadata": {}, "source": [ "Define function for loading data" ] }, { "cell_type": "code", "execution_count": 3, "id": "7e5bbdda", "metadata": {}, "outputs": [], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "def load_data():\n", " df = sqlContext.read.option(\"header\",True) \\\n", " .csv(\"yellow_tripdata_2019-01_short.csv\")\n", " return df" ] }, { "cell_type": "markdown", "id": "0d52409d", "metadata": {}, "source": [ "### Q1.a" ] }, { "cell_type": "markdown", "id": "e43f6e00", "metadata": {}, "source": [ "Perform data casting to clean incoming dataset" ] }, { "cell_type": "code", "execution_count": 4, "id": "11f801b4", "metadata": {}, "outputs": [], "source": [ "def clean_data(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with the all the original columns\n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType\n", "\n", " df = df.withColumn(\"passenger_count\", df[\"passenger_count\"].cast(IntegerType()))\n", " df = df.withColumn(\"total_amount\", df[\"total_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"tip_amount\", df[\"tip_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"trip_distance\", df[\"trip_distance\"].cast(FloatType()))\n", " df = df.withColumn(\"fare_amount\", df[\"fare_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"tpep_pickup_datetime\", df[\"tpep_pickup_datetime\"].cast(TimestampType()))\n", " df = df.withColumn(\"tpep_dropoff_datetime\", df[\"tpep_dropoff_datetime\"].cast(TimestampType()))\n", " # END YOUR CODE HERE -----------\n", " \n", " return df" ] }, { "cell_type": "markdown", "id": "d4f565d0", "metadata": {}, "source": [ "### Q1.b" ] }, { "cell_type": "markdown", "id": "72b4f712", "metadata": {}, "source": [ "Find rate per person for based on how many passengers travel between pickup and dropoff locations. " ] }, { "cell_type": "code", "execution_count": 5, "id": "4e115152", "metadata": {}, "outputs": [], "source": [ "def common_pair(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - PULocationID\n", " - DOLocationID\n", " - passenger_count\n", " - per_person_rate\n", " \n", " per_person_rate is the total_amount per person for a given pair.\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql import Window\n", " from pyspark.sql.functions import sum\n", "\n", " partition_cols = ['PULocationID','DOLocationID']\n", " group_by_result = df.groupBy(partition_cols).count()\n", " df = df.filter((df.PULocationID != df.DOLocationID))\n", " df = df.withColumn(\"passenger_count_sum\", sum(\"passenger_count\").over(Window.partitionBy(*partition_cols)))\n", " df = df.withColumn(\"total_amount_sum\", sum(\"total_amount\").over(Window.partitionBy(*partition_cols)))\n", " df = df.withColumn(\"per_person_rate\", col(\"total_amount_sum\")/col(\"passenger_count_sum\"))\n", " df = df.select(\n", " ['PULocationID','DOLocationID', 'passenger_count_sum','per_person_rate']\n", " ).distinct().orderBy(['passenger_count_sum','per_person_rate'], ascending=False).limit(10)\n", " df = df.withColumnRenamed('passenger_count_sum', 'passenger_count')\n", " # END YOUR CODE HERE -----------\n", " \n", " return df" ] }, { "cell_type": "markdown", "id": "127574ab", "metadata": {}, "source": [ "### Q1.c" ] }, { "cell_type": "markdown", "id": "36a8fd27", "metadata": {}, "source": [ "Find trips which trip distances generate the highest tip percentage." ] }, { "cell_type": "code", "execution_count": 6, "id": "376c981c", "metadata": {}, "outputs": [], "source": [ "def distance_with_most_tip(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - trip_distance\n", " - tip_percent\n", " \n", " trip_percent is the percent of tip out of fare_amount\n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql import Window\n", " from pyspark.sql.functions import col, ceil, sum, max\n", " \n", " df = df.where((df.fare_amount > 2) & (df.trip_distance > 0))\n", " \n", " # # Calculate the tip percent (tip_amount * 100 / fare_amount) for each trip. \n", " df = df.withColumn(\"tip_percent\", (col('tip_amount') * 100) / col('fare_amount'))\n", " \n", " # # Round all trip distances up to the closest mile and find the average tip_percent for each trip_distance.\n", " df = df.withColumn(\"trip_distance_round_up\", ceil(col('trip_distance')).cast('decimal(8,0)'))\n", "\n", " partition_cols = ['trip_distance_round_up']\n", " group_by_result = df.groupBy(partition_cols).count()\n", " \n", " df = df.withColumn(\"tip_percent_sum\", sum(\"tip_percent\").over(Window.partitionBy(*partition_cols)))\n", " df_joined = group_by_result.join(df, partition_cols)\n", " df = df_joined.withColumn(\"average_tip_percent\", col('tip_percent_sum') / col('count'))\n", " \n", " df = df.select(['trip_distance_round_up','tip_percent_sum','count','average_tip_percent']).distinct()\n", " df = df.select(['trip_distance_round_up','average_tip_percent'])\n", " df = df.withColumnRenamed(\"average_tip_percent\",\"tip_percent\").withColumnRenamed(\"trip_distance_round_up\",\"trip_distance\")\n", " df = df.orderBy(['tip_percent'], ascending=False).limit(15)\n", " # END YOUR CODE HERE -----------\n", " \n", " return df\n" ] }, { "cell_type": "markdown", "id": "f0172fe6", "metadata": {}, "source": [ "### Q1.d" ] }, { "cell_type": "markdown", "id": "4613c906", "metadata": {}, "source": [ "Determine the average speed at different times of day." ] }, { "cell_type": "code", "execution_count": 7, "id": "abff9e24", "metadata": {}, "outputs": [], "source": [ "def time_with_most_traffic(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - time_of_day\n", " - am_avg_speed\n", " - pm_avg_speed\n", " \n", " trip_percent is the percent of tip out of fare_amount\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql.functions import date_format\n", " from pyspark.sql import Window\n", " from pyspark.sql.functions import ceil,sum\n", "\n", " # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. \n", " \n", " df = df.withColumn('time_of_day_str', date_format(col(\"tpep_pickup_datetime\"),\"K\"))\n", " df = df.withColumn('12_hour_time', col('time_of_day_str').cast(\"int\"))\n", " \n", " df = df.withColumn('AM_PM', date_format(col(\"tpep_pickup_datetime\"),\"a\"))\n", " df = df.withColumn('trip_time_h',(col(\"tpep_dropoff_datetime\") - col('tpep_pickup_datetime') ).cast(\"long\")/3600)\n", " \n", " # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. \n", " # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.\n", " \n", " partition_cols = ['12_hour_time','AM_PM']\n", " group_by_result = df.groupBy(partition_cols).count()\n", " df = group_by_result.join(df, partition_cols)\n", " \n", " df = df.withColumn(\"trip_distance_sum\", sum(\"trip_distance\").over(Window.partitionBy(*partition_cols)))\n", " df = df.withColumn(\"average_trip_distance\", col('trip_distance_sum') / col('count'))\n", " \n", " df = df.withColumn(\"trip_time_h_sum\", sum(\"trip_time_h\").over(Window.partitionBy(*partition_cols)))\n", " df = df.withColumn(\"average_trip_time\", col('trip_time_h_sum') / col('count'))\n", " \n", " df = df.withColumn(\"average_speed\", col('average_trip_distance') / col('average_trip_time'))\n", " # df.select(['VendorID','12_hour_time','AM_PM','trip_time_h_sum','average_trip_time','average_trip_distance','average_speed','count']).show()\n", " \n", " df = df.select(['12_hour_time','AM_PM','average_speed']).distinct()\n", " df = df.groupBy('12_hour_time').pivot('AM_PM').sum(\"average_speed\")\n", " df = df.withColumnRenamed(\"AM\",\"am_avg_speed\").withColumnRenamed(\"PM\",\"pm_avg_speed\").withColumnRenamed(\"12_hour_time\",\"time_of_day\")\n", " df = df.orderBy(['time_of_day'])\n", " # END YOUR CODE HERE -----------\n", " \n", " return df\n" ] }, { "cell_type": "markdown", "id": "34cbd7b9", "metadata": {}, "source": [ "### The below cells are for you to investigate your solutions and will not be graded\n", "## Ensure they are commented out prior to submitting to Gradescope to avoid errors" ] }, { "cell_type": "code", "execution_count": 8, "id": "bf9abefb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+------------+---------------+------------------+\n", "|PULocationID|DOLocationID|passenger_count| per_person_rate|\n", "+------------+------------+---------------+------------------+\n", "| 239| 238| 62| 4.26274198870505|\n", "| 237| 236| 60| 4.482500068346659|\n", "| 263| 141| 52|3.4190384974846473|\n", "| 161| 236| 42| 5.368571440378825|\n", "| 148| 79| 42| 4.711904752822149|\n", "| 142| 238| 39| 5.05487182812813|\n", "| 141| 236| 37| 4.355675723101641|\n", "| 239| 143| 37| 4.252162224537617|\n", "| 239| 142| 35| 3.817714350564139|\n", "| 79| 170| 34| 6.394705884596881|\n", "+------------+------------+---------------+------------------+\n", "\n", "+-------------+------------------+\n", "|trip_distance| tip_percent|\n", "+-------------+------------------+\n", "| 1|17.129815971513313|\n", "| 2|15.815527155632552|\n", "| 17|15.796441782308916|\n", "| 20| 15.11240992123345|\n", "| 3|14.886705727113446|\n", "| 6|14.579695131601051|\n", "| 5|14.245405861990653|\n", "| 4|13.831569507473274|\n", "| 9|13.814476557648435|\n", "| 8|12.072596772433315|\n", "| 19|11.952632334985276|\n", "| 10|11.880490518902954|\n", "| 7| 10.80057562837643|\n", "| 21|10.739019886973427|\n", "| 18|10.696822158448429|\n", "+-------------+------------------+\n", "\n", "+-----------+------------------+-------------------+\n", "|time_of_day| am_avg_speed| pm_avg_speed|\n", "+-----------+------------------+-------------------+\n", "| 0| 9.377696196631234| NULL|\n", "| 1|10.845483413697353| 5.125214305177561|\n", "| 3| NULL| 0.0|\n", "| 4| NULL| 0.0|\n", "| 5| NULL| 0.5137660239764732|\n", "| 6| NULL| 9.989847870647605|\n", "| 7| NULL|0.18415305490417713|\n", "| 8| NULL| 0.5183127622697896|\n", "| 10| NULL| 0.6147483972627696|\n", "| 11| NULL| 4.650958285207579|\n", "+-----------+------------------+-------------------+\n", "\n" ] } ], "source": [ "df = load_data()\n", "df = clean_data(df)\n", "\n", "common_pair(df).show()\n", "distance_with_most_tip(df).show()\n", "time_with_most_traffic(df).show()\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.2" } }, "nbformat": 4, "nbformat_minor": 5 }