• 服务端ZMQ(一)——源码解析


    服务端ZMQ(一)——源码解析

    1、什么是ZeroMQ

    1. 基于消息队列的多线程网络库:对套接字类型、连接处理、帧、路由底层细节进行抽象、提供跨越多种传输协议的套接字(socket library)
    2. 一个嵌入式库:封装了网络通信、消息队列、线程调度等功能,向上层应用提供API,应用程序通过加载库文件,调用API函数来实现高性能网络通信(简单好用的传输层)

    总结:ZeroMQ在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口,提供一个底层的网络通讯库

    注意:普通的 socket 是端到端的(1:1的关系)ZeroMQ 却是可以N:M 的关系

    /**
    socket
    	1.建立连接		销毁连接
    	2.制定协议		选择协议	protobuffer messagepack shrift
    	3.处理错误		error close
    	4.断线重连的问题心跳 检测
    	5. IO epoll
    	
    有哪些模型?
    1.请求回应模型,	req/rep			redis 协议请求的次序跟回应的次序
    
    2、管道模型 		 push/ull模型   	 ginx
    
    3、监听发布模型 	sub/pub 		观察者模式
    
    **/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    1.1、为什么用消息队列和多线程?

    ​ 看到消息队列,首先想到的是异步、消峰、解耦。例子:某个功能中很多步骤需要在一个流程里面完成,例如下图支付流程被拉长了,更加耗时。

    在这里插入图片描述

    引入消息队列,就变成了下图。对于一些可以同时处理的流程用线程,线程池去处理,减少响应时间。

    在这里插入图片描述

    1.2、ZeroMQ长是什么样子 ?

    在这里插入图片描述

    ZeroMQ几乎所有IO操作都是异步的,每个ZeroMQ IO线程都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不同的操作系统平台使用不同的网络IO模型(select、poll、epoll、devpoll、kequeue等)。主线程I/O线程通过Mail Box传递消息来进行通信

    Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。

    Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。

    Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。

    每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。

    ZMQ性能优化的过程中发现有3个因素会对性能产生严重的影响:

    • 内存分配的次数
    • 系统调用的次数
    • 并发模型

    **内存分配的次数:**一条ØMQ消息由一个不透明的句柄来表示。对于非常短小的消息,其内容被直接编码到句柄中。因此,对句柄的拷贝实际上就是对消息数据的拷贝。当遇到较大的消息时,它被分配到一个单独的缓冲区内,而句柄只包含一个指向缓冲区的指针。对句柄的拷贝并不会造成对消息数据的拷贝,当消息有数兆字节长时,这么处理是很有道理的如下图。需要提醒的是,后一种情况里缓冲区是按引用计数的,因此可以做到被多个句柄引用而不必拷贝数据。

    在这里插入图片描述

    **批量处理:**在消息通信系统中,系统调用的数量太多的话会导致出现性能瓶颈。当创建高性能的应用时应该尽可能多的去避免遍历调用栈。

    在这里插入图片描述

    **并发模型:**采用一种不同的模型,目标是完全避免锁机制,并让每个线程能够全速运行。线程间的通信是通过在线程间传递异步消息(事件)来实现的。(actor模式)

    无锁算法:ØMQ在pipe对象中采用无锁队列来在用户线程和ØMQ的工作者线程之间传递消息。

    2、简单示例

    2.1、主要的四个操作

    1. 创建和销毁套接字:zmq_socket(), zmq_close()
    2. 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
    3. 为套接字建立连接:zmq_bind(), zmq_connect()
    4. 发送和接收消息:zmq_send(), zmq_recv()

    2.2、HelloWorld

    //client
    #include                                                                                        
    #include                                                                                     
    #include                                                                                      
    #include                                                                                     
                                                                                                           
    int main (void)                                                                                        
    {                                                                                                      
        printf ("Connecting to hello world server…\n");                                                   
        void *context = zmq_ctx_new ();                         //创建套接字   上下文环境                                                       
        void *requester = zmq_socket (context, ZMQ_REQ);        //设置套接字   ZMQ_REQ     请求回应模型                                                                                                   
        zmq_connect (requester, "tcp://localhost:5555");        //建立连接                                        
                                                                                                           
        int request_nbr;                                                                                   
        for (request_nbr = 0; request_nbr != 5; request_nbr++) { //发送消息                                    
            char buffer [5];                                                                              
            printf ("Sending Hello %d…\n", request_nbr);                                                  
            zmq_send (requester, "Hello", 5, 0);                                                           
            zmq_recv (requester, buffer, 5, 0);                                                           
            printf ("Received World %d\n", request_nbr);                                                   
        }                                                                                                  
                                                                                                           
        zmq_close (requester);                                                                             
        zmq_ctx_destroy (context);                                                                         
                                                                                                           
        return 0;                                                                                          
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    //server
    #include                                                                                                                        #include                                                                                     
    #include                                                                                     
    #include                                                                                     
    #include                                                                                      
                                                                                                           
    int main (void)                                                                                        
    {                                                                                                                                                                           
        void *context = zmq_ctx_new ();                     //创建套接字   上下文环境                                                       
        void *responder = zmq_socket (context, ZMQ_REP);    //设置套接字   ZMQ_REP     请求回应模型                                             
        int rc = zmq_bind (responder, "tcp://*:5555");      //建立连接                                                    
        assert (rc == 0);                                                                                  
                                                                                                           
        while (1) {                                 //接收消息                                                       
            char buffer [5];                                                                              
            zmq_recv (responder, buffer, 5, 0);                                                           
            printf ("Received Hello\n");                                                                   
            sleep (1);          //  Do some 'work'                                                         
            zmq_send (responder, "World", 5, 0);                                                           
        }                                                                                                  
        return 0;                                                                                          
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    实现效果:

    在这里插入图片描述

    2.3、与传统TCP的区别

    1. 使用多种协议:inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
    2. 连接是异步的,并由一组消息队列做缓冲;
    3. 连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;
    4. 一个套接字可以有多个输入和输出连接;N:M;
    5. ZMQ没有提供类似accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
    6. 应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。、
    7. 当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定;

    3、工作流程

    3.1、zmq_ctx_new

    /*
    	zmq.cpp  原文件
    */
    void *zmq_ctx_new (void)
    {
    	//首先是初始化网络环境  ZMQ_HAVE_OPENPGM 或者 ZMQ_HAVE_WINDOWS
        if (!zmq::initialize_network ()) {  
            return NULL;
        }
    
        //  Create 0MQ context.
        /*
            ctx_t	ctx.hpp源文件
            
            zmq::ctx_t::ctx_t () :
            //套接字是否使用的标志  后续close 最后一个释放
            _tag (ZMQ_CTX_TAG_VALUE_GOOD),	 
            //初始化信箱、zmq_ctx_term thread、reaper thread、I/O thread...
            _starting (true),
            //zmq_ctx_term 是否启用
            _terminating (false),
            //回收线程
            _reaper (NULL),
            //同时socket最大打开数
            _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
            //同时消息的最大数
            _max_msgsz (INT_MAX),
            //IO 线程数量
            _io_thread_count (ZMQ_IO_THREADS_DFLT),
            // 该上下文是否永远不会终止
            _blocky (true),
            //是否支持ipv6
            _ipv6 (false),
            //是否使用零拷贝消息解析功能
            _zero_copy (true)
        {
        #ifdef HAVE_FORK
            _pid = getpid ();
        #endif
        #ifdef ZMQ_HAVE_VMCI
            _vmci_fd = -1;
            _vmci_family = -1;
        #endif
    
            //  Initialise crypto library, if needed.
            zmq::random_open ();
    
        #ifdef ZMQ_USE_NSS
            NSS_NoDB_Init (NULL);
        #endif
    
        #ifdef ZMQ_USE_GNUTLS
            gnutls_global_init ();
        #endif
        }
            
            
            1、创建ctx指针
            2、check_tag()  --> 当前 ctx_t 的状态   _tag == ZMQ_CTX_TAG_VALUE_GOOD 
            3、terminate()  --> _terminatin 	--> start () --> create socket(并注册信箱)
            4、shutdown ()  --> _reaper(NULL)   回收线程
            5、set() --> thread_ctx_t () -->    启用新线程(参数)
            ...
        
        */
        zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
        if (ctx) {
            if (!ctx->valid ()) {
                delete ctx;
                return NULL;
            }
        }
        return ctx;
    }
    /*
    	所以zmq_ctx_new 只是做了初始化工作		ctx_t提供一个start_thread后续的函数调用中进行的启动(poller_base -> start -> _ctx.start_thread | poller_set -> start -> _ctx.start_thread)
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    3.2、zmq_socket

    //1、zmq_socket
    void *zmq_socket (void *ctx_, int type_)
    {
        //ctx_t检查
        if (!ctx_ || !(static_cast (ctx_))->check_tag ()) {
            errno = EFAULT;
            return NULL;
        }
        zmq::ctx_t *ctx = static_cast (ctx_);
        // 返回对象指针 是基类socket_base_t(看一下基类构造形式)
        zmq::socket_base_t *s = ctx->create_socket (type_);
        return static_cast (s);
    }
    
    //2、ctx_t::create_socket
    zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
    {
        scoped_lock_t locker (_slot_sync);
    
        //  一旦调用了zmq_ctx_term,将不能创建新套接字
        if (_terminating) {
            errno = ETERM;
            return NULL;
        }
    
        if (unlikely (_starting)) {
            // start_thread 启动
            if (!start ())
                return NULL;
        }
    
        //  如果当前已达到套接字上限,返回错误
        if (_empty_slots.empty ()) {
            errno = EMFILE;
            return NULL;
        }
    
        //  选择索引
        const uint32_t slot = _empty_slots.back ();
        _empty_slots.pop_back ();
    
        //  生成唯一id
        const int sid = (static_cast (max_socket_id.add (1))) + 1;
    
        //  创建套接字,并注册在其身上的mailbox
        socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
        if (!s) {
            _empty_slots.push_back (slot);
            return NULL;
        }
        _sockets.push_back (s);
        //该 ctx_t 上的 i_mailbox 数组
        _slots[slot] = s->get_mailbox ();
    
        return s;
    }
    
    //3、ctx_t::start
    bool zmq::ctx_t::start ()
    {
       //数组中的 mailboxes 进行初始化,增加回收线程
        _opt_sync.lock ();
        const int term_and_reaper_threads_count = 2;
        const int mazmq = _max_sockets;
        const int ios = _io_thread_count;
        _opt_sync.unlock ();
        const int slot_count = mazmq + ios + term_and_reaper_threads_count;
        try {
            //vector 重设 capacity 上限
            _slots.reserve (slot_count);
            _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
        }
        catch (const std::bad_alloc &) {
            errno = ENOMEM;
            return false;
        }
        _slots.resize (term_and_reaper_threads_count);
    
        // 将关闭线程的 mailbox 绑定到 ctx 上
        _slots[term_tid] = &_term_mailbox;
    
        //  创建回收线程并启动
        _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        if (!_reaper) {
            errno = ENOMEM;
            goto fail_cleanup_slots;
        }
        if (!_reaper->get_mailbox ()->valid ())
            goto fail_cleanup_reaper;
        _slots[reaper_tid] = _reaper->get_mailbox ();
        _reaper->start ();
    
        //创建指定数量的io线程启动且注册,当然包括其mailbox
        _slots.resize (slot_count, NULL);
    
        for (int i = term_and_reaper_threads_count;
             i != ios + term_and_reaper_threads_count; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            if (!io_thread) {
                errno = ENOMEM;
                goto fail_cleanup_reaper;
            }
            if (!io_thread->get_mailbox ()->valid ()) {
                delete io_thread;
                goto fail_cleanup_reaper;
            }
            _io_threads.push_back (io_thread);
            _slots[i] = io_thread->get_mailbox ();
            //io_thread 会使用 ctx_t 上的start_thread来启动成员函数 worker_routine ,进而启动当前平台下的io接口的
            //loop(), 再接下来就是经典的 reactor 模式, 从响应的fd中,找到对应的 poll_entry_t 
            //通过判断响应的事件来调用挂接在io_thread上的对象的 in_event 或者 out_event 函数
            io_thread->start ();
        }
    
        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = static_cast (_slots.size ()) - 1;
             i >= static_cast (ios) + term_and_reaper_threads_count; i--) {
            _empty_slots.push_back (i);
        }
    	//启动完成
        _starting = false;
        return true;
    
    fail_cleanup_reaper:
        _reaper->stop ();
        delete _reaper;
        _reaper = NULL;
    
    fail_cleanup_slots:
        _slots.clear ();
        return false;
    }
    //4、zmq::socket_base_t
    zmq::socket_base_t::socket_base_t (ctx_t *parent_,
                                       uint32_t tid_,
                                       int sid_,
                                       bool thread_safe_) :
    	//调用 own_t 的构造函数,用于维护对象的生命周期
        own_t (parent_, tid_),
        _sync (),
        _tag (0xbaddecaf),
        _ctx_terminated (false),
        _destroyed (false),
        _poller (NULL),
        _handle (static_cast (NULL)),
        _last_tsc (0),
        _ticks (0),
        _rcvmore (false),
        _monitor_socket (NULL),
        _monitor_events (0),
        _thread_safe (thread_safe_),
        _reaper_signaler (NULL),
        _monitor_sync ()
    {
        options.socket_id = sid_;
        options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
        options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
        options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
    	//根据线程安全选项来决定是否生成线程安全的 mailbox 对象
        if (_thread_safe) {
            _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
            zmq_assert (_mailbox);
        } else {
            mailbox_t *m = new (std::nothrow) mailbox_t ();
            zmq_assert (m);
    
            if (m->get_fd () != retired_fd)
                _mailbox = m;
            else {
                LIBZMQ_DELETE (m);
                _mailbox = NULL;
            }
        }
    }
    
    /*
    	zmq_socket 在ctx上插入一个socket_base_t对象并将其指针抛出 由own_t来维护生命周期
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178

    3.3、zmq_bind

    int zmq_bind (void *s_, const char *addr_)
    {
        //转化成socket_base_t 指针 (socket_base_t定义了很多操作)
        zmq::socket_base_t *s = as_socket_base_t (s_);
        if (!s)
            return -1;
        //进行地址的解析和绑定地址
        return s->bind (addr_);
    }
    
    //1、zmq::socket_base_t::bind
    int zmq::socket_base_t::bind (const char *endpoint_uri_)
    {
        //根据线程安全拍段进行上锁准备
        scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
    
        if (unlikely (_ctx_terminated)) {
            errno = ETERM;
            return -1;
        }
    
        // 执行可能存在的被挂起的命令
        int rc = process_commands (0, false);
        if (unlikely (rc != 0)) {
            return -1;
        }
    
       //为分割对传入的协议和地址端口进行分片
       //并对传入协议进行检查
        std::string protocol;
        std::string address;
        if (parse_uri (endpoint_uri_, protocol, address)
            || check_protocol (protocol)) {
            return -1;
        }
    
        // ......
    
    	//以下传输方式需要在io线程中进行,所以我们选择一个io线程
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }
    
        if (protocol == protocol_name::tcp) {
            //创建tcp 监听对象
            tcp_listener_t *listener =
              new (std::nothrow) tcp_listener_t (io_thread, this, options);
            alloc_assert (listener);
            //设置地址
            rc = listener->set_local_address (address.c_str ());
            if (rc != 0) {
                LIBZMQ_DELETE (listener);
                event_bind_failed (make_unconnected_bind_endpoint_pair (address),
                                   zmq_errno ());
                return -1;
            }
    
            // Save last endpoint URI
            listener->get_local_address (_last_endpoint);
    		//将节点插入子树中
            add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
                          static_cast (listener), NULL);
            options.connected = true;
            return 0;
        }
     	
        // ......
    	
        zmq_assert (false);
        return -1;
    }
    
    //2、zmq::ctx_t::choose_io_thread   传入绑定的CPU下标
    zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
    {
        if (_io_threads.empty ())
            return NULL;
    
         //根据cpu偏好以及当前的io压力来选择压力最小的io线程并返回
        int min_load = -1;
        io_thread_t *selected_io_thread = NULL;
        for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
             i++) {
            if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
                const int load = _io_threads[i]->get_load ();
                if (selected_io_thread == NULL || load < min_load) {
                    min_load = load;
                    selected_io_thread = _io_threads[i];
                }
            }
        }
        return selected_io_thread;
    }
    
    //3、zmq::socket_base_t::add_endpoint
    void zmq::socket_base_t::add_endpoint (
      const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
    {
        //  将新节点插入endpoint_
        launch_child (endpoint_);
        //插入ctx
        _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
                                              endpoint_pipe_t (endpoint_, pipe_));
    
        if (pipe_ != NULL)
            pipe_->set_endpoint_pair (endpoint_pair_);
    }
    
    //4、zmq::own_t::launch_child
    void zmq::own_t::launch_child (own_t *object_)
    {
        //  插入
        object_->set_owner (this);
    
        //  向object_所属的io线程发送plug消息,在执行process_plug
        send_plug (object_);
    
        //  设置object_归属权
        send_own (this, object_);
    }
    
    
    /*
    	zmq 在进行bind操作后,并不是马上绑定上的,虽然时间很短但其实是一个异步的流程
    	
    	tcp_listener_t 在plug初始化过程中,首先加入到当前的socket_base_t, 然后向tcp_listener_t发起一个plug消息,在io_thread的mailbox中进行缓存,并再下次loop循环中进行 tcp_listener_t process_plug
    	
    	为什么process_plug用消息启动:zmq开启多线程模式时,直接执行 process_plug 操作可能会不在 tcp_listener_t 所属线程中执行,也就是破坏了 actor 设计初衷(猜测)
    */
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132

    4、zmq_connect

    int zmq_connect (void *s_, const char *addr_)
    {
        zmq::socket_base_t *s = as_socket_base_t (s_);
        if (!s)
            return -1;
        return s->connect (addr_);
    }
    
    int zmq::socket_base_t::connect (const char *endpoint_uri_)
    {
        scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
        return connect_internal (endpoint_uri_);
    }
    
    
    int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
    {
        if (unlikely (_ctx_terminated)) {
            errno = ETERM;
            return -1;
        }
    
        //  执行任何可能被挂起的命令
        int rc = process_commands (0, false);
        if (unlikely (rc != 0)) {
            return -1;
        }
    
        //  解析 endpoint_uri_字符串 以 :
        //  为边界进行分别
        std::string protocol;
        std::string address;
        if (parse_uri (endpoint_uri_, protocol, address)
            || check_protocol (protocol)) {
            return -1;
        }
       
        //......
        //DEALER SUB PUB REQ 不支持对一个端点同时开启多个会话,进行判断当前会话是否存在
        const bool is_single_connect =
          (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
           || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
        if (unlikely (is_single_connect)) {
            if (0 != _endpoints.count (endpoint_uri_)) {
                // There is no valid use for multiple connects for SUB-PUB nor
                // DEALER-ROUTER nor REQ-REP. Multiple connects produces
                // nonsensical results.
                return 0;
            }
        }
    
        //选择io线程去运行我们的会话对象
        io_thread_t *io_thread = choose_io_thread (options.affinity);
        if (!io_thread) {
            errno = EMTHREAD;
            return -1;
        }
    
        address_t *paddr =
          new (std::nothrow) address_t (protocol, address, this->get_ctx ());
        alloc_assert (paddr);
    
        //  Resolve address (if needed by the protocol)
        if (protocol == protocol_name::tcp) {
            //  Do some basic sanity checks on tcp:// address syntax
            //  - hostname starts with digit or letter, with embedded '-' or '.'
            //  - IPv6 address may contain hex chars and colons.
            //  - IPv6 link local address may contain % followed by interface name / zone_id
            //    (Reference: https://tools.ietf.org/html/rfc4007)
            //  - IPv4 address may contain decimal digits and dots.
            //  - Address must end in ":port" where port is *, or numeric
            //  - Address may contain two parts separated by ':'
            //  Following code is quick and dirty check to catch obvious errors,
            //  without trying to be fully accurate.
            //对提供的字符串进行解析
            const char *check = address.c_str ();
            if (isalnum (*check) || isxdigit (*check) || *check == '['
                || *check == ':') {
                check++;
                while (isalnum (*check) || isxdigit (*check) || *check == '.'
                       || *check == '-' || *check == ':' || *check == '%'
                       || *check == ';' || *check == '[' || *check == ']'
                       || *check == '_' || *check == '*') {
                    check++;
                }
            }
            //  Assume the worst, now look for success
            rc = -1;
             //检查地址是否是安全有效的
            if (*check == 0) {
                //  Do we have a valid port string? (cannot be '*' in connect
                check = strrchr (address.c_str (), ':');
                if (check) {
                    check++;
                    if (*check && (isdigit (*check)))
                        rc = 0; //  Valid
                }
            }
            if (rc == -1) {
                errno = EINVAL;
                LIBZMQ_DELETE (paddr);
                return -1;
            }
            /推迟解决方案配置
            paddr->resolved.tcp_addr = NULL;
        }
    #ifdef ZMQ_HAVE_WS
    #ifdef ZMQ_HAVE_WSS
        else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
            if (protocol == protocol_name::wss) {
                paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
                alloc_assert (paddr->resolved.wss_addr);
                rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
                                                        options.ipv6);
            } else
    #else
        else if (protocol == protocol_name::ws) {
    #endif
            {
                paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
                alloc_assert (paddr->resolved.ws_addr);
                rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
                                                       options.ipv6);
            }
    
            if (rc != 0) {
                LIBZMQ_DELETE (paddr);
                return -1;
            }
        }
    #endif
    
    #if defined ZMQ_HAVE_IPC
        else if (protocol == protocol_name::ipc) {
            paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
            alloc_assert (paddr->resolved.ipc_addr);
            int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
            if (rc != 0) {
                LIBZMQ_DELETE (paddr);
                return -1;
            }
        }
    #endif
    
        if (protocol == protocol_name::udp) {
            if (options.type != ZMQ_RADIO) {
                errno = ENOCOMPATPROTO;
                LIBZMQ_DELETE (paddr);
                return -1;
            }
    
            paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
            alloc_assert (paddr->resolved.udp_addr);
            rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
                                                    options.ipv6);
            if (rc != 0) {
                LIBZMQ_DELETE (paddr);
                return -1;
            }
        }
    
        // TBD - Should we check address for ZMQ_HAVE_NORM???
    
    #ifdef ZMQ_HAVE_OPENPGM
        if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
            struct pgm_addrinfo_t *res = NULL;
            uint16_t port_number = 0;
            int rc =
              pgm_socket_t::init_address (address.c_str (), &res, &port_number);
            if (res != NULL)
                pgm_freeaddrinfo (res);
            if (rc != 0 || port_number == 0) {
                return -1;
            }
        }
    #endif
    #if defined ZMQ_HAVE_TIPC
        else if (protocol == protocol_name::tipc) {
            paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
            alloc_assert (paddr->resolved.tipc_addr);
            int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
            if (rc != 0) {
                LIBZMQ_DELETE (paddr);
                return -1;
            }
            const sockaddr_tipc *const saddr =
              reinterpret_cast (
                paddr->resolved.tipc_addr->addr ());
            // Cannot connect to random Port Identity
            if (saddr->addrtype == TIPC_ADDR_ID
                && paddr->resolved.tipc_addr->is_random ()) {
                LIBZMQ_DELETE (paddr);
                errno = EINVAL;
                return -1;
            }
        }
    #endif
    #if defined ZMQ_HAVE_VMCI
        else if (protocol == protocol_name::vmci) {
            paddr->resolved.vmci_addr =
              new (std::nothrow) vmci_address_t (this->get_ctx ());
            alloc_assert (paddr->resolved.vmci_addr);
            int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
            if (rc != 0) {
                LIBZMQ_DELETE (paddr);
                return -1;
            }
        }
    #endif
    
        //  Create session.
        session_base_t *session =
          session_base_t::create (io_thread, true, this, options, paddr);
        errno_assert (session);
    
        //  PGM does not support subscription forwarding; ask for all data to be
        //  sent to this pipe. (same for NORM, currently?)
    #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
        const bool subscribe_to_all =
          protocol == protocol_name::pgm || protocol == protocol_name::epgm
          || protocol == protocol_name::norm || protocol == protocol_name::udp;
    #elif defined ZMQ_HAVE_OPENPGM
        const bool subscribe_to_all = protocol == protocol_name::pgm
                                      || protocol == protocol_name::epgm
                                      || protocol == protocol_name::udp;
    #elif defined ZMQ_HAVE_NORM
        const bool subscribe_to_all =
          protocol == protocol_name::norm || protocol == protocol_name::udp;
    #else
        const bool subscribe_to_all = protocol == protocol_name::udp;
    #endif
        pipe_t *newpipe = NULL;
    
        if (options.immediate != 1 || subscribe_to_all) {
            //  Create a bi-directional pipe.
            object_t *parents[2] = {this, session};
            pipe_t *new_pipes[2] = {NULL, NULL};
    
            const bool conflate = get_effective_conflate_option (options);
    
            int hwms[2] = {conflate ? -1 : options.sndhwm,
                           conflate ? -1 : options.rcvhwm};
            bool conflates[2] = {conflate, conflate};
            rc = pipepair (parents, new_pipes, hwms, conflates);
            errno_assert (rc == 0);
    
            //  Attach local end of the pipe to the socket object.
            attach_pipe (new_pipes[0], subscribe_to_all, true);
            newpipe = new_pipes[0];
    
            //  Attach remote end of the pipe to the session object later on.
            session->attach_pipe (new_pipes[1]);
        }
    	
        // .........
            
        //  Save last endpoint URI
        paddr->to_string (_last_endpoint);
    
        add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
                      static_cast (session), newpipe);
        return 0;
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265

    5、zmq_recv

    int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
    {
        zmq::socket_base_t *s = as_socket_base_t (s_);
        if (!s)
            return -1;
        //初始化 zmq_msg_t 
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        errno_assert (rc == 0);
    
        const int nbytes = s_recvmsg (s, &msg, flags_);
        if (unlikely (nbytes < 0)) {
            const int err = errno;
            rc = zmq_msg_close (&msg);
            errno_assert (rc == 0);
            errno = err;
            return -1;
        }
    
        // 判断是否超过给定的大小
        const size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
    
        //  We exp如果比给定的大小 大,则进行拷贝
        if (to_copy) {
            assert (buf_);
            memcpy (buf_, zmq_msg_data (&msg), to_copy);
        }
        //释放消息 (当一个消息不再被使用的时候应该立刻调用zmq_msg_close()进行资源释放,否则可能引起内存泄露)
        rc = zmq_msg_close (&msg);
        errno_assert (rc == 0);
    
        return nbytes;
    }
    
    
    static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
    {
        //调用 socket_base_t 的recv函数
    	//以我们目前来说实际上就是调用的 rep_t 的 xrecv,在 socket_base_t 中,recv 调用的虚函数 xrecv 而 xrecv 将会被子类重写
        const int rc = s_->recv (reinterpret_cast (msg_), flags_);
        if (unlikely (rc < 0))
            return -1;
    
        //  Truncate returned size to INT_MAX to avoid overflow to negative values
        const size_t sz = zmq_msg_size (msg_);
        return static_cast (sz < INT_MAX ? sz : INT_MAX);
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    4、工作流程时序图

    4.1、接收端(server)

    在这里插入图片描述

    4.2、发送端(client)

    在这里插入图片描述

    4.3、zmq类层次

    在这里插入图片描述

    ①、object_t,主要用于发送命令和处理命令,所有继承object_t的子类都具备该类的功能

    ②、io_thread_t,内含一个poller,可监听句柄的读、写、异常状态,继承自object_t,具有接收命令、处理命令、发送命令的功能

    ③、io_object_t,可以获取一个io_thread_t的poller,从而具备poller功能,所有继承自该类的子类都具有pollere功能,可监听句柄的读、写、异常状态

    ④、reaper_t,zmq的回收线程

    ⑤、own_t,zmq的对象树结点,或者说多叉树的结点,其主要用于对象的销毁,可以想到,对象的销毁就是这棵树的销毁过程,必须要使用深度优先的算法来销毁。关于zmq对象树在Internal Architecture of libzmq有详细讲解

    ⑥、tcp_connector_t,zmq_socket的连接器,使用她来建立tcp连接

    ⑦、tcp_listener_t,zmq_socket的监听器

    ⑧、stream_engine,负责处理io事件中的一种----网络事件,把网络字节流转换成zeromq的msg_t消息传递给session_base_t。另外一些和版本兼容相关的杂务也stream_engine处理的。stream_engine_t处理完杂务,到session_base_t就只看见msg_t了。

    ⑨、session_base_t,管理zmq_socket的连接和通信,主要与engine进行交换

    ⑩、socket_base_t,zeromq的socket,在zmq中,被当成一种特殊的”线程“,具有收发命令的功能

    //参考资料
    https://blog.csdn.net/tbyzs/article/category/1710475   		zeromq源码分析
    https://www.cnblogs.com/zengzy/category/777608.html			zeromq源码学习笔记		
    
    • 1
    • 2
    • 3
  • 相关阅读:
    测试工程师提升方向,提升产品思维提高测试效率......
    客服支持Chatbot提供即时回答,改善用户体验
    ELK入门(二)- springboot整合ES
    PHP志愿者协会报名系统的设计与实现 毕业设计-附源码201524
    JavaScript 工具函数大全
    C# 自定义事件
    vue2.x版本中computed和watch的使用入门详解-watch篇
    vue设置全局变量:让你的数据无处不在!
    关于sql中联接的问题
    数据库 | 数据库概述、关系型数据库、非关系型数据库
  • 原文地址:https://blog.csdn.net/weixin_43730892/article/details/127693541