Files
004_comission/hk1234566/python_networking/new/NetworkApplications-full.py
louiscklaw 866bfd3b42 update,
2025-01-31 19:51:47 +08:00

588 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import argparse
import socket
import os
import sys
import struct
import time
import random
import traceback # useful for exception handling
import threading
from pprint import pprint
# config
INCOMING_BUFFER = 1024
OUTGOING_BUFFER = INCOMING_BUFFER * 10
ICMP_TYPE = 8
def setupArgumentParser() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='A collection of Network Applications developed for SCC.203.')
parser.set_defaults(func=ICMPPing, hostname='lancaster.ac.uk')
subparsers = parser.add_subparsers(help='sub-command help')
parser_p = subparsers.add_parser('ping', aliases=['p'], help='run ping')
parser_p.set_defaults(timeout=4)
parser_p.add_argument('hostname', type=str, help='host to ping towards')
parser_p.add_argument(
'--count',
'-c',
nargs='?',
type=int,
help='number of times to ping the host before stopping')
parser_p.add_argument(
'--timeout',
'-t',
nargs='?',
type=int,
help='maximum timeout before considering request lost')
parser_p.set_defaults(func=ICMPPing)
parser_t = subparsers.add_parser('traceroute', aliases=['t'],
help='run traceroute')
parser_t.set_defaults(timeout=4, protocol='icmp')
parser_t.add_argument(
'hostname',
type=str,
help='host to traceroute towards')
parser_t.add_argument(
'--timeout',
'-t',
nargs='?',
type=int,
help='maximum timeout before considering request lost')
parser_t.add_argument('--protocol', '-p', nargs='?', type=str,
help='protocol to send request with (UDP/ICMP)')
parser_t.set_defaults(func=Traceroute)
parser_pt = subparsers.add_parser('paris-traceroute', aliases=['pt'],
help='run paris-traceroute')
parser_pt.set_defaults(timeout=4, protocol='icmp')
parser_pt.add_argument(
'hostname',
type=str,
help='host to traceroute towards')
parser_pt.add_argument(
'--timeout',
'-t',
nargs='?',
type=int,
help='maximum timeout before considering request lost')
parser_pt.add_argument('--protocol', '-p', nargs='?', type=str,
help='protocol to send request with (UDP/ICMP)')
parser_pt.set_defaults(func=ParisTraceroute)
parser_w = subparsers.add_parser(
'web', aliases=['w'], help='run web server')
parser_w.set_defaults(port=8080)
parser_w.add_argument('--port', '-p', type=int, nargs='?',
help='port number to start web server listening on')
parser_w.set_defaults(func=WebServer)
parser_x = subparsers.add_parser('proxy', aliases=['x'], help='run proxy')
parser_x.set_defaults(port=8000)
parser_x.add_argument('--port', '-p', type=int, nargs='?',
help='port number to start web server listening on')
parser_x.set_defaults(func=Proxy)
args = parser.parse_args()
return args
class NetworkApplication:
def checksum(self, dataToChecksum: str) -> str:
csum = 0
countTo = (len(dataToChecksum) // 2) * 2
count = 0
while count < countTo:
thisVal = dataToChecksum[count + 1] * 256 + dataToChecksum[count]
csum = csum + thisVal
csum = csum & 0xffffffff
count = count + 2
if countTo < len(dataToChecksum):
csum = csum + dataToChecksum[len(dataToChecksum) - 1]
csum = csum & 0xffffffff
csum = (csum >> 16) + (csum & 0xffff)
csum = csum + (csum >> 16)
answer = ~csum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
answer = socket.htons(answer)
return answer
def printOneResult(
self,
destinationAddress: str,
packetLength: int,
time: float,
ttl: int,
destinationHostname=''):
if destinationHostname:
print(
"%d bytes from %s (%s): ttl=%d time=%.2f ms" %
(packetLength, destinationHostname, destinationAddress, ttl, time))
else:
print("%d bytes from %s: ttl=%d time=%.2f ms" %
(packetLength, destinationAddress, ttl, time))
def printAdditionalDetails(
self,
packetLoss=0.0,
minimumDelay=0.0,
averageDelay=0.0,
maximumDelay=0.0):
print("%.2f%% packet loss" % (packetLoss))
if minimumDelay > 0 and averageDelay > 0 and maximumDelay > 0:
print("rtt min/avg/max = %.2f/%.2f/%.2f ms" %
(minimumDelay, averageDelay, maximumDelay))
def printMultipleResults(
self,
ttl: int,
destinationAddress: str,
measurements: list,
destinationHostname=''):
latencies = ''
noResponse = True
for rtt in measurements:
if rtt is not None:
latencies += str(round(rtt, 3))
latencies += ' ms '
noResponse = False
else:
latencies += '* '
if noResponse is False:
print(
"%d %s (%s) %s" %
(ttl,
destinationHostname,
destinationAddress,
latencies))
else:
print("%d %s" % (ttl, latencies))
class ICMPPing(NetworkApplication):
# Task 1.1: ICMP Ping
def receiveOnePing(self, icmpSocket, destinationAddress, ID, timeout):
# 1. Wait for the socket to receive a reply
icmpSocket.settimeout(timeout)
try:
reply, addr = icmpSocket.recvfrom(2048)
except socket.timeout as msg:
print("No data received from socket within timeout period. Message: " + str(msg))
sys.exit(1)
# 2. Once received, record time of receipt, otherwise, handle a timeout
recv_time = time.time()
# 4. Unpack the packet header for useful information, including the ID
# icmp header of the received packet,
# bottom of packet because of network byte-order
# align offset to include the layer 2 encap = 14
reply_size = struct.unpack(">H", reply[16 - 14:18 - 14])[0]
reply_ttl = struct.unpack(">B", reply[22 - 14:23 - 14])[0]
reply_id = struct.unpack(">B", reply[38 - 14:39 - 14])[0]
# 5. Check that the ID matches between the request and reply
if reply_id != ID:
print("Received packet ID not match")
sys.exit(1)
# 6. Return recv time + packet size
return (recv_time, reply_size, reply_ttl)
def sendOnePing(self, icmpSocket, destinationAddress, ID):
# 1. Build ICMP header
header = struct.pack('BBHHH', ICMP_TYPE, 0, 0, ID, 1)
data = bytes("Task 1.1: ICMP Ping", 'utf-8')
# 2. Checksum ICMP packet using given function
new_checksum = self.checksum(header + data)
# 3. Insert checksum into packet
header = struct.pack('BBHHH', ICMP_TYPE, 0, new_checksum, ID, 1)
packet = header + data
# 4. Send packet using socket
while packet:
sent = icmpSocket.sendto(packet, (destinationAddress, 1500)) # 1500 = Port number
packet = packet[sent:]
# 5. Record time of sending
sendTime = time.time()
return sendTime
def doOnePing(self, destinationAddress, timeout):
# 1. Create ICMP socket
# Sends raw packets to ipv4 addresses
new_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp'))
ID = 1
# 2. Call sendOnePing function
sendTime = self.sendOnePing(new_socket, destinationAddress, ID)
# 3. Call receiveOnePing function
(recv_time, reply_len, reply_size) = self.receiveOnePing(new_socket, destinationAddress, ID, 1)
# 4. Close ICMP socket
new_socket.close()
# 5. Return total network delay
send_time_ms = sendTime * 1000
recv_time_ms = recv_time * 1000
total_delay = recv_time_ms - send_time_ms
return (total_delay, reply_len, reply_size)
def __init__(self, args):
print('Ping to: %s...' % (args.hostname))
# 1. Look up hostname, resolving it to an IP address
# # 2. Call doOnePing function, approximately every second
try:
destinationAddress = socket.gethostbyname(args.hostname)
while True:
(total_delay, reply_len, reply_size) = self.doOnePing(destinationAddress, 1)
# 3. Print out the returned delay (and other relevant details) using the printOneResult method
self.printOneResult(destinationAddress, reply_len, total_delay, reply_size, args.hostname)
time.sleep(1)
# 4. Continue this process until stopped
except BaseException:
print("Host name not recognised")
class Traceroute(NetworkApplication):
# Task 1.2: Traceroute
def __init__(self, args):
# Please ensure you print each result using the printOneResult method!
print('Traceroute to: %s...' % (args.hostname))
# Get IP of destination
dest_address = socket.gethostbyname(args.hostname)
# init ttl_count_up
ttl_count_up = 1
while True:
# Creates sockets
recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_ICMP)
send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl_count_up)
# COnstruct and send packet
header = struct.pack('BBHHH', ICMP_TYPE, 0, 0, 5, 1)
data = "Task 1.2: Traceroute".encode()
# NOTE: the direction constrainted by checksum generating function, so the above BBHHH will not be modified
new_checksum = self.checksum(header + data)
header = struct.pack('BBHHH', ICMP_TYPE, 0, new_checksum, 5, 1)
packet = header + data
send_socket.sendto(packet, (dest_address, 1024 * 10))
send_time = time.time() # Record beginning time
# Loop until packet received
run = True
while run:
recv_packet, address = recv_socket.recvfrom(1024 * 4)
address = address[0]
run = False
send_socket.close()
recv_socket.close()
recv_time = time.time()
# try best to resolv hostname
try:
hostname = socket.gethostbyaddr(address)[0]
except BaseException:
hostname = address
self.printOneResult(address, sys.getsizeof(packet), (recv_time - send_time) * 1000, ttl_count_up, hostname)
ttl_count_up += 1
# dest reach, exit loop
if address == dest_address:
break
class ParisTraceroute(NetworkApplication):
# Task 1.3: Paris-Traceroute
# A well-known limitation of trace route is that it may indicate a path that does not actually
# exist in the presence of “load-balancers” in the network. Consider the example below where
# a source host Src sends traceroute traffic to a destination host Dst.
def getIdentifier(self, checkSumWanted):
return 0xf7ff - checkSumWanted
def __init__(self, args):
try:
print('Paris-Traceroute to: %s...' % (args.hostname))
# Get IP of destination
dest_ip = socket.gethostbyname(args.hostname)
ttl_count_up = 1
# in paris-traceroute, use checksum as identifier
check_sum_count_up = 1
while True:
# Creates sockets
recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
send_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
# Limit ttl of socket
send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl_count_up)
# COnstruct and send packet
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
header = struct.pack('!BBHHH', ICMP_TYPE, 0, 0, 0, 0)
# NOTE: the checksum acts as a identifier, get the apporiate identifier to get the wanted checksum
# BE(big endian) used here
# checked with opensource from https://paris-traceroute.net
new_identifier = self.getIdentifier(check_sum_count_up)
header = struct.pack('!BBHHH', ICMP_TYPE, 0, check_sum_count_up, new_identifier, 0)
packet = header
send_socket.sendto(packet, (dest_ip, OUTGOING_BUFFER))
# jot down start time for diff
start_time = time.time()
# Loop until packet received
run = True
while run:
recv_packet, address = recv_socket.recvfrom(INCOMING_BUFFER)
address = address[0]
run = False
# close socket after done
send_socket.close()
recv_socket.close()
recv_time = time.time()
# try best to resolv hostname
try:
try_res_hostname = socket.gethostbyaddr(address)[0]
except BaseException:
# bypass if cannot resolv hostname
try_res_hostname = address
self.printOneResult(address, sys.getsizeof(packet), (recv_time - start_time) * 1000, ttl_count_up, try_res_hostname)
ttl_count_up += 1
check_sum_count_up += 1
# dest reach, exit loop
if address == dest_ip:
break
except BaseException as err:
print('error occured', err)
sys.exit(1)
class WebServer(NetworkApplication):
def handleRequest(self, tcpSocket):
# 1. Receive request message from the client on connection socket
getrequest = tcpSocket.recv(INCOMING_BUFFER).decode()
print(getrequest)
# 2. Extract the path of the requested object from the message (second
# part of the HTTP header)
headers = getrequest.split('\n')
filename = headers[0].split()[1]
try:
# 3. Read the corresponding file from disk
filetosend = open(filename.replace('/', ''))
content = filetosend.read()
filetosend.close()
# 4. Store in temporary buffer
response = 'HTTP/1.0 200 OK\n\n' + content
# 5. Send the correct HTTP response error
except FileNotFoundError:
response = 'HTTP/1.0 404 NOT FOUND\n\nFile Not Found'
# 6. Send the content of the file to the socket
tcpSocket.sendall(response.encode())
# 7. Close the connection socket
tcpSocket.close()
pass
def __init__(self, args):
print('Web Server starting on port: %i...' % (args.port))
# 1. Create server socket
server_socket = socket.socket()
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 2. Bind the server socket to server address and server port
server_socket.bind(("127.0.0.1", args.port))
# 3. Continuously listen for connections to server socket
server_socket.listen()
# 4. When a connection is accepted, call handleRequest function,
# passing new connection socket
# (see https://docs.python.org/3/library/socket.html#socket.socket.accept)
run = True
while run:
client_socket, client_address = server_socket.accept()
self.handleRequest(client_socket)
# 5. Close server socket
server_socket.close()
run = False
class Proxy(NetworkApplication):
# Task 2.2: Web Proxy
# As with Task 2.1, there are a number of ways to test your Web Proxy. For example, to
# generate requests using curl, we can use the following:
# curl neverssl.com - -proxy 127.0.0.1: 8000
# This assumes that the Web Proxy is running on the local machine and bound to port 8000.
# In this case, the URL requested from the proxy is neverssl.com.
def handleRequest(self, tcp_socket):
dst_host = ''
# receive request from client
full_req = tcp_socket.recv(INCOMING_BUFFER).decode('utf-8')
# print("Full req =", full_req)
first_line = full_req.split('\r\n')[0]
[http_action, full_url, http_ver] = first_line.split(' ')
sainted_url = full_url.split('://')[1].replace('/', '')
try_split_port = sainted_url.split(':')
if (len(try_split_port) > 1):
dst_host, dst_port = try_split_port
else:
dst_host = sainted_url
dst_port = 80
try:
# try convert to ip, if not emit gaierror
dst_ip = socket.gethostbyname(dst_host)
# create new socket for sending request
outgoing_req_socket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
outgoing_req_socket.settimeout(2)
# connect to dst server
outgoing_req_socket.connect((dst_ip, dst_port))
# forward request getting from proxy
outgoing_req_socket.send(full_req.encode('utf-8'))
# print("forwarded the request")
# receive data from the server
while True:
reply = outgoing_req_socket.recv(INCOMING_BUFFER)
if len(reply) > 0:
# forward reply to originator
tcp_socket.send(reply)
else:
# buffer empty, forward reply done
break
# close port
outgoing_req_socket.close()
# handle cannot convert hostname to ip
except socket.gaierror as msg:
print("Couldn't convert domain to ip", dst_host, msg)
if tcp_socket:
tcp_socket.close()
sys.exit(1)
# handle socket timeout
except socket.timeout:
print("Connection timeout")
if outgoing_req_socket:
outgoing_req_socket.close()
return
# final overflow for any error
except socket.error as msg:
print("Socket error:", msg)
if outgoing_req_socket:
outgoing_req_socket.close()
if tcp_socket:
tcp_socket.close()
sys.exit(1)
def __init__(self, args):
server_ip = '127.0.0.1'
server_port = args.port
print('Task 2.2: Web Proxy, starting on port: %i...' % (server_port))
# 1. Create server socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 2. Bind the server socket to server address and server port
server_socket.bind((server_ip, server_port))
# 3. Continuously listen for connections to server socket
server_socket.listen(1)
serving = True
try:
while serving:
# 4. When a connection is accepted,
# -> call handleIncomingRequest function,
# -> passing new connection socket
# (see https://docs.python.org/3/library/socket.html#socket.socket.accept)
connection, address = server_socket.accept()
self.handleRequest(connection)
except socket.error as msg:
if server_socket:
server_socket.close()
print("Socket error:", msg)
sys.exit(1)
finally:
# 5. Close server socket
if server_socket:
server_socket.close()
sys.exit(0)
if __name__ == "__main__":
args = setupArgumentParser()
args.func(args)