(点击
上方蓝字
,快速关注我们)
编译:王坚
www.codeceo.com/article/django-channels.html
如有好文章投稿,请点击 → 这里了解详情
通道是 Django 即将支持的令人兴奋的特性,它将使得 Django 不止支持普通请求外部工具和库(即便不是 Python 的),还可能是整个框架。
根据官方文档,Django 通道是:
…一个让 Django 不仅可以处理纯 HTTP 请求,包括 HTTP2 和WebSockets,也有能力在请求发送到缩略图或后台运算时也能运行运行代码。
如果你以前用过 Django,你就知道 project 是多重要。目前 Django 的诸多特性依赖于库,比如 Celery(在请求之外处理复杂任务),或者 Node.js,dijango-websocket-redis,或者 gevent-stocketio 来支持 WebSocket。
由于 Celery 的原因(它是一个事实标准),其他所有的实现方法以非标准的形式在 Django 的局限内有各自的问题。我们在以往的博文里提到了成功实现的不同方式。
一个合乎标准的方式更容易维护,更安全,多数开发者熟悉其内容也更容易交接。
在这篇博客中我会快速的介绍开发应用 Django 通道网站所涉及的概念,同时介绍一个用 WebSocket 给客户端推送通知的例子。
应用
我们举的例子是对用 gevent-socketio 实现博客实时通知应用的修改。目的是让你看到用 Django 通道在同样的条件下实施会有多简单,代码可以在GitHub上找到。
面向事件的Diango
默认的 Django 网站请求-响应模型:一个请求进来,被传递给视图,视图产生一个回应,然后回应被发送到客户端,所有一切都是单线程完成的。
在大多数应用中都完全胜任,但它有自己的局限。如果是多个请求就会让工作进程持续好长时间,后续的请求要排队等待。这就是用 Celery 来做缩略图之类事情的原因:在图片上传时,我们建立缩略图任务并及时响应到客户端,在此同时 Celery 在自己的进程中处理图片。
同样的情况会发生在和客户端的实时双向对话中。设想一个请求-响应,我们需要一个进程对一个客户端来收发消息直到连接终止。
Django 通道提供了另一个模型:面向事件的模型。在这个模型中,事件取代了请求和响应。一个请求事件被接收到会被传递给合适的处理者来产生一个新的响应事件被传回到客户端。
事件模型可以应用到其他情况而不只是对请求-响应模型的模仿。比如由外界条件触发的传感器,它产生一个事件给事件处理者,然后会依次产生另一个事件通知所有对原始事件感兴趣的人。
但是这个进程怎么工作的?我们需要在开始实例前认识下channel。
什么是通道
根据 Django 通道的官方文档,通道是:
…一个有序的,先进先出的消息队列,通常一次只有一个消息接收者。
多个生产者将消息写入 channel(用一个名字来识别),然后一个用户订阅了那么 channel 就可用,它会取出队列中的第一条消息。就是这么简单。
通道改变了Django的工作方式,让它像worker一样工作。每个worker听从通道上所有用户的吩咐当有消息是用户会被通知。要想这事发生,我们需要三个层:
-
接口服务器:连接网站与客户端,通过一个 WSGIn接头和一个独立的WebSocket服务器。
-
通道后端:它在接口和worker间传递消息。(为单一服务器提供存储,一个数据库或者Redis),Python 代码都在这里。
-
worker:它们收听所有的通道,当消息来时唤醒用户(函数)。
接口服务器把连接(HTTP,WebSocket等)转换成通道中的消息,worker负责处理这些消息。这里的门道在于消息不需要从接口服务器产生。消息可以在任何地方产生,view,form,signal,随你心意。
是时候干活了。
我们的第一个用户
我们从安装Django1.8(1.9也行)开始。首先,需要安装安装 channel 包,它是依赖 PyPi 的。如果你想安装最新版本的通道,看看官方介绍文档。
接下来把 channel 加入到 INSTALLED_APPS 设置中:
INSTALLED_APPS
=
(
...
'channels_test'
,
# Our test app
'channels'
,
)
就是这样。通道默认配置使用在内存中的后端,它能很好的在单个服务器的网站进行工作。
我们将要写一个简单的用户来接收“http.message”通道上的消息,然后回应通道一个新消息。让我们在测试 Django 应用这中建立一个模组叫“consumer.py”:
# consumers.py
from json import dumps
from
django
.
http import HttpResponse
from
django
.
utils
.
timezone import now
def http_consumer
(
message
)
:
response
=
HttpResponse
(
"It is now {} and you've requested {} with {} as request parameters."
.
format
(
now
(),
message
.
content
[
'path'
],
dumps
(
message
.
content
[
'get'
])
)
)
message
.
reply_channel
.
send
(
response
.
channel_encode
())
消息从标准“reques.http”通道中出来,用户写一个新消息在回应通道中回应。值得注意的是,他们是两种不同的通道:普通通道传递消息给用户,和回应通道。只用接口服务器侦听回应通道,它知道那个通道连接那个客户端,所以他知道回应该发给谁。
在开始进程前,我们需要一个方法来告诉 Django 将“request.http”通道消息发送给我们的新用户。在设置中继续创建一个模组叫“routing.py”:
channel_routing
=
{
"http.request"
:
"channels_test.consumers.http_consumer"
}
现在我们运行服务器(是开发服务器或者 WSGI 服务器无关紧要),给我们的网站一个请求:
$
curl
http
:
//localhost:8000/some/path?foo=bar
It
is
now
2016
-
02
-
01
11
:
49
:
25.166799
+
00
:
00
and
you
'
ve
requested
/
some
/
path
with
{
"foo"
:
[
"bar"
]}
as
request
parameters
.
我们得到了想要的请求,通道解决了我们大部分问题。现在让我们玩点更有趣的。
实时通知
我们已经多次提及了实时通知的要点,这给了我们一个很好的机会来应用一个通道,并比较了两个解决方案。
我们将改进现有的项目,追踪用户在特定地理位置发出有趣的实时通知。在这个项目中我们用到了 gevent-socketio], SocketIO, 和 RabbitMQ(还有 Node.js)。我们将用通道、普通WebSockets和Redis来做同样的事情。
如前所述,我们用 WebStocket 推送通知到客户端。通道对 WebStocket 有完整的支持。所有我们要做的不过是在我们的“tracker”应用中加上几个通道:
# routing.py
channel_routing
=
{
"websocket.connect"
:
"tracker.consumers.websocket_connect"
,
"websocket.keepalive"
:
"tracker.consumers.websocket_keepalive"
,
"websocket.disconnect"
:
"tracker.consumers.websocket_disconnect"
}
我们不在意“websocket.message”通道也不打算接收用户消息。我们的目标是向所有连接的客户端发出推送消息。用一个群来做这件事非常容易。让我们看看用户:
# tracker/consumers.py
import logging
from channels import Group
from
channels
.
sessions import channel_session
from
channels
.
auth import channel_session_user_from_http
logger
=
logging
.
getLogger
(
__name__
)
# Connected to websocket.connect and websocket.keepalive
@
channel_session_user_from_http
def websocket_connect
(
message
)
:
logger
.
info
(
'websocket_connect. message = %s'
,
message
)
# transfer_user(message.http_session, message.channel_session)
Group
(
"notifications"
).
add
(
message
.
reply_channel
)
# Connected to websocket.keepalive
@
channel_session
def websocket_keepalive
(
message
)
:
logger
.
info
(
'websocket_keepalive. message = %s'
,
message
)
Group
(
"notifications"
).
add
(
message
.
reply_channel
)
# Connected to websocket.disconnect
@
channel_session
def websocket_disconnect
(
message
)
:
logger
.
info
(
'websocket_disconnect. message = %s'
,
message
)
Group
(
"notifications"
).
discard
(
message
.
reply_channel
)
无论何时一个客户端连接,就会有一条消息通过“websocket.connect”发送,然后我们需要加上一个回应通道,还要把它放进“notifications”群。群允许我们同时发送相同的消息到所有的通道。我们要保持通道群的更新,即当客户端连接,我们将回应通道就啊如群;断开连接,将它移除。群也会在一定时间后清理通道,我们用“websocket.keepalive”通道把接收到keepalive消息的通道加入到“notifications”群。如果通道已存在不会被再次加入。
注意我们没有向群里发送任何东西。我们要在 AreaOfInterest 中的 Incident 被报告或更新时通知用户。我们简单的加入 post_save 信号:
# signals.py
import logging
from json import dumps
from
django
.
db
.
models
.
signals import
post_save
,
post_delete
from
django
.
dispatch import receiver
from channels import Group
from
.
models import
Incident
,
AreaOfInterest
logger
=
logging
.
getLogger
(
__name__
)
def send_notification
(
notification
)
:
logger
.
info
(
'send_notification. notification = %s'
,
notification
)
Group
(
"notifications"
).
send
({
'text'
:
dumps
(
notification
)})
@
receiver
(
post_save
,
sender
=
Incident
)
def incident_post_save
(
sender
,
**
kwargs
)
:
send_notification
({
'type'
:
'post_save'
,
'created'
:
kwargs
[
'created'
],
'feature'
:
kwargs
[
'instance'
].
geojson
_
feature
})
if
not
kwargs
[
'instance'
].
closed
:
areas_of_interest
=
[
area_of_interest
.
geojson_feature
for
area_of_interest
in
AreaOfInterest
.
objects
.
filter
(
polygon__contains
=
kwargs
[
'instance'
].
location
,
severity__in
=
kwargs
[
'instance'
].
alert_severities
,
)
]
if
areas_of_interest
:
send_notification
(
dict
(
type
=
'alert'
,
feature
=
kwargs
[
'instance'
].
geojson_feature
,
areas_of_interest
=
[
{
'id'
:
area_of_interest
[
'id'
],
'name'
:
area_of_interest
[
'properties'
][
'name'
],
'severity'
:
area_of_interest
[
'properties'
][
'severity'
],
'url'
:
area_of_interest
[
'properties'
][
'url'
],
}
for
area_of_interest
in
areas_of
_
interest
]
))
@
receiver
(
post_save
,
sender
=
AreaOfInterest
)
def area_of_interest_post_save
(
sender
,
**
kwargs
)
:
send_notification
({
'type'
:
'post_save'
,
'created'
:
kwargs
[
'created'