Commit 28a10bc3 authored by Давид Ли's avatar Давид Ли

init

parents
File added
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.11" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
This diff is collapsed.
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/http_framework.iml" filepath="$PROJECT_DIR$/.idea/http_framework.iml" />
</modules>
</component>
</project>
\ No newline at end of file
from typing import BinaryIO
class Request:
def __init__(self, file: BinaryIO) -> None:
self.file = file
self.body = self.method = self.uri = self.protocol = ''
self.headers = {}
self.parse_request_line()
self.parse_headers()
self.parse_body()
def readline(self):
return self.file.readline().decode().strip()
def parse_request_line(self) -> None:
request_line = self.readline()
self.method, self.uri, self.protocol = request_line.split()
def parse_headers(self) -> None:
while True:
header = self.readline()
if not header:
break
header_name, header_value = header.split(': ')
self.headers[header_name] = header_value
def parse_body(self) -> None:
if 'Content-Length' in self.headers:
content_length = int(self.headers['Content-Length'])
self.body = self.file.read(content_length)
from typing import BinaryIO
from os import fstat
class Response:
HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_NOT_FOUND = 404
HTTP_INTERNAL_SERVER_ERROR = 500
MESSAGES = {
HTTP_OK: 'OK',
HTTP_BAD_REQUEST: 'Bad Request',
HTTP_NOT_FOUND: 'Not Found',
HTTP_INTERNAL_SERVER_ERROR: 'Internal Server Error'
}
PROTOCOL = 'HTTP/1.1'
def __init__(self, file: BinaryIO) -> None:
self.file = file
self.__status = self.HTTP_OK
self.__headers: list[dict] = []
self.body = None
self.file_body = None
def set_status(self, new_status: int) -> None:
self.__status = new_status
def set_body(self, body: str) -> None:
self.body = body.encode()
self.add_header('Content-Type', str(len(self.body)))
def set_file_body(self, file: BinaryIO) -> None:
self.file_body = file
size = fstat(file.fileno()).st_size
self.add_header('Content-Length', str(size))
def add_header(self, name: str, value: str) -> None:
self.__headers.append({name: value})
def __get_status_line(self) -> str:
message = self.MESSAGES[self.__status]
return f'{self.PROTOCOL} {self.__status} {message}'
def __get_response_head(self) -> bytes:
status_line = self.__get_status_line()
# for example
# headers = [': '.join(list(*header.items())) for header in self.__headers]
# print(headers)
# /for example
headers: list[str] = []
for header in self.__headers:
k, v = list(*header.items())
headers.append(f'{k}: {v}')
head_str = '\r\n'.join([status_line] + headers)
head_str += '\r\n\r\n'
return head_str.encode()
def __write_file_body(self) -> None:
# Chunk load
while True:
data = self.file_body.read(1024)
if not data:
break
self.file.write(data)
def send(self):
head = self.__get_response_head()
self.file.write(head)
if self.body:
self.file.write(self.body)
elif self.file_body:
self.__write_file_body()
import socketserver
from request import Request
from response import Response
from static_responder import StaticResponder
HOST, PORT = '127.0.0.1', 1025
class MyTCPHandler(socketserver.StreamRequestHandler):
def handle(self) -> None:
request = Request(file=self.rfile)
response = Response(file=self.wfile)
static_responder = StaticResponder(request, response, 'static')
if static_responder.file:
static_responder.prepare_response()
else:
response.add_header('Content-Type', 'text/html')
response.add_header('Connection', 'close')
response.set_body('<h1>Hello world!</h1>')
print(
f'Method: {request.method}\n'
f'URI: {request.uri}\n'
f'Protocol: {request.protocol}\n'
)
response.send()
socketserver.TCPServer.allow_reuse_address = True
with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>Hello world!</h1>
</body>
</html>
\ No newline at end of file
* {
text-align: center;
}
.container {
margin: 0 auto;
width: 80%;
}
from glob import glob
import os
from request import Request
from response import Response
class StaticResponder:
def __init__(self, request: Request, response: Response, static_dir: str) -> None:
self.request = request
self.response = response
self.static_dir = static_dir
self.file = None
self._check_file()
def _check_file(self):
path = './' + self.static_dir + self.request.uri
# path = ./ static /styles.css
files: list[str] = glob(path)
if len(files) > 0 and os.path.isfile(files[0]):
self.file = files[0]
def prepare_response(self):
if self.file:
file = open(self.file, 'rb')
self.response.set_file_body(file)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment