Pure Soul

  1. 首页
  2. Linux
  3. 正文

I/O多路复用之poll

2021年10月30日 941点热度 0人点赞 0条评论

poll

select, poll, epoll是IO多路复用当中的重要的三种实现方式,poll和epoll相对于select而言,只能在Linux下使用,但是select是跨平台的。同时poll相对于select而言,没有最大监听数量的限制。但是也是监管一系列的文件描述符,阻塞的去轮询看这些文件描述符是否可读/可写/异常,再去调用io函数读写。

但是select和poll都存在一个很大的“缺点”,当存在大量的连接且其中绝大部分的连接都是活跃的时候,那么poll/select的效率实际上是很低下的,这也是由于二者的轮询机制实现,如果存在大量的活跃连接,轮询的效率会十分的低下,那么此时和传统的多线程/多进程实现模型本质上也没有区别,甚至于效率低于多进程/多线程模型。

和select监听采用fd_set位数组不同,poll监听采用的是pollfd事件结构体数组,也就是先定义一个事件结构体数组,然后在事件结构体数组中,设定好要监听事件的一个文件描述符,及要监听的事件等等信息。也就是每一个监听的事件(文件描述符)都是对应的一个初始化的结构体

// pollfd结构体的原型为:
struct pollfd {
   int   fd;         /* 需要关注/监听的文件描述符 */
   short events;     /* 注册的事件,由用户提供,例如POLLIN表示监听的是读就绪事件 */
   short revents;    /* 实际发生的事件,由内核填充,在之后的操作可以直接获取这个文件描述符的就绪状态 */
};

常见的事件注册如下:

使用的最多的就是POLLIN,POLLOUT,POLLERR三种状态,可以类比于select当中传入的readfds,writefds,exceptfds三个监听事件。

poll的函数句柄为int poll(struct pollfd *fds, nfds_t nfds, int timeout);,其中fds参数为数组的0元素指针,nfds为数组的大小,实际使用的时候可以随便设置(因为poll没有监听大小的限制),timeout为设置超时时间。同时相比于select的三个返回列表(可读,可写,错误),poll返回值为int类型的数值,表示的含义为:

(1)返回值小于0,表示出错
(2)返回值等于0,表示poll函数等待超时
(3)返回值大于0,表示poll由于监听的文件描述符就绪返回,
          并且返回结果就是就绪的文件描述符的个数。
          poll函数使用前面提到的pollfd结构体中的revents参数,
          revents变量在每一次poll函数调用完成后
         内核设置会设置revents的值, 这个值其实也就是上面列出来的那些events的宏(例如POLLIN等),
         以说明对该描述符发生了什么事件
         比如 调用完poll函数后要查看某一个文件描述符是否处于激活状态(比如可读)
         是通过调用pollfd参数的revents参数与POLLIN做比较如果相等,
         则说明该文件描述符处现在是可读的
         使用if语句:if(poll_fd.revents==POLLIN)

同时就绪情况可以细分为读就绪,写就绪,异常就绪,具体而言:
- 读就绪:
- 对于socket而言,如果socket缓冲区的字节数大于等于标记位的时候,此时可以无阻塞的读取文件描述符,且poll返回值大于0
- TCP通信中如果对方关闭连接,那么对这个socket读的时候,会返回0
- 监听的socket有新的连接,也属于读就绪
- socket上有未处理的错误

  • 写就绪
    • socket缓冲区累计的字节数大于指定的标记就可以无阻塞的写,并且返回值大于0
    • socket的写操作被关闭(close或shutdown),会触发SIGPIPE信号,
    • socket使用非阻塞connect连接成功或者失败之后
    • socket上有未处理的错误
  • 异常就绪
    • socket上收到额外的带外数据

poll的实现

服务端的实现

import select
import socket

server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server.bind(('0.0.0.0', 18888))
server.listen()
poll = select.poll()  # 初始化poll
poll.register(server.fileno(), select.POLLIN)  # 将serverz注册到poll当中,追踪的是文件描述符
# 关注的事件为是否具有可写事件的发生,采用register的方式修改链表,而不是以select的数组方式进行传递
connections = {}
while True:
    for fd, event in poll.poll(3600):  # timeout设置为3600s,对注册的时间进行轮询
        # 根据event的状态进行读写错误处理分离
        if event & select.POLLIN:
            # 此时是可读事件
            if fd == server.fileno():  # 有新的连接到来
                conn, addr = server.accept()
                poll.register(conn.fileno(), select.POLLIN)
                connections[conn.fileno()] = conn
            else:
                # 旧的连接具有可读事件
                conn = connections[fd]  # 通过文件描述符找到对应的socket连接进行数据传输
                try:
                    data = conn.recv(2048)  # 获取数据,接收请求
                except Exception as ex:
                    poll.unregister(fd)
                    conn=connections[fd]
                    connections.pop(fd)
                    conn.close()
                else:
                    if data:
                        poll.register(fd, select.POLLOUT)  # 重复注册
                        # 使用modify修改注册的事件类型
                        # poll.modify(fd, select.POLLOUT)
        elif event & select.POLLOUT: # 具有可写事件
            conn=connections[fd]
            conn.sendall(bytes('i have received', encoding='utf-8'))
            # 变换事件的监听状态,发送完之后变为可读监听
            poll.modify(fd, select.POLLIN)
        elif event & select.POLLERR or event & select.POLLHUP : # 连接发生错误了
            poll.unregister(fd) # 停止监听
            conn=connections[fd]
            del connections[fd] # 删除记录
            conn.close() # 关闭此次连接

客户端基于多线程的实现:

import socket
import random
import time
import threading


class SocketThread(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
        self.socket_name = name

    def run(self) -> None:
        obj = socket.socket()
        obj.connect(('*.*.*.*', 18888))
        while True:
            inp = str(random.randint(0, 10000))
            print('thread number ', self.name, inp)
            try:
                obj.sendall(bytes(inp, encoding='utf-8'))
                ret = str(obj.recv(2048), encoding='utf-8')
            except:
                break
            else:
                print(ret)
            # time.sleep(0.1)
        obj.close()

ths=[SocketThread('{0}'.format(i)) for i in range(50)]
for t in ths:
    t.start()
for t in ths:
    t.join()
标签: 暂无
最后更新:2021年10月31日

ycq

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >

COPYRIGHT © 2021 oo2ee.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS