本文共 41085 字,大约阅读时间需要 136 分钟。
如果觉得这排版不好,建议去看
Tornado是一个python语言的web服务框架,它是基于社交聚合网站FriendFeed的实时信息服务开发而来的。
2007年由4名Google前软件工程师一起创办了FriendFeed,旨在使用户能够方便地跟踪好友在Facebook和Twitter等多个社交网站上的活动。结果两年后,Facebook宣布收购FriendFeed
Tornado使FriendFeed使用的可扩展的非阻塞Web服务器及其相关工具的开源版本,这个Web框架看起来有些像web.py或 Google的webapp,不过为了更加有效地利用非阻塞服务器环境,Tornado这个Web框架还包含了一些相关的有用工具和优化。
与其他web框架一些区别
速度相当快,使用了epoll非阻塞的方式,每秒可以处理数以千计的连接。
特点
单进程单线程异步IO的网络模式, epoll的异步网络IO
WSGI把应用(Application)和服务器(Server)结合起来,Tornado既可以是WSGI应用也可以是WSGI服务。
既是WebServer也是WebFramework
常用tornado参考doc
可以先跳过架构这块介绍,到安装使用。(使用过了tornado,再来看整体架构就清晰容易多了)
Tornado不仅仅是一个Web框架,它完整地实现了HTTP服务器和客户端,再此基础上提供了Web服务,它可分为四层:
注意颜色分层
Tornado的HTTPConnection类用来处理HTTP请求,包括读取HTTP请求头、读取POST传递的数据,调用用户自定义的处理方法,以及把响应数据写给客户端的socket。
为了在处理请求时实现对socket的异步读写,Tornado实现了IOStream类用来处理socket的异步读写。
Tornado为了实现高并发和高性能,使用了一个IOLoop事件循环来处理socket的读写事件,IOLoop事件循环是基于Linux的epoll模型,可以高效地响应网络事件,这是Tornado高效的基础保证。
Tornado是一个轻量级框架,它的模块不多最重要的模块是web,web模块包含了Tornado大部分主要功能的Web框架,其他模块都是工具性质的,以便让Web模块更加有用。
Core Web Framework 核心Web框架
Asynchronous Networking 异步网络底层模块
Integration With Other Services 系统集成服务
Utilities 应用模块
注意请求和返回的流程(颜色区别,以IO Loop为起点)
注意版本Tornado和python的版本:
python环境可以分为本地环境、虚拟环境、远程环境
本机安装个python,然后配置下环境变量。
借用一些软件(比如:Anaconda、Virtualenv/Virtualenvwrapper),安装虚拟环境,可以达到隔离效果,多个python在一台机器上也不会冲突
非本机,一台远程机器上python环境
我使用的是Anaconda,如何使用和安装可以参照这篇blog。
至于virtualenv和本机安装环境,自行用搜索引擎解决下
主流的python开发工具
我使用的是pycharm,如何安装也不多介绍了。里面有下载地址:
pip如何使用,如何切换国内网,也都看这篇blog吧。
# 使用anaconda创建一个tornado虚拟环境conda create --name iworkh-tornado python=3.7# 查看有哪些环境conda info -e# 切换到tornado虚拟环境activate iworkh-tornado# pip安装tornadopip install tornado# 或使用condo安装tornadoconda install tornado
pycharm创建项目
因为前异步使用anaconda创建一个tornado虚拟环境,所以就使用已存在的环境了。
当然如果前面没有自己创建虚拟环境,也可以使用pycharm里的
new environment using XXX
(选中虚拟化工具),来创建新的。
至此,tornado的开发环境基本ok了,如果有问题,可在留言区留言,或者QQ联系我。
提到python web框架,那么就想到了Django和Flask,那么他们有啥区别呢?
可以读下这几篇blog
总结以几点下:
django比较重量级;flask和tornado都比较轻量级
django大而全的框架,效率高;
django和flask同步,能力查;tornado异步框架,性能相对好
tornado适合用于开发长连接多的web应用。比如股票信息推送、网络聊天等。
Tornado与Flask旗鼓相当;django慢
tornado即使web框架,也是web服务;flask和tornado知识web框架,需要借助web服务器来部署
coroutine是一个gen方式历史过滤方案,在python3.x后,已经使用了asnyc
我们都知道cpu的处理能力要远远大于IO的处理能力,而IO是阻塞操作,即io阻塞了,cpu即在休息,在虚度时间,在cpu资源,(浪费是可耻的,我们要利用最大化)。
当然不只本地磁盘IO,我们访问网络,常使用的requests
和urllib
的库都是同步的。
io模型主要分为同步/异步,阻塞/非阻塞
同步和异步:是相对于调用方而言的
阻塞和非阻塞: 是相对于被调用方而言的
譬如:用户程序(用户空间,A)调用了文件系统内容(内核空间,B),假设B操作比较耗时要10秒钟。
对于A而言,是立刻得到B的返回结果呢,还是一直等着。等B处理完能够通知A,还是A要一直在等着?(同步/异步)
对于B而言,接收到A的请求了,处理完后,如何能够通知到A,不用限制着A,要一直等着?(阻塞/非阻塞)
阻塞
安装下requests工具表
pip install requests
代码
import requestsblogHtml = requests.get("https://iworkh.gitee.io/blog/").content.decode("utf8")print(blogHtml)
requests.get
是阻塞,如果网不好或服务器响应很慢的话,那阻塞这回很耗时
非阻塞
import socket# AF_INET:服务器之间网络通信,SOCK_STREAM 流式socketclient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置非阻塞client.setblocking(False)# 传个tupleaddress = "www.baidu.com"try: client.connect(("www.baidu.com", 80))except BlockingIOError as e: print("连接中...干些其他事") passwhile True: try: client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", address).encode("utf8")) print("连接成功") break except OSError as e: pass# 定义bytesdata = b""while 1: try: rec = client.recv(512) if rec: data += rec else: print(data.decode("utf8")) break except BlockingIOError as e: pass
setblocking
设置为非阻塞的,然后后面一直使用while循环来处理,直到正常返回结果。(比太关注死循环的写法,而要关注于非阻塞了)
IO多路复用:一个进程监控多个描述符fds,当描述符状态为read、write等状态时,通知程序进行相应的操作。
select、poll和epoll都是多路复用机制。它们本质上也是同步IO,它们在读写过程是阻塞的。
而异步IO是非阻塞,由内核将数据从内核空间拷贝到用户空间
像了解其原理德胡啊,强力建议去看篇blog
select函数监控3类文件描述符: readfds,writefds,exceptfds。
window和linux都支持,但是单进监控的文件描述符有限,linux一般为1024。poll使用一个pollfd指针实现。pollfd监控even事件,虽们没有最大限制,但是太多也影响性能。需要遍历所有的描述符。
某时刻,文件描述符,处于就绪状态很少。但是却要遍历所有描述符,因此当文件描述符很多时,性能会下降。
epoll对select和poll增强。是在2.6内核中提出的。没有最大文件描述符的限制。
epoll使用一个文件描述符区管理多个描述符,将用户的文件描述符event存在内核时间表中,这样用户空间和内核只要copy一次。
由于唤醒的进程,不知道哪些是就绪状态,所以可以将就绪状态的socket存下以便查找,不用都所有都遍历一遍。
上面异步代码,都是while和try来处理,这我们修改下使用select方式进行改造
import socketfrom selectors import DefaultSelector, EVENT_WRITE, EVENT_READselector = DefaultSelector()class CallBackRequestSite: def get_url(self, url): self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False) self.host = url self.data = b"" try: self.client.connect((self.host, 80)) except BlockingIOError as e: print("连接中...干些其他事") pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) def connected(self, key): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format("/", self.host).encode("utf8")) selector.register(self.client.fileno(), EVENT_READ, self.read_data) def read_data(self, key): recv_data = self.client.recv(1024) if recv_data: self.data += recv_data else: selector.unregister(key.fd) print(self.data.decode("utf8"))def loop(): while 1: # 根据系统是否支持,使用epoll还是select,优先epoll。默认阻塞,有活动连接就返回活动的连接列表 ready = selector.select() for key, mask in ready: callback = key.data callback(key)if __name__ == '__main__': demo = CallBackRequestSite() demo.get_url("www.baidu.com") loop()
上面方式是通过回调函数的方式实现的异步操作,虽然异步了,但是回调方式的缺点也很明显
回调方式的缺点
协程(Coroutines)是一种比线程更加轻量级的存在,正如一个进程可以拥有多个线程一样,一个线程可以拥有多个协程。
进程和线程是由系统控制,而协程是由程序自己控制的。好处是性能大幅度的提升,因为不会像线程切换那样消耗资源。
一个进程可以包含多个线程,一个线程也可以包含多个协程。一个线程的多个协程的运行是串行的。当一个协程运行时,其它协程必须挂起。
多核CPU,多个进程或一个进程内的多个线程是可以并行运行的,但一个线程内协程却绝对是串行的,无论CPU有多少个核。
如对进程、线程、协程不太清除的阅读这篇文件
协程的一个重要基础就是:程序运行一个协程能够暂停,去运行另外的协程。(如何暂停?才是关键)
yield就是可以满足程序(函数/方法)执行时,暂停。
def yield_method(): print("do start...") yield 1 res2 = yield 2 print("do c...res2:{}".format(res2)) yield 3
这是最简单的函数中使用yield的例子,resp = yield_method()
是一个generator
生成器。
next(resp)
: 可以使得程序从一个yield到下一个yield的执行。(当到最后一个yield时,再执行next会报错)send
: 可以发送参数给之前yield,并执行到下一个yield不懂的可以阅读下这篇文章
def yield_method(): print("do start...") yield 1 print("do a...") res1 = yield 2 print("do b...res1: {}".format(res1)) res2 = yield 3 print("do c...res2:{}".format(res2)) yield 4def call_hand(): resp = yield_method() print(next(resp)) # 运行到 yield 1处,返回 1 print(resp.__next__()) # 从yield 1开始运行,到 yield 2处,返回 2 print(resp.send("got it")) # 从yield 2开始运行,并把send值给yield 2的变量,到 yield 3处,返回 3 print(next(resp)) # 从yield 3开始运行,到 yield 4处,返回 4def call_for(): for item in yield_method(): print(item)if __name__ == '__main__': print(yield_method()) print("*" * 30) call_hand() print("*" * 30) call_for()
在tornado中,定义协程代码有多种方式
直接上代码,里面有注释,具体自己悟
import asynciofrom asyncio import coroutine as pycoroutinefrom time import sleep, timefrom tornado.gen import coroutine as tcoroutineasync def async_method(): await asyncio.sleep(2) print("do a...") await await_sleep_one_min() print("do b...python的async和await") await t_yield_sleep_one_min() print("do c...tornado装饰器") await py_yield_from_sleep_one_min() print("do d...python装饰器") await asyc_sleep_one_min() print("do e...")# 不推荐使用, tornado装饰器方式@tcoroutinedef t_yield_sleep_one_min(): print("等了又等--1") yield sleep(1) print("等了又等--2") yield sleep(1) print("等了又等--3") yield sleep(1)# 不推荐使用,python装饰器方式,python的旧语法@pycoroutinedef py_yield_from_sleep_one_min(): yield from asyncio.sleep(1)# 推荐使用(async与await组合,因为asyncio.sleep也是个异步代码,需要用await)async def await_sleep_one_min(): await asyncio.sleep(1)# 推荐使用(async单独使用,因为sleep不是个异步方法,不需要用await)async def asyc_sleep_one_min(): print('asyc_sleep_one_min----waiting') sleep(1)if __name__ == '__main__': start = time() asyncio.run(async_method()) end = time() print('cost time = ' + str(end - start))
简单总结下:
- 协程定义可以用装饰器或者async修饰(推荐async),后面只说async形式了。
- 协程A调用其他协程B(A和B函数都用async),协程A里调用协程B的方法加使用
await
关键字,当然普通函数(非协程)不需要加await的
执行协程方法,也有多种:tornado的IOLoop和python的asyncio
python方式
方式一:一直run
import asyncioif __name__ == '__main__': start = time() # 一直run asyncio.ensure_future(async_method()) asyncio.get_event_loop().run_forever() end = time() print('cost time = ' + str(end - start))
方式二:run完就结束
asyncio.run(async_method())
方式三:run完就结束
asyncio.get_event_loop().run_until_complete(async_method())
tornado方式
# 通过tornado的IOLoop来run (执行某个协程后停止事件循环)IOLoop.current().run_sync(async_method)
同步方式HTTPClient
from tornado import httpclientdef get_url(url): http_client = httpclient.HTTPClient() resp = http_client.fetch(url) print(resp.body.decode("utf8"))if __name__ == '__main__': web_site = "https://iworkh.gitee.io/blog/" get_url(web_site)
异步方式AsnycHttpClient
import asynciofrom tornado import httpclientasync def async_get_url(url): http_client = httpclient.AsyncHTTPClient() resp = await http_client.fetch(url) print(resp.body.decode("utf8"))if __name__ == '__main__': web_site = "https://iworkh.gitee.io/blog/" asyncio.run(async_get_url(web_site))
tornado.web
提供了一种带有异步功能并允许它扩展到大量开放连接的简单的web框架, 使其成为处理长连接(long polling) 的一种理想选择.
import timefrom tornado import webfrom tornado.ioloop import IOLoopclass HelloWorldHandlerOne(web.RequestHandler): async def get(self, *args, **kwargs): # 别在handler中写同步的代码,会被阻塞的 time.sleep(5) self.write("hello world---one")class HelloWorldHandlerTwo(web.RequestHandler): def get(self, *args, **kwargs): self.write("hello world---two")url_map = [ ("/test1", HelloWorldHandlerOne), ("/test2", HelloWorldHandlerTwo)]if __name__ == '__main__': app = web.Application(url_map, debug=True) app.listen(8888) IOLoop.current().start()
几点说明
web.RequestHandler
,然后重写rest一些接口(get,post,put,delete等)web.Application
,传了参数url和handler对应关系。debug参数即便请了不同的接口(test1和test2),由于test1的handler中
time.sleep(5)
,先请求test1,后请求test2,test2也会等test1响应完,才会响应test2(单线程的)。
debug为true
Run
启动:后台启动。如果关闭需要控制面板后台关闭Debug
启动:修改内容,会自动停了。得再次debug启动ctr+c
可以退出测试url
url匹配主要是对于python的表达式的使用,如不了解可以先去了解下
from tornado import web, ioloopclass IndexHandler(web.RequestHandler): async def get(self, *args, **kwargs): self.write("首页")class UserHandler(web.RequestHandler): async def get(self, name, *args, **kwargs): self.write("hello world, {}".format(name))class ProductHandler(web.RequestHandler): def get(self, name, count, *args, **kwargs): self.write("购买{}个{}".format(count, name))class DivHandler(web.RequestHandler): # one,two这个要跟url里的(?P\d+)/(?P \d+)里的名称匹配 def get(self, one, two, *args, **kwargs): try: result = int(one) / int(two) self.write("{}/{}={}".format(one, two, result)) except ZeroDivisionError: # 发生错误,会跳转 self.redirect(self.reverse_url('error_page', 500)) passclass ErrorHandler(web.RequestHandler): def get(self, code, *args, **kwargs): self.write("发生错误:error_code: {}".format(code))url_map = [ ("/user/(\w+)/?", UserHandler), ("/product/(\w+)/(\d+)/?", ProductHandler), ("/calc/div/(?P \d+)/(?P \d+)/?", DivHandler), web.URLSpec("/index/?", IndexHandler, name="index"), web.URLSpec("/error/(?P \d+)/?", ErrorHandler, name="error_page")]if __name__ == '__main__': app = web.Application(url_map, debug=True) app.listen(8888) print('started...') ioloop.IOLoop.current().start()
几点说明
/?
表示0或1个/
,即url最后可以有/
也可以没有/
web.URLSpec
来创建对象,可以指定一些参数(url,handler,kwargs,name)(?P<xxx>\d+)
,表示取一个组名,这个组名必须是唯一的,不重复的,没有特殊符号,然后跟参数里名称要一样redirect
重定向,reverse_url
根据名称类获取url测试url
首页
hello world, iworkh
购买3个apple
4/2=2.0
发生错误:error_code: 500
发生错误:error_code: 404
动态传参:将一些配置参数通过命令行、配置文件方式动态传给程序,而不是代码写死。增加灵活性。
官网关于options的介绍
动态传参主要使用的tornado.options
某块,使用步骤:
options.define()
options.parse_command_line()
,options.parse_config_file("/server.conf")
options.xxx
直接上代码
from tornado import options, web, ioloopoptions.define("port", default="8888", type=int, help="端口号")options.define("static_path", default="static", type=str, help="静态资源路径")class IndexHandler(web.RequestHandler): async def get(self): await self.finish('welcome to iworkh !!!')url_map = [ ("/?", IndexHandler)]setting = { 'debug': True}if __name__ == '__main__': # 命令行方式 options.parse_command_line() app = web.Application(url_map, **setting) app.listen(options.options.port) ioloop.IOLoop.current().start()
这注意: url_map是个list,而setting是个dic,去
web.Application
的初始化方法中就可以看到需要的参数类型
启动命令
python option_demo.py --port=8080
通过
--port
指定启动端口
测试url
将上面启动main代码修改为以下:
if __name__ == '__main__': # 配置文件方式 options.parse_config_file('config.ini') app = web.Application(url_map, **setting) app.listen(options.options.port) ioloop.IOLoop.current().start()
配置文件内容
port = 8889
就加了端口号
测试url
因为后面内容,请求的方式不仅仅是get,还有post等,所以使用Postman
工具来请求
Postman很简单,自行下载使用下,这就不多介绍了
handler里内容和细节相当得多,具体可以参照官网
我们主要从以下几个方面入手
常用的函数
`get_argument`,`get_arguments`: 从post的form中解析参数,如果form中没有,那会从url中解析`get_query_argument`,`get_query_arguments`: 从url中解析参数`get_body_argument`,`get_body_arguments`: json格式数据解析`request``path_args``path_kwargs`
注意函数名有s和没有s的区别:
- 有s表示接受的是一个数组,即使一个数据,也会转为数组形式.
- 没有s的,但是传了多个值(比如:http://localshot:8888/user?name=lilei&&name=wangwu),最后一个会生效(name=wangwu)
结论:
方法 | 传参 |
---|---|
get_query_argument(s) | 参数在url中 |
get_argument(s) | 参数在form中,没有头信息,form中没有还可以从url中获取 |
get_body_argument(s) | 参数在form中,带有头信息 |
request.body | 参数是json |
演示代码
import jsonfrom tornado import web, ioloopclass UrlParamHandler(web.RequestHandler): async def get(self): name = self.get_query_argument("name") age = self.get_query_argument("age") self.write("name: {}, age: {}".format(name, age)) self.write('') names = self.get_query_arguments("name") ages = self.get_query_arguments("age") self.write("names: {}, ages: {}".format(names, ages))class FormParamHandler(web.RequestHandler): async def post(self): name = self.get_argument("name") age = self.get_argument("age") self.write("name: {}, age: {}".format(name, age)) self.write('') names = self.get_arguments("name") ages = self.get_arguments("age") self.write("names: {}, ages: {}".format(names, ages))class JsonWithFormHeadersParamHandler(web.RequestHandler): async def post(self): name = self.get_body_argument("name") age = self.get_body_argument("age") self.write("name: {}, age: {}".format(name, age))class JsonParamHandler(web.RequestHandler): async def post(self): parm = json.loads(self.request.body.decode("utf8")) name = parm["name"] age = parm["age"] self.write("name: {}, age: {}".format(name, age))url_handers = [ ("/url", UrlParamHandler), ("/form", FormParamHandler), ("/jsonwithformheaders", JsonWithFormHeadersParamHandler), ("/json", JsonParamHandler)]if __name__ == '__main__': app = web.Application(url_handers, debug=True) app.listen(8888) print("started...") ioloop.IOLoop.current().start()
测试url
测试工具Postman,当然也可以使用python的requests
来请求 1. 参数在url中,get请求
代码请求接口方式测试
import requestsurl="http://localhost:8888/url?name=zhangsan&&age=20"resp = requests.request('GET', url).content.decode("utf8")print(resp)
postman或浏览器方式测试
name: zhangsan, age: 20
names: [‘zhangsan’], ages: [‘20’]
name: lisi, age: 28
names: [‘zhangsan’, ‘lisi’], ages: [‘20’, ‘28’]
由上面结果,可以看出函数名有s和没s的区别了,下面就其他方法就演示了,因为有s的,就不演示了跟这类似.
个人觉得,不用s,传多个值,为啥不直接传数组了.特别后面是json格式交互的时候,多个值的场景完全可以数组作为值来传递
2. 参数在form中,没有请求头,post请求
代码请求接口方式测试
import requestsurl = "http://localhost:8888/form"data = { "name": "zhangsan", "age": 20}resp = requests.request('POST', url, data=data).content.decode("utf8")
postman方式测试
name: lisi, age: 28
names: [‘zhangsan’, ‘lisi’], ages: [‘20’, ‘28’]
3. 参数在from中,并含有请求头Content-Type:application/x-www-form-urlencoded
,post请求
import requestsurl = "http://localhost:8888/jsonwithformheaders"headers = { "Content-Type": "application/x-www-form-urlencoded;"}data = { "name": "zhangsan", "age": 20}resp = requests.request('POST', url, headers=headers, data=data).content.decode("utf8")print(resp)
postman方式测试
代码请求接口方式测试
import requestsurl = "http://localhost:8888/json"data = { "name": "zhangsan", "age": 20}resp = requests.request('POST', url, json=data).content.decode("utf8")print(resp)
注意这这参数传的是
json=data
,而不是data=data
哦
postman方式测试
没有头,只能通过
request.body
来拿到参数,而且是bytes类型,需要decode后,json解析后得到dict
提示: 注意json值传递时候,有没header,取值方式不同.结论如下:
方法 | 传参 |
---|---|
get_query_argument(s) | 参数在url中 |
get_argument(s) | 参数在form中,没有头信息 |
get_body_argument(s) | 参数在form中,带有头信息 |
request.body | 参数是json |
注意: process里函数的执行顺序
initialize: 初始化,通过`URLSpec`可将参数传递到方法prepare: 在get/post等操作之前,会调用get/head/post/delete/patch/put/options: 一般业务逻辑处理on_finish: 在out操作之后,会调用
直接上代码
from tornado import web, ioloopclass ProcessDemoHandler(web.RequestHandler): # initialize方法同步 def initialize(self, dbinfo): print("initialize...") self.dbinfo = dbinfo print("数据库host:{}".format(self.dbinfo['db_host'])) pass async def prepare(self): print("prepare...") pass async def get(self): print("get...") self.write("success!!!") pass def on_finish(self): print("finish...") passinit_param = { 'dbinfo': { 'db_host': 'localhost', 'db_port': 3306, 'db_user': 'root', 'db_password': '123', }}url_handers = [ (r"/?", ProcessDemoHandler, init_param),]if __name__ == '__main__': app = web.Application(url_handers, debug=True) app.listen(8888) print("started...") ioloop.IOLoop.current().start()
需要关注以下几点:
1.顺序: initialize 👉 prepare 👉 get/post/put/delete/… 👉 on_finish
2.initialize和on_finish方法同步 3.如何将参数通过url_handers
传给initialize方法
结果:
initialize...数据库host:localhostprepare...get...
set_status: 设置状态码set_headeradd_headerclear_headerwrite: 写数据,可以write多次,放缓存中,而不会中断当flush或者finish或者没消息断开时发送flush: 刷新数据到客户端finish: 写数据,写完断开了render/render_string: 模板方式渲染输出redirect: 重定向send_error/write_error: 发送错误render_embed_js/render_embed_css: 嵌入的js和cssrender_linked_js/render_linked_css: 链接的js和css
import asynciofrom tornado import web, ioloopclass OutputDemoHandler(web.RequestHandler): async def get(self): self.set_status(200) self.write("error!!!") self.write("warning!!!") self.write("") await self.flush() await asyncio.sleep(5) self.write("success!!!") await self.finish("done")url_handers = [ (r"/?", OutputDemoHandler),]if __name__ == '__main__': app = web.Application(url_handers, debug=True) app.listen(8888) print("started...") ioloop.IOLoop.current().start()
这里主要使用了
没用过的write
,flush
以及finish
操作,其他前面用过就不多介绍了.render_xxx
形式的函数,在模板中使用的。
结果:
error!!!warning!!!success!!!done
第一行先出来,应为flush了,过了5秒后,第二行数据出来
setting里每项值类型都是dic
类型
配置主要分以下几种类型
这不会全部介绍,只介绍下常用的,在实际开发中,去官网查阅.
官网setting配置项
打开后,直接搜索关键字
settings
,就能定位到
常用配置
debug
是否开启debug模式
default_handler_class
当url没有配置到handler时,默认处理器.
static_path
静态资源路径
static_url_prefix
静态资源前缀
static_handler_class
静态资源handler(默认是
tornado.web.StaticFileHandler
)
静态资源代码演示
from tornado import web, ioloopclass SettingDemoHandler(web.RequestHandler): async def get(self): await self.finish("success")url_handers = [ (r"/?", SettingDemoHandler),]settings={ 'debug': True, 'static_path': 'public/static', 'static_url_prefix': '/static/'}if __name__ == '__main__': app = web.Application(url_handers, **settings) app.listen(8888) print("started...") ioloop.IOLoop.current().start()
在相对路径下,创建
public/static/images
文件夹,并放一张图片进去
测试url
端口号后面的前缀url紧跟的路径,要跟
static_url_prefix
配置一致
模板就不多介绍了,工作中我们一般开发都前后端分离,因此这就不过多纠缠了。(单为了文章的完整性,我还是写了个简单例子)
前端:react、vue等前端框架
后端:java、go、python等语言编写的web框架跟前端进行数据交互
有兴趣的可以看下官网:
主要包含以下几点
1.普通字符串模板,加载使用2.文件模板,加载使用3.常用的模板语法 { { data }},{ % set ...%},{ % raw %}4.导入python,调用函数,循环输出5.模板继承 { % extends "xxx.html" %}6.UIModule使用 { % module Template("foo.html", arg=42) %}
基本结构
结果
代码主要分模板、UIMoudle、orderHandler、启动类
启动类和handler
from typing import Optionalfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler): def get(self): data = { 'title': '书本列表', 'orderList': [ { 'id': 1, 'name': 'java', 'price': 80, 'count': 1, 'link_to': '查看'}, { 'id': 2, 'name': 'springboot', 'price': 100, 'count': 2, 'link_to': '查看'}, { 'id': 3, 'name': 'springcloud', 'price': 120, 'count': 1, 'link_to': '查看'}, ] } self.render("tp_order.html", **data)class FooterUIMoudle(web.UIModule): def render(self): return self.render_string("uimodules/tp_footer.html") def embedded_css(self) -> Optional[str]: css = ''' .content { background: gray} a {text-decoration-line: none} ''' return cssurl_handlers = [ (r"/order", OrderHandler)]settings = { "debug": True, "template_path": 'public/templates', "static_path": 'public/static', "static_url_prefix": "/static/", "ui_modules": { 'FooterUIMoudle': FooterUIMoudle }}if __name__ == '__main__': app = web.Application(url_handlers, **settings) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()
settings里定义
注意给模板传参数的类型,在目录中也需要跟其对于方式取出值ui_modules
,template_path
,static_path
tp_base.html模板
相对启动类路径:public/templates/tp_base.html
商城 {% block custom_css %} {% end %}{% module Template("uimodules/tp_header.html") %}{% block order_html_block%}{% module FooterUIMoudle() %} {% block custom_js %} {% end %}订单
{% end %}
里面使用UIMoudle、继承等手段
tp_base.html模板
相对启动类路径:public/templates/tp_base.html
{% extends 'tp_base.html' %}{% block order_html_block %}{% end %}总价格: { { total_price }}
{ { title }} {% set total_price = 0 %} {% for item in orderList %} id 名称 价格 数量 详情 {% end %} { { item["id"] }} { { item["name"] }} { { item["price"] }}元 { { item["count"] }} {% raw item["link_to"] %}
里面使用继承、循环、raw、赋值等手段
uimoudles下的tp_footer.html模板
相对启动类路径:public/templates/uimodules/tp_header.html
welcome to iworkh !!!
uimoudles下的tp_footer.html模板
相对启动类路径:public/templates/uimodules/tp_footer.html
python里PyMySQL是同步,所以在tornado我们得用异步的,这介绍下aiomysql,它是基于PyMySQL的。用法差不错,只不过实现了异步操作。
安装依赖
pip install aiomysql
import asyncioimport aiomysqlasync def test_example(loop): pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='', db='mysql', loop=loop) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") print(cur.description) (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed()loop = asyncio.get_event_loop()loop.run_until_complete(test_example(loop))
这是官网Github上给的示例,主要注意以下几点
- async异步编写
- 调用异步操作时,要使用await
- async with的用法,异步上下文管理器,接受会将对应的资源关闭的(异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器,
__aenter__
和__aexit__
方法。)
有兴趣可以去了解下async with
和async for
,百度下随便挑一篇阅读下就明白了(基本都雷同的😊)
创建表
CREATE TABLE `tb_order` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, `price` decimal(10,2) DEFAULT NULL, `count` int(5) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
插入一条数据
INSERT INTO `iworkh_tornado`.`tb_order`(`id`, `name`, `price`, `count`)VALUES (1, 'java', 80.00, 1);
使用get请求来请求rest接口,参数方法可以放在url中,也可以方法在form中 (通过get_argument获取参数)
查询逻辑
import aiomysqlfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler): def initialize(self, db_info) -> None: self.db_info = db_info async def get(self): id = self.get_argument("id", None) if not id: raise Exception("id can not None") db_id = db_name = db_price = db_count = None async with aiomysql.create_pool(host=self.db_info["db_host"], port=self.db_info["db_port"], user=self.db_info["db_user"], password=self.db_info["db_password"], db=self.db_info["db_name"], charset=self.db_info["db_charset"]) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: sql = "SELECT `id`, `name`, `price`, `count` FROM TB_ORDER WHERE id = {};".format(id) print(sql) await cur.execute(sql) result = await cur.fetchone() try: if (result): db_id, db_name, db_price, db_count = result except Exception: pass self.write("success, {}, {}, {}, {}".format(db_id, db_name, db_price, db_count))initParam = { "db_info": { 'db_host': 'localhost', 'db_name': 'iworkh_tornado', 'db_port': 3306, 'db_user': 'iworkh', 'db_password': 'iworkh123', 'db_charset': 'utf8' }}url_handlers = [ (r"/order/?", OrderHandler, initParam)]if __name__ == '__main__': app = web.Application(url_handlers, debug=True) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()
测试代码
import requestsurl = "http://localhost:8888/order"data={ 'id':'1'}resp = requests.get(url, data=data).content.decode("utf8")print(resp)
查询逻辑
import aiomysqlfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler): def initialize(self, db_info) -> None: self.db_info = db_info async def post(self): db_name= self.get_argument("name") db_price= self.get_argument("price") db_count= self.get_argument("count") async with aiomysql.create_pool(host=self.db_info["db_host"], port=self.db_info["db_port"], user=self.db_info["db_user"], password=self.db_info["db_password"], db=self.db_info["db_name"], charset=self.db_info["db_charset"]) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: sql = "INSERT INTO TB_ORDER (`name`, `price`, `count`) VALUES ('{}','{}','{}');"\ .format(db_name, db_price, db_count) print(sql) await cur.execute(sql) await conn.commit() self.write("save successfully")initParam = { "db_info": { 'db_host': 'localhost', 'db_name': 'iworkh_tornado', 'db_port': 3306, 'db_user': 'iworkh', 'db_password': 'iworkh123', 'db_charset': 'utf8' }}url_handlers = [ (r"/order/?", OrderHandler, initParam)]if __name__ == '__main__': app = web.Application(url_handlers, debug=True) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()
测试代码
import requestsurl = "http://localhost:8888/order"data = { 'name':'springboot', 'price': 100.0, 'count':2}resp = requests.post(url, data=data).content.decode("utf8")print(resp)
查询逻辑
import aiomysqlfrom tornado import web, ioloopclass OrderHandler(web.RequestHandler): def initialize(self, db_info) -> None: self.db_info = db_info async def put(self): db_id= self.get_argument("id") db_name= self.get_argument("name") db_price= self.get_argument("price") db_count= self.get_argument("count") async with aiomysql.create_pool(host=self.db_info["db_host"], port=self.db_info["db_port"], user=self.db_info["db_user"], password=self.db_info["db_password"], db=self.db_info["db_name"], charset=self.db_info["db_charset"]) as pool: async with pool.acquire() as conn: async with conn.cursor() as cur: sql = "UPDATE TB_ORDER SET `name` = '{}', `price` = '{}', `count` = '{}' WHERE `id` = {};"\ .format(db_name, db_price, db_count, db_id) print(sql) await cur.execute(sql) await conn.commit() self.write("save successfully")initParam = { "db_info": { 'db_host': 'localhost', 'db_name': 'iworkh_tornado', 'db_port': 3306, 'db_user': 'iworkh', 'db_password': 'iworkh123', 'db_charset': 'utf8' }}url_handlers = [ (r"/order/?", OrderHandler, initParam)]if __name__ == '__main__': app = web.Application(url_handlers, debug=True) app.listen(8888) print("started successfully") ioloop.IOLoop.current().start()
测试代码
import requestsurl = "http://localhost:8888/order"data = { 'id': 1, 'name':'java', 'price': 60.0, 'count':1}resp = requests.put(url, data=data).content.decode("utf8")print(resp)
通过上面的aiomysl操作,虽然实现了异步操作,但是我们可以发现一些问题
我们通知之前对aiomysql对数据库的操作,我们发现了一些问题,操作很麻烦。
那使用orm有啥好处呢?
,
,'
等一些低价错误,很难发现,浪费开发时间)python领域常用的一些ORM框架
Django’s ORM
优点: 易用,学习曲线短 和Django紧密集合,用Django时使用约定俗成的方法去操作数据库缺点: QuerySet速度不给力,会逼我用Mysqldb来操作原生sql语句。 不好处理复杂的查询,强制开发者回到原生SQL
peewee
优点: Django式的API,使其易用 轻量实现,很容易和任意web框架集成 aysnc-peewee异步操作缺点: 不支持自动化schema迁移 不能像Django那样,使线上的mysql表结构生成结构化的模型。 多对多查询写起来不直观
SQLAlchemy
优点: 巨牛逼的企业级API,使得代码有健壮性和适应性 灵活的设计,使得能轻松写复杂查询缺点: 工作单元概念不常见 重量级 API,导致长学习曲线
数据操作,主要就增删改查功能,下面分别演示如何使用。
工作中使用前,最好把官网浏览一遍,掌握一些常用的api和细节
先pip安装
pip install peewee
base_model.py
import datetimefrom peewee import MySQLDatabase, Model, DateTimeFielddb_info = { 'host': '127.0.0.1', 'user': 'iworkh', 'password': 'iworkh123', 'charset': 'utf8', 'port': 3306}db = MySQLDatabase('iworkh_tornado', **db_info)class BaseModel(Model): created_date = DateTimeField(default=datetime.datetime.now) class Meta: database = db
user_model.py
from peewee import CharField, DateField, IntegerField, BooleanFieldfrom lesson05.models.base_model import *class User(BaseModel): username = CharField(column_name='username', unique=True, null=False, max_length=50, verbose_name="姓名") birthday = DateField(column_name='birthday', null=True, default=None, verbose_name='生日') phone = CharField(column_name='phone', max_length=11) mail = CharField(column_name='mail', max_length=50, verbose_name='邮件') gender = IntegerField(column_name='gender', null=False, default=1, verbose_name='姓别') is_admin = BooleanField(column_name='is_admin', default=False, verbose_name='是否是管理员') class Meta: table_name = "T_USER"
from peewee import ForeignKeyField, CharField, IntegerFieldfrom lesson05.models.base_model import BaseModelfrom lesson05.models.user_model import Userclass Pet(BaseModel): user = ForeignKeyField(User, backref='pets') name = CharField(index=True, column_name='name', max_length=50, verbose_name='名字') age = IntegerField(column_name='age', verbose_name='年龄') type = CharField(column_name='type', max_length=50, verbose_name='类型') color = CharField(column_name='color', max_length=50, verbose_name='颜色') description = CharField(column_name='description', max_length=500, verbose_name='描述') class Meta: table_name = "T_PET"
import 省略if __name__ == '__main__': db.connect() table_list = [User, Pet] db.create_tables(table_list) print("end...")
插入数据,可以使用save
和create
方法来操作
- save: 对象操作
- create: 是类操作,看create函数上有
@classmethod
修饰符
import datetimeimport 省略userList = [ { 'username': 'zhangsan', 'birthday': '1988-08-08', 'gender': 1, 'mail': 'zhangsan@qq.com', 'phone': '111', 'is_admin': False}, { 'username': 'lisi', 'birthday': datetime.date(1999, 8, 8), 'gender': 1, 'mail': 'lisi@qq.com', 'phone': '222', 'is_admin': False}, { 'username': 'wangwu', 'birthday': datetime.date(2025, 8, 8), 'gender': 0, 'mail': 'wangwu@qq.com', 'phone': '333', 'is_admin': False}, { 'username': 'admin', 'birthday': datetime.date(2000, 8, 8), 'gender': 1, 'mail': 'admin@qq.com', 'phone': '444', 'is_admin': True}]def save(): user = User() user.username = 'iworkh_save' user.birthday = datetime.date(1988, 8, 8) user.gender = 0 user.mail = "157162006@qq.com" user.phone = "888888888" user.is_admin = True # 使用对象调用方法 row = user.save() print("save的返回值是值:{}".format(row))def save_list(): for item in userList: user = User(**item) user.save()def create(): user_dic = { 'username': 'iworkh_create2', 'birthday': datetime.date(1989, 8, 8), 'gender': 1, 'mail': '157162006@qq.com', 'phone': '888888888', 'is_admin': False } # 使用类调用方法 row = User.create(**user_dic) print("create的返回值值:{}".format(row))if __name__ == '__main__': save() create() save_list() print("end...")
查询比较重要,但是peewee的api使用,跟原生sql语法优点类似
sql: select * from user where username like %iworkh%
peewee写法:User.select().where(User.username.contains(‘iworkh’))
import 省略def select_all(): # modelSelect = User.select() # sql: select * from user fields = [User.username, User.phone] modelSelect = User.select(*fields) # sql: select username, phone from user # 这还没有立即执行,返回的是modelSelect对象,当for或者execute时才会执行 for user in modelSelect: print("{} --- {} --- {}".format(user.username, user.phone, user.is_admin))def select_conditon(): # 根据id取记录,如果id不存在,那么转化时候会报错 user_res_get_by_id = User.get_by_id(1) print("get_by_id 结果:{}".format(user_res_get_by_id.username)) user_res_get = User.get(User.id == 1) print("user_res_get 结果:{}".format(user_res_get.username)) user_res_dic = User[1] print("user_res_dic 结果:{}".format(user_res_dic.username)) print("*" * 50) # select * from user where username like %iworkh% modelSelect = User.select().where(User.username.contains('iworkh')) for user in modelSelect: print("{} --- {}".format(user.username, user.is_admin)) print("*" * 50) # select * from user ordery by birthday desc modelSelect_order = User.select().order_by(User.birthday.desc()) for user in modelSelect_order: print("{} --- {}".format(user.username, user.birthday)) print("*" * 50) # 按每页3的大小分页,取第2页数据(从1开始计数) modelSelect_pagable = User.select().paginate(page=2, paginate_by=3) for user in modelSelect_pagable: print("{} --- {}".format(user.username, user.id))if __name__ == '__main__': select_all() select_conditon()
更新很简单,使用的是save
函数,当save里传id值时,则认为是更新;id没有值None时,则是插入操作
import 省略def upate(): # 使用save方法即可更新,只要传的值中还有id user_res_get_by_id = User.get_by_id(1) user_res_get_by_id.username = user_res_get_by_id.username + '_update' # 从数据中取处记录,并修改值后更新数据库 user_res_get_by_id.save()if __name__ == '__main__': upate()
删除记录有两种操作:
import 省略def delete_by_id(): # 类操作 User.delete_by_id(3)def delete_instance(): user = User() user.id = 4 # 对象操作,recursive参数控制关联数据是否删除 user.delete_instance()if __name__ == '__main__': delete_by_id() delete_instance() print("end...")
技巧
peewee里还有很其他知识点,除了阅读官网提供的doc文档外,还有一些地方值得我们开发者注意的,就是源码下面的test和example模块。
留个问题:
peewee里transaction如何使用?(如何去官网doc和源码下快速找到你需要的答案呢?)
tornado调用接口,那得用异步的,所以peewee的同步操作,在tornado中肯定玩不转的。
peewee-async的接口主要分两部分
主要通过manager来操作
主要通过peewee_async来操作
安装
pip install --pre peewee-async
定义model还是跟peewee一样,不过db的方式使用了peewee_async.MySQLDatabase
import datetimeimport peewee_asyncfrom peewee import Model, DateTimeField, TextField, CharFielddb_info = { 'host': '127.0.0.1', 'user': 'iworkh', 'password': 'iworkh123', 'charset': 'utf8', 'port': 3306}db = peewee_async.MySQLDatabase('iworkh_tornado', **db_info)class BaseModel(Model): created_date = DateTimeField(default=datetime.datetime.now) class Meta: database = dbclass BlogModel(BaseModel): title = CharField(column_name='title', max_length=50) content = TextField(column_name='content') class Meta: table_name = 't_blog'
通过高级API的方式,进行增删改查操作
import asyncioimport peewee_asyncimport 省略了models的引入objects = peewee_async.Manager(db)async def save(): blog = { 'title': 'tornado入门', 'content': '详细内容请看文章'} await objects.create(BlogModel, **blog) print("保存成功")async def get_all(): all_blogs = await objects.execute(BlogModel.select()) for blog in all_blogs: print("id: {}, 文章标题:{}".format(blog.id, blog.title))async def delete(): blog = BlogModel() blog.id = 1 await objects.delete(blog) print("删除成功")def create_table(): BlogModel.create_table(True)def drop_table(): BlogModel.drop_table(True)async def test(): create_table() await save() print("*"*50) await get_all() print("*"*50) await delete() print("*"*50) await get_all() drop_table()if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test()) print("end")
整理不打字易,觉得有帮助就点个赞吧。
转载地址:http://vzhws.baihongyu.com/