Files
tunmnlu/task_3/Skeleton/Q1/_submit/q1d/q1.ipynb
louiscklaw 9035c1312b update,
2025-02-01 02:09:32 +08:00

427 lines
15 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"id": "e5905a69",
"metadata": {},
"source": [
"# CSE6242 - HW3 - Q1"
]
},
{
"cell_type": "markdown",
"id": "09289981",
"metadata": {},
"source": [
"Pyspark Imports"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "139318cb",
"metadata": {},
"outputs": [],
"source": [
"### DO NOT MODIFY THIS CELL ###\n",
"import pyspark\n",
"from pyspark.sql import SQLContext\n",
"from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce"
]
},
{
"cell_type": "markdown",
"id": "3fd9e0f8",
"metadata": {},
"source": [
"Initialize PySpark Context"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "b0c18c6c",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"23/10/20 05:31:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n",
"23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n",
"23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.\n",
"23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.\n",
"/usr/local/lib/python3.9/dist-packages/pyspark/sql/context.py:113: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.\n",
" warnings.warn(\n"
]
}
],
"source": [
"### DO NOT MODIFY THIS CELL ###\n",
"sc = pyspark.SparkContext(appName=\"HW3-Q1\")\n",
"sqlContext = SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"id": "d68ae314",
"metadata": {},
"source": [
"Define function for loading data"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "7e5bbdda",
"metadata": {},
"outputs": [],
"source": [
"### DO NOT MODIFY THIS CELL ###\n",
"def load_data():\n",
" df = sqlContext.read.option(\"header\",True) \\\n",
" .csv(\"yellow_tripdata_2019-01_short.csv\")\n",
" return df"
]
},
{
"cell_type": "markdown",
"id": "0d52409d",
"metadata": {},
"source": [
"### Q1.a"
]
},
{
"cell_type": "markdown",
"id": "e43f6e00",
"metadata": {},
"source": [
"Perform data casting to clean incoming dataset"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "11f801b4",
"metadata": {},
"outputs": [],
"source": [
"def clean_data(df):\n",
" '''\n",
" input: df a dataframe\n",
" output: df a dataframe with the all the original columns\n",
" '''\n",
" \n",
" # START YOUR CODE HERE ---------\n",
" from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType\n",
"\n",
" df = df.withColumn(\"passenger_count\", df[\"passenger_count\"].cast(IntegerType()))\n",
" df = df.withColumn(\"total_amount\", df[\"total_amount\"].cast(FloatType()))\n",
" df = df.withColumn(\"tip_amount\", df[\"tip_amount\"].cast(FloatType()))\n",
" df = df.withColumn(\"trip_distance\", df[\"trip_distance\"].cast(FloatType()))\n",
" df = df.withColumn(\"fare_amount\", df[\"fare_amount\"].cast(FloatType()))\n",
" df = df.withColumn(\"tpep_pickup_datetime\", df[\"tpep_pickup_datetime\"].cast(TimestampType()))\n",
" df = df.withColumn(\"tpep_dropoff_datetime\", df[\"tpep_dropoff_datetime\"].cast(TimestampType()))\n",
" # END YOUR CODE HERE -----------\n",
" \n",
" return df"
]
},
{
"cell_type": "markdown",
"id": "d4f565d0",
"metadata": {},
"source": [
"### Q1.b"
]
},
{
"cell_type": "markdown",
"id": "72b4f712",
"metadata": {},
"source": [
"Find rate per person for based on how many passengers travel between pickup and dropoff locations. "
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "4e115152",
"metadata": {},
"outputs": [],
"source": [
"def common_pair(df):\n",
" '''\n",
" input: df a dataframe\n",
" output: df a dataframe with following columns:\n",
" - PULocationID\n",
" - DOLocationID\n",
" - passenger_count\n",
" - per_person_rate\n",
" \n",
" per_person_rate is the total_amount per person for a given pair.\n",
" \n",
" '''\n",
" \n",
" # START YOUR CODE HERE ---------\n",
" from pyspark.sql import Window\n",
" from pyspark.sql.functions import sum\n",
"\n",
" partition_cols = ['PULocationID','DOLocationID']\n",
" group_by_result = df.groupBy(partition_cols).count()\n",
" df = df.filter((df.PULocationID != df.DOLocationID))\n",
" df = df.withColumn(\"passenger_count_sum\", sum(\"passenger_count\").over(Window.partitionBy(*partition_cols)))\n",
" df = df.withColumn(\"total_amount_sum\", sum(\"total_amount\").over(Window.partitionBy(*partition_cols)))\n",
" df = df.withColumn(\"per_person_rate\", col(\"total_amount_sum\")/col(\"passenger_count_sum\"))\n",
" df = df.select(\n",
" ['PULocationID','DOLocationID', 'passenger_count_sum','per_person_rate']\n",
" ).distinct().orderBy(['passenger_count_sum','per_person_rate'], ascending=False).limit(10)\n",
" df = df.withColumnRenamed('passenger_count_sum', 'passenger_count')\n",
" # END YOUR CODE HERE -----------\n",
" \n",
" return df"
]
},
{
"cell_type": "markdown",
"id": "127574ab",
"metadata": {},
"source": [
"### Q1.c"
]
},
{
"cell_type": "markdown",
"id": "36a8fd27",
"metadata": {},
"source": [
"Find trips which trip distances generate the highest tip percentage."
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "376c981c",
"metadata": {},
"outputs": [],
"source": [
"def distance_with_most_tip(df):\n",
" '''\n",
" input: df a dataframe\n",
" output: df a dataframe with following columns:\n",
" - trip_distance\n",
" - tip_percent\n",
" \n",
" trip_percent is the percent of tip out of fare_amount\n",
" '''\n",
" \n",
" # START YOUR CODE HERE ---------\n",
" from pyspark.sql import Window\n",
" from pyspark.sql.functions import col, ceil, sum, max\n",
" \n",
" df = df.where((df.fare_amount > 2) & (df.trip_distance > 0))\n",
" \n",
" # # Calculate the tip percent (tip_amount * 100 / fare_amount) for each trip. \n",
" df = df.withColumn(\"tip_percent\", (col('tip_amount') * 100) / col('fare_amount'))\n",
" \n",
" # # Round all trip distances up to the closest mile and find the average tip_percent for each trip_distance.\n",
" df = df.withColumn(\"trip_distance_round_up\", ceil(col('trip_distance')).cast('decimal(8,0)'))\n",
"\n",
" partition_cols = ['trip_distance_round_up']\n",
" group_by_result = df.groupBy(partition_cols).count()\n",
" \n",
" df = df.withColumn(\"tip_percent_sum\", sum(\"tip_percent\").over(Window.partitionBy(*partition_cols)))\n",
" df_joined = group_by_result.join(df, partition_cols)\n",
" df = df_joined.withColumn(\"average_tip_percent\", col('tip_percent_sum') / col('count'))\n",
" \n",
" df = df.select(['trip_distance_round_up','tip_percent_sum','count','average_tip_percent']).distinct()\n",
" df = df.select(['trip_distance_round_up','average_tip_percent'])\n",
" df = df.withColumnRenamed(\"average_tip_percent\",\"tip_percent\").withColumnRenamed(\"trip_distance_round_up\",\"trip_distance\")\n",
" df = df.orderBy(['tip_percent'], ascending=False).limit(15)\n",
" # END YOUR CODE HERE -----------\n",
" \n",
" return df\n"
]
},
{
"cell_type": "markdown",
"id": "f0172fe6",
"metadata": {},
"source": [
"### Q1.d"
]
},
{
"cell_type": "markdown",
"id": "4613c906",
"metadata": {},
"source": [
"Determine the average speed at different times of day."
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "abff9e24",
"metadata": {},
"outputs": [],
"source": [
"def time_with_most_traffic(df):\n",
" '''\n",
" input: df a dataframe\n",
" output: df a dataframe with following columns:\n",
" - time_of_day\n",
" - am_avg_speed\n",
" - pm_avg_speed\n",
" \n",
" trip_percent is the percent of tip out of fare_amount\n",
" \n",
" '''\n",
" \n",
" # START YOUR CODE HERE ---------\n",
" from pyspark.sql.functions import date_format\n",
" from pyspark.sql import Window\n",
" from pyspark.sql.functions import ceil,sum\n",
"\n",
" # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. \n",
" \n",
" df = df.withColumn('time_of_day_str', date_format(col(\"tpep_pickup_datetime\"),\"K\"))\n",
" df = df.withColumn('12_hour_time', col('time_of_day_str').cast(\"int\"))\n",
" \n",
" df = df.withColumn('AM_PM', date_format(col(\"tpep_pickup_datetime\"),\"a\"))\n",
" df = df.withColumn('trip_time_h',(col(\"tpep_dropoff_datetime\") - col('tpep_pickup_datetime') ).cast(\"long\")/3600)\n",
" \n",
" # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. \n",
" # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.\n",
" \n",
" partition_cols = ['12_hour_time','AM_PM']\n",
" group_by_result = df.groupBy(partition_cols).count()\n",
" df = group_by_result.join(df, partition_cols)\n",
" \n",
" df = df.withColumn(\"trip_distance_sum\", sum(\"trip_distance\").over(Window.partitionBy(*partition_cols)))\n",
" df = df.withColumn(\"average_trip_distance\", col('trip_distance_sum') / col('count'))\n",
" \n",
" df = df.withColumn(\"trip_time_h_sum\", sum(\"trip_time_h\").over(Window.partitionBy(*partition_cols)))\n",
" df = df.withColumn(\"average_trip_time\", col('trip_time_h_sum') / col('count'))\n",
" \n",
" df = df.withColumn(\"average_speed\", col('average_trip_distance') / col('average_trip_time'))\n",
" # df.select(['VendorID','12_hour_time','AM_PM','trip_time_h_sum','average_trip_time','average_trip_distance','average_speed','count']).show()\n",
" \n",
" df = df.select(['12_hour_time','AM_PM','average_speed']).distinct()\n",
" df = df.groupBy('12_hour_time').pivot('AM_PM').sum(\"average_speed\")\n",
" df = df.withColumnRenamed(\"AM\",\"am_avg_speed\").withColumnRenamed(\"PM\",\"pm_avg_speed\").withColumnRenamed(\"12_hour_time\",\"time_of_day\")\n",
" df = df.orderBy(['time_of_day'])\n",
" # END YOUR CODE HERE -----------\n",
" \n",
" return df\n"
]
},
{
"cell_type": "markdown",
"id": "34cbd7b9",
"metadata": {},
"source": [
"### The below cells are for you to investigate your solutions and will not be graded\n",
"## Ensure they are commented out prior to submitting to Gradescope to avoid errors"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "bf9abefb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+------------+---------------+------------------+\n",
"|PULocationID|DOLocationID|passenger_count| per_person_rate|\n",
"+------------+------------+---------------+------------------+\n",
"| 239| 238| 62| 4.26274198870505|\n",
"| 237| 236| 60| 4.482500068346659|\n",
"| 263| 141| 52|3.4190384974846473|\n",
"| 161| 236| 42| 5.368571440378825|\n",
"| 148| 79| 42| 4.711904752822149|\n",
"| 142| 238| 39| 5.05487182812813|\n",
"| 141| 236| 37| 4.355675723101641|\n",
"| 239| 143| 37| 4.252162224537617|\n",
"| 239| 142| 35| 3.817714350564139|\n",
"| 79| 170| 34| 6.394705884596881|\n",
"+------------+------------+---------------+------------------+\n",
"\n",
"+-------------+------------------+\n",
"|trip_distance| tip_percent|\n",
"+-------------+------------------+\n",
"| 1|17.129815971513313|\n",
"| 2|15.815527155632552|\n",
"| 17|15.796441782308916|\n",
"| 20| 15.11240992123345|\n",
"| 3|14.886705727113446|\n",
"| 6|14.579695131601051|\n",
"| 5|14.245405861990653|\n",
"| 4|13.831569507473274|\n",
"| 9|13.814476557648435|\n",
"| 8|12.072596772433315|\n",
"| 19|11.952632334985276|\n",
"| 10|11.880490518902954|\n",
"| 7| 10.80057562837643|\n",
"| 21|10.739019886973427|\n",
"| 18|10.696822158448429|\n",
"+-------------+------------------+\n",
"\n",
"+-----------+------------------+-------------------+\n",
"|time_of_day| am_avg_speed| pm_avg_speed|\n",
"+-----------+------------------+-------------------+\n",
"| 0| 9.377696196631234| NULL|\n",
"| 1|10.845483413697353| 5.125214305177561|\n",
"| 3| NULL| 0.0|\n",
"| 4| NULL| 0.0|\n",
"| 5| NULL| 0.5137660239764732|\n",
"| 6| NULL| 9.989847870647605|\n",
"| 7| NULL|0.18415305490417713|\n",
"| 8| NULL| 0.5183127622697896|\n",
"| 10| NULL| 0.6147483972627696|\n",
"| 11| NULL| 4.650958285207579|\n",
"+-----------+------------------+-------------------+\n",
"\n"
]
}
],
"source": [
"df = load_data()\n",
"df = clean_data(df)\n",
"\n",
"common_pair(df).show()\n",
"distance_with_most_tip(df).show()\n",
"time_with_most_traffic(df).show()\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}