使用aiop实现事件等待模式

前端之家收集整理的这篇文章主要介绍了使用aiop实现事件等待模式前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

事件等待模式,也就所谓的reactor模式,简单的讲就是通过监听等待io的事件,然后将等待就绪可以处理的事件,分发给对应的io处理程序进行处理,这种模式本身并不算是异步io,但是TBox的asio库,在类unix系统上,也是基于此模型实现的,所以在讲解真正的异步io之前,首先稍微介绍介绍,对asio的底层机制有个大体了解。

在类unix系统,例如linux的epoll,mac的kqueue,以及select、poll,dev/poll等等,都是可以用来实现reactor

虽然功能大体相同,但是效率上来讲, epoll和kqueue 更加的高效,因为他们没有像select,poll那样通过轮询来实现

单从接口设计上来说,kqueue的设计效率更高些,因为它可以一次批量处理多个事件,所以跟系统内核间的交互比较少。但是具体epoll和kqueue那个效率更高,就不好说了。

TBox下的asio底层实现,大体分为两种:

@H_404_10@
  • 基于reactor模型,通过对epoll,kqueue、poll、select等api的分装实现的aiop接口(也就是本章所要讲的) 在一个独立的线程内,进行事件监听,来分发处理各种异步io事件,也就实现了proactor的模式。
  • 直接系统原生支持的proactor模型,例如windows的iocp,并在其基础上做一些封装来实现。
  • 这样,对于上层应用来说,用的都是同一套asio的异步回调接口,不需要做什么的改变,但是底层根据不同平台其实现机制,已大不相同,windows上用了iocp,linux和android用了epoll,mac上用了kqueue,ios上用了poll。。

    言归正传,本章所要将的aiop接口,就是对reactor的各种poll接口的上层封装,对于一些不需要将就高性能的应用直接使用aiop更加的简洁,易维护。 而基于回调的proactor模式,等下一章再详细讲解。

    首先我先描述下几个对象类型:

    @H_404_10@
  • aiop: 事件等待对象池
  • aioo: 等待对象,关联和维护:socket句柄、用户私有数据
  • aioe: 事件对象,一个aioo对象一次可以等待多个不同类型的aioe事件对象,例如send,recv,acpt,conn,...
  • code: 事件代码
  • 接着我们看下直接使用aiop的简单服务器代码,(这里为了看起来简洁点,资源管理和释放上我就先省略了):

    tb_int_t main(tb_int_t argc,tb_char_t** argv)
    {
        // 初始化一个tcp的套接字,用于监听
        tb_socket_ref_t listen_sock = tb_socket_init(TB_SOCKET_TYPE_TCP);
        tb_assert_and_check_return_val(sock,0);
        
        // 初始化aiop轮询池,对象规模16个socket,可混用,如果传0,则使用默认值
        tb_aiop_ref_t aiop = tb_aiop_init(16);
        tb_assert_and_check_return_val(aiop,0);
        
        // 绑定ip和端口,这里ip没做绑定,传null,监听端口为9090
        if (!tb_socket_bind(listen_sock,tb_null,9090)) return 0;
        
        // 监听socket
        if (!tb_socket_listen(listen_sock,5)) return 0;
        
        // 将这个监听套接添加到aiop中,并附上accept等待事件
        if (!tb_aiop_addo(aiop,listen_sock,TB_AIOE_CODE_ACPT,tb_null)) return 0;
        
        // 初始化一个aioe的事件对象列表,用于获取等待返回的事件
        tb_aioe_t list[16];
        
        // 开启循环
        while (1)
        {
            /* 等待事件到来,用到跟epoll,select类似
             *
             * 16: 最大需要等待的事件数量
             * -1: 等待超时值,这里为永久等待
             *
             * objn为返回的有效事件数,如果失败返回:-1, 超时返回:0
             */
            tb_long_t objn = tb_aiop_wait(aiop,list,16,-1);
            tb_assert_and_check_break(objn >= 0);
            
            // 枚举等到的事件列表
            tb_size_t i = 0;
            for (i = 0; i < objn; i++)
            {
                // 获取事件对应aioo对象句柄,aioo对socket句柄,事件类型、关联的私有数据进行了同一维护
                tb_aioo_ref_t aioo = list[i].aioo;
                
                // 获取aioe事件关联的私有数据指针
                // 也可以通过tb_aioo_priv(aioo)获取
                tb_cpointer_t priv = list[i].priv;
                
                // 获取aioo对应的socket句柄
                tb_socket_ref_t sock = tb_aioo_sock(aioo);
                
                // 有accept事件?
                if (list[i].code & TB_AIOE_CODE_ACPT)
                {
                    // 接收对方的连接,返回对应的客户端socket
                    tb_socket_ref_t client_sock = tb_socket_accept(sock,tb_null);
                    tb_assert_and_check_break(client_sock);
                    
                    /* 将客户端的socket也添加到aiop池中,并等待它的recv事件
                     *
                     * 这里的最后一个参数,可以传一个私有的数据指针,并和sock进行关联
                     * 用于方便维护每个连接对应的会话数据,这里随便传了个字符串
                     *
                     * 返回的aioo对象可以保存下来,并在之后可以灵活修改需要等待的事件
                     *
                     * 注:这里的client_sock需要在自己的应用内,自己做释放,aiop不会去自动释放
                     * 因为这个实在外围代码自己创建的,这里的例子仅仅为了省事,就直接忽略了。
                     */
                    tb_aioo_ref_t aioo = tb_aiop_addo(aiop,client_sock,TB_AIOE_CODE_RECV,"private data");
                    tb_assert_and_check_break(aioo);
                }
                // 有recv事件?
                else if (list[i].code & TB_AIOE_CODE_RECV)
                {
                    // 非阻塞接收一段数据
                    tb_byte_t data[8192];
                    tb_long_t real = tb_socket_recv(sock,data,sizeof(data));
                    // 接收完指定数据后,省略部分代码
                    // ...
                    // 将socket改为等待发送
                    if (!tb_aiop_sete(aiop,aioo,TB_AIOE_CODE_SEND,tb_null)) break;
                    
                    // 尝试发送数据,也许没发完
                    tb_socket_send(sock,"hello",sizeof("hello"));
                }
                // 有send事件?
                else if (list[i].code & TB_AIOE_CODE_SEND)
                {
                    // 继续发送上回没发完的数据
                    // tb_socket_send(..);
                    
                    // 删除对应aioo事件对象,取消监听这个sock的事件
                    tb_aiop_delo(aiop,aioo);
                }
                // 有连接事件?
                else if (list[i].code & TB_AIOE_CODE_CONN)
                {
                    // 此处不会进入,仅仅在tb_socket_connect之后,并且注册了aioo,才会有此事件
                    // ...
                }
                // 错误代码处理 
                else 
                {
                    tb_trace_e("unknown code: %lu",list[i].code);
                    break;
                }
            }
        }
        return 0;
    }

    这里的代码,仅仅描述了下aiop的接口调用流程,没有实际的服务器业务逻辑,仅作参考,请不要照搬复制的使用。

    代码里提到的aioo对象,还有个单独的等待接口,用于直接等待单个socket对象,一般在basic_stream的wait里面使用:

    /*! 等待单个socket句柄的事件
     *
     * @param socket    socket句柄 
     * @param code      aioe等待事件代码
     * @param timeout   等待超时时间,永久等待传:-1
     *
     * @return          > 0: 等到的事件,0: 超时,-1: 失败
     */
    tb_long_t           tb_aioo_wait(tb_socket_ref_t socket,tb_size_t code,tb_long_t timeout);

    aiop的接口并不多,我这里不多做描述了,就简单的列举下吧:

    /*! 初始化aiop事件等待池
     *
     * @param maxn      等待对象的规模数量
     *
     * @return          aiop池
     */
    tb_aiop_ref_t       tb_aiop_init(tb_size_t maxn);
    
    /*! 退出aiop
     *
     * @param aiop      aiop池
     */
    tb_void_t           tb_aiop_exit(tb_aiop_ref_t aiop);
    
    /*! 清除所有aiop中的aioo等待对象
     *
     * @param aiop      aiop池
     */
    tb_void_t           tb_aiop_cler(tb_aiop_ref_t aiop);
    
    /*! 强制退出aiop的等待,tb_aiop_wait会返回:-1,并且退出循环,无法再次等待
     *
     * @param aiop      aiop池
     */
    tb_void_t           tb_aiop_kill(tb_aiop_ref_t aiop);
    
    /*! 退出aiop的等待,tb_aiop_wait会返回:0,但不退出等待循环,还可继续等待
     *
     * @param aiop      aiop池
     */
    tb_void_t           tb_aiop_spak(tb_aiop_ref_t aiop);
    
    /*! 添加一个socket等待对象aioo,并且关联等待的事件代码code,以及私有数据priv
     *
     * @param aiop      aiop池
     * @param socket    socket句柄
     * @param code      等待的事件代码
     * @param priv      关联的用户私有数据
     *
     * @return          aioo对象
     */
    tb_aioo_ref_t       tb_aiop_addo(tb_aiop_ref_t aiop,tb_socket_ref_t socket,tb_cpointer_t priv);
    
    /*! 删除一个aioo等待对象,永远不再等待它
     *
     * @param aiop      aiop池
     * @param aioo      aioo对象句柄
     *
     */
    tb_void_t           tb_aiop_delo(tb_aiop_ref_t aiop,tb_aioo_ref_t aioo);
    
    /*! 投递一个aioe等待事件对象
     *
     * @param aiop      aiop池
     * @param aioe      aioe事件对象
     *
     * @return          投递成功返回:tb_true,失败返回:tb_false
     */
    tb_bool_t           tb_aiop_post(tb_aiop_ref_t aiop,tb_aioe_t const* aioe);
    
    /*! 设置和修改aioo对象的等待事件
     *
     * @param aiop      aiop池
     * @param aioo      aioo对象
     * @param code      等待事件代码
     * @param priv      用户私有数据
     *
     * @return          成功返回:tb_true,失败返回:tb_false
     */
    tb_bool_t           tb_aiop_sete(tb_aiop_ref_t aiop,tb_aioo_ref_t aioo,tb_cpointer_t priv);
    
    /*! 等待一定量的aioe事件
     *
     * @param aiop      aiop池
     * @param list      aioe事件列表,用于保存成功返回的事件对象
     * @param maxn      指定等待多少的事件
     * @param timeout   等待超时值,永久等待:-1
     *
     * @return          > 0: 实际等到的aioe事件对象数,-1: 失败
     */
    tb_long_t           tb_aiop_wait(tb_aiop_ref_t aiop,tb_aioe_t* list,tb_size_t maxn,tb_long_t timeout);

    虽然aiop的reactor模式处理并发io已经相当方便了,但是由于系统底层对其的实现的程度不一,像windows上只能通过select来实现,因此对于一些高性能并发io的处理上,还是有些力不从心的,为了更好的利用不同系统提供的io处理特性,达到更好的并发性,又要保证上层调用接口的简单、统一,实现跨平台、高度可移植性。

    这个时候用proactor模式进行更上层的接口封装,完全采用异步回调通知模式,是一种较好地@R_301_463@案。


    猜你在找的React相关文章