feling.net/_posts/2015-05-17-project-http-pro...

17 KiB
Raw Permalink Blame History

layout title description categories tags keywords
pages python 实现 http 代理 基于 python 的 socket 实现的 http 代理, 用到了多进程+协程来提高性能.
项目
python
epoll
http_proxy
开源项目
http代理
gevent
协程
惊群

前言

这篇文章将会介绍一个 http 代理的实现过程...

http 协议的文档

关于网络协议的文档, 可以在这里查 https://www.ietf.org/ . 搜索文档编号就行. 目前关于http的文档有这几篇:

项目 git 地址

这个项目目前托管在 github 上: https://github.com/playay/http_proxy

最原始的实现原理

实现一个 http 代理的功能, 最容易想到的程序的运行过程, 可能是这样的:

  1. 接收 http 请求
  2. 解析请求头, 得到 host 和 uri
  3. 转发请求, 接收响应
  4. 把响应传回给请求方

实现基本的功能

按照前言中提到的最原始的实现原理, 我们来一步步实现.
这里不考虑性能, 对于性能的改善, 会在改善性能这一章中介绍.

创建工程

一般来说, 一个目录就是一个工程, 这个目录下通常会有以下几个文件夹:

  • bin/ 可执行文件
  • lib/ 所有的第三方库
  • src/ 程序的源码
  • doc/ 项目文档

我们写的是 python 代码, 就不需要 src 文件夹了, 可执行文件就是源码, 所以建个 http_proxy/bin/ 文件夹, 新建个 http_proxy.py 文件,这个工程就算创建好了.

接收、解析、处理请求

我们用 socket 接收 http 请求. 首先, 写出一个 socket 程序的模板:
绑定、监听、启动新线程处理请求

import logging
logging.basicConfig(level=logging.INFO,
                format='%(asctime)s [line:%(lineno)d] %(levelname)s %(message)s',
                datefmt='%M:%S',
                )
import sys
import socket
import thread

def proxyer(soc):
    request = soc.recv(4096)
    logging.info(request)
    
def start():
    address = ('',int(sys.argv[1]))
    s = socket.socket()
    s.bind(address)
    s.listen(1024)
    while True:
        soc, add = s.accept()
        thread.start_new_thread(proxyer, (soc,))
       
if __name__ == '__main__':
   start()

如上所示, 我们做到了: 来一个请求, 就启动一个线程. 在新的线程里, 用 proxyer() 接收请求并打印了出来.


这里有一个很重要的问题: 如何完整地接收请求.
数据在用 socket 传输的过程中, 可能分多次传送, 具体原理可以参考 ip 或 tcp 报文的分段传输. 因为单个数据包的大小有限制, 如果数据大小超过这个限制就要分多次发送, 调用一次 soc.recv(4096), 就不能保证完整接收请求了, 另外如果单个数据包大小超过4096, soc.recv(4096) 也收不到完整数据. 所以, 我们需要循环调用 soc.recv(4096) .

什么时候结束循环呢? 用 socket 接收 http 请求的数据, 最开始收到的肯定是请求头, 请求头以 \r\n\r\n 结束.
如果是 GET、HEAD、CONNECT 方法的请求,接收完请求头就表示已经完整接收请求.
所以对于 GET、HEAD、CONNECT 方法可以这样做:

def proxyer(soc):
    request = ''
    got_header = False
    headers = {}
    while True:
        buf = soc.recv(4096)
        request = request + buf
        if not got_header and '\r\n\r\n' in request:
            got_header = True
            break
        if not buf:
            break
    if not '\r\n\r\n' in request:
        logging.warning('request err, close this task')
        soc.close()
        return

但如果是 POST 方法, 在请求头接收完之后可能还有数据. http 协议在中, 有两种方式判断 POST 请求是否接收完整:

  1. 如果请求头中包含 Content-Length ,就用它来判断
  2. 如果请求头中包含 Transfer-Encoding , 就用它来判断

如果两种都不包含, 默认用第二种方式, 如果两种都包含, 用第二种方式.
Transfer-Encoding 在完整协议里有很多个可选值, 这里只当它是 chunked (实际上网时, 我只见过这一种). 它表示: 最后会发一个空的 socket 包来标记数据发送完毕.
实际中我也只见过第一种方式的 POST 请求. 不管怎样, 解析请求头是必须的, 解析的代码如下:

def parse_request_header(header):
    '''
    解析http请求头,
    成功,返回(host, port, method, uri, headers)
    失败,返回(None,None,None,None,None)
    '''
    lines = header.strip().split('\r\n')
    try:
        '''解析method和uri'''
        line0 = lines[0].split(' ')
        method = line0[0].upper()
        uri = line0[1]
    
        '''解析其他header'''
        headers = {}
        for i in range(1,len(lines)):
            line= lines[i].split(':')
            key = line.pop(0)
            value = ''.join(line)
            headers[key] = value.strip()

        '''处理目标主机和端口'''
        if method in ['CONNECT']:
            target_host_and_port = uri.split(':')
        else:
            target_host_and_port = headers['Host'].split(':')
        if len(target_host_and_port)==1:
            target_host = target_host_and_port[0]
            if method in ['CONNECT']: target_port = 443
            else: target_port = 80
        else:
            target_host = target_host_and_port[0]
            target_port = int(target_host_and_port[1].strip())
    except Exception, e: 
        logging.warning(str(type(e))+' '+str(e)+' err')
        return None,None,None,None,None
    return target_host, target_port, method, uri, headers

有了解析请求头的方法, 就能在接收请求的时候, 加上对 POST 方法的支持:

def proxyer(soc):
    '''接收http请求'''
    request = ''
    got_header = False
    headers = {}
    while True:
        buf = soc.recv(4096)
        request = request + buf
        if not got_header and '\r\n\r\n' in request:
            got_header = True
            request_header = request.split('\r\n\r\n')[0] + '\r\n\r\n'
            header_length = len(request_header)
            host, port, method, uri, headers = parse_request_header(request_header)
            if not host or not port or not method in ['HEAD','GET','POST','CONNECT']:
                logging.warning('parser request err or method not support ,close this task')
                soc.close()
                return
            if method in ['GET','HEAD','CONNECT']:
                break
        if got_header and method in ['POST']:
            if 'Content-Length' in headers:
                if int(headers['Content-Length']) <= len(request)-header_length:
                    break
            else:
                logging.warning('no Content-Length in POST request,close this task')
                soc.close()
                return
        if not buf:
            break
    if not '\r\n\r\n' in request:
        logging.warning('request err, close this task')
        soc.close()
        return

对于 http 请求, 还有最后一步, 按照协议规定: 浏览器发给代理的请求头, 与正常的请求头是不一样的, 所以我们还要对请求头做一些修改, 才能转给目的主机.
修改请求头的代码如下:

def proxyer(soc):
    '''接收http请求'''
    ...
    
    '''按协议要求,修改请求头'''
    if method in ['GET','POST','HEAD']:
        request_header = re.sub('Proxy-Connection: .+\r\n','',request_header)
        request_header = re.sub('Connection: .+','',request_header)
        request_header = re.sub('\r\n\r\n','\r\nConnection: close\r\n\r\n',request_header)
        request_header = re.sub(uri,uri[uri.index('/',8):],request_header)
        request = request_header+request[header_length:]

修改完了请求头, 就该用它去获取响应了, 但是 CONNECT 方法的请求比较特殊, 如果是 CONNECT 方法, 接下来要做的不是获取响应, 而是建立一条到目标主机的隧道. 所以我们准备好两个方法 do_proxy()do_tunnel() . 下一节中我们会分别实现这两个方法.

def proxyer(soc):
    '''接收http请求'''
    ...
    
    '''按协议要求,修改请求头'''
    ...
    
    '''获取目标主机的http响应, 并转发响应包'''
    if method in ['CONNECT']:
        do_tunnel(host, port, soc)
    else:
        do_proxy(host, port, method, uri, headers, request, soc)

得到响应并回传给请求方

上一节中, 我们接收完了 http 请求, 并解析、处理了它. 算是完成了最原始的实现原理中的1、2两步. 最后留下了两个方法 do_proxy()do_tunnel() . 实现这两个方法, 就算是完成了剩下的3、4两步.

先说下相对简短的 do_tunnel() . 要实现的是: 建立一条请求方到目标主机的隧道. 其实只要新建一个 socket 连到目标主机上, 然后把请求方的 socket 拿过来、对接上就OK了. 对接, 就是把一个 socket 收到的数据, 发给另一个 socket . 看代码:

def dock_socket(recv, send, recv_from_response=False):
    try:
        while True:
            buf = recv.recv(4096)
            send.send(buf)
            if not buf:
                break
    except Exception, e:
        recv.close()
        send.close()
        return
    if recv_from_response:
        recv.close()
        send.close()
    return

因为我们只做短连接, 所以如果数据方向是响应方发给请求方的, 就可以 close 掉 socket 了. 这就是 recv_from_response 的含义
基于 dock_socket() 方法, do_tunnel() 的实现如下:

TUNNEL_OK = '''HTTP/1.1 200 Connection Established\r\nProxy-Connection: close\r\n\r\n'''

def do_tunnel(host, port, soc):
    cos = socket.socket()
    try:
        cos.connect((host,port))
    except Exception, e:
        logging.warning('connect err'+host+':'+str(port))
        #soc.send(TUNNEL_FAIL)
        soc.close()
        return
    soc.send(TUNNEL_OK)
    thread.start_new_thread(dock_socket, (soc, cos, False))
    thread.start_new_thread(dock_socket, (cos, soc, True))

隧道建立成功, 按照 http 协议要求, 要给请求方一个响应, 即第12行(倒数第三行)的 soc.send(TUNNEL_OK) .
至此, 对 CONNECT 方法的 http 请求, 已经可以完整处理. 通过这个程序代理, 已经可以正常访问 https://www.baidu.com .


接下来我们说 do_proxy() 方法, 要实现的功能就是:

  • 连接目标主机
  • 发送请求
  • 接收响应
  • 转发请求

除了接收响应需要像接收请求时一样, 要注意如何完整接收报文之外. 并没有什么麻烦的地方.

既然是像接收请求一样就收响应, 还是要先解析响应报文头:

def parse_response_header(response_header):
    '''解析http响应报文头'''
    Transfer_Encoding = False
    Content_Length = 0
    status_code = 0
    lines = response_header.strip().split('\r\n')
    status_code = int(lines[0].split(' ')[1])
    
    headers = {}
    for i in range(1,len(lines)):
        line= lines[i].split(':')
        key = line.pop(0)
        value = ''.join(line)
        headers[key] = value.strip()
    
    return status_code, headers

然后完成 do_proxy() 方法:

def do_proxy(host, port, method, uri, request_headers, request, soc):
    '''获取目标主机的http应答, 并转发应答包'''      
    c = socket.socket()
    try:
        c.connect((host, port))
    except Exception, e:
        logging.warning(str(type(e))+' '+str(e)+' err')
        c.close()
        soc.send(str(type(e))+' '+str(e)+' err')
        soc.close()
        return
    try:   
        c.send(request)
        response = ''
        got_header = False
        headers = {}
        while True:
            buf = c.recv(4096)
            response = response + buf
            soc.send(buf)
            if not got_header and '\r\n\r\n' in response:
                got_header = True
                response_header = response.split('\r\n\r\n')[0] + '\r\n\r\n'
                #logging.debug(response)
                header_length = len(response_header)
                status_code, headers = parser_response_header(response_header)

            if got_header:
                '''没有内容,直接返回报文头就行'''
                if method in ['HEAD']:
                    break
                if method in ['GET', 'POST']:
                    if status_code in [204,301,302,303,304,307]:
                        break
                    '''正常的判断是否接收完响应的方式'''
                    if 'Transfer-Encoding' in headers:
                        if not buf:
                            logging.debug('not buf in tranfer-encoding')
                            break 
                    if 'Content-Length' in headers:
                        if int(headers['Content-Length']) <= len(response)-header_length:
                            break
                    if not 'Content-Length' in headers and not 'Transfer-Encoding' in headers and not buf:
                        logging.debug('not buf')
                        break 
            if not buf:
                logging.error('response not buf')
                break
    except Exception, e:
        logging.warning(str(type(e))+' '+str(e)+' err')
        c.close()
        soc.close()
        return
    c.close()
    soc.close()

其实, 去掉各种 try, 还有对响应完整性的判断, 上面这段代码也就剩下:

c = socket.socket()
c.connect((host, port))
buf = c.recv(4096)
soc.send(buf)

改善性能

协程

现在的程序还是多线程的模型,虽然功能已经实现。但是至少我自己在用的时候,明显感觉网速很慢。尤其是像腾讯、新浪这种门户网站的首页。一个页面的请求太他妈多了!!一下子就开了巨多的线程。线程相互之间切换的代价也是很大的,每个线程时间没做多少事,多数时间都在等待 IO。带宽没利用上多少cpu 就快耗尽了,还都是耗在切换线程上,然后网速还很慢。

有一种叫协程的东西。内部好象是用 select poll epoll 那三个实现的。具体实现还没花时间研究过(什么是epoll)。它的效果大概是这样的:在一个线程里,有多个 socket当一个 socket 在等待 IO 的时候,切换到其他的程序语句去执行。这个时候的切换,就相当于是函数调用的时候的切换,代价非常非常的小。用一个线程就能处理巨多的 socket ,充分利用 cpu 。

实际用起来的效果在代理运行在本地的时候浏览器设置了代理几乎感觉不到代理的存在。内存、cpu 等资源都占用得很少。部署到服务器上,带宽都耗尽了 cpu 资源都还剩余很多。

我用的 gevent 这个第三方模块来实现协程,这个模块好像一直还没支持 python3 。反正我是一直没找到它的 python3 版本。

时隔好久才来续写这篇文章,有点不想介绍 gevent 的使用了。这些可以直接看官方的文档。下面直接贴上代码,用 gevent 改写了两个地方。第一个是程序入口处:

import gevent
from gevent import socket
from gevent.server import StreamServer

if __name__ == '__main__':
    server = StreamServer(('', int(sys.argv[1])), proxyer)   
    server.start() 
    server.serve_forever() 

还有一个是 do_tunnel() 方法:

def do_tunnel(host, port, soc):
    c = socket.socket()
    try:
        c.connect((host,port))
    except Exception, e:
        logging.warning('connect err'+host+':'+str(port))
        #soc.send(TUNNEL_FAIL)
        soc.close()
        return
    soc.send(TUNNEL_OK)
    gevent.joinall([
        gevent.spawn(dock_socket, soc, c, False),
        gevent.spawn(dock_socket, c, soc, True),
    ])

在不考虑底层实现的情况下,可以把协程当成线程来用,他们提供的使用方式都是相近的。

多进程

在一台多核 CPU 的电脑上, 与 CPU 核心数相同的进程数多数情况下能带来较大的性能.
我们稍微修改一下程序的入口. 在为这个程序配置了监听的端口号之后, fork 出与 CPU 核心数相同个数的进程, 然后开始监听.

from multiprocessing import cpu_count

if __name__ == '__main__':
    server = StreamServer(('', int(sys.argv[1])), proxyer)   
    if cpu_count()>1: server.max_accept  = 1
    server.start() 
    pid_list = []
    for i in range(cpu_count()):
        pid = os.fork()
        if pid == 0: 
            del pid_list
            server.serve_forever() 
        else:
            pid_list.append(str(pid))
    with open(sys.path[0]+'/pid','w') as f:
        f.write('|'.join(pid_list))
    print('start processes: '+'|'.join(pid_list)+' ok')

这样很容易想到一个问题, 当一个请求来的时候, 这几个进程都会接收这个请求并处理? 其实这叫做惊群现象, 网上有资料说: 在系统的内核层面会处理这个问题, 内核会负责分配这些请求到各个进程. 程序实际运行过程中, 来的每个请求确实也只被分配到一个进程中处理.

  • 文章目录 {:toc}