Worktile 中百万级实时消息推送服务的实现 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Worktile
V2EX    分享发现

Worktile 中百万级实时消息推送服务的实现

  •  1
     
  •   Worktile 2015-01-15 17:12:11 +08:00 5193 次点击
    这是一个创建于 3975 天前的主题,其中的信息可能已经有所发展或是发生改变。

    摘要:相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果。

    在团队协同工具 Worktile的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新。Worktile中的推送服务是采用的是基于XMPP协议、Erlang语言实现的Ejabberd,并在其源码基础上,结合我们的业务,对源码作了修改以适配我们自身的需求。另外,基于AMQP协议也可以作为实时消息推送的一种选择,踢踢网就是采用 RabbitMQ+STOMP协议实现的消息推送服务。本文将结合我在Worktile和踢踢网的项目实践,介绍下消息推送服务的具体实现。

    实时推送的几种实现方式

    相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果,这些技术大体可分为以下几类:

    1)短轮询。页面端通过JS定时异步刷新,这种方式实时效果较差。

    2)长轮询。页面端通过JS异步请求服务端,服务端在接收到请求后,如果该次请求没有数据,则挂起这次请求,直到有数据到达或时间片(服务端设定)到,则返回本次请求,客户端接着下一次请求。示例如下:
    替代文字

    3)WebSocket。浏览器通过WebSocket协议连接服务端,实现了浏览器和服务器端的全双工通信。需要服务端和浏览器都支持WebSocket协议。

    以上几种方式中,方式1实现较简单,但效率和实时效果较差。方式2对服务端实现的要求比较高,尤其是并发量大的情况下,对服务端的压力很大。方式3效率较高,但对较低版本的浏览器不支持,另外服务端也需要有支持WebSocket的实现。Worktile的WEB端实时消息推送,采用的是XMPP扩展协议XEP-0124 BOSH( http://xmpp.org/extensions/xep-0124.html),本质是采用方式2长轮询的方式。踢踢网则采用了WebSocket连接RabbitMQ的方式实现,下面我会具体介绍如何用这两种方式实现Server Push。

    运行时环境准备

    服务端的实现中,无论采用Ejabberd还是RabbitMQ,都是基于Erlang语言开发的,所以必须安装Erlang运行时环境。Erlang是一种函数式语言,具有容错、高并发的特点,借助OTP的函数库,很容易构建一个健壮的分布式系统。目前,基于Erlang开发的产品有,数据库方面:Riak(Dynamo实现)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中间件有RabbitMQ等。对于服务端程序员来说,Erlang提供的高并发、容错、热部署等特性是其他语言无法达到的。无论在实时通信还是在游戏程序中,用Erlang可以很容易为每一个上线用户创建一个对应的Process,对一台4核8个G的服务器来说,承载上百万个这样的Process是非常轻松的事。下图是Erlang程序发起Process的一般性示意图:
    替代文字

    如图所示,Session Manager(or Gateway)负责为每个用户(UID)创建相对应的Process, 并把这个对应关系(MAP)存放到数据表中。每个Process则对应用户数据,并且他们之间可以相互发送消息。Erlang的优势就是在内存足够的情况下创建上百万个这样的Process,而且它的创建和销毁比JAVA的Thread要轻量的多,两者不是一个数量级的。

    好了,我们现在开始着手Erlang环境的搭建(实验的系统为Ubuntu 12.04, 4核8个G内存):

    1、依赖库安装

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    sudo apt-get install build-essential

    sudo apt-get install libncurses5-dev

    sudo apt-get install libssl-dev libyaml-dev

    sudo apt-get install m4

    sudo apt-get install unixodbc unixodbc-dev

    sudo apt-get install freeglut3-dev libwxgtk2.8-dev

    sudo apt-get install xsltproc

    sudo apt-get install fop tk8.5 libxml2-utils

    2、官网下载OTP源码包( http://www.erlang.org/download.html), 解压并安装

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    tar zxvf otpsrcR16B01.tar.gz

    cd otpsrcR16B01

    configure

    make & make install

    至此,erlang运行环境就完成了。下面将分别介绍rabbitmq和ejabberd构建实时消息服务。

    基于RabbitMQ的实时消息服务

    RabbitMQ是在业界广泛应用的消息中间件,也是对AMQP协议实现最好的一种中间件。AMQP协议中定义了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等实体,他们的关系如下图所示:
    替代文字

    消息发布者(Producer)连接交换器(Exchange), 交换器和消息队列(Message Queue)通过KEY进行Binding,Binding是根据Exchange的类型(分为Fanout、Direct、Topic、Header)分别对消息作不同形式的派发。Message Queue又分为Durable、Temporary、Auto-Delete三种类型,Durable Queue是持久化队列,不会因为服务ShutDown而消失,Temporary Queue则服务重启后会消失,Auto-Delete则是在没有Consumer连接时自动删除。另外RabbitMQ有很多第三方插件,可以基于AMQP协议基础之上做出很多扩展的应用。下面我们将介绍WEB STOMP插件构建基于AMQP之上的STOMP文本协议,通过浏览器WebSocket达到实时的消息传输。系统的结构如图:
    替代文字

    如图所示,WEB端我们使用STOMP.JS和SockJS.JS与RabbitMQ的WEB STOMP Plugin通信,手机端可以用STOMPj, Gozirra(Android)或者Objc-STOMP(IOS)通过STOMP协议与RabbitMQ收发消息。因为我们是实时消息系统通常都是要与已有的用户系统结合,RabbitMQ可以通过第三方插件RabbitMQ-AYTH-Backend-HTTP来适配已有的用户系统,这个插件可以通过HTTP接口完成用户连接时的认证过程。当然,认证方式还有LDAP等其他方式。下面介绍具体步骤:

    从官网( http://rabbitmq.com/download.html)下载最新版本的源码包,解压并安装

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    tar zxf rabbitmq-server-x.x.x.tar.gz

    cd rabbitmq-server-x.x.x

    make & make install

    为RabbitMQ安装WEB-STOMP插件

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    cd /path/to/your/rabbitmq

    ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp

    ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples

    ./sbin/rabbitmqctl stop

    ./sbin/rabbitmqctl start

    ./sbin/rabbitmqctl status

    将会显示下图所示的运行的插件列表
    替代文字

    安装用户授权插件

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    cd /path/to/your/rabbitmq/plugins

    wget <a href="http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez">http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez</a>

    cd ..

    ./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http

    编辑RabbitMQ.Config文件(默认存放于/ECT/RabbitMQ/下),添加
    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    [

    ...

    {rabbit, [{auth_backends, [rabbit_auth_backend_http]}]},

    ...

    {rabbitmq_auth_backend_http,

    [{user_path, “http://your-server/auth/user”},

    {vhost_path, “http://your-server/auth/vhost”},

    {resource_path, “http://your-server/auth/resource”}

    ]}

    ...

    ].

    其中,User_Path是根据用户名密码进行校验,VHOST_Path是校验是否有权限访问VHOST, Resource_Path是校验用户对传入的Exchange、Queue是否有权限。我下面的代码是用Node.js实现的这三个接口的示例:

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    var express = require('express');

    var app = express();

    app.get('/auth/user', function(req, res){

    var name = req.query.username;

    var pass = req.query.password;

    console.log("name : " + name + ", pass : " + pass);

    if(name === 'guest' && pass === "guest"){

    console.log("allow");

    res.send("allow");

    }else{

    res.send('deny');

    }

    });

    app.get('/auth/vhost', function(req, res){

    console.log("/auth/vhost");

    res.send("allow");

    });

    app.get('/auth/resource', function(req, res){

    console.log("/auth/resource");

    res.send("allow");

    });

    app.listen(3000);

    浏览器端JS实现,示例代码如下:
    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    ......

    var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');

    var client = Stomp.over(ws);

    // SockJS does not support heart-beat: disable heart-beats

    client.heartbeat.outgoing = 0;

    client.heartbeat.incoming = 0;

    client.debug = pipe('#second');

    var print_first = pipe('#first', function(data) {

    client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data);

    });

    var on_cOnnect= function(x) {

    id = client.subscribe("/exchange/feed/user_x", function(d) {

    print_first(d.body);

    });

    };

    var on_error = function() {

    console.log('error');

    };

    client.connect('guest1', 'guest1', on_connect, on_error, '/');

    ......

    需要说明的时,在这里我们首先要在RabbitMQ实例中创建Feed这个Exchange,我们用STOMP.JS连接成功后,根据当前登陆用户的ID(user_x)绑定到这个Exchange,即Subscribe(“/exchange/feed/user_x”, …) 这个操作的行为,这样在向RabbitMQ中Feed Exchange发送消息并指定用户ID(user_x)为KEY,页面端就会通过WEB Socket实时接收到这条消息。

    到目前为止,基于RabbitMQ+STOMP实现WEB端消息推送就已经完成,其中很多的细节需要小伙伴们亲自去实践了,这里就不多说了。实践过程中可以参照官方文档:

    http://rabbitmq.com/stomp.html
    http://rabbitmq.com/web-stomp.html
    https://github.com/simonmacmullen/rabbitmq-auth-backend-http

    以上的实现是我本人在踢踢网时采用的方式,下面接着介绍一下现在在Worktile中如何通过Ejabberd实现消息推送。

    基于Ejabberd的实时消息推送

    与RabbitMQ不同,Ejabberd是XMPP协议的一种实现,与AMQP相比,XMPP广泛应用于即时通信领域。XMPP协议的实现有很多种,比如JAVA的OpenFire,但相较其他实现,Ejabberd的并发性能无疑使最优秀的。XMPP协议的前身是Jabber协议,早期的Jabber协议主要包括在线状态(Presence)、好友花名册(Roster)、IQ(Info/Query)几个部分。现在Jabber已经成为RFC的官方标准,如RFC2799,RFC4622,RFC6121,以及XMPP的扩展协议(XEP)。Worktile Web端的消息提醒功能就是基于XEP-0124、XEP-0206定义的BOSH扩展协议。

    由于自身业务的需要,我们对Ejabberd的用户认证和好友列表模块的源码进行修改,通过Redis保存用户的在线状态,而不是Mnesia和MySQL。另外好友这块我们是从已有的数据库中(MongoDB)中获取项目或团队的成员。Web端通过Strophe.JS来连接(HTTP-BIND),Strophe.JS可以以长轮询和WebSocket两种方式来连接,由于Ejabberd还没有好的WebSocket的实现,就采用了BOSH的方式模拟长连接。整个系统的结构如下:
    替代文字

    Web端用Strophe.JS通过HTTP-BIND进行连接Nginx代理,Nginx反向代理EjabberdCluster。iOS用XMPP-FramWork连接, Android可以用Smack直接连Ejabberd服务器集群。这些都是现有的库,无需对Client进行开发。在线状态根据用户UID作为KEY定义了在线、离线、忙等状态存放于Redis中。好友列表从MongoDB的Project表中获取。用户认证直接修改了Ejabberd_Auth_Internal.erl文件,通过MongoDB驱动连接用户库,在线状态等功能是新加了模块,其部分代码如下:

    [js] view plaincopy在CODE上查看代码片派生到我的代码片
    -module(wt_mod_proj).

    -behaviour(gen_mod).

    -behaviour(gen_server).

    -include("ejabberd.hrl").

    -include("logger.hrl").

    -include("jlib.hrl").

    -define(SUPERVISOR, ejabberd_sup).

    ...

    -define(ONLINE, 1).

    -define(OFFLINE, 0).

    -define(BUSY, 2).

    -define(LEAVE, 3).

    ...

    %% API

    -export([start_link/2, get_proj_online_users/2]).

    %% gen_mod callbacks

    -export([start/2, stop/1]).

    %% gen_server callbacks

    -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).

    %% Hook callbacks

    -export([user_available/1, unset_presence/3, set_presence/4]).

    -export([get_redis/1, remove_online_user/3, append_online_user/3]).

    ...

    -record(state,{host = <<"">>, server_host, rconn, mconn}).

    start_link(Host, Opts) ->

    Proc = gen_mod:get_module_proc(Host, ?MODULE),

    gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).

    user_available(New) ->

    LUser = New#jid.luser, LServer = New#jid.lserver,

    Proc = gen_mod:get_module_proc(LServer, ?MODULE),

    gen_server:cast(Proc, {user_available, LUser, LServer}).

    append_online_user(Uid, Proj, Host) ->

    Proc = gen_mod:get_module_proc(Host, ?MODULE),

    gen_server:call(Proc, {append_online_user, Uid, Proj}).

    remove_online_user(Uid, Proj, Host) ->

    Proc = gen_mod:get_module_proc(Host, ?MODULE),

    gen_server:call(Proc, {remove_online_user, Uid, Proj}).

    ...

    set_presence(User, Server, Resource, Packet) ->

    Proc = gen_mod:get_module_proc(Server, ?MODULE),

    gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}).

    ...

    start(Host, Opts) ->

    Proc = gen_mod:get_module_proc(Host, ?MODULE),

    ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},

    transient, 2000, worker, [?MODULE]},

    supervisor:start_child(?SUPERVISOR, ChildSpec).

    stop(Host) ->

    Proc = gen_mod:get_module_proc(Host, ?MODULE),

    gen_server:call(Proc, stop),

    supervisor:delete_child(?SUPERVISOR, Proc).

    init([Host, Opts]) ->

    MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>),

    RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST),

    RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT),

    ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100),

    ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50),

    ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50),

    MOngoHost= gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST),

    MOngoPort= gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT),

    {ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}),

    C = c(RedisHost, RedisPort),

    ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rcOnn= C, mcOnn= Mongo}}.

    terminate(_Reason, #state{host = Host, rcOnn= C, mcOnn= Mongo}) ->

    ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100),

    ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50),

    ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50),

    eredis:stop(C),

    ok.

    ...

    handle_call({append_online_user, Uid, ProjId}, _From, State) ->

    C = State#state.rconn,

    Key = <<!--?PRE_RPOJ_ONLINE_USERS /binary, ProjId/binary-->>,

    Resp = eredis:q(C, ["SADD", Key, Uid]),

    {reply, Resp, State};

    handle_call({remove_online_user, Uid, ProjId}, _From, State) ->

    ...

    handle_call({get_proj_online_users, ProjId}, _From, State) ->

    ...

    handle_cast({set_presence, User, Server, Resource, Packet}, #state{mcOnn= Mongo} = State) ->

    C = State#state.rconn,

    Key = <<!--?USER_PRESENCE /binary, User/binary-->>,

    Pids = get_user_projs(User, Mongo),

    Cmd = get_proj_key(Pids, ["SUNION"]),

    case xml:get_subtag_cdata(Packet, <<"show">>) of

    <<"away">> ->

    eredis:q(C, ["SET", Key, ?LEAVE]);

    <<"offline">> ->

    ...

    handle_cast(_Msg, State) -> {noreply, State}.

    handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rcOnn= RedisConn, mcOnn= Mongo} = State) ->

    case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of

    {'EXIT', Reason} ->

    ?ERROR_MSG("~p", [Reason]);

    _ ->

    ok

    end,

    {noreply, State};

    handle_info(_Info, State) -> {noreply, State}.

    code_change(_OldVsn, State, _Extra) -> {ok, State}.

    ...

    其中,User_Available_HOOK和SM_Remove_Connection_HOOK 就是用户上线和用户断开连接触发的事件,Ejabberd 中正是由于这些HOOK,才能很容易扩展功能。

    在用Tsung对Ejabberd进行压力测试,测试机器为4核心8G内存的普通PC,以3台客户机模拟用户登录、设置在线状态、发送一条文本消息、关闭连接操作,在同时在线达到30w时,CPU占用不到3%,内存大概到3个G左右,随着用户数增多,主要内存的损耗较大。由于压力测试比较耗时,再等到有时间的时候,会在做一些更深入的测试。

    11 条回复    2015-01-20 10:50:43 +08:00
    teddy1004
        1
    teddy1004  
       2015-01-15 17:26:28 +08:00
    好文!感谢~~~
    keepcleargas
        2
    keepcleargas  
       2015-01-15 17:33:55 +08:00   1
    不错...
    juneszh
        3
    juneszh  
       2015-01-15 17:44:49 +08:00
    但是怎么解决低版本浏览器?低版本改用JS异步?
    airyland
        4
    airyland  
       2015-01-15 20:33:45 +08:00
    如何定义百万级?
    kmvan
        5
    kmvan  
       2015-01-15 20:41:20 +08:00
    之前在ff上用了一下web版,卡的我不行。特别是拖动任务的时候,FF都快被卡哭(崩溃)了
    quix
        6
    quix  
       2015-01-15 21:07:12 +08:00
    @juneszh 低版本退到用comet

    另外还有一个stack推荐大家用, 实测效果和性能都不差:
    http://tavendo.com/
    skybr
        7
    skybr  
       2015-01-15 21:08:20 +08:00
    erlang官方提供apt仓库的啊
    Worktile
        8
    Worktile  
    OP
       2015-01-16 16:14:44 +08:00
    @teddy1004 如果有其他需求可以随时跟我们提~我们一定尽快回复。
    Worktile
        9
    Worktile  
    OP
       2015-01-16 16:15:30 +08:00
    @keepcleargas 感谢~多谢收藏。我们也会定期发布更多。
    Worktile
        10
    Worktile  
    OP
       2015-01-19 16:48:23 +08:00
    @juneszh 如Q@quix所说,comet是简单的http请求,不存在浏览器兼容问题,就是ejabberd的http-bind。
    Worktile
        11
    Worktile  
    OP
       2015-01-20 10:50:43 +08:00
    @kmvan 我也崩溃了,哪里的问题哇,你可以详细说说,每个人都卡到这样,我们8w团队岂不是都得每天哭会,那得都是真爱
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5476 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 35ms UTC 03:30 PVG 11:30 LAX 19:30 JFK 22:30
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86