{ "cells": [ { "cell_type": "markdown", "id": "e5905a69", "metadata": {}, "source": [ "# CSE6242 - HW3 - Q1" ] }, { "cell_type": "markdown", "id": "09289981", "metadata": {}, "source": [ "Pyspark Imports" ] }, { "cell_type": "code", "execution_count": 1, "id": "139318cb", "metadata": {}, "outputs": [], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "import pyspark\n", "from pyspark.sql import SQLContext\n", "from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce" ] }, { "cell_type": "markdown", "id": "3fd9e0f8", "metadata": {}, "source": [ "Initialize PySpark Context" ] }, { "cell_type": "code", "execution_count": 2, "id": "b0c18c6c", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "23/10/19 06:47:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n", "23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n", "23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.\n", "23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.\n", "23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.\n", "23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.\n", "/usr/local/lib/python3.9/dist-packages/pyspark/sql/context.py:113: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.\n", " warnings.warn(\n" ] } ], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "sc = pyspark.SparkContext(appName=\"HW3-Q1\")\n", "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "id": "d68ae314", "metadata": {}, "source": [ "Define function for loading data" ] }, { "cell_type": "code", "execution_count": 3, "id": "7e5bbdda", "metadata": {}, "outputs": [], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "def load_data():\n", " df = sqlContext.read.option(\"header\",True) \\\n", " .csv(\"yellow_tripdata_2019-01_short.csv\")\n", " return df" ] }, { "cell_type": "markdown", "id": "0d52409d", "metadata": {}, "source": [ "### Q1.a" ] }, { "cell_type": "markdown", "id": "e43f6e00", "metadata": {}, "source": [ "Perform data casting to clean incoming dataset" ] }, { "cell_type": "code", "execution_count": 4, "id": "11f801b4", "metadata": {}, "outputs": [], "source": [ "def clean_data(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with the all the original columns\n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType\n", "\n", " df = df.withColumn(\"passenger_count\", df[\"passenger_count\"].cast(IntegerType()))\n", " df = df.withColumn(\"total_amount\", df[\"total_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"tip_amount\", df[\"tip_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"trip_distance\", df[\"trip_distance\"].cast(FloatType()))\n", " df = df.withColumn(\"fare_amount\", df[\"fare_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"tpep_pickup_datetime\", df[\"tpep_pickup_datetime\"].cast(TimestampType()))\n", " df = df.withColumn(\"tpep_dropoff_datetime\", df[\"tpep_dropoff_datetime\"].cast(TimestampType()))\n", "\n", " # END YOUR CODE HERE -----------\n", " \n", " return df" ] }, { "cell_type": "markdown", "id": "d4f565d0", "metadata": {}, "source": [ "### Q1.b" ] }, { "cell_type": "markdown", "id": "72b4f712", "metadata": {}, "source": [ "Find rate per person for based on how many passengers travel between pickup and dropoff locations. " ] }, { "cell_type": "code", "execution_count": 5, "id": "4e115152", "metadata": {}, "outputs": [], "source": [ "def common_pair(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - PULocationID\n", " - DOLocationID\n", " - passenger_count\n", " - per_person_rate\n", " \n", " per_person_rate is the total_amount per person for a given pair.\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", "\n", " \n", " \n", " # END YOUR CODE HERE -----------\n", " \n", " return df" ] }, { "cell_type": "markdown", "id": "127574ab", "metadata": {}, "source": [ "### Q1.c" ] }, { "cell_type": "markdown", "id": "36a8fd27", "metadata": {}, "source": [ "Find trips which trip distances generate the highest tip percentage." ] }, { "cell_type": "code", "execution_count": 6, "id": "376c981c", "metadata": {}, "outputs": [], "source": [ "def distance_with_most_tip(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - trip_distance\n", " - tip_percent\n", " \n", " trip_percent is the percent of tip out of fare_amount\n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", "\n", " # END YOUR CODE HERE -----------\n", " \n", " return df\n" ] }, { "cell_type": "markdown", "id": "f0172fe6", "metadata": {}, "source": [ "### Q1.d" ] }, { "cell_type": "markdown", "id": "4613c906", "metadata": {}, "source": [ "Determine the average speed at different times of day." ] }, { "cell_type": "code", "execution_count": 7, "id": "abff9e24", "metadata": {}, "outputs": [], "source": [ "def time_with_most_traffic(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - time_of_day\n", " - am_avg_speed\n", " - pm_avg_speed\n", " \n", " trip_percent is the percent of tip out of fare_amount\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql.functions import date_format\n", " from pyspark.sql import Window\n", " from pyspark.sql.functions import ceil,sum\n", " df_q1d = df.select('*')\n", " \n", " # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. \n", " \n", " df_q1d = df_q1d.withColumn('time_of_day_str',date_format(col(\"tpep_pickup_datetime\"),\"K\"))\n", " df_q1d = df_q1d.withColumn('12_hour_time', col('time_of_day_str').cast(\"int\"))\n", " \n", " df_q1d = df_q1d.withColumn('AM_PM',date_format(col(\"tpep_pickup_datetime\"),\"a\"))\n", " df_q1d = df_q1d.withColumn('trip_time_h',(col(\"tpep_dropoff_datetime\").cast(\"long\") - col('tpep_pickup_datetime').cast(\"long\"))/3600)\n", " \n", " # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. \n", " # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.\n", " \n", " partition_cols = ['12_hour_time','AM_PM']\n", " group_by_result = df_q1d.groupBy(partition_cols).count()\n", " \n", " df_q1d = group_by_result.join(df_q1d, partition_cols)\n", "\n", " df_q1d = df_q1d.withColumn(\"trip_distance_sum\", sum(\"trip_distance\").over(Window.partitionBy(*partition_cols)))\n", " df_q1d = df_q1d.withColumn(\"average_trip_distance\",col(\"trip_distance_sum\")/col(\"count\"))\n", "\n", " df_q1d = df_q1d.withColumn(\"trip_time_sum\", sum(\"trip_time_h\").over(Window.partitionBy(*partition_cols)))\n", " df_q1d = df_q1d.withColumn(\"average_trip_time\",col(\"trip_time_sum\")/col(\"count\"))\n", "\n", " df_q1d = df_q1d.withColumn(\"average_speed\",col(\"average_trip_distance\")/col(\"average_trip_time\"))\n", "\n", " df_q1d = df_q1d.select(['12_hour_time','AM_PM','average_speed']).distinct()\n", "\n", " \n", " df_q1d = df_q1d.groupBy('12_hour_time').pivot('AM_PM').sum(\"average_speed\")\n", " df_q1d = df_q1d.withColumnRenamed(\"AM\",\"am_avg_speed\").withColumnRenamed(\"PM\",\"pm_avg_speed\").withColumnRenamed(\"12_hour_time\",\"time_of_day\")\n", " df_q1d = df_q1d.orderBy(['time_of_day'], ascending=True)\n", " # df.show()\n", " \n", " # df.select(['12_hour_time','AM_PM','trip_time_h','average_trip_time','trip_distance','average_trip_distance','average_speed']).show()\n", " df = df_q1d\n", " # END YOUR CODE HERE -----------\n", " \n", " return df\n" ] }, { "cell_type": "markdown", "id": "34cbd7b9", "metadata": {}, "source": [ "### The below cells are for you to investigate your solutions and will not be graded\n", "## Ensure they are commented out prior to submitting to Gradescope to avoid errors" ] }, { "cell_type": "code", "execution_count": 8, "id": "bf9abefb", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+------------------+-------------------+\n", "|time_of_day| am_avg_speed| pm_avg_speed|\n", "+-----------+------------------+-------------------+\n", "| 0| 9.377696196631234| NULL|\n", "| 1|10.845483413697353| 5.125214305177561|\n", "| 3| NULL| 0.0|\n", "| 4| NULL| 0.0|\n", "| 5| NULL| 0.5137660239764732|\n", "| 6| NULL| 9.989847870647605|\n", "| 7| NULL|0.18415305490417713|\n", "| 8| NULL| 0.5183127622697896|\n", "| 10| NULL| 0.6147483972627696|\n", "| 11| NULL| 4.650958285207579|\n", "+-----------+------------------+-------------------+\n", "\n", "checkint table:\n", "First row : True True True\n", "1 row : True True True\n", "2 row : True True True\n", "3 row : True True True\n", "4 row : True True True\n", "5 row : True True True\n", "6 row : True True True\n", "7 row : True True True\n", "8 row : True True True\n", "9 row : True True True\n" ] } ], "source": [ "df = load_data()\n", "df = clean_data(df)\n", "# common_pair(df)\n", "# distance_with_most_tip(df)\n", "# time_with_most_traffic(df)\n", "\n", "# # # q1b\n", "# q1b_df = common_pair(df)\n", "# # q1b_df.show()\n", "# print(\"First row :\",q1b_df.collect()[0][0] == '239') \n", "# print(\"First row :\",q1b_df.collect()[0][1] == '238') \n", "# print(\"First row :\",q1b_df.collect()[0][2] == 62) \n", "# print(\"First row :\",q1b_df.collect()[0][3] == 4.26274198870505) \n", "\n", "# print(\"4th row :\",q1b_df.collect()[4][0] == '148') \n", "# print(\"4th row :\",q1b_df.collect()[4][1] == '79') \n", "# print(\"4th row :\",q1b_df.collect()[4][2] == 42) \n", "# print(\"4th row :\",q1b_df.collect()[4][3] == 4.711904752822149) \n", "\n", "# print(\"9th row :\",q1b_df.collect()[9][0] == '79') \n", "# print(\"9th row :\",q1b_df.collect()[9][1] == '170') \n", "# print(\"9th row :\",q1b_df.collect()[9][2] == 34) \n", "# print(\"9th row :\",q1b_df.collect()[9][3] == 6.394705884596881) \n", "\n", "# # # q1c\n", "# q1c_df = distance_with_most_tip(df)\n", "# # q1c_df.show()\n", "# print(\"First row :\",q1c_df.collect()[0][0] == 1) \n", "# print(\"First row :\",q1c_df.collect()[0][1] == 17.129815971513313) \n", "\n", "# print(\"Mid row :\",q1c_df.collect()[6][0] == 5) \n", "# print(\"Mid row :\",q1c_df.collect()[6][1] == 14.245405861990653) \n", "\n", "# print(\"Last row :\",q1c_df.collect()[14][0] == 18) \n", "# print(\"Last row :\",q1c_df.collect()[14][1] == 10.696822158448429) \n", "\n", "# # # q1d\n", "q1d_df = time_with_most_traffic(df)\n", "q1d_df.show()\n", "print('checkint table:')\n", "print(\"First row :\",q1d_df.collect()[0][0] == 0 ,q1d_df.collect()[0][1] == 9.377696196631234, q1d_df.collect()[0][2] == None) \n", "print(\"1 row :\" ,q1d_df.collect()[1][0] == 1 ,q1d_df.collect()[1][1] == 10.845483413697353, q1d_df.collect()[1][2] == 5.125214305177561) \n", "print(\"2 row :\" ,q1d_df.collect()[2][0] == 3 ,q1d_df.collect()[2][1] == None, q1d_df.collect()[2][2] == 0.0) \n", "print(\"3 row :\" ,q1d_df.collect()[3][0] == 4 ,q1d_df.collect()[3][1] == None, q1d_df.collect()[3][2] == 0.0) \n", "print(\"4 row :\" ,q1d_df.collect()[4][0] == 5 ,q1d_df.collect()[4][1] == None, q1d_df.collect()[4][2] == 0.5137660239764732) \n", "print(\"5 row :\" ,q1d_df.collect()[5][0] == 6 ,q1d_df.collect()[5][1] == None, q1d_df.collect()[5][2] == 9.989847870647605) \n", "print(\"6 row :\" ,q1d_df.collect()[6][0] == 7 ,q1d_df.collect()[6][1] == None, q1d_df.collect()[6][2] == 0.18415305490417713) \n", "print(\"7 row :\" ,q1d_df.collect()[7][0] == 8 ,q1d_df.collect()[7][1] == None, q1d_df.collect()[7][2] == 0.5183127622697896) \n", "print(\"8 row :\" ,q1d_df.collect()[8][0] == 10 ,q1d_df.collect()[8][1] == None, q1d_df.collect()[8][2] == 0.6147483972627696) \n", "print(\"9 row :\" ,q1d_df.collect()[9][0] == 11 ,q1d_df.collect()[9][1] == None, q1d_df.collect()[9][2] == 4.650958285207579) \n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.2" } }, "nbformat": 4, "nbformat_minor": 5 }