Python实现简易版的Web服务器(推荐)
更新:HHH   时间:2023-1-7


下面给大家介绍python实现简易版的web服务器,具体内容详情大家通过本文学习吧!

1、请自行了解HTTP协议

https://www.jb51.net/article/133883.htm(点击跳转)

2、创建Socket服务,监听指定IP和端口

3、以阻塞方式等待客户端连接

4、读取客户端请求数据并进行解析

5、准备服务器运行上下文

6、处理客户端请求数据

7、根据用户请求路径读取文件

8、返回响应结果给客户端

9、程序入口

10、目录结构

11、运行

python wsgiserver.py app:run

12、源码

a.wsgiserver.py文件

#encoding:utf-8
import socket
import StringIO
import sys
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class WSGIServer(object):
 address_family = socket.AF_INET
 socket_type = socket.SOCK_STREAM
 request_queue_size = 30
 recv_size = 1024
 def __init__(self, server_address):
  self._listen_socket = _listen_socket = socket.socket(self.address_family,
               self.socket_type)  _listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)  _listen_socket.bind(server_address) 
  _listen_socket.listen(self.request_queue_size)  
  _host, _port = _listen_socket.getsockname()
  self._server_name = socket.getfqdn(_host)
  self._server_port = _port
  self._headers_set = []
  self._application = None
  self._client = None
  self._request_data = None
  self._request_method = None
  self._path = None
  self._request_version = None
  self._start_response = None
 def set_application(self, application):
  self._application = application
 def server_forever(self): 
  _listen_socket = self._listen_socket
  logger.info('listen on %s:%s', self._server_name, self._server_port)  while 1:  
   try:
    self._client, _addr = _listen_socket.accept()
    self._handle_request(_addr)
   except KeyboardInterrupt as e:
    logger.info('interrupt')
    break
   except BaseException as e:
    logger.error(e)
 def _handle_request(self, client_addr):
  self._request_data = _request_data = self._client.recv(self.recv_size)
  self._parse_request_data(_request_data)  
  _env = self._get_environment(client_addr) 
  _result = self._application(_env, self.start_response)
  self._finish_response(_result)
 def _parse_request_data(self, request_data): 
  _request_line = str(request_data.splitlines()[0]).rstrip('\r\n')
  (self._request_method, self._path, self._request_version) = _request_line.split()
 def _get_environment(self, client_addr):  
  _env = {}  
  _env['wsgi.version'] = (1, 0)  
  _env['wsgi.url_scheme'] = 'http'
  _env['wsgi.input'] = StringIO.StringIO(self._request_data)    _env['wsgi.errors'] = sys.stderr  
  _env['wsgi.multithread'] = False
  _env['wsgi.multiprocess'] = False
  _env['wsgi.run_once'] = False
  _env['REQUEST_METHOD'] = self._request_method.upper()  
  _env['PATH_INFO'] = self._path
  _env['SERVER_NAME'] = self._server_name
  _env['SERVER_PORT'] = self._server_port
  _env['HTTP_CLIENT_IP'] = client_addr[0]
  logger.info('%s %s %s %s', _env['HTTP_CLIENT_IP'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _env['REQUEST_METHOD'], _env['PATH_INFO'])
  return _env
 def start_response(self, status, response_headers, exc_info=None):   _server_headers = [
   ('Date', 'Sun, 7 Jun 2015 23:07:04 GMT'),
   ('Server', 'WSGIServer 0.1')
   ]
  self._headers_set = [status, response_headers + _server_headers]
 def _finish_response(self, result):  
  _status, _response_headers = self._headers_set
  _response = 'HTTP/1.1 {status}\r\n'.format(status=_status)    for _header in _response_headers:  
  _response += '{0}:{1}\r\n'.format(*_header)  
  _response += '\r\n'
  for _data in result:   
   _response += _data
  self._client.sendall(_response)
  self._client.close()
def make_server(server_address, application):
 server = WSGIServer(server_address)
 server.set_application(application)
 return server
if __name__ == '__main__':
 logging.basicConfig(level=logging.DEBUG)
 server_addr= ('0.0.0.0', 43002)
 app_path = sys.argv[1]
 module, application = app_path.split(':')
 module = __import__(module)
 application = getattr(module, application)
 httpd = make_server(server_addr, application)
 httpd.server_forever()

b.app.py文件

#encoding:utf-8
import os
class PageNotFoundException(BaseException):
 pass
def render(filename, dirname='html'):
 _path = os.path.join(dirname, filename)
 if os.path.exists(_path):  
  with open(_path, 'rb') as handler:  
   return handler.read()
 raise PageNotFoundException('file not found:%s' % _path)
def run(env, start_response):
 _path = env.get('PATH_INFO')
 response = ''
 try:
  _path = 'index.html' if _path == '/' else _path[1:]
  if _path.endswith('.css'):
   start_response('200 OK', [('Content-Type', 'text/css')])
  elif _path.endswith('.js'):
   start_response('200 OK', [('Content-Type', 'text/javascript')]
  elif _path.endswith('.html'):
   start_response('200 OK', [('Content-Type', 'text/html')])
  else:
   start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Disposition', 'attachment; filename=%s' % os.path.basename(_path))])
  response = render(_path) 
 except PageNotFoundException as e:
  response = render('404.html') 
 return [response, '\r\n']

总结

以上所述是小编给大家介绍的Python实现简易版的Web服务器,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对天达云网站的支持!

返回开发技术教程...