This commit is contained in:
louiscklaw
2025-02-01 02:02:14 +08:00
parent a767348238
commit c403fa8e72
48 changed files with 5987 additions and 0 deletions

11
2nd_copy/src/Pipfile Normal file
View File

@@ -0,0 +1,11 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
[requires]
python_version = "3.11"

262
2nd_copy/src/main.py Normal file
View File

@@ -0,0 +1,262 @@
# Objective:
# This scripts aims to analyze the historical prices of a stock
import os
import sys
import csv
# define error constant
CSV_FILE_NOT_FOUND='csv_file_not_found'
# column assignment by CSV definition
[ C_DATE,
C_OPEN,
C_HIGH,
C_LOW,
C_CLOSE,
C_ADJ_CLOSE,
C_VOLUME,
C_MONTH_AVG_PRICE,
C_EMA
] = list(range(0,8+1))
# NOTE: get_data_list(csv_file_name)
# NOTE: This function has one parameter, namely csv_file_name.
# NOTE: When the function is called, you need to pass along a CSV file name which is used inside the function to open and read the CSV
# NOTE: file.
# NOTE: After reading each row, it will be split into a list. The list will then be appended into a main
# NOTE: list (a list of lists), namely data_list. The data_list will be returned at the end of the
# NOTE: function.
# NOTE: file tested found as protected by outer try except structure
def clean_data(data_list):
"""clean and bloat data"""
out_list = []
for data in sorted(data_list):
out_list.append([
data[C_DATE],
float(data[C_OPEN]),
float(data[C_HIGH]),
float(data[C_LOW]),
float(data[C_CLOSE]),
float(data[C_ADJ_CLOSE]),
float(data[C_VOLUME]),
])
return out_list
def get_data_list(csv_file_name):
'''parse csv file, bloat it into list object'''
data_list = []
with open(csv_file_name, newline='') as f_csv:
data_list = list(csv.reader(f_csv, delimiter=',', quotechar='"'))
# NOTE: skip the very first row as that is names
# NOTE: bloat the column accordingly
return clean_data(data_list[1:])
# NOTE: get_monthly_averages(data_list)
# NOTE: This function has one parameter, namely data_list. You need to pass the data_list
# NOTE: generated by the get_data_list() function as the argument to this function and then
# NOTE: calculate the monthly average prices of the stock. The average monthly prices are calculated in
# NOTE: the following way.
# NOTE:
# NOTE: 1. Suppose the volume and adjusted closing price of a trading day are V1 and C1, respectively.
# NOTE: 2. The total sale of that day equals V1 x C1.
# NOTE: 3. Now, suppose the volume and adjusted closing price of another trading day are V2 and C2, respectively.
# NOTE: 4. The average of these two trading days is the sum of the total sales divided by the total volume:
# NOTE:
# NOTE: Average price = (V1 x C1 + V2 x C2) / (V1 + V2)
# NOTE:
# NOTE: To average a whole month, you need to
# NOTE: - add up the total sales (V1 x C1 + V2 x C2 + ... + Vn x Cn) for each day and
# NOTE: - divide it by the sum of all volumes (V1 + V2 + ... + Vn) where n is the number of trading days in the month.
# NOTE: A tuple with 2 items, including the date (year and month only) and the average for that month,
# NOTE: will be generated for each month. The tuple for each month will be appended to a main list,
# NOTE: namely monthly_averages_list. The monthly_averages_list will be returned at the end of the function.
def get_available_month(data_list):
'''get the unique month from the list
input:
data_list
'''
return sorted(set([data[0][0:7] for data in data_list]))
def get_monthly_averages(data_list):
'''get the average price by month
input:
data_list
'''
month_in_list = get_available_month(data_list)
month_average_price = {}
monthly_averages_list = data_list
# get total volume by month
for month in month_in_list:
filtered_month_transaction = list(filter(lambda row: row[C_DATE][0:7] == month, monthly_averages_list))
# NOTE: (V1 x C1 + V2 x C2 ...)
sum_total_sale_by_month = sum(map(lambda row: row[C_VOLUME] * row[C_ADJ_CLOSE], filtered_month_transaction))
# NOTE: (V1 + V2 ...)
sum_volume_by_month = sum(map(lambda t: t[C_VOLUME], filtered_month_transaction))
# NOTE: Average price = (V1 x C1 + V2 x C2 ...) / (V1 + V2 ... )
month_average_price[month] = sum_total_sale_by_month/sum_volume_by_month
# NOTE: append to main list -> C_MONTH_AVG_PRICE
for data in monthly_averages_list:
data.append(month_average_price[data[C_DATE][0:7]])
return monthly_averages_list
# NOTE: get_moving_averages(monthly_averages_list)
# NOTE: This function has one parameter, namely monthly_averages_list. You need to pass the
# NOTE: monthly_averages_list generated by get_monthly_averages() as the argument
# NOTE: to this function and then calculate the 5-month exponential moving average (EMA) stock prices.
# NOTE: In general, the EMA for a particular month can be calculated by the following formula:
# NOTE:
# NOTE: EMA = (Monthly average price previous months EMA) x smoothing constant + previous months EMA
# NOTE:
# NOTE: where
# NOTE:
# NOTE: smoothing constant = 2 / (number of time periods in months + 1)
# NOTE:
# NOTE: Initial SMA = 20-period sum / 20
# NOTE: Multiplier = (2 / (Time periods + 1) ) = (2 / (20 + 1) ) = 0.0952(9.52%)
# NOTE: EMA = {Close EMA(previous day)} x multiplier + EMA(previous day).
def get_monthly_average(data_list, month_wanted):
'''
get monthly average from the list
input:
data_list: data_list
month_wanted: YYYY-MM
'''
return list(filter(lambda d: d[C_DATE][0:7] == month_wanted, data_list) )[0][C_MONTH_AVG_PRICE]
def get_SMA(data_list, month_to_get_SMA):
'''calculate SMA from the beginning(oldest) of the list
input:
data_list: data_list
month_to_get_SMA : number of month to initialize the SMA (i.e. 5)
'''
sum_of_months = 0
for month in month_to_get_SMA:
sum_of_months = sum_of_months + get_monthly_average(data_list, month)
return sum_of_months / len(month_to_get_SMA)
def get_extreme_EMA(ema_list, max_min= 'min', skip_month=0):
'''get max/min EMA from the list
input:
ema_list: month list with ema
max_min: max / min selector (default: min)
skip_month: month to skip as initialized as SMA (i.e. the first 5 month)
'''
if (max_min == 'max'):
return max(map(lambda r: r[2], ema_list[skip_month:]))
return min(map(lambda r: r[2], ema_list[skip_month:]))
def get_month_by_EMA(ema_list, ema_value):
'''get months(value) specified by the EMA value wanted
input:
ema_list: month list with ema
ema_value: ema value to select the month (i.e. max EMA)
'''
return list(map(lambda r: r[0], filter(lambda x: x[2] == ema_value, ema_list)))
def get_output_content(max_ema, min_ema, max_ema_months, min_ema_months, report_name=""):
'''get the output content, return with a formatted string
input:
max_ema: max ema to report
min_ema: min ema to report
max_ema_months: month(s) to report with max ema
min_ema_months: month(s) to report with min ema
'''
# reformat to MM-YYYY before out to file
reformat_max_ema_months = list(map(lambda m: m.split('-')[1]+'-'+m.split('-')[0] , max_ema_months))
reformat_min_ema_months = list(map(lambda m: m.split('-')[1]+'-'+m.split('-')[0] , min_ema_months))
return '''
# The best month for {report_name}:
# {best_ema_months}, {best_EMA}
# The worst month for {report_name}:
# {worst_ema_months}, {worst_EMA}
'''.format(
best_ema_months=','.join(reformat_max_ema_months),
best_EMA=round(max_ema, 2),
worst_ema_months=','.join(reformat_min_ema_months),
worst_EMA=round(min_ema, 2),
report_name=report_name).strip()
def get_moving_averages(monthly_averages_list):
'''get moving averages
input:
monthly_averages_list
'''
month_available = get_available_month(monthly_averages_list)
# NOTE: initialize first 0 to 4 SMA
monthly_averages_list_w_EMA = [[c, get_monthly_average(monthly_averages_list, c)] for c in month_available]
initial_SMA = sum(map(lambda x: x[1], monthly_averages_list_w_EMA[0:5]))/5
smoothing_constant = 2 / (5 + 1)
for i in range(0,len(monthly_averages_list_w_EMA)):
if (i < 5):
# first 5 month were given by SMA
monthly_averages_list_w_EMA[i].append( initial_SMA)
else:
month_average_this_month = monthly_averages_list_w_EMA[i][1]
EMA_last_month = monthly_averages_list_w_EMA[i-1][2]
EMA_this_month = (month_average_this_month - EMA_last_month) * smoothing_constant + EMA_last_month
monthly_averages_list_w_EMA[i].append( EMA_this_month )
return monthly_averages_list_w_EMA
# get input from user
csv_filepath = input("Please input a csv filename: ")
try:
# NOTE: get csv file from user
csv_filename = csv_filepath
txt_filename = csv_filename.split('.csv')[0]+'_output.txt'
report_name = os.path.basename(csv_filename).replace('.csv','')
# NOTE: process file
data_list = get_data_list(csv_filename)
monthly_average_list = get_monthly_averages(data_list)
ema_list = get_moving_averages(monthly_average_list)
# NOTE: output txt file
max_ema = get_extreme_EMA(ema_list,'max', 5)
min_ema = get_extreme_EMA(ema_list, 'min',5)
best_ema_months = get_month_by_EMA(ema_list, max_ema)
worst_ema_months = get_month_by_EMA(ema_list, min_ema)
output_string = get_output_content(max_ema, min_ema, best_ema_months, worst_ema_months, report_name)
with open(txt_filename, 'w+') as f_output:
f_output.truncate(0)
f_output.writelines(output_string)
print('output wrote '+txt_filename)
print('done !')
except IsADirectoryError as e:
# NOTE: if input is a directory, drop here
print('sorry the path is a directory')
except FileNotFoundError as e:
# NOTE: if csv file not found, drop here
print('sorry cannot find the file wanted')
except Exception as e:
# # cast outside if exception definition not found
raise e

6
2nd_copy/src/test.sh Normal file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -ex
clear
python3 ./main.py