Sven

Reactor 模式


一种并发模型

阻塞的io

最早编写socket服务的时候我们编写的代码如下

import java.net.*;
import java.io.*;

class Client
{
    public static void main(String [] args)
    {
        String serverName = "127.0.0.1";
        int port = 9999;
        try
        {
            System.out.println("连接到主机:" + serverName + " ,端口号:" + port);
            Socket client = new Socket(serverName, port);
            System.out.println("远程主机地址:" + client.getRemoteSocketAddress());
            OutputStream outToServer = client.getOutputStream();
            DataOutputStream out = new DataOutputStream(outToServer);

            out.writeUTF("Hello from " + client.getLocalSocketAddress());
            InputStream inFromServer = client.getInputStream();
            DataInputStream in = new DataInputStream(inFromServer);
            System.out.println("服务器响应: " + in.readUTF());
            client.close();
        }catch(IOException e)
        {
            e.printStackTrace();
        }
    }
}


class Server extends Thread
{
    private ServerSocket serverSocket;

    public Server(int port) throws IOException
    {
        serverSocket = new ServerSocket(port);
        serverSocket.setSoTimeout(0); // 永不超时
    }

    public void run()
    {
        while(true)
        {
            try
            {
                System.out.println("等待远程连接,端口号为:" + serverSocket.getLocalPort() + "...");
                Socket server = serverSocket.accept();
                System.out.println("远程主机地址:" + server.getRemoteSocketAddress());
                DataInputStream in = new DataInputStream(server.getInputStream());
                System.out.println(in.readUTF());
                DataOutputStream out = new DataOutputStream(server.getOutputStream());
                out.writeUTF("谢谢连接我:" + server.getLocalSocketAddress() + "\nGoodbye!");
                server.close();
            }catch(SocketTimeoutException s)
            {
                System.out.println("Socket timed out!");
                break;
            }catch(IOException e)
            {
                e.printStackTrace();
                break;
            }
        }
    }
    public static void main(String [] args)
    {
        int port = 9999;
        try
        {
            Thread t = new Server(port);
            t.run();
        }catch(IOException e)
        {
            e.printStackTrace();
        }
    }
}

ServerSocket的accept/read都是阻塞的, accept会等待连接建立, read会等待数据可读, 如果要处理多个客户端的连接, 需要给每个连接分配一个线程; 线程和连接是一对一的关系;

但是线程的创建是有开销的, 操作系统调度线程上下文也是有开销的, 所以我们使用线程池, 对线程做复用; 一个线程可以处理多个连接;

当前连接的数据不可读的时候, 其他连接还是被阻塞的, 我们可以使用非阻塞的socket, 线程不断轮训read查看是否可读; 但是, 轮询会消耗cpu, 当一个线程处理的连接越多, 轮训的效率越低;

当使用io多路复用(操作系统提供的一组监控文件描述符事件的api)监控socket的事件的时候我们就不需要轮训了;

用过 I/O 多路复用接口写网络程序的同学,肯定知道是面向过程的方式写代码的,这样的开发的效率不高。 于是,大佬们基于面向对象的思想,对 I/O 多路复用作了一层封装,让使用者不用考虑底层网络 API 的细节,只需要关注应用代码的编写, 就是Reactor,Reactor 模式也叫 Dispatcher 模式,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程;

Reactor 模式主要由 Reactor 和处理资源池这两个核心部分组成,它俩负责的事情如下:

Reactor 负责监听和分发事件,事件类型包含连接事件、读写事件; 处理资源池负责处理事件,如 read -> 业务逻辑 -> send; Reactor 模式是灵活多变的,可以应对不同的业务场景,灵活在于:

Reactor 的数量可以只有一个,也可以有多个; 处理资源池可以是单个进程 / 线程,也可以是多个进程 /线程; 将上面的两个因素排列组设一下,理论上就可以有 4 种方案选择:

单 Reactor 单进程 / 线程; 单 Reactor 多进程 / 线程; 多 Reactor 单进程 / 线程; 多 Reactor 多进程 / 线程; 其中,「多 Reactor 单进程 / 线程」实现方案相比「单 Reactor 单进程 / 线程」方案,不仅复杂而且也没有性能优势,因此实际中并没有应用。

剩下的 3 个方案都是比较经典的,且都有应用在实际的项目中:

单 Reactor 单进程 / 线程; 单 Reactor 多线程 / 进程; 多 Reactor 多进程 / 线程; 方案具体使用进程还是线程,要看使用的编程语言以及平台有关:

Java 语言一般使用线程,比如 Netty; C 语言使用进程和线程都可以,例如 Nginx 使用的是进程,Memcache 使用的是线程。

On this page