1. 简介
我们之前研究过进程间通信(IPC),并了解了不同方法之间的一些性能比较。在本文中,我们将研究如何在Java应用程序中实现其中一些方法。
2. 什么是进程间通信?
进程间通信(简称IPC)是不同进程之间进行通信的一种机制,这包括组成同一应用程序的各种进程、在同一台计算机上运行的不同进程以及分布在互联网上的其他进程。
例如,一些Web浏览器将每个选项卡作为不同的OS进程运行,这样做是为了使它们彼此隔离,但确实需要选项卡进程和主浏览器进程之间有一定程度的IPC来确保一切正常运行。
我们在这里看到的一切都将以消息传递的形式出现,Java缺乏对共享内存机制的标准支持,尽管一些第三方库可以实现这一点。因此,我们将考虑一个向消费进程发送消息的生产进程。
3. 基于文件的IPC
在标准Java中,我们可以实现的最简单的IPC形式就是使用本地文件系统上的文件。一个进程可以写入文件,而另一个进程可以读取同一个文件。任何进程在进程边界之外使用文件系统所做的任何事情都可以被同一台计算机上的所有其他进程看到。
3.1 共享文件
我们可以从让两个进程读写同一个文件开始,生产进程将写入文件系统上的文件,然后消费进程将从同一个文件中读取。
我们确实需要注意,写入文件和读取文件不会重叠。在许多计算机上,文件系统操作不是原子的,因此如果写入和读取同时发生,消费进程可能会收到损坏的消息。但是,如果我们可以保证这一点(例如,使用文件系统锁定),那么共享文件就是促进IPC的直接方法。
3.2 共享目录
共享单个知名文件的一个进步是共享整个目录,我们的生产应用程序可以在每次需要时将新文件写入目录,而我们的消费应用程序可以检测到新文件的存在并对其做出反应。
Java在NIO2中有WatchService API,我们可以使用它来实现这一点。我们的消费进程可以使用它来监视我们的目标目录,每当它通知我们已创建新文件时,我们就可以对其做出反应:
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get("pathToDir");
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
WatchKey key;
while ((key = watchService.take()) != null) {
for (WatchEvent<?> event : key.pollEvents()) {
// React to new file.
}
key.reset();
}
完成此操作后,我们的生产进程需要在此目录中创建适当的文件,而消费进程将检测并处理它们。
但请记住,大多数文件系统操作都不是原子的,我们必须确保仅在文件完全写入时才触发文件创建事件。这通常是通过将文件写入临时目录,然后在完成后将其移动到目标目录来完成的。
在大多数文件系统上,“移动文件”或“重命名文件”操作只要发生在同一文件系统内就被视为原子操作。
3.3 命名管道
到目前为止,我们已经使用完整的文件在进程之间传递消息,这要求生产进程在消费进程读取之前写入整个文件。
命名管道是我们可以在此处使用的一种特殊文件类型。命名管道是文件系统上的条目,但其背后没有任何存储。相反,它们充当写入和读取进程之间的管道。
我们首先让消费进程打开命名管道进行读取,由于此命名管道在文件系统上显示为文件,因此我们使用标准文件IO机制执行此操作:
BufferedReader reader = new BufferedReader(new FileReader(file));
String line;
while ((line = reader.readLine()) != null) {
// Process read line
}
写入此命名管道的所有内容都会被此消费进程立即读取,这意味着我们的生产进程需要打开此文件并正常写入。
不幸的是,我们没有在Java中创建这些命名管道的机制。相反,我们需要使用标准OS命令来创建文件系统条目,然后程序才能使用它。具体如何执行此操作因操作系统而异。例如,在Linux上,我们将使用mkfifo命令:
$ mkfifo /tmp/ipc-namedpipe
然后,我们可以在我们的消费和生产进程中使用/tmp/ipc-namedpipe。
4. 基于网络的IPC
我们所看到的一切都围绕着两个进程共享同一个文件系统,这意味着它们需要在同一台计算机上运行。然而,在某些情况下,我们希望我们的进程能够相互通信,无论它们在哪台计算机上运行。
我们可以改用基于网络的IPC来实现这一点。本质上,这只是在一个进程中运行Web服务器,在另一个进程中运行Web客户端。
4.1 简单套接字
实现基于网络的IPC最明显的例子是使用简单的网络套接字,我们可以使用JDK中的套接字支持,也可以依赖Netty或Grizzly等库。
我们的消费进程将运行一个监听已知地址的Web服务器,然后它可以像任何Web服务器一样处理传入的连接和处理消息:
try (ServerSocket serverSocket = new ServerSocket(1234)) {
Socket clientSocket = serverSocket.accept();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
// Process read line
}
}
然后,生产进程可以向其发送网络消息,以促进我们的IPC:
try (Socket clientSocket = new Socket(host, port)) {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
out.println(msg);
}
值得注意的是,与基于文件的IPC相比,我们可以更轻松地双向发送消息。
4.2 JMX
使用网络套接字效果很好,但我们需要自己管理很多复杂性。作为替代方案,我们也可以使用JMX。从技术上讲,这仍然使用基于网络的IPC,但它将网络抽象化,因此我们只在MBean方面工作。
和以前一样,我们需要一个在消费进程上运行的服务器。但是,这个服务器现在是来自JVM的标准MBeanServer,而不是我们自己做的任何事情。
我们首先需要定义我们的MBean本身:
public interface IPCTestMBean {
void sendMessage(String message);
}
class IPCTest implements IPCTestMBean {
@Override
public void sendMessage(String message) {
// Process message
}
}
然后,我们可以将其提供给JVM中的MBeanServer:
ObjectName objectName = new ObjectName("cn.tuyucheng.taketoday.ipc:type=basic,name=test");
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new IPCTest(), objectName);
至此,我们已经为消费者做好了准备。
然后我们可以使用JMXConnectorFactory实例从我们的生产系统向该服务器发送消息:
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1234/jmxrmi");
try (JMXConnector jmxc = JMXConnectorFactory.connect(url, null)) {
ObjectName objectName = new ObjectName("cn.tuyucheng.taketoday.ipc:type=basic,name=test");
IPCTestMBean mbeanProxy = JMX.newMBeanProxy(jmxc.getMBeanServerConnection(), objectName, IPCTestMBean.class, true);
mbeanProxy.sendMessage("Hello");
}
请注意,为了使其正常工作,我们需要使用一些额外的JVM参数运行我们的消费者,以便在知名端口上公开JMX:
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.port=1234
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
然后我们需要在客户端的URL中使用它来连接到正确的服务器。
5. 消息传递基础设施
到目前为止,我们看到的都是相对简单的IPC方式。在某个时候,这种方式也会失效。例如,它假设只有一个进程消费消息—或者生产者确切知道要与哪个消费者对话。
如果我们需要超越这一点,我们可以使用JMS、AMPQ或Kafka之类的东西与专用消息传递基础设施集成。
显然,这比我们在这里讨论的规模要大得多-这将允许一整套生产和消费系统相互传递消息。但是,如果我们需要这种规模,那么这些选项确实存在。
6. 总结
我们已经了解了进程间IPC的几种不同方式,以及我们自己如何实现它们,这涵盖了从共享单个文件到企业级的各种规模。
Post Directory
