Skip to content

werkzeug源码分析 By 泽东

小傅哥 edited this page May 28, 2020 · 1 revision

转载请注明出处即可
源码地址github werkzeug
主要参考文档为werkzeug
环境为MacOS, Python 3.7+, IDE Pycharm

注意:文章中的源码存在删减,主要是为了减少篇幅和去除非核心逻辑,但不会影响对执行流程的理解。

一、WSGI简介

WSGI是类似于Servlet规范的一个通用的接口规范。和Servlet类似,只要编写的程序符合WSGI规范,就可以在支持WSGI规范的Web服务器中运行,就像符合Servlet规范的应用可以在Tomcat和Jetty中运行一样。 一个最小的Hello World的WSGI程序如下。

from wsgiref import simple_server


def application(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return [b'Hello World!']


http_server = simple_server.make_server('0.0.0.0', 5000, application)
http_server.serve_forever()

注意如果访问后报了500, 错误为write() argument must be a bytes instance,需要注意return时, 不要直接返回字符串,需要返回bytes。

可以看到wsgi程序的定义只需要实现一个application即可。很简单的3行代码就实现了对http请求的处理。其中enviorn参数是一个dict,包含了系统的环境变量和HTTP请求的相关参数。 enviorn中的系统环境变量

enviorn中的Http请求参数

关于start_response,我们现在这里复习下Http协议的内容 Http Request需要包含以下部分

  • 请求方法 --- 统一资源标识符(Uniform Resource Identifier, URI) --- 协议/版本
  • 请求头(Header)
  • 实体(Body)

具体示例为:

POST /examples/default HTTP/1.1
Accept: text/plain; text/hteml
Accept-Language: en-gb
Connection: Keep-Alive
Host: locahost
User-Agent: Mozilla/4.0 (compatible; MSIE 4.0.1; Windoes 98)
Content-Length: 33
Content-Type application/x-www-form-urlencoded
Accept-Encoding: gzip, deflate

lastName=Franks&firstName=Michael

其中body上面的空行为CRLF(\r\n), 对协议很重要,决定着request body从哪里开始解析。

Http Response需要包含以下部分

  • 协议 --- 状态码 --- 描述
  • 响应头(header)
  • 响应实体(body)

具体示例为:

HTTP/1.1 200 OK
Server: Microsoft-IIS/4.0
Content-Type: text/plain
Content-Length: 12

Hello world!

那么现在再来看start_response函数, 第一个参数在写着状态码描述。第二个参数是一个列表,写着response header。而application的返回值则代表着response body。

二、Werkzeug的Demo

了解了WSGI,我们再看下如何使用Werkzeug来写Hello World。

from wsgiref import simple_server

from werkzeug.wrappers import Response


def application(environ, start_response):
    response = Response('Hello World!', mimetype='text/plain')
    return response(environ, start_response)


http_server = simple_server.make_server('0.0.0.0', 5000, application)
http_server.serve_forever()
from wsgiref import simple_server

from werkzeug.wrappers import Request, Response


def application(environ, start_response):
    request = Request(environ)
    text = 'Hello %s!' % request.args.get('name', 'World')
    response = Response(text, mimetype='text/plain')
    return response(environ, start_response)


http_server = simple_server.make_server('0.0.0.0', 5000, application)
http_server.serve_forever()

在这里可以看到Werkzeug的作用,如果自己手写WSGI的程序的话,需要自己解析environ,以及自己处理返回值。而使用了Werkzeug就可以通过该库所提供的Request和Response来简化开发。正如官网的介绍Werkzeug is a utility library for WSGI

在这篇文章中主要分析Werkzeug是如何实现相关的工具,进而简化WSGI程序的开发的。了解 Werkzeug也为后续理解Flask打下了坚实的基础。

三、Werkzeug提供的工具

(1) Request和Response对象,方便处理请求和响应 (2) Map、Rule以及MapAdapter,方便处理请求路由 (3) WSGI Helper, 比如一些编解码的处理,以及一些方便对stream的处理等。 (4) Context Locals提供了Local,类似于Java的ThreadLocal (5) Http Exception用于处理相关的异常,比如404等。 (6) http.py中还提供了很多的http code和header的定义 除了这些工具还有很多,具体可以查看下官网。

在这篇文章中重点来解析Request和Response以及路由相关的源码。

四、wrappers分析

在Werkzeug并没有多少的包, wrappers是其中之一。 wrappers包

我们先从request = Request(environ)这行代码入手。分析Request。 注意,下面的复制粘贴的源码会删除掉与主流程不太相关的代码。方便理解核心流程。

(1) class Request分析

首先,其实不用多说也知道Request无非是解析了environdict而已。 Request继承了很多类,可以看到存在着Accept、ETAG、CORS等相关Header的解析

class Request(
    BaseRequest, 
    AcceptMixin, 
    ETagRequestMixin, 
    UserAgentMixin,
    AuthorizationMixin,
    CORSRequestMixin,
    CommonRequestDescriptorsMixin,
):

BaseRequest的构造方法为

    def __init__(self, environ, populate_request=True, shallow=False):
        self.environ = environ
        if populate_request and not shallow:
            self.environ["werkzeug.request"] = self
        self.shallow = shallow

因为Request的方法和属性众多,这里找几个比较常见的来分析下实现。

1. request.query_string和request.method

query_string = environ_property(
        "QUERY_STRING",
        "",
        read_only=True,
        load_func=lambda x: x.encode("latin1"),
        doc="The URL parameters as raw bytes.",

environ_property是一个类,实现了一个lookup方法,这个obj其实传的就是Request,其实lookup的调用其实就是获取了environ dict。

class environ_property(_DictAccessorProperty):
   read_only = True

   def lookup(self, obj):
       return obj.environ

environ_property继承了_DictAccessorProperty其中的__get__方法实现为

def __get__(self, obj, type=None):
    if obj is None:
        return self
    storage = self.lookup(obj)
    if self.name not in storage:
        return self.default
    rv = storage[self.name]
    if self.load_func is not None:
        try:
            rv = self.load_func(rv)
        except (ValueError, TypeError):
            rv = self.default
    return rv

可以看到先通过lookup方法获取了environ dict,也就是stroage变量,然后在获取了rv。也就是environdict里面的key='QUERY_STRING'的value。 其实获取method(GET, POST)也是一样的实现

method = environ_property(
        "REQUEST_METHOD",
        "GET",
        read_only=True,
        load_func=lambda x: x.upper(),
        doc="The request method. (For example ``'GET'`` or ``'POST'``).",
    )

2. request.data

这个是获取Request Body, 在environ dict中,通过wsgi.input来获取的BufferedReader类来读取body中的数据。 wsgi.input

在Werkzeug中的实现也是类似的,具体源码如下。

@cached_property
def data(self):
    return self.get_data(parse_form_data=True)

def get_data(self, cache=True, as_text=False, parse_form_data=False):
    rv = getattr(self, "_cached_data", None)
    if rv is None:
        if parse_form_data:
            self._load_form_data()
        rv = self.stream.read()
        if cache:
            self._cached_data = rv
    if as_text:
        rv = rv.decode(self.charset, self.encoding_errors)
    return rv

主要分析下self.stream.read()这行

@cached_property
def stream(self):
    return get_input_stream(self.environ)

def get_input_stream(environ, safe_fallback=True):
    stream = environ["wsgi.input"]
    content_length = get_content_length(environ)

    if environ.get("wsgi.input_terminated"):
        return stream

    if content_length is None:
        return io.BytesIO() if safe_fallback else stream

    return LimitedStream(stream, content_length)

简单来说就是获取wsgi.inputBufferedReader对象,然后判断下是否存在content_length(http request header里面正常情况下都会有),创建LimitedStream类,最多只读取content_length长度的内容。 如果content_length不存在的话,则判断了是否设置了safe_fallback=True,会返回空的BytesIO对象,默认是True。

3. request.args

这里的实现就不详细解释了,无非就是获取QUERY_STRING,然后通过&进行分割,然后在用=切个,前面的作为key, 后面的作为value而已。需要注意的是这里用了MultiDict,目的是为了同一个键的存储多个值。

def url_decode(
    s,
    charset="utf-8",
    decode_keys=None,
    include_empty=True,
    errors="replace",
    separator="&",
    cls=None,
):
    if cls is None:
        from .datastructures import MultiDict

        cls = MultiDict
    if isinstance(s, str) and not isinstance(separator, str):
        separator = separator.decode(charset or "ascii")
    elif isinstance(s, bytes) and not isinstance(separator, bytes):
        separator = separator.encode(charset or "ascii")
    return cls(_url_decode_impl(s.split(separator), charset, include_empty, errors))

4. request.path

path获取的是environ中的PATH_INFO,然后最后一行处理了这种情况,比如http://localhost:5000//default,如果多写了`/`,在这里会比换成单个`/`>

def path(self):
    raw_path = _wsgi_decoding_dance(
        self.environ.get("PATH_INFO") or "", self.charset, self.encoding_errors
    )
    return "/" + raw_path.lstrip("/")

(2) class Response分析

Response类的核心功能有两个,一个是通过一定的封装构造返回值,另一个是返回一个符合WSGI规范的函数。具体的实现比较简单不在详述。

# Response的init函数
def __init__(
    self,
    response=None,
    status=None,
    headers=None,
    mimetype=None,
    content_type=None,
    direct_passthrough=False,
)

# Response的call函数
def __call__(self, environ, start_response):
    app_iter, status, headers = self.get_wsgi_response(environ)
    start_response(status, headers)
    return app_iter

五、Map、Rule和MapAdapter

以一个Demo为例, 看下这三个类的使用。

from wsgiref import simple_server

from werkzeug.routing import Map, Rule, HTTPException
from werkzeug.wrappers import Response, Request

url_map = Map([
    Rule('/test1', endpoint='test1'),
    Rule('/test2', endpoint='test2'),
])


def test1(request, **args):
    return Response('test1')


def test2(request, **args):
    return Response('test2')


views = {'test1': test1, 'test2': test2}


def application(environ, start_response):
    request = Request(environ)
    try:
        return url_map.bind_to_environ(environ) \
            .dispatch(
            lambda endpoint, args: views[endpoint](request, **args)
        )(environ, start_response)
    except HTTPException as e:
        return e(environ, start_response)


http_server = simple_server.make_server('0.0.0.0', 5000, application)
http_server.serve_forever()

其中每个Rule都代表着一个URL匹配模式,并且第一个参数string是可以放<converter(arguments):name>,比如/all/page/<int:page>。endpoint可以放字符串,函数等等,代表着如果匹配到相应的路径,则返回endpoint的值。因为大部分应用至少会有1个接口,所以Rule的存在意义是可以定义一个path到具体的处理函数(或者用字符串表示函数)的一个映射,简化了多接口的开发。 Map可以存放多个Rule,并且在调用bind_to_environ 函数后,返回一个MapAdapter对象,然后通过MapAdapterdispatch方法来获取匹配的Rule,但这里并没有把Rule对象返回,而是返回了endpointargs,那么通过endpoint就可以获取具体的执行函数(或者endpoint本身就是一个执行函数),最后执行到具体的执行函数中, 在返回Response

如果这么这段问题看得比较蒙圈,没关系,现在就来解释下具体的请求流程是怎么处理的,看下bind_to_environ dispatch 两个函数具体的执行逻辑。 根据上面的Demo代码,接到请求后,首先通过bind_to_environ函数获取了MapAdapter

def bind_to_environ(self, environ, server_name=None, subdomain=None):
    environ = _get_environ(environ)
    wsgi_server_name = get_host(environ).lower()
    scheme = environ["wsgi.url_scheme"]

    # 存在删减

    def _get_wsgi_string(name):
        val = environ.get(name)
        if val is not None:
            return _wsgi_decoding_dance(val, self.charset)

    script_name = _get_wsgi_string("SCRIPT_NAME")
    path_info = _get_wsgi_string("PATH_INFO")
    query_args = _get_wsgi_string("QUERY_STRING")
    return Map.bind(
        self,
        server_name,
        script_name,
        subdomain,
        scheme,
        environ["REQUEST_METHOD"],
        path_info,
        query_args=query_args,
    )

主体逻辑可以理解为是通过environ获取了部分参数,然后在调用bind方法。bind方法,最后其实就是通过这些参数创建了MapAdapter对象

def bind(
    self,
    server_name,
    script_name=None,
    subdomain=None,
    url_scheme="http",
    default_method="GET",
    path_info=None,
    query_args=None,
):
    # 存在删减
    return MapAdapter(
        self,
        server_name,
        script_name,
        subdomain,
        url_scheme,
        path_info,
        default_method,
        query_args,
    )

然后在来看下dispatch函数

def dispatch(
    self, view_func, path_info=None, method=None, catch_http_exceptions=False
):
    try:
        try:
            endpoint, args = self.match(path_info, method)
        except RequestRedirect as e:
            return e
        return view_func(endpoint, args)
    except HTTPException as e:
        if catch_http_exceptions:
            return e
        raise

dispatch函数很简单,上面的逻辑

return url_map.bind_to_environ(environ) \
            .dispatch(
            lambda endpoint, args: views[endpoint](request, **args)
        )(environ, start_response)

可以改写为

endpoint, args = url_map.bind_to_environ(environ).match()
return views[endpoint](request, **args)(environ, start_response)

dispatch只是用view_func接了下寻找具体的执行函数的过程而已。然后重点看下match方法。 去掉了websocket和redirect的逻辑后,代码如下。

def match(
    self,
    path_info=None,
    method=None,
    return_rule=False,
    query_args=None,
    websocket=None,
):

    for rule in self.map._rules:
        try:
            rv = rule.match(path, method)
        except:
            pass
        if rv is None:
            continue
        if rule.methods is not None and method not in rule.methods:
            have_match_for.update(rule.methods)
            continue

        if return_rule:
            return rule, rv
        else:
            return rule.endpoint, rv


    raise NotFound()

其实可以看到,对于path到Rule的匹配是通过for循环来进行的。rule.match用来判断path和Rule是否匹配,然后在判断对应的methods是否匹配,如果是匹配的则终止循环,返回了endpoint。 笔者曾经在flask上扩展了一个根据版本号的路由@app.route('/main.json', version=['<=1.3'])类似于这样。支持了以下几种版本号的定义。

R: 1.6
R0: 1.6-1.9
R1: =1.6
R2: > 1.6
R3: < 1.6
R4: >=1.6
R5: <=1.6

所做的更改就是在match方法这里进行的处理,具体的逻辑写在了判断methods之后。

if rule.methods is not None and method not in rule.methods:
    have_match_for.update(rule.methods)
    continue

# determine version
version = get_version(self.request)
if self.request and version:
    if not isinstance(rule.version, list) or not rule.version:
        rule.version = list()

    version_list = self.version_dict.get(rule.rule)

    if len(rule.version) == 0 \
            and version_list is not None \
            and determine_version(version, version_list):
        continue
    elif len(rule.version) != 0 and not determine_version(version, rule.version):
        continue

最后在说下rule.match(path)方法,是通过正则判断是否匹配来判断path是否和Rule匹配的。 routing.py源码第855行。

self._regex = re.compile(regex)

routing.py源码第871行的Rule.match方法

m = self._regex.search(path)

六、结束语

本文主要分析了Werkzeug部分核心源码,下篇文章打算分析下Flask是如何用Werkzeug提供的工具来构造了一个优秀的框架。

参考

https://werkzeug.palletsprojects.com/en/1.0.x/# 《深入剖析Tomcat》

📝 首页

🌏 知识星球码农会锁

实战项目:「DDD+RPC分布式抽奖系统」、专属小册、问题解答、简历指导、架构图稿、视频课程

🐲 头条

⛳ 目录

  1. 源码 - :octocat: 公众号:bugstack虫洞栈 文章所涉及到的全部开源代码
  2. Java
  3. Spring
  4. 面向对象
  5. 中间件
  6. Netty 4.x
  7. 字节码编程
  8. 💯实战项目
  9. 部署 Dev-Ops
  10. 📚PDF 下载
  11. 关于

💋 精选

🐾 友链

建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。

作者小傅哥
链接https://bugstack.cn
来源bugstack虫洞栈

2021年10月24日,小傅哥 的文章全部开源到代码库 CodeGuide 中,与同好同行,一起进步,共同维护。

这里我提供 3 种方式:

  1. 提出 Issue :在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交 PR )。
  2. 处理 Issue : 帮忙处理一些待处理的 Issue
  3. 提交 PR: 对于错别字/笔误这类问题可以直接提交PR,无需提交Issue 确认。

详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹

  • 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
微信:fustack

  • 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
公众号:bugstack虫洞栈

感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。

Clone this wiki locally