{ "cells": [ { "cell_type": "markdown", "id": "e5905a69", "metadata": {}, "source": [ "# CSE6242 - HW3 - Q1" ] }, { "cell_type": "markdown", "id": "09289981", "metadata": {}, "source": [ "Pyspark Imports" ] }, { "cell_type": "code", "execution_count": 1, "id": "139318cb", "metadata": {}, "outputs": [], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "import pyspark\n", "from pyspark.sql import SQLContext\n", "from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce\n", "from pyspark.sql.functions import *" ] }, { "cell_type": "markdown", "id": "3fd9e0f8", "metadata": {}, "source": [ "Initialize PySpark Context" ] }, { "cell_type": "code", "execution_count": 2, "id": "b0c18c6c", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "23/10/18 14:54:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "23/10/18 14:54:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n", "23/10/18 14:54:42 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n", "23/10/18 14:54:42 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.\n", "23/10/18 14:54:42 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.\n", "/usr/local/lib/python3.9/dist-packages/pyspark/sql/context.py:113: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.\n", " warnings.warn(\n" ] } ], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "sc = pyspark.SparkContext(appName=\"HW3-Q1\")\n", "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "id": "d68ae314", "metadata": {}, "source": [ "Define function for loading data" ] }, { "cell_type": "code", "execution_count": 3, "id": "7e5bbdda", "metadata": {}, "outputs": [], "source": [ "### DO NOT MODIFY THIS CELL ###\n", "def load_data():\n", " df = sqlContext.read.option(\"header\",True) \\\n", " .csv(\"yellow_tripdata_2019-01_short.csv\")\n", " return df" ] }, { "cell_type": "markdown", "id": "0d52409d", "metadata": {}, "source": [ "### Q1.a" ] }, { "cell_type": "markdown", "id": "e43f6e00", "metadata": {}, "source": [ "Perform data casting to clean incoming dataset" ] }, { "cell_type": "code", "execution_count": 4, "id": "11f801b4", "metadata": {}, "outputs": [], "source": [ "def clean_data(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with the all the original columns\n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType\n", "\n", " df = df.withColumn(\"passenger_count\", df[\"passenger_count\"].cast(IntegerType()))\n", " df = df.withColumn(\"total_amount\", df[\"total_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"tip_amount\", df[\"tip_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"trip_distance\", df[\"trip_distance\"].cast(FloatType()))\n", " df = df.withColumn(\"fare_amount\", df[\"fare_amount\"].cast(FloatType()))\n", " df = df.withColumn(\"tpep_pickup_datetime\", df[\"tpep_pickup_datetime\"].cast(TimestampType()))\n", " df = df.withColumn(\"tpep_dropoff_datetime\", df[\"tpep_dropoff_datetime\"].cast(TimestampType()))\n", "\n", " # END YOUR CODE HERE -----------\n", " \n", " return df" ] }, { "cell_type": "code", "execution_count": 5, "id": "58f423ea-b1e3-4942-95c0-2c5e7924e5c5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- passenger_count: integer (nullable = true)\n", " |-- total_amount: float (nullable = true)\n", " |-- tip_amount: float (nullable = true)\n", " |-- trip_distance: float (nullable = true)\n", " |-- fare_amount: float (nullable = true)\n", " |-- tpep_pickup_datetime: timestamp (nullable = true)\n", " |-- tpep_pickup_datetime: timestamp (nullable = true)\n", "\n" ] } ], "source": [ "df = load_data()\n", "df = clean_data(df)\n", "df.select(['passenger_count', 'total_amount', 'tip_amount', 'trip_distance', 'fare_amount', 'tpep_pickup_datetime', 'tpep_pickup_datetime']).printSchema()" ] }, { "cell_type": "markdown", "id": "d4f565d0", "metadata": {}, "source": [ "### Q1.b" ] }, { "cell_type": "markdown", "id": "72b4f712", "metadata": {}, "source": [ "Find rate per person for based on how many passengers travel between pickup and dropoff locations. " ] }, { "cell_type": "code", "execution_count": 6, "id": "4e115152", "metadata": {}, "outputs": [], "source": [ "def common_pair(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - PULocationID\n", " - DOLocationID\n", " - passenger_count\n", " - per_person_rate\n", " \n", " per_person_rate is the total_amount per person for a given pair.\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql import Window\n", "\n", " partition_cols = ['PULocationID','DOLocationID']\n", "\n", " group_by_result = df.groupBy(partition_cols).count()\n", " # group_by_result.show()\n", "\n", " # Filter out any trips that have the same pick-up and drop-off location. \n", " df_temp = df.filter((df.PULocationID != df.DOLocationID))\n", " # group_by_result_difference_location.show()\n", "\n", " # # [4 pts] You will be modifying the function common_pair. \n", " # # Return the top 10 pickup-dropoff location pairs that have the highest number of total passengers who have traveled between them. \n", " # # Sort the location pairs by total passengers. \n", " df_temp = df_temp.withColumn(\"passenger_count\", sum(\"passenger_count\").over(Window.partitionBy(*partition_cols)))\n", " \n", " # # For each location pair, also compute \n", " # # the average amount per passenger over all trips (name this per_person_rate), utilizing total_amount.\n", " df_temp = df_temp.withColumn(\"total_amount_partition\", sum(\"total_amount\").over(Window.partitionBy(*partition_cols)))\n", " df_temp = df_temp.withColumn(\"per_person_rate\",col(\"total_amount_partition\")/col(\"passenger_count\"))\n", " \n", " # # For pairs with the same total passengers, \n", " # # sort them in descending order of per_person_rate.\n", " # # Rename the column for total passengers to passenger_count. \n", " df_temp = df_temp.select(['PULocationID','DOLocationID','passenger_count','per_person_rate']).distinct()\n", " df_joined = group_by_result.join(df_temp, partition_cols)\n", " df_joined = df_joined.orderBy(['passenger_count','per_person_rate'], ascending=False).limit(10)\n", " df_output = df_joined.drop('count')\n", " # END YOUR CODE HERE -----------\n", "\n", " return df_output\n" ] }, { "cell_type": "code", "execution_count": 7, "id": "859262a0-fa16-48d9-ac59-6ee74ff77381", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+------------+---------------+------------------+\n", "|PULocationID|DOLocationID|passenger_count| per_person_rate|\n", "+------------+------------+---------------+------------------+\n", "| 246| 162| 30| 0.9|\n", "| 151| 239| 30|0.6666666666666666|\n", "| 107| 181| 3|0.6666666666666666|\n", "| 113| 90| 3|0.6666666666666666|\n", "| 116| 42| 3|0.6666666666666666|\n", "| 138| 50| 3|0.6666666666666666|\n", "| 141| 234| 3|0.6666666666666666|\n", "| 144| 261| 3|0.6666666666666666|\n", "| 161| 170| 3|0.6666666666666666|\n", "| 170| 141| 3|0.6666666666666666|\n", "+------------+------------+---------------+------------------+\n", "\n" ] } ], "source": [ "common_pair(df).show()" ] }, { "cell_type": "markdown", "id": "127574ab", "metadata": {}, "source": [ "### Q1.c" ] }, { "cell_type": "markdown", "id": "36a8fd27", "metadata": {}, "source": [ "Find trips which trip distances generate the highest tip percentage." ] }, { "cell_type": "code", "execution_count": 8, "id": "376c981c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+------------------+\n", "|trip_distance| tip_percent|\n", "+-------------+------------------+\n", "| 0.7| 30.90909177606756|\n", "| 0.55|30.000000733595627|\n", "| 1.2|28.095238549368723|\n", "| 3.7|27.407407760620117|\n", "| 6.3|26.511627019837846|\n", "| 0.3|26.499998569488525|\n", "| 0.6| 24.54545497894287|\n", "| 1.42|23.999999119685246|\n", "| 2.8|21.666666666666668|\n", "| 8.7|20.724637957586758|\n", "| 1.9|19.047619047619047|\n", "| 2.1|17.000000476837158|\n", "| 1.5| 11.37931018040098|\n", "| 12.3|10.526315789473685|\n", "| 1.3| 8.928571428571429|\n", "+-------------+------------------+\n", "\n" ] } ], "source": [ "def distance_with_most_tip(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - trip_distance\n", " - tip_percent\n", " \n", " trip_percent is the percent of tip out of fare_amount\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " # You will be modifying the function distance_with_most_tip . \n", " # Filter the data for trips having fares (fare_amount) \n", " # greater than $2.00 and \n", " # a trip distance (trip_distance) greater than 0. \n", " filtered_df = df.where((df.fare_amount > 2) & (df.trip_distance > 0))\n", " \n", " # Calculate the tip percent (tip_amount * 100 / fare_amount) for each trip. \n", " # Round all trip distances up to the closest mile and find the average tip_percent for each trip_distance.\n", " filtered_df = filtered_df.groupBy('trip_distance').agg((mean('tip_amount')*100)/mean('fare_amount'))\n", " \n", " # Sort the result in descending order of tip_percent to obtain the top 15 trip distances which tip the most generously. \n", " # Rename \n", " # the column for rounded trip distances to trip_distance, and \n", " # the column for average tip percents tip_percent . \n", " new_cols = ['trip_distance', 'tip_percent']\n", " df = filtered_df.toDF(*new_cols) \n", " df = df.sort(df.tip_percent.desc()).limit(15)\n", " \n", "\n", " # END YOUR CODE HERE -----------\n", " \n", " return df\n", "distance_with_most_tip(df).show()" ] }, { "cell_type": "markdown", "id": "f0172fe6", "metadata": {}, "source": [ "### Q1.d" ] }, { "cell_type": "markdown", "id": "4613c906", "metadata": {}, "source": [ "Determine the average speed at different times of day." ] }, { "cell_type": "code", "execution_count": 9, "id": "abff9e24", "metadata": { "scrolled": true }, "outputs": [ { "ename": "PySparkValueError", "evalue": "[CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mPySparkValueError\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn[9], line 100\u001b[0m\n\u001b[1;32m 96\u001b[0m \u001b[38;5;66;03m# END YOUR CODE HERE -----------\u001b[39;00m\n\u001b[1;32m 98\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m df_output\n\u001b[0;32m--> 100\u001b[0m \u001b[43mtime_with_most_traffic\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdf\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241m.\u001b[39mshow()\n", "Cell \u001b[0;32mIn[9], line 95\u001b[0m, in \u001b[0;36mtime_with_most_traffic\u001b[0;34m(df)\u001b[0m\n\u001b[1;32m 92\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m (AM_PM \u001b[38;5;241m==\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mPM\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n\u001b[1;32m 93\u001b[0m time_data[\u001b[38;5;28mint\u001b[39m(time_int)][\u001b[38;5;241m2\u001b[39m] \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mfloat\u001b[39m(average_speed)\n\u001b[0;32m---> 95\u001b[0m df_output \u001b[38;5;241m=\u001b[39m \u001b[43mspark\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcreateDataFrame\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdata\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mtime_data\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m[\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mtime_of_day\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mam_avg_speed\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mpm_avg_speed\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 96\u001b[0m \u001b[38;5;66;03m# END YOUR CODE HERE -----------\u001b[39;00m\n\u001b[1;32m 98\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m df_output\n", "File \u001b[0;32m/usr/local/lib/python3.9/dist-packages/pyspark/sql/session.py:1443\u001b[0m, in \u001b[0;36mSparkSession.createDataFrame\u001b[0;34m(self, data, schema, samplingRatio, verifySchema)\u001b[0m\n\u001b[1;32m 1438\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m has_pandas \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(data, pd\u001b[38;5;241m.\u001b[39mDataFrame):\n\u001b[1;32m 1439\u001b[0m \u001b[38;5;66;03m# Create a DataFrame from pandas DataFrame.\u001b[39;00m\n\u001b[1;32m 1440\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28msuper\u001b[39m(SparkSession, \u001b[38;5;28mself\u001b[39m)\u001b[38;5;241m.\u001b[39mcreateDataFrame( \u001b[38;5;66;03m# type: ignore[call-overload]\u001b[39;00m\n\u001b[1;32m 1441\u001b[0m data, schema, samplingRatio, verifySchema\n\u001b[1;32m 1442\u001b[0m )\n\u001b[0;32m-> 1443\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_create_dataframe\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1444\u001b[0m \u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msamplingRatio\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mverifySchema\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;66;43;03m# type: ignore[arg-type]\u001b[39;49;00m\n\u001b[1;32m 1445\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/local/lib/python3.9/dist-packages/pyspark/sql/session.py:1485\u001b[0m, in \u001b[0;36mSparkSession._create_dataframe\u001b[0;34m(self, data, schema, samplingRatio, verifySchema)\u001b[0m\n\u001b[1;32m 1483\u001b[0m rdd, struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_createFromRDD(data\u001b[38;5;241m.\u001b[39mmap(prepare), schema, samplingRatio)\n\u001b[1;32m 1484\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 1485\u001b[0m rdd, struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_createFromLocal\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mmap\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mprepare\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1486\u001b[0m \u001b[38;5;28;01massert\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jvm \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[1;32m 1487\u001b[0m jrdd \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jvm\u001b[38;5;241m.\u001b[39mSerDeUtil\u001b[38;5;241m.\u001b[39mtoJavaArray(rdd\u001b[38;5;241m.\u001b[39m_to_java_object_rdd())\n", "File \u001b[0;32m/usr/local/lib/python3.9/dist-packages/pyspark/sql/session.py:1093\u001b[0m, in \u001b[0;36mSparkSession._createFromLocal\u001b[0;34m(self, data, schema)\u001b[0m\n\u001b[1;32m 1090\u001b[0m data \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mlist\u001b[39m(data)\n\u001b[1;32m 1092\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m schema \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mor\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(schema, (\u001b[38;5;28mlist\u001b[39m, \u001b[38;5;28mtuple\u001b[39m)):\n\u001b[0;32m-> 1093\u001b[0m struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_inferSchemaFromList\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mnames\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mschema\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1094\u001b[0m converter \u001b[38;5;241m=\u001b[39m _create_converter(struct)\n\u001b[1;32m 1095\u001b[0m tupled_data: Iterable[Tuple] \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mmap\u001b[39m(converter, data)\n", "File \u001b[0;32m/usr/local/lib/python3.9/dist-packages/pyspark/sql/session.py:969\u001b[0m, in \u001b[0;36mSparkSession._inferSchemaFromList\u001b[0;34m(self, data, names)\u001b[0m\n\u001b[1;32m 955\u001b[0m schema \u001b[38;5;241m=\u001b[39m reduce(\n\u001b[1;32m 956\u001b[0m _merge_type,\n\u001b[1;32m 957\u001b[0m (\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 966\u001b[0m ),\n\u001b[1;32m 967\u001b[0m )\n\u001b[1;32m 968\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m _has_nulltype(schema):\n\u001b[0;32m--> 969\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m PySparkValueError(\n\u001b[1;32m 970\u001b[0m error_class\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mCANNOT_DETERMINE_TYPE\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m 971\u001b[0m message_parameters\u001b[38;5;241m=\u001b[39m{},\n\u001b[1;32m 972\u001b[0m )\n\u001b[1;32m 973\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m schema\n", "\u001b[0;31mPySparkValueError\u001b[0m: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring." ] } ], "source": [ "def time_with_most_traffic(df):\n", " '''\n", " input: df a dataframe\n", " output: df a dataframe with following columns:\n", " - time_of_day\n", " - am_avg_speed\n", " - pm_avg_speed\n", " \n", " trip_percent is the percent of tip out of fare_amount\n", " \n", " '''\n", " \n", " # START YOUR CODE HERE ---------\n", " from pyspark.sql.functions import dayofweek,date_format, unix_timestamp,format_number\n", " from pyspark.sql.types import DecimalType\n", " from pyspark.sql import Window\n", " \n", " # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. \n", " \n", " df = df.withColumn('time_of_day',date_format(col(\"tpep_pickup_datetime\"),\"K\"))\n", " df = df.withColumn('AM_PM',date_format(col(\"tpep_pickup_datetime\"),\"a\"))\n", " df = df.withColumn('diff_in_seconds',col(\"tpep_dropoff_datetime\").cast(\"long\") - col('tpep_pickup_datetime').cast(\"long\"))\n", "\n", " partition_cols = ['time_of_day','AM_PM']\n", " \n", " # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. \n", " # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.\n", " group_by_result = df.groupBy(partition_cols).count()\n", " \n", " df = group_by_result.join(df, partition_cols)\n", " \n", " df = df.withColumn(\"trip_distance_sum\", sum(\"trip_distance\").over(Window.partitionBy(*partition_cols)))\n", " df = df.withColumn(\"average_trip_distance\",col(\"trip_distance_sum\")/col(\"count\"))\n", "\n", " df = df.withColumn(\"trip_time_sum_s\", sum(\"diff_in_seconds\").over(Window.partitionBy(*partition_cols)))\n", " df = df.withColumn(\"average_trip_time_s\",col(\"trip_time_sum_s\")/col(\"count\"))\n", "\n", " df = df.withColumn(\"average_speed\",col(\"average_trip_distance\")/col(\"average_trip_time_s\"))\n", "\n", " df = df.select(['time_of_day','AM_PM','average_speed']).distinct()\n", " df = group_by_result.join(df, partition_cols)\n", " \n", " # A day with low average speed indicates high levels of traffic. \n", " # The average speed may be 0, indicating very high levels of traffic.\n", "\n", " # Additionally, you must separate the hours into AM and PM, with hours 0:00-11:59 being AM, and hours 12:00-23:59 being PM. \n", " # Convert these times to the 12 hour time, so you can match the below output. \n", " # For example, \n", " # the row with 1 as time of day, \n", " # should show the average speed between 1 am and 2 am in the am_avg_speed column, \n", " # and between 1 pm and 2pm in the pm_avg_speed column.\n", "\n", " # Use date_format along with the appropriate pattern letters to format the time of day so that it matches the example output below. \n", " \n", " # Your final table should \n", " # contain values sorted from 0-11 for time_of_day. \n", " # There may be data missing for a time of day, and it may be null for am_avg_speed or pm_avg_speed. \n", " # If an hour has no data for am or pm, there may be missing rows. \n", " # Do not include any additional rows for times of day which are not represented in the data. \n", "\n", " df = df.select(['time_of_day','AM_PM','average_speed']).limit(15)\n", " \n", " from pyspark.sql import SparkSession\n", " spark = SparkSession.builder.getOrCreate()\n", "\n", " time_data = [\n", " [0,None,None],\n", " [1,None,None],\n", " [2,None,None],\n", " [3,None,None],\n", " [4,None,None],\n", " [5,None,None],\n", " [6,None,None],\n", " [7,None,None],\n", " [8,None,None],\n", " [9,None,None],\n", " [10,None,None],\n", " [11,None,None]\n", " ]\n", " \n", " for i in range(0,len(time_data)):\n", " time_int_wanted = time_data[i][0]\n", " \n", " for j in range(0, len(df.collect())):\n", " time_int = df.collect()[j][0]\n", " AM_PM = df.collect()[j][1]\n", " average_speed = df.collect()[j][2]\n", "\n", " if (time_int == str(time_int_wanted)):\n", " if (AM_PM ==\"AM\"):\n", " time_data[int(time_int)][1] = float(average_speed)\n", " if (AM_PM ==\"PM\"):\n", " time_data[int(time_int)][2] = float(average_speed)\n", "\n", " df_output = spark.createDataFrame(data=time_data, schema=[\"time_of_day\",\"am_avg_speed\",\"pm_avg_speed\"])\n", " # END YOUR CODE HERE -----------\n", " \n", " return df_output\n", "\n", "time_with_most_traffic(df).show()" ] }, { "cell_type": "markdown", "id": "34cbd7b9", "metadata": {}, "source": [ "### The below cells are for you to investigate your solutions and will not be graded\n", "## Ensure they are commented out prior to submitting to Gradescope to avoid errors" ] }, { "cell_type": "code", "execution_count": null, "id": "bf9abefb", "metadata": {}, "outputs": [], "source": [ "# df = load_data()\n", "# df = clean_data(df)" ] }, { "cell_type": "code", "execution_count": null, "id": "cfa96f41", "metadata": {}, "outputs": [], "source": [ "# common_pair(df).show()" ] }, { "cell_type": "code", "execution_count": null, "id": "8e42b46a", "metadata": {}, "outputs": [], "source": [ "# distance_with_most_tip(df).show()" ] }, { "cell_type": "code", "execution_count": null, "id": "43e80dba-7407-4c3a-ba27-6cba3d90d21c", "metadata": {}, "outputs": [], "source": [ "# time_with_most_traffic(df).show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.2" } }, "nbformat": 4, "nbformat_minor": 5 }