下面给大家介绍python实现简易版的web服务器,具体内容详情大家通过本文学习吧!
1、请自行了解HTTP协议
https://www.jb51.net/article/133883.htm(点击跳转)
2、创建Socket服务,监听指定IP和端口
3、以阻塞方式等待客户端连接
4、读取客户端请求数据并进行解析
5、准备服务器运行上下文
6、处理客户端请求数据
7、根据用户请求路径读取文件
8、返回响应结果给客户端
9、程序入口
10、目录结构
11、运行
python wsgiserver.py app:run
12、源码
a.wsgiserver.py文件
#encoding:utf-8
import socket
import StringIO
import sys
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class WSGIServer(object):
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 30
recv_size = 1024
def __init__(self, server_address):
self._listen_socket = _listen_socket = socket.socket(self.address_family,
self.socket_type) _listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) _listen_socket.bind(server_address)
_listen_socket.listen(self.request_queue_size)
_host, _port = _listen_socket.getsockname()
self._server_name = socket.getfqdn(_host)
self._server_port = _port
self._headers_set = []
self._application = None
self._client = None
self._request_data = None
self._request_method = None
self._path = None
self._request_version = None
self._start_response = None
def set_application(self, application):
self._application = application
def server_forever(self):
_listen_socket = self._listen_socket
logger.info('listen on %s:%s', self._server_name, self._server_port) while 1:
try:
self._client, _addr = _listen_socket.accept()
self._handle_request(_addr)
except KeyboardInterrupt as e:
logger.info('interrupt')
break
except BaseException as e:
logger.error(e)
def _handle_request(self, client_addr):
self._request_data = _request_data = self._client.recv(self.recv_size)
self._parse_request_data(_request_data)
_env = self._get_environment(client_addr)
_result = self._application(_env, self.start_response)
self._finish_response(_result)
def _parse_request_data(self, request_data):
_request_line = str(request_data.splitlines()[0]).rstrip('\r\n')
(self._request_method, self._path, self._request_version) = _request_line.split()
def _get_environment(self, client_addr):
_env = {}
_env['wsgi.version'] = (1, 0)
_env['wsgi.url_scheme'] = 'http'
_env['wsgi.input'] = StringIO.StringIO(self._request_data) _env['wsgi.errors'] = sys.stderr
_env['wsgi.multithread'] = False
_env['wsgi.multiprocess'] = False
_env['wsgi.run_once'] = False
_env['REQUEST_METHOD'] = self._request_method.upper()
_env['PATH_INFO'] = self._path
_env['SERVER_NAME'] = self._server_name
_env['SERVER_PORT'] = self._server_port
_env['HTTP_CLIENT_IP'] = client_addr[0]
logger.info('%s %s %s %s', _env['HTTP_CLIENT_IP'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _env['REQUEST_METHOD'], _env['PATH_INFO'])
return _env
def start_response(self, status, response_headers, exc_info=None): _server_headers = [
('Date', 'Sun, 7 Jun 2015 23:07:04 GMT'),
('Server', 'WSGIServer 0.1')
]
self._headers_set = [status, response_headers + _server_headers]
def _finish_response(self, result):
_status, _response_headers = self._headers_set
_response = 'HTTP/1.1 {status}\r\n'.format(status=_status) for _header in _response_headers:
_response += '{0}:{1}\r\n'.format(*_header)
_response += '\r\n'
for _data in result:
_response += _data
self._client.sendall(_response)
self._client.close()
def make_server(server_address, application):
server = WSGIServer(server_address)
server.set_application(application)
return server
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
server_addr= ('0.0.0.0', 43002)
app_path = sys.argv[1]
module, application = app_path.split(':')
module = __import__(module)
application = getattr(module, application)
httpd = make_server(server_addr, application)
httpd.server_forever()
b.app.py文件
#encoding:utf-8
import os
class PageNotFoundException(BaseException):
pass
def render(filename, dirname='html'):
_path = os.path.join(dirname, filename)
if os.path.exists(_path):
with open(_path, 'rb') as handler:
return handler.read()
raise PageNotFoundException('file not found:%s' % _path)
def run(env, start_response):
_path = env.get('PATH_INFO')
response = ''
try:
_path = 'index.html' if _path == '/' else _path[1:]
if _path.endswith('.css'):
start_response('200 OK', [('Content-Type', 'text/css')])
elif _path.endswith('.js'):
start_response('200 OK', [('Content-Type', 'text/javascript')]
elif _path.endswith('.html'):
start_response('200 OK', [('Content-Type', 'text/html')])
else:
start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Disposition', 'attachment; filename=%s' % os.path.basename(_path))])
response = render(_path)
except PageNotFoundException as e:
response = render('404.html')
return [response, '\r\n']
总结
以上所述是小编给大家介绍的Python实现简易版的Web服务器,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对天达云网站的支持!