428 lines
15 KiB
Plaintext
428 lines
15 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "e5905a69",
|
|
"metadata": {},
|
|
"source": [
|
|
"# CSE6242 - HW3 - Q1"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "09289981",
|
|
"metadata": {},
|
|
"source": [
|
|
"Pyspark Imports"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"id": "139318cb",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"### DO NOT MODIFY THIS CELL ###\n",
|
|
"import pyspark\n",
|
|
"from pyspark.sql import SQLContext\n",
|
|
"from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "3fd9e0f8",
|
|
"metadata": {},
|
|
"source": [
|
|
"Initialize PySpark Context"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "b0c18c6c",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Setting default log level to \"WARN\".\n",
|
|
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
|
|
"23/10/19 06:47:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
|
|
"23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n",
|
|
"23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n",
|
|
"23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.\n",
|
|
"23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.\n",
|
|
"23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.\n",
|
|
"23/10/19 06:47:52 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.\n",
|
|
"/usr/local/lib/python3.9/dist-packages/pyspark/sql/context.py:113: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.\n",
|
|
" warnings.warn(\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"### DO NOT MODIFY THIS CELL ###\n",
|
|
"sc = pyspark.SparkContext(appName=\"HW3-Q1\")\n",
|
|
"sqlContext = SQLContext(sc)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "d68ae314",
|
|
"metadata": {},
|
|
"source": [
|
|
"Define function for loading data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"id": "7e5bbdda",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"### DO NOT MODIFY THIS CELL ###\n",
|
|
"def load_data():\n",
|
|
" df = sqlContext.read.option(\"header\",True) \\\n",
|
|
" .csv(\"yellow_tripdata_2019-01_short.csv\")\n",
|
|
" return df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "0d52409d",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Q1.a"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "e43f6e00",
|
|
"metadata": {},
|
|
"source": [
|
|
"Perform data casting to clean incoming dataset"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "11f801b4",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def clean_data(df):\n",
|
|
" '''\n",
|
|
" input: df a dataframe\n",
|
|
" output: df a dataframe with the all the original columns\n",
|
|
" '''\n",
|
|
" \n",
|
|
" # START YOUR CODE HERE ---------\n",
|
|
" from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType\n",
|
|
"\n",
|
|
" df = df.withColumn(\"passenger_count\", df[\"passenger_count\"].cast(IntegerType()))\n",
|
|
" df = df.withColumn(\"total_amount\", df[\"total_amount\"].cast(FloatType()))\n",
|
|
" df = df.withColumn(\"tip_amount\", df[\"tip_amount\"].cast(FloatType()))\n",
|
|
" df = df.withColumn(\"trip_distance\", df[\"trip_distance\"].cast(FloatType()))\n",
|
|
" df = df.withColumn(\"fare_amount\", df[\"fare_amount\"].cast(FloatType()))\n",
|
|
" df = df.withColumn(\"tpep_pickup_datetime\", df[\"tpep_pickup_datetime\"].cast(TimestampType()))\n",
|
|
" df = df.withColumn(\"tpep_dropoff_datetime\", df[\"tpep_dropoff_datetime\"].cast(TimestampType()))\n",
|
|
"\n",
|
|
" # END YOUR CODE HERE -----------\n",
|
|
" \n",
|
|
" return df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "d4f565d0",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Q1.b"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "72b4f712",
|
|
"metadata": {},
|
|
"source": [
|
|
"Find rate per person for based on how many passengers travel between pickup and dropoff locations. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "4e115152",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def common_pair(df):\n",
|
|
" '''\n",
|
|
" input: df a dataframe\n",
|
|
" output: df a dataframe with following columns:\n",
|
|
" - PULocationID\n",
|
|
" - DOLocationID\n",
|
|
" - passenger_count\n",
|
|
" - per_person_rate\n",
|
|
" \n",
|
|
" per_person_rate is the total_amount per person for a given pair.\n",
|
|
" \n",
|
|
" '''\n",
|
|
" \n",
|
|
" # START YOUR CODE HERE ---------\n",
|
|
"\n",
|
|
" \n",
|
|
" \n",
|
|
" # END YOUR CODE HERE -----------\n",
|
|
" \n",
|
|
" return df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "127574ab",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Q1.c"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "36a8fd27",
|
|
"metadata": {},
|
|
"source": [
|
|
"Find trips which trip distances generate the highest tip percentage."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"id": "376c981c",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def distance_with_most_tip(df):\n",
|
|
" '''\n",
|
|
" input: df a dataframe\n",
|
|
" output: df a dataframe with following columns:\n",
|
|
" - trip_distance\n",
|
|
" - tip_percent\n",
|
|
" \n",
|
|
" trip_percent is the percent of tip out of fare_amount\n",
|
|
" '''\n",
|
|
" \n",
|
|
" # START YOUR CODE HERE ---------\n",
|
|
"\n",
|
|
" # END YOUR CODE HERE -----------\n",
|
|
" \n",
|
|
" return df\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "f0172fe6",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Q1.d"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "4613c906",
|
|
"metadata": {},
|
|
"source": [
|
|
"Determine the average speed at different times of day."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"id": "abff9e24",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def time_with_most_traffic(df):\n",
|
|
" '''\n",
|
|
" input: df a dataframe\n",
|
|
" output: df a dataframe with following columns:\n",
|
|
" - time_of_day\n",
|
|
" - am_avg_speed\n",
|
|
" - pm_avg_speed\n",
|
|
" \n",
|
|
" trip_percent is the percent of tip out of fare_amount\n",
|
|
" \n",
|
|
" '''\n",
|
|
" \n",
|
|
" # START YOUR CODE HERE ---------\n",
|
|
" from pyspark.sql.functions import date_format\n",
|
|
" from pyspark.sql import Window\n",
|
|
" from pyspark.sql.functions import ceil,sum\n",
|
|
" df_q1d = df.select('*')\n",
|
|
" \n",
|
|
" # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. \n",
|
|
" \n",
|
|
" df_q1d = df_q1d.withColumn('time_of_day_str',date_format(col(\"tpep_pickup_datetime\"),\"K\"))\n",
|
|
" df_q1d = df_q1d.withColumn('12_hour_time', col('time_of_day_str').cast(\"int\"))\n",
|
|
" \n",
|
|
" df_q1d = df_q1d.withColumn('AM_PM',date_format(col(\"tpep_pickup_datetime\"),\"a\"))\n",
|
|
" df_q1d = df_q1d.withColumn('trip_time_h',(col(\"tpep_dropoff_datetime\").cast(\"long\") - col('tpep_pickup_datetime').cast(\"long\"))/3600)\n",
|
|
" \n",
|
|
" # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. \n",
|
|
" # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.\n",
|
|
" \n",
|
|
" partition_cols = ['12_hour_time','AM_PM']\n",
|
|
" group_by_result = df_q1d.groupBy(partition_cols).count()\n",
|
|
" \n",
|
|
" df_q1d = group_by_result.join(df_q1d, partition_cols)\n",
|
|
"\n",
|
|
" df_q1d = df_q1d.withColumn(\"trip_distance_sum\", sum(\"trip_distance\").over(Window.partitionBy(*partition_cols)))\n",
|
|
" df_q1d = df_q1d.withColumn(\"average_trip_distance\",col(\"trip_distance_sum\")/col(\"count\"))\n",
|
|
"\n",
|
|
" df_q1d = df_q1d.withColumn(\"trip_time_sum\", sum(\"trip_time_h\").over(Window.partitionBy(*partition_cols)))\n",
|
|
" df_q1d = df_q1d.withColumn(\"average_trip_time\",col(\"trip_time_sum\")/col(\"count\"))\n",
|
|
"\n",
|
|
" df_q1d = df_q1d.withColumn(\"average_speed\",col(\"average_trip_distance\")/col(\"average_trip_time\"))\n",
|
|
"\n",
|
|
" df_q1d = df_q1d.select(['12_hour_time','AM_PM','average_speed']).distinct()\n",
|
|
"\n",
|
|
" \n",
|
|
" df_q1d = df_q1d.groupBy('12_hour_time').pivot('AM_PM').sum(\"average_speed\")\n",
|
|
" df_q1d = df_q1d.withColumnRenamed(\"AM\",\"am_avg_speed\").withColumnRenamed(\"PM\",\"pm_avg_speed\").withColumnRenamed(\"12_hour_time\",\"time_of_day\")\n",
|
|
" df_q1d = df_q1d.orderBy(['time_of_day'], ascending=True)\n",
|
|
" # df.show()\n",
|
|
" \n",
|
|
" # df.select(['12_hour_time','AM_PM','trip_time_h','average_trip_time','trip_distance','average_trip_distance','average_speed']).show()\n",
|
|
" df = df_q1d\n",
|
|
" # END YOUR CODE HERE -----------\n",
|
|
" \n",
|
|
" return df\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "34cbd7b9",
|
|
"metadata": {},
|
|
"source": [
|
|
"### The below cells are for you to investigate your solutions and will not be graded\n",
|
|
"## Ensure they are commented out prior to submitting to Gradescope to avoid errors"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "bf9abefb",
|
|
"metadata": {
|
|
"scrolled": true
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----------+------------------+-------------------+\n",
|
|
"|time_of_day| am_avg_speed| pm_avg_speed|\n",
|
|
"+-----------+------------------+-------------------+\n",
|
|
"| 0| 9.377696196631234| NULL|\n",
|
|
"| 1|10.845483413697353| 5.125214305177561|\n",
|
|
"| 3| NULL| 0.0|\n",
|
|
"| 4| NULL| 0.0|\n",
|
|
"| 5| NULL| 0.5137660239764732|\n",
|
|
"| 6| NULL| 9.989847870647605|\n",
|
|
"| 7| NULL|0.18415305490417713|\n",
|
|
"| 8| NULL| 0.5183127622697896|\n",
|
|
"| 10| NULL| 0.6147483972627696|\n",
|
|
"| 11| NULL| 4.650958285207579|\n",
|
|
"+-----------+------------------+-------------------+\n",
|
|
"\n",
|
|
"checkint table:\n",
|
|
"First row : True True True\n",
|
|
"1 row : True True True\n",
|
|
"2 row : True True True\n",
|
|
"3 row : True True True\n",
|
|
"4 row : True True True\n",
|
|
"5 row : True True True\n",
|
|
"6 row : True True True\n",
|
|
"7 row : True True True\n",
|
|
"8 row : True True True\n",
|
|
"9 row : True True True\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df = load_data()\n",
|
|
"df = clean_data(df)\n",
|
|
"# common_pair(df)\n",
|
|
"# distance_with_most_tip(df)\n",
|
|
"# time_with_most_traffic(df)\n",
|
|
"\n",
|
|
"# # # q1b\n",
|
|
"# q1b_df = common_pair(df)\n",
|
|
"# # q1b_df.show()\n",
|
|
"# print(\"First row :\",q1b_df.collect()[0][0] == '239') \n",
|
|
"# print(\"First row :\",q1b_df.collect()[0][1] == '238') \n",
|
|
"# print(\"First row :\",q1b_df.collect()[0][2] == 62) \n",
|
|
"# print(\"First row :\",q1b_df.collect()[0][3] == 4.26274198870505) \n",
|
|
"\n",
|
|
"# print(\"4th row :\",q1b_df.collect()[4][0] == '148') \n",
|
|
"# print(\"4th row :\",q1b_df.collect()[4][1] == '79') \n",
|
|
"# print(\"4th row :\",q1b_df.collect()[4][2] == 42) \n",
|
|
"# print(\"4th row :\",q1b_df.collect()[4][3] == 4.711904752822149) \n",
|
|
"\n",
|
|
"# print(\"9th row :\",q1b_df.collect()[9][0] == '79') \n",
|
|
"# print(\"9th row :\",q1b_df.collect()[9][1] == '170') \n",
|
|
"# print(\"9th row :\",q1b_df.collect()[9][2] == 34) \n",
|
|
"# print(\"9th row :\",q1b_df.collect()[9][3] == 6.394705884596881) \n",
|
|
"\n",
|
|
"# # # q1c\n",
|
|
"# q1c_df = distance_with_most_tip(df)\n",
|
|
"# # q1c_df.show()\n",
|
|
"# print(\"First row :\",q1c_df.collect()[0][0] == 1) \n",
|
|
"# print(\"First row :\",q1c_df.collect()[0][1] == 17.129815971513313) \n",
|
|
"\n",
|
|
"# print(\"Mid row :\",q1c_df.collect()[6][0] == 5) \n",
|
|
"# print(\"Mid row :\",q1c_df.collect()[6][1] == 14.245405861990653) \n",
|
|
"\n",
|
|
"# print(\"Last row :\",q1c_df.collect()[14][0] == 18) \n",
|
|
"# print(\"Last row :\",q1c_df.collect()[14][1] == 10.696822158448429) \n",
|
|
"\n",
|
|
"# # # q1d\n",
|
|
"q1d_df = time_with_most_traffic(df)\n",
|
|
"q1d_df.show()\n",
|
|
"print('checkint table:')\n",
|
|
"print(\"First row :\",q1d_df.collect()[0][0] == 0 ,q1d_df.collect()[0][1] == 9.377696196631234, q1d_df.collect()[0][2] == None) \n",
|
|
"print(\"1 row :\" ,q1d_df.collect()[1][0] == 1 ,q1d_df.collect()[1][1] == 10.845483413697353, q1d_df.collect()[1][2] == 5.125214305177561) \n",
|
|
"print(\"2 row :\" ,q1d_df.collect()[2][0] == 3 ,q1d_df.collect()[2][1] == None, q1d_df.collect()[2][2] == 0.0) \n",
|
|
"print(\"3 row :\" ,q1d_df.collect()[3][0] == 4 ,q1d_df.collect()[3][1] == None, q1d_df.collect()[3][2] == 0.0) \n",
|
|
"print(\"4 row :\" ,q1d_df.collect()[4][0] == 5 ,q1d_df.collect()[4][1] == None, q1d_df.collect()[4][2] == 0.5137660239764732) \n",
|
|
"print(\"5 row :\" ,q1d_df.collect()[5][0] == 6 ,q1d_df.collect()[5][1] == None, q1d_df.collect()[5][2] == 9.989847870647605) \n",
|
|
"print(\"6 row :\" ,q1d_df.collect()[6][0] == 7 ,q1d_df.collect()[6][1] == None, q1d_df.collect()[6][2] == 0.18415305490417713) \n",
|
|
"print(\"7 row :\" ,q1d_df.collect()[7][0] == 8 ,q1d_df.collect()[7][1] == None, q1d_df.collect()[7][2] == 0.5183127622697896) \n",
|
|
"print(\"8 row :\" ,q1d_df.collect()[8][0] == 10 ,q1d_df.collect()[8][1] == None, q1d_df.collect()[8][2] == 0.6147483972627696) \n",
|
|
"print(\"9 row :\" ,q1d_df.collect()[9][0] == 11 ,q1d_df.collect()[9][1] == None, q1d_df.collect()[9][2] == 4.650958285207579) \n"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.2"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|