Files
004_comission/hyhl_1022/jupyter/jupyter-helloworld/1st_copy.ipynb
louiscklaw 3ce4f45857 update,
2025-01-31 21:34:27 +08:00

373 lines
13 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "83041d33",
"metadata": {},
"outputs": [],
"source": [
"%rm -rf google_output.txt"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "9b24b4df",
"metadata": {},
"outputs": [],
"source": [
"import os,sys, csv"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "c2b90953",
"metadata": {},
"outputs": [],
"source": [
"# column from csv file\n",
"# COL_DATE: the day of trading\n",
"# COL_OPEN: the stock price at the beginning of the trading day\n",
"# COL_HIGH: the highest price the stock achieved on the trading day\n",
"# COL_LOW: the lowest price the stock achieved on the trading day\n",
"# COL_CLOSE: the stock price at the end of the trading day\n",
"# COL_ADJ_Close: the adjusted closing price of the trading day (reflecting the stocks value after accounting for any corporate actions like dividends, stock splits and new stock offerings)\n",
"# COL_VOLUME: the total number of shares were traded on the trading day\n",
"COL_DATE=0\n",
"COL_OPEN=1\n",
"COL_HIGH=2\n",
"COL_LOW=3\n",
"COL_CLOSE=4\n",
"COL_ADJ_CLOSE=5\n",
"COL_VOLUME=6\n",
"\n",
"# append at middle stage\n",
"COL_TOTAL_SALE_OF_DAY=7\n",
"COL_MONTH_ONLY=8\n",
"COL_EMA=9\n",
"\n",
"# monthly_averages_list\n",
"COL_MONTHLY_AVERAGE_PRICE=1\n",
"COL_EMA=2"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "09a9417f",
"metadata": {},
"outputs": [],
"source": [
"# get_data_list(csv_file_name)\n",
"# This function has one parameter, namely csv_file_name. \n",
"# When the function is called, you need to pass along a CSV file name which is used inside the function to open and read the CSV\n",
"# file. \n",
"# After reading each row, it will be split into a list. The list will then be appended into a main\n",
"# list (a list of lists), namely data_list. The data_list will be returned at the end of the\n",
"# function.\n",
"def get_data_list(csv_file_name):\n",
" '''read data list from csv file'''\n",
" data_list = []\n",
" try:\n",
" with open(csv_file_name, newline='') as csvfile:\n",
" temp = []\n",
" temp = csv.reader(csvfile, delimiter=',', quotechar='\"')\n",
" data_list = list(temp)\n",
" \n",
" return data_list\n",
" except Exception as e:\n",
" print('error during reading csv file ')\n",
" print('exitting...')\n",
" sys.exit()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "cd616e6e",
"metadata": {},
"outputs": [],
"source": [
"# get_monthly_averages(data_list)\n",
"# This function has one parameter, namely data_list. You need to pass the data_list\n",
"# generated by the get_data_list() function as the argument to this function and then\n",
"# calculate the monthly average prices of the stock. The average monthly prices are calculated in\n",
"# the following way. \n",
"# \n",
"# 1. Suppose the volume and adjusted closing price of a trading day are V1 and C1, respectively. \n",
"# 2. The total sale of that day equals V1 x C1. \n",
"# 3. Now, suppose the volume and adjusted closing price of another trading day are V2 and C2, respectively. \n",
"# 4. The average of these two trading days is the sum of the total sales divided by the total volume:\n",
"# \n",
"# Average price = (V1 x C1 + V2 x C2) / (V1 + V2)\n",
"# \n",
"# To average a whole month, you need to \n",
"# - add up the total sales (V1 x C1 + V2 x C2 + ... + Vn x Cn) for each day and \n",
"# - divide it by the sum of all volumes (V1 + V2 + ... + Vn) where n is the number of trading days in the month.\n",
"# A tuple with 2 items, including the date (year and month only) and the average for that month,\n",
"# will be generated for each month. The tuple for each month will be appended to a main list,\n",
"# namely monthly_averages_list. The monthly_averages_list will be returned at the end of the function.\n",
"\n",
"def get_monthly_averages(data_list):\n",
" '''calculate the monthly average prices of the stock'''\n",
"\n",
" monthly_averages_list=[]\n",
" data_list_data_only = data_list[1:]\n",
" month_available = []\n",
" \n",
" # data cleaning\n",
" for i in range(len(data_list_data_only)):\n",
" # V1 x C1, calculate the total sale, append into column\n",
" data_list_data_only[i].append(float(data_list_data_only[i][COL_VOLUME]) * float(data_list_data_only[i][COL_ADJ_CLOSE]))\n",
"\n",
" # mark the row by YYYY-MM for easy monthly sum calculation, COL_MONTH_ONLY\n",
" data_list_data_only[i].append(data_list_data_only[i][COL_DATE][0:7])\n",
"\n",
" # get the month in the list YYYY-MM\n",
" month_available = set(list(map(lambda x: x[COL_MONTH_ONLY], data_list_data_only)))\n",
"\n",
" # literate the whole list, calculate the total_sale and total volume\n",
" # get the average sale by total_sale / total_volume\n",
" for month in sorted(month_available):\n",
" filtered_month = list(filter(lambda x: x[COL_MONTH_ONLY] == month, data_list_data_only))\n",
" total_sale = sum(list( map(lambda x: x[COL_TOTAL_SALE_OF_DAY], filtered_month)))\n",
" total_volume = sum(list( map(lambda x: float(x[COL_VOLUME]), filtered_month)))\n",
" monthly_averages_list.append([month, total_sale/total_volume])\n",
"\n",
" return list(monthly_averages_list)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "dfe29847",
"metadata": {},
"outputs": [],
"source": [
"# get_moving_averages(monthly_averages_list)\n",
"# This function has one parameter, namely monthly_averages_list. You need to pass the\n",
"# monthly_averages_list generated by get_monthly_averages() as the argument\n",
"# to this function and then calculate the 5-month exponential moving average (EMA) stock prices.\n",
"# In general, the EMA for a particular month can be calculated by the following formula:\n",
"# \n",
"# EMA = (Monthly average price previous months EMA) x smoothing constant + previous months EMA\n",
"# \n",
"# where\n",
"# \n",
"# smoothing constant = 2 / (number of time periods in months + 1)\n",
"# \n",
"# Initial SMA = 20-period sum / 20\n",
"# Multiplier = (2 / (Time periods + 1) ) = (2 / (20 + 1) ) = 0.0952(9.52%)\n",
"# EMA = {Close EMA(previous day)} x multiplier + EMA(previous day).\n",
"def get_moving_averages(monthly_averages_list):\n",
" '''\n",
" get moving averages from montyly_average_list\n",
" input:\n",
" [ [YYYY-MM, monthly average price],\n",
" [YYYY-MM, monthly average price],\n",
" ...]\n",
"\n",
" output: \n",
" [ [YYYY-MM, monthly average price, EMA],\n",
" [YYYY-MM, monthly average price, EMA],\n",
" ...]\n",
" '''\n",
"\n",
" # by ref, the first 5 month EMA were given by SMA\n",
" monthly_averages_list[0].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)\n",
" monthly_averages_list[1].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)\n",
" monthly_averages_list[2].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)\n",
" monthly_averages_list[3].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)\n",
" monthly_averages_list[4].append(sum(map(lambda x: x[1], monthly_averages_list[0:5]))/5)\n",
"\n",
" # smoothing constant = 2 / (number of time periods in months + 1)\n",
" smoothing_constant = 2 / (5 + 1)\n",
"\n",
" # main loop to calculate EMA, start from the 6th month available till the end of the list\n",
" for i in range(5, len(monthly_averages_list)):\n",
" previous_month_EMA = monthly_averages_list[i-1][2]\n",
" Monthly_average_price = monthly_averages_list[i][1]\n",
"\n",
" EMA = (Monthly_average_price - previous_month_EMA) * smoothing_constant + previous_month_EMA\n",
" monthly_averages_list[i].append(EMA)\n",
"\n",
" return monthly_averages_list\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "c89cbae8",
"metadata": {},
"outputs": [],
"source": [
"def format_date_string(yyyy_mm):\n",
" '''rearrange date string from csv file YYYY-MM => MM-YYYY'''\n",
" [yyyy, mm] = yyyy_mm.split('-')\n",
" return '-'.join([mm, yyyy])\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "8d646beb",
"metadata": {},
"outputs": [],
"source": [
"def write_output_file(filename_to_write, monthly_averages_list_w_ema, report_name):\n",
" '''get output string from template and write to output file\n",
" input:\n",
" filename_to_write: txt file name with path to be written to\n",
" monthly_averages_list_w_ema: list provided with EMA\n",
" report_name: report name to be written to report\n",
" '''\n",
"\n",
" RESULT_TEMPLATE='''\n",
"# The best month for ^report_name^:\n",
"# ^best_month^, ^best_EMA^\n",
"\n",
"# The worst month for ^report_name^:\n",
"# ^worst_month^, ^worst_EMA^\n",
" '''.strip()\n",
"\n",
" # get the max EMA of the list\n",
" best_EMA = max(map(lambda x: x[2], monthly_averages_list_w_ema[5:]))\n",
" # get the month(s) by the EMA wanted\n",
" best_months = list(map(lambda x: format_date_string(x[0]), filter(lambda x: x[2] == best_EMA, monthly_averages_list_w_ema[5:])))\n",
"\n",
" # get the min(worst) EMA of the list\n",
" worst_EMA = min(map(lambda x: x[2], monthly_averages_list_w_ema[5:]))\n",
" # get the month(s) by the EMA wanted\n",
" worst_months = list(map(lambda x: format_date_string(x[0]), filter(lambda x: x[2] == worst_EMA, monthly_averages_list_w_ema[5:])))\n",
"\n",
" # assemble the output string\n",
" result_string = RESULT_TEMPLATE\n",
" result_string = result_string\\\n",
" .replace('^best_month^', ','.join(best_months))\\\n",
" .replace('^best_EMA^', str('%.2f' % best_EMA))\\\n",
" .replace('^worst_month^', ','.join(worst_months))\\\n",
" .replace('^worst_EMA^', str('%.2f' % worst_EMA)) \\\n",
" .replace('^report_name^', report_name) \n",
"\n",
" # write output file\n",
" with open(filename_to_write, 'w+') as file_write:\n",
" file_write.truncate(0)\n",
" file_write.writelines(result_string)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "1917aaef",
"metadata": {},
"outputs": [],
"source": [
"def main():\n",
" # Main function starts here\n",
"\n",
" print('start')\n",
"\n",
" # gather csv file with path from user\n",
" input_filename = input(\"Please input a csv filename: \")\n",
" \n",
" csv_filename = os.path.basename(input_filename)\n",
" csv_path = os.path.dirname(input_filename)\n",
"\n",
" # transform to the output file path by csv file name got\n",
" txt_filename = csv_filename.replace('.csv','_output.txt')\n",
" if (csv_path !=''):\n",
" txt_filename = '/'.join([csv_path, txt_filename])\n",
" else:\n",
" txt_filename = '/'.join(['.', txt_filename])\n",
" \n",
" # grep the corp_name from the filename google.csv => google\n",
" corp_name = os.path.basename(input_filename).split('.')[0]\n",
"\n",
" # process the data_list by csv file as stateed in assignment\n",
" print(f'processing {csv_filename}')\n",
" csv_list=get_data_list(input_filename)\n",
" monthly_averages_list = get_monthly_averages(csv_list)\n",
" monthly_averages_list_w_EMA = get_moving_averages(monthly_averages_list)\n",
"\n",
" # write output file\n",
" write_output_file(txt_filename, monthly_averages_list_w_EMA, corp_name)\n",
" print('wrote to {file} done'.format(file = txt_filename))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b7d3e814",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"start\n"
]
}
],
"source": [
"if __name__ == \"__main__\":\n",
" main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "325de646",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"%ls"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de467460",
"metadata": {},
"outputs": [],
"source": [
"%cat google_output.txt"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "41f834e6",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}