# CSE6242 - HW3 - Q1

Pyspark Imports

In [1]:
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/20 05:31:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/10/20 05:31:40 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.types import StructField, StructType, IntegerType, TimestampType, FloatType, StringType

    df = df.withColumn("passenger_count", df["passenger_count"].cast(IntegerType()))
    df = df.withColumn("total_amount", df["total_amount"].cast(FloatType()))
    df = df.withColumn("tip_amount", df["tip_amount"].cast(FloatType()))
    df = df.withColumn("trip_distance", df["trip_distance"].cast(FloatType()))
    df = df.withColumn("fare_amount", df["fare_amount"].cast(FloatType()))
    df = df.withColumn("tpep_pickup_datetime", df["tpep_pickup_datetime"].cast(TimestampType()))
    df = df.withColumn("tpep_dropoff_datetime", df["tpep_dropoff_datetime"].cast(TimestampType()))
    # END YOUR CODE HERE -----------
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import Window
    from pyspark.sql.functions import sum

    partition_cols = ['PULocationID','DOLocationID']
    group_by_result = df.groupBy(partition_cols).count()
    df = df.filter((df.PULocationID != df.DOLocationID))
    df = df.withColumn("passenger_count_sum", sum("passenger_count").over(Window.partitionBy(*partition_cols)))
    df = df.withColumn("total_amount_sum", sum("total_amount").over(Window.partitionBy(*partition_cols)))
    df = df.withColumn("per_person_rate", col("total_amount_sum")/col("passenger_count_sum"))
    df = df.select(
        ['PULocationID','DOLocationID', 'passenger_count_sum','per_person_rate']
    ).distinct().orderBy(['passenger_count_sum','per_person_rate'], ascending=False).limit(10)
    df = df.withColumnRenamed('passenger_count_sum', 'passenger_count')
    # END YOUR CODE HERE -----------
    
    return df

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [6]:
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql import Window
    from pyspark.sql.functions import col, ceil, sum, max
    
    df = df.where((df.fare_amount > 2) & (df.trip_distance > 0))
    
    # # Calculate the tip percent (tip_amount * 100 / fare_amount) for each trip. 
    df = df.withColumn("tip_percent", (col('tip_amount') * 100) / col('fare_amount'))
    
    # # Round all trip distances up to the closest mile and find the average tip_percent for each trip_distance.
    df = df.withColumn("trip_distance_round_up", ceil(col('trip_distance')).cast('decimal(8,0)'))

    partition_cols = ['trip_distance_round_up']
    group_by_result = df.groupBy(partition_cols).count()
    
    df = df.withColumn("tip_percent_sum", sum("tip_percent").over(Window.partitionBy(*partition_cols)))
    df_joined  = group_by_result.join(df, partition_cols)
    df = df_joined.withColumn("average_tip_percent", col('tip_percent_sum') / col('count'))
    
    df = df.select(['trip_distance_round_up','tip_percent_sum','count','average_tip_percent']).distinct()
    df = df.select(['trip_distance_round_up','average_tip_percent'])
    df = df.withColumnRenamed("average_tip_percent","tip_percent").withColumnRenamed("trip_distance_round_up","trip_distance")
    df = df.orderBy(['tip_percent'], ascending=False).limit(15)
    # END YOUR CODE HERE -----------
    
    return df


### Q1.d

Determine the average speed at different times of day.

In [7]:
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import date_format
    from pyspark.sql import Window
    from pyspark.sql.functions import ceil,sum

    # You will be modifying the function time_with_most_traffic to determine which hour of the day has the most traffic. 
    
    df = df.withColumn('time_of_day_str', date_format(col("tpep_pickup_datetime"),"K"))
    df = df.withColumn('12_hour_time', col('time_of_day_str').cast("int"))
    
    df = df.withColumn('AM_PM', date_format(col("tpep_pickup_datetime"),"a"))
    df = df.withColumn('trip_time_h',(col("tpep_dropoff_datetime") - col('tpep_pickup_datetime')  ).cast("long")/3600)
    
    # Calculate the traffic for a particular hour using the average speed of all taxi trips which began during that hour. 
    # Calculate the average speed as the average trip_distance divided by the average trip time, as distance per hour.
    
    partition_cols = ['12_hour_time','AM_PM']
    group_by_result = df.groupBy(partition_cols).count()
    df  = group_by_result.join(df, partition_cols)
    
    df = df.withColumn("trip_distance_sum", sum("trip_distance").over(Window.partitionBy(*partition_cols)))
    df = df.withColumn("average_trip_distance", col('trip_distance_sum') / col('count'))
    
    df = df.withColumn("trip_time_h_sum", sum("trip_time_h").over(Window.partitionBy(*partition_cols)))
    df = df.withColumn("average_trip_time", col('trip_time_h_sum') / col('count'))
    
    df = df.withColumn("average_speed", col('average_trip_distance') / col('average_trip_time'))
    # df.select(['VendorID','12_hour_time','AM_PM','trip_time_h_sum','average_trip_time','average_trip_distance','average_speed','count']).show()
    
    df = df.select(['12_hour_time','AM_PM','average_speed']).distinct()
    df = df.groupBy('12_hour_time').pivot('AM_PM').sum("average_speed")
    df = df.withColumnRenamed("AM","am_avg_speed").withColumnRenamed("PM","pm_avg_speed").withColumnRenamed("12_hour_time","time_of_day")
    df = df.orderBy(['time_of_day'])
    # END YOUR CODE HERE -----------
    
    return df


### The below cells are for you to investigate your solutions and will not be graded
## Ensure they are commented out prior to submitting to Gradescope to avoid errors

In [8]:
df = load_data()
df = clean_data(df)

common_pair(df).show()
distance_with_most_tip(df).show()
time_with_most_traffic(df).show()


+------------+------------+---------------+------------------+
|PULocationID|DOLocationID|passenger_count|   per_person_rate|
+------------+------------+---------------+------------------+
|         239|         238|             62|  4.26274198870505|
|         237|         236|             60| 4.482500068346659|
|         263|         141|             52|3.4190384974846473|
|         161|         236|             42| 5.368571440378825|
|         148|          79|             42| 4.711904752822149|
|         142|         238|             39|  5.05487182812813|
|         141|         236|             37| 4.355675723101641|
|         239|         143|             37| 4.252162224537617|
|         239|         142|             35| 3.817714350564139|
|          79|         170|             34| 6.394705884596881|
+------------+------------+---------------+------------------+

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|           