In [1]:
%rm -rf google_output.txt

In [2]:
import os,sys, csv

In [3]:
# column from csv file
# COL_DATE: the day of trading
# COL_OPEN: the stock price at the beginning of the trading day
# COL_HIGH: the highest price the stock achieved on the trading day
# COL_LOW: the lowest price the stock achieved on the trading day
# COL_CLOSE: the stock price at the end of the trading day
# COL_ADJ_Close: the adjusted closing price of the trading day (reflecting the stock’s value after accounting for any corporate actions like dividends, stock splits and new stock offerings)
# COL_VOLUME: the total number of shares were traded on the trading day
COL_DATE=0
COL_OPEN=1
COL_HIGH=2
COL_LOW=3
COL_CLOSE=4
COL_ADJ_CLOSE=5
COL_VOLUME=6

# append at middle stage
COL_TOTAL_SALE_OF_DAY=7
COL_MONTH_ONLY=8
COL_EMA=9

# monthly_averages_list
COL_MONTHLY_AVERAGE_PRICE=1
COL_EMA=2

In [4]:
# get_data_list(csv_file_name)
# This function has one parameter, namely csv_file_name. 
# When the function is called, you need to pass along a CSV file name which is used inside the function to open and read the CSV
# file. 
# After reading each row, it will be split into a list. The list will then be appended into a main
# list (a list of lists), namely data_list. The data_list will be returned at the end of the
# function.
def get_data_list(csv_file_name):
  '''read data list from csv file'''
  data_list = []
  try:
    with open(csv_file_name, newline='') as csvfile:
      temp = []
      temp = csv.reader(csvfile, delimiter=',', quotechar='"')
      data_list = list(temp)
    
    return data_list
  except Exception as e:
    print('error during reading csv file ')
    print('exitting...')
    sys.exit()

In [5]:
# get_monthly_averages(data_list)
# This function has one parameter, namely data_list. You need to pass the data_list
# generated by the get_data_list() function as the argument to this function and then
# calculate the monthly average prices of the stock. The average monthly prices are calculated in
# the following way. 
# 
# 1. Suppose the volume and adjusted closing price of a trading day are V1 and C1, respectively. 
# 2. The total sale of that day equals V1 x C1. 
# 3. Now, suppose the volume and adjusted closing price of another trading day are V2 and C2, respectively. 
# 4. The average of these two trading days is the sum of the total sales divided by the total volume:
# 
#                        Average price = (V1 x C1 + V2 x C2) / (V1 + V2)
# 
# To average a whole month, you need to 
#   - add up the total sales (V1 x C1 + V2 x C2 + ... + Vn x Cn) for each day and 
#   - divide it by the sum of all volumes (V1 + V2 + ... + Vn) where n is the number of trading days in the month.
# A tuple with 2 items, including the date (year and month only) and the average for that month,
# will be generated for each month. The tuple for each month will be appended to a main list,
# namely monthly_averages_list. The monthly_averages_list will be returned at the end of the function.

def get_monthly_averages(data_list):
  '''calculate the monthly average prices of the stock'''

  monthly_averages_list=[]
  data_list_data_only = data_list[1:]
  month_available = []
  
  # data cleaning
  for i in range(len(data_list_data_only)):
    # V1 x C1, calculate the total sale, append into column
    data_list_data_only[i].append(float(data_list_data_only[i][COL_VOLUME]) * float(data_list_data_only[i][COL_ADJ_CLOSE]))

    # mark the row by YYYY-MM for easy monthly sum calculation, COL_MONTH_ONLY
    data_list_data_only[i].append(data_list_data_only[i][COL_DATE][0:7])

  # get the month in the list YYYY-MM
  month_available = set(list(map(lambda x: x[COL_MONTH_ONLY], data_list_data_only)))

  # literate the whole list, calculate the total_sale and total volume
  # get the average sale by total_sale / total_volume
  for month in sorted(month_available):
    filtered_month = list(filter(lambda x: x[COL_MONTH_ONLY] == month, data_list_data_only))
    total_sale = sum(list( map(lambda x: x[COL_TOTAL_SALE_OF_DAY], filtered_month)))
    total_volume = sum(list( map(lambda x: float(x[COL_VOLUME]), filtered_month)))
    monthly_averages_list.append([month, total_sale/total_volume])

  return list(monthly_averages_list)

In [6]:
# get_moving_averages(monthly_averages_list)
# This function has one parameter, namely monthly_averages_list. You need to pass the
# monthly_averages_list generated by get_monthly_averages() as the argument
# to this function and then calculate the 5-month exponential moving average (EMA) stock prices.
# In general, the EMA for a particular month can be calculated by the following formula:
# 
#     EMA = (Monthly average price – previous month’s EMA) x smoothing constant + previous month’s EMA
# 
# where
# 
#     smoothing constant = 2 / (number of time periods in months + 1)
# 
# Initial SMA = 20-period sum / 20
# Multiplier = (2 / (Time periods + 1) ) = (2 / (20 + 1) ) = 0.0952(9.52%)
# EMA = {Close – EMA(previous day)} x multiplier + EMA(previous day).
def get_moving_averages(monthly_averages_list):
  '''
    get moving averages from montyly_average_list
    input:
    [ [YYYY-MM, monthly average price],
      [YYYY-MM, monthly average price],
      ...]

    output: 
    [ [YYYY-MM, monthly average price, EMA],
      [YYYY-MM, monthly average price, EMA],
      ...]
  '''

  # by ref, the first 5 month EMA were given by SMA
  monthly_averages_list[0].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)
  monthly_averages_list[1].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)
  monthly_averages_list[2].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)
  monthly_averages_list[3].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)
  monthly_averages_list[4].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)

  # smoothing constant = 2 / (number of time periods in months + 1)
  smoothing_constant = 2 / (5 + 1)

  # main loop to calculate EMA, start from the 6th month available till the end of the list
  for i in range(5, len(monthly_averages_list)):
    previous_month_EMA = monthly_averages_list[i-1][2]
    Monthly_average_price = monthly_averages_list[i][1]

    EMA = (Monthly_average_price - previous_month_EMA) * smoothing_constant + previous_month_EMA
    monthly_averages_list[i].append(EMA)

  return monthly_averages_list


In [7]:
def format_date_string(yyyy_mm):
  '''rearrange date string from csv file YYYY-MM => MM-YYYY'''
  [yyyy, mm] = yyyy_mm.split('-')
  return '-'.join([mm, yyyy])


In [8]:
def write_output_file(filename_to_write, monthly_averages_list_w_ema, report_name):
  '''get output string from template and write to output file
  input:
    filename_to_write: txt file name with path to be written to
    monthly_averages_list_w_ema: list provided with EMA
    report_name: report name to be written to report
  '''

  RESULT_TEMPLATE='''
# The best month for ^report_name^:
# ^best_month^, ^best_EMA^

# The worst month for ^report_name^:
# ^worst_month^, ^worst_EMA^
  '''.strip()

  # get the max EMA of the list
  best_EMA = max(map(lambda x: x[2], monthly_averages_list_w_ema[5:]))
  # get the month(s) by the EMA wanted
  best_months = list(map(lambda x: format_date_string(x[0]), filter(lambda x: x[2] == best_EMA, monthly_averages_list_w_ema[5:])))

  # get the min(worst) EMA of the list
  worst_EMA = min(map(lambda x: x[2], monthly_averages_list_w_ema[5:]))
  # get the month(s) by the EMA wanted
  worst_months = list(map(lambda x: format_date_string(x[0]), filter(lambda x: x[2] == worst_EMA, monthly_averages_list_w_ema[5:])))

  # assemble the output string
  result_string = RESULT_TEMPLATE
  result_string = result_string\
    .replace('^best_month^', ','.join(best_months))\
    .replace('^best_EMA^', str('%.2f' % best_EMA))\
    .replace('^worst_month^', ','.join(worst_months))\
    .replace('^worst_EMA^', str('%.2f' % worst_EMA)) \
    .replace('^report_name^', report_name) 

  # write output file
  with open(filename_to_write, 'w+') as file_write:
    file_write.truncate(0)
    file_write.writelines(result_string)

In [9]:
def main():
    # Main function starts here

    print('start')

    # gather csv file with path from user
    input_filename = input("Please input a csv filename: ")
    
    csv_filename = os.path.basename(input_filename)
    csv_path = os.path.dirname(input_filename)

    # transform to the output file path by csv file name got
    txt_filename = csv_filename.replace('.csv','_output.txt')
    if (csv_path !=''):
        txt_filename = '/'.join([csv_path, txt_filename])
    else:
        txt_filename = '/'.join(['.', txt_filename])
    
    # grep the corp_name from the filename google.csv => google
    corp_name = os.path.basename(input_filename).split('.')[0]

    # process the data_list by csv file as stateed in assignment
    print(f'processing {csv_filename}')
    csv_list=get_data_list(input_filename)
    monthly_averages_list = get_monthly_averages(csv_list)
    monthly_averages_list_w_EMA = get_moving_averages(monthly_averages_list)

    # write output file
    write_output_file(txt_filename, monthly_averages_list_w_EMA, corp_name)
    print('wrote to {file} done'.format(file = txt_filename))


In [None]:
if __name__ == "__main__":
    main()

start


In [None]:
%ls

In [None]:
%cat google_output.txt