编写功能模块 http.py
import threading
import time
from socket import *
from re import match
# s是否匹配 正则表达式rex
def like(s,rex):
try:
return s==match(rex,s).group()
except:
return False
#http请求
class Request:
def __init__(self,sock:socket):
self.sock=sock
bys=b''
s=0
while 1:
if s==4:
break
b=self.readOne()
if b==None:
break
bys+=b
if s==0 and b==b'\r':
s+=1
elif s==1 and b==b'\n':
s+=1
elif s==2 and b==b'\r':
s+=1
elif s==3 and b==b'\n':
s+=1
else:
s=0
#获取请求头
self.head=bys.decode()
L=self.head.split("\r\n")
U=L[1:-2]
#请求 方法
self.method,self.uri,self.proto=L[0].split(" ")
# 把请求头包装为字典
self.map={}
for i in U:
k,v=i.split(": ")
self.map[k]=v
#读一个字节
def readOne(self):
b=self.sock.recv(1)
if b==b'':
return None
return b
#http响应
class Response:
def __init__(self,sock:socket,code="utf-8"):
self.sock=sock
self.head=""
self.status=200
self.code=code
#增加响应头
def addHead(self,k,v):
self.head+=k+": "+v+"\r\n"
#发送响应头
def sendHead(self):
h="HTTP/1.1 "+str(self.status)+"\r\n"+self.head+"\r\n"
self.sock.send(h.encode())
# 响应字节流
def write(self,bys):
self.sock.send(bys)
# 响应字符串
def print(self,msg:str):
self.sock.send(msg.encode(self.code))
class Server(socket):
def __init__(self,ip="127.0.0.1",port=7000,lis=12) -> None:
super().__init__(AF_INET,SOCK_STREAM)
self.bind((ip,port))
self.listen(lis)
#服务器
class Servlet(Server):
def __init__(self, ip="127.0.0.1", port=7000, lis=12) -> None:
super().__init__(ip, port, lis)
self.map={}
self.dir={}
self.code="utf-8"
#增加响应方法
def addFunction(self,urip,func):
self.map[urip]=func
#增加静态资源根目录
def addPageDir(self,uri,path):
self.dir[uri]=path
#响应页面 path是本机的文件地址
def ResPage(self,res:Response,path):
file=open(path,"rb")
res.sendHead()
res.write(file.read())
file.close()
#处理一个请求
def Handle(self,sock:socket):
req=Request(sock)
res=Response(sock,self.code)
uri=req.uri
for k in self.map:
if like(uri,k):
func=self.map[k]
func(req,res)
sock.close()
return
for k in self.dir:
v=self.dir[k]
l=len(k)
pre=uri[:l]
print(k,pre)
if k==pre:
path=v+uri[l:]
self.ResPage(res,path)
sock.close()
return
#启动
def run(self):
while 1:
sock,_=self.accept()
th=threading.Thread(target=self.Handle,args=(sock,))
th.start()
?
编写测试模块 test.py
import time
from http import *
def func(req:Request,res:Response):
res.code="gbk"
res.sendHead()
uri=req.uri
t=time.localtime(time.time())
msg="<h1>现在是</h1>"+str(t.tm_hour)+"点 "+str(t.tm_min)+"分 "+str(t.tm_sec)+"秒 <h1>你的uri="+uri+"</h1>"
res.print(msg)
servlet=Servlet()
servlet.addFunction("/f/.*",func)
servlet.addPageDir("/page","f:/pycode/http/page")
servlet.run()
在 目录?f:/pycode/http/page 下编辑 index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>Hello Python</h1>
</body>
</html>
测试效果
?
?
|