Pure Soul

  1. 首页
  2. Linux
  3. 正文

I/O多路复用之select

2021年10月28日 824点热度 0人点赞 0条评论

目录

  • 题外话:服务器单机理论最大能连接多少个客户端?
  • 多进程模型
  • 多线程模型
  • select

题外话:服务器单机理论最大能连接多少个客户端?

答案是:对于IPV4而言,粗略估计有2^48个连接。计算机标识一个唯一的socket连接依赖的是唯一四元组-(源ip,源端口,本机ip,本机端口),其中本机的ip和socket初始化的端口是不能变的,因此源ip和源端口都是可变的。那么本机的一个进程,理论上可以连接2^32*2^16=2^48个连接,32表示IPV4的地址数,2^16表示本机端口号的数目(65535)。

当然,理论值只是理论值。限制连接数的不只是ip和端口两个因素,还和内核的各种限制也有关系,每一个socket在内核中都是一个文件句柄的存在,并且Linux下单个进程的可用句柄数量为1024个,可以通过修改系统配置来增大这个数目。

多进程模型

原始的IO多路复用就是多进程模型,为每一个socket连接分配一个进程来处理。主进程负责监听accept(),每次有新的连接就开辟一个子进程来处理。由此可见,主进程和子进程承担了不同的角色。

这种朴素的复用方式在并发量小的时候还可以应对,当并发量大了之后,这种方法不足以支撑起高并发的场景。进程的上下文切换不仅包含了虚拟内存、栈、全局变量等用户空间的资源,还包括了内核堆栈、寄存器等内核空间的资源。频繁的切换会极大地浪费资源。

多线程模型

多线程模型相较于多进程而言,节省了进程切换所需要的资源。
如果每来一个连接就创建一个线程,线程运行完后,还得操作系统还得销毁线程,虽说线程切换的上写文开销不大,但是如果频繁创建和销毁线程,系统开销也是不小的。

那么,我们可以使用线程池的方式来避免线程的频繁创建和销毁,所谓的线程池,就是提前创建若干个线程,这样当由新连接建立时,将这个已连接的 Socket 放入到一个队列里,然后线程池里的线程负责从队列中取出已连接 Socket 进程处理。

需要注意的是,保存已连接的socket队列是全局的,每个线程都可以操作,为了避免多线程竞争,线程在操作这个队列前要加锁。

select

select 实现多路复用的方式是,将已连接的 Socket 都放到一个集合文件描述符集(fd_set),然后调用 select 函数将文件描述符集合拷贝(第一次拷贝)到内核里,让内核来检查是否有网络事件产生,内核检查的方式很简单粗暴,就是通过遍历文件(第一次遍历)描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写,接着再把整个文件描述符集合拷贝(第二次拷贝)回用户态里,然后用户态还需要再通过遍历(第二次遍历)的方法找到可读或可写的Socket,然后再对其处理。

通过上面的分析可以看出,对于select而言,需要两次拷贝以及对文件描述符集的遍历。同时,select有最大监听事件数量的限制,最大为1024。
以下为select实现读写分离的服务器代码和客户端代码。

# -*- coding: utf-8 -*-
# @Time    : 2021/10/24 20:04
# @Author  : Chuqiao Yi
# @File    : select_server.py
# @Software: PyCharm
"""
select application on server
"""
import select
import socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # TCP
# server.setblocking(flag=False)  # None block

server_addr = ('0.0.0.0', 18888)  # ip-port
print('TCP listening on ', server_addr)

server.bind(server_addr)
server.listen()

inputs = [server, ]  # ready to read

outputs = []  # ready to write

message_queues = {}  # socket: Queue

while inputs:
    print('Waiting for next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs, 10)
    # select.select(rlist, wlist, xlist[, timeout]),
    # select句柄中,rlist,wlist,xlist都需要是可携带对象,其中的每一个可以是整数(文件描述符),
    # 或是python的文件对象,或者是socket对象。亦或是自己定义的类,只要有合适的fileno()方法返回一个真正的文件描述符
    print('正在监听的socket对象为', len(inputs))
    print(readable)
    for serv_or_conn in readable:
        if serv_or_conn == server:
            # 新的连接到来
            conn, addr = serv_or_conn.accept()
            print('new user ', addr)
            inputs.append(conn)
            message_queues[conn] = []
        else:
            # 老用户进行连接
            try:
                data_bytes = serv_or_conn.recv(2048)
            except Exception as ex:
                inputs.remove(serv_or_conn)
            else:
                data_str = str(data_bytes, encoding='utf-8')
                message_queues[serv_or_conn].append(data_str)
                outputs.append(serv_or_conn)

    for conn in writable:
        recv_str = message_queues[conn][0]
        del message_queues[conn][0]
        conn.sendall(bytes(recv_str + ' I have received', encoding='utf-8'))
        outputs.remove(conn)

    for ec in exceptional:
        inputs.remove(ec) # 移除所有的出错连接
# client code
import socket
import random
import time
import threading


class SocketThread(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
        self.socket_name = name

    def run(self) -> None:
        obj = socket.socket()
        obj.connect(('ip address', 18888))
        while True:
            inp = str(random.randint(0, 10000))
            print('thread number ', self.name, inp)
            try:
                obj.sendall(bytes(inp, encoding='utf-8'))
                ret = str(obj.recv(2048), encoding='utf-8')
            except:
                break
            else:
                print(ret)
            time.sleep(1)
        obj.close()

ths=[SocketThread(i) for i in range(5)]
for t in ths:
    t.start()
for t in ths:
    t.join()

标签: 暂无
最后更新:2021年10月30日

ycq

这个人很懒,什么都没留下

点赞
下一篇 >

COPYRIGHT © 2021 oo2ee.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS