# CSE6242 - HW3 - Q1

Pyspark Imports

In [1]:
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce
from pyspark.sql.functions import *

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/18 13:38:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType

    df = df.withColumn("passenger_count", df["passenger_count"].cast(IntegerType()))
    df = df.withColumn("total_amount", df["total_amount"].cast(FloatType()))
    df = df.withColumn("tip_amount", df["tip_amount"].cast(FloatType()))
    df = df.withColumn("trip_distance", df["trip_distance"].cast(FloatType()))
    df = df.withColumn("fare_amount", df["fare_amount"].cast(FloatType()))
    df = df.withColumn("tpep_pickup_datetime", df["tpep_pickup_datetime"].cast(TimestampType()))
    df = df.withColumn("tpep_dropoff_datetime", df["tpep_dropoff_datetime"].cast(TimestampType()))

    # END YOUR CODE HERE -----------
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [6]:
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import Window

    partition_cols = ['PULocationID','DOLocationID']

    group_by_result = df.groupBy(partition_cols).count()
    # group_by_result.show()

    # Filter out any trips that have the same pick-up and drop-off location. 
    df_temp = df.filter((df.PULocationID != df.DOLocationID))
    # group_by_result_difference_location.show()

    # # [4 pts] You will be modifying the function common_pair. 
    # # Return the top 10 pickup-dropoff location pairs that have the highest number of total passengers who have traveled between them. 
    # # Sort the location pairs by total passengers. 
    df_temp = df_temp.withColumn("passenger_count", sum("passenger_count").over(Window.partitionBy(*partition_cols)))
    
    # # For each location pair, also compute 
    #     # the average amount per passenger over all trips (name this per_person_rate), utilizing total_amount.
    df_temp = df_temp.withColumn("total_amount_partition", sum("total_amount").over(Window.partitionBy(*partition_cols)))
    df_temp = df_temp.withColumn("per_person_rate",col("total_amount_partition")/col("passenger_count"))
    
    # # For pairs with the same total passengers, 
    #     # sort them in descending order of per_person_rate.
    # # Rename the column for total passengers to passenger_count. 
    df_temp = df_temp.select(['PULocationID','DOLocationID','passenger_count','per_person_rate']).distinct()
    df_joined  = group_by_result.join(df_temp, partition_cols)
    df_joined = df_joined.orderBy(['passenger_count','per_person_rate'], ascending=False).limit(10)
    df_output = df_joined.drop('count')
    # END YOUR CODE HERE -----------

    return df_output


### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [8]:
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    # You will be modifying the function distance_with_most_tip . 
    # Filter the data for trips having fares (fare_amount) 
        # greater than $2.00 and 
        # a trip distance (trip_distance) greater than 0. 
    filtered_df = df.where((df.fare_amount > 2) & (df.trip_distance > 0))
        
    # Calculate the tip percent (tip_amount * 100 / fare_amount) for each trip. 
    # Round all trip distances up to the closest mile and find the average tip_percent for each trip_distance.
    filtered_df = filtered_df.groupBy('trip_distance').agg((mean('tip_amount')*100)/mean('fare_amount'))
    
    # Sort the result in descending order of tip_percent to obtain the top 15 trip distances which tip the most generously. 
    # Rename 
    #   the column for rounded trip distances to trip_distance, and 
    #   the column for average tip percents tip_percent . 
    new_cols = ['trip_distance', 'tip_percent']
    df = filtered_df.toDF(*new_cols) 
    df = df.sort(df.tip_percent.desc()).limit(15)

    # END YOUR CODE HERE -----------
    
    return df

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|          0.1| 59.45578229670622|
|         2.58| 59.33333331463384|
|         4.69|54.054054054054056|
|         3.46|51.736263652424235|
|         5.44|43.499999046325684|
|         6.39|40.341465647627665|
|         8.58| 38.83333206176758|
|         9.27|36.328358436698345|
|         2.17| 36.04761872972761|
|        13.05| 35.88888910081651|
|         4.53|              35.0|
|         0.26| 34.28571564810617|
|         6.66| 34.19047650836763|
|         3.82| 33.53043514749278|
|         2.99| 33.39130360147227|
+-------------+------------------+



### Q1.d

Determine the average speed at different times of day.

In [9]:
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    # from pyspark.sql.functions import dayofweek,date_format, unix_timestamp,format_number
    # from pyspark.sql.types import DecimalType
    # from pyspark.sql import Window
    
    # # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. 
    
    # df = df.withColumn('time_of_day',date_format(col("tpep_pickup_datetime"),"K"))
    # df = df.withColumn('AM_PM',date_format(col("tpep_pickup_datetime"),"a"))
    # df = df.withColumn('diff_in_seconds',col("tpep_dropoff_datetime").cast("long") - col('tpep_pickup_datetime').cast("long"))

    # partition_cols = ['time_of_day','AM_PM']
    
    # # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. 
    # # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.
    # group_by_result = df.groupBy(partition_cols).count()
    
    # df  = group_by_result.join(df, partition_cols)
    
    # df = df.withColumn("trip_distance_sum", sum("trip_distance").over(Window.partitionBy(*partition_cols)))
    # df = df.withColumn("average_trip_distance",col("trip_distance_sum")/col("count"))

    # df = df.withColumn("trip_time_sum_s", sum("diff_in_seconds").over(Window.partitionBy(*partition_cols)))
    # df = df.withColumn("average_trip_time_s",col("trip_time_sum_s")/col("count"))

    # df = df.withColumn("average_speed",col("average_trip_distance")/col("average_trip_time_s"))

    # df = df.select(['time_of_day','AM_PM','average_speed']).distinct()
    # df  = group_by_result.join(df, partition_cols)
    
    # # A day with low average speed indicates high levels of traffic. 
    # # The average speed may be 0, indicating very high levels of traffic.

    # # Additionally, you must separate the hours into AM and PM, with hours 0:00-11:59 being AM, and hours 12:00-23:59 being PM. 
    # # Convert these times to the 12 hour time, so you can match the below output. 
    # # For example, 
    #   # the row with 1 as time of day, 
    #     # should show the average speed between 1 am and 2 am in the am_avg_speed column, 
    #     # and between 1 pm and 2pm in the pm_avg_speed column.

    # # Use date_format along with the appropriate pattern letters to format the time of day so that it matches the example output below. 
    
    # # Your final table should 
    #   # contain values sorted from 0-11 for time_of_day. 
    #   # There may be data missing for a time of day, and it may be null for am_avg_speed or pm_avg_speed. 
    #   # If an hour has no data for am or pm, there may be missing rows. 
    #     # Do not include any additional rows for times of day which are not represented in the data. 

    # df = df.select(['time_of_day','AM_PM','average_speed']).limit(15)
    
    # from pyspark.sql import SparkSession
    # spark = SparkSession.builder.getOrCreate()

    # time_data = [
    #     [0,None,None],
    #     [1,None,None],
    #     [2,None,None],
    #     [3,None,None],
    #     [4,None,None],
    #     [5,None,None],
    #     [6,None,None],
    #     [7,None,None],
    #     [8,None,None],
    #     [9,None,None],
    #     [10,None,None],
    #     [11,None,None]
    # ]
                                  
    # for i in range(0,len(time_data)):
    #     time_int_wanted = time_data[i][0]
    
    #     for j in range(0, len(df.collect())):
    #         time_int = df.collect()[j][0]
    #         AM_PM = df.collect()[j][1]
    #         average_speed = df.collect()[j][2]

    #         if (time_int == str(time_int_wanted)):
    #             if (AM_PM =="AM"):
    #                 time_data[int(time_int)][1] = float(average_speed)
    #             if (AM_PM =="PM"):
    #                 time_data[int(time_int)][2] = float(average_speed)

    # df_output = spark.createDataFrame(data=time_data, schema=["time_of_day","am_avg_speed","pm_avg_speed"])
    # END YOUR CODE HERE -----------
    
    return df_output

+-----------+--------------------+--------------------+
|time_of_day|        am_avg_speed|        pm_avg_speed|
+-----------+--------------------+--------------------+
|          0|0.002604915610175...|                NULL|
|          1|0.003012634281582599|0.001423670640327...|
|          2|                NULL|                NULL|
|          3|                NULL|                 0.0|
|          4|                NULL|                 0.0|
|          5|                NULL|1.427127844379092E-4|
|          6|                NULL|0.002774957741846557|
|          7|                NULL|5.115362636227142E-5|
|          8|                NULL|1.439757672971638E-4|
|          9|                NULL|                NULL|
|         10|                NULL|1.707634436841026...|
|         11|                NULL|0.001291932857002...|
+-----------+--------------------+--------------------+



### The below cells are for you to investigate your solutions and will not be graded
## Ensure they are commented out prior to submitting to Gradescope to avoid errors

In [10]:
# df = load_data()
# df = clean_data(df)

In [11]:
# common_pair(df).show()

In [12]:
# distance_with_most_tip(df).show()

In [13]:
# time_with_most_traffic(df).show()