1. 概述
大多数分布式应用程序都需要一些有状态的组件来保持一致性和容错性。Atomix是一个可嵌入的库,可帮助实现分布式资源的容错性和一致性。
它提供了一组丰富的API来管理其资源,如集合、组和并发工具。
首先,我们需要在pom中添加以下Maven依赖:
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-all</artifactId>
<version>1.0.8</version>
</dependency>
此依赖提供了其节点相互通信所需的基于Netty的传输。
2. 引导集群
要开始使用Atomix,我们需要先引导一个集群。
Atomix由一组副本组成,用于创建有状态的分布式资源,每个副本都维护集群中现有每个资源的状态副本。
集群中的副本有两种类型:主动和被动。
分布式资源的状态变化通过主动副本传播,而被动副本保持同步以维持容错能力。
2.1 引导嵌入式集群
要启动单节点集群,我们首先需要创建一个AtomixReplica实例:
AtomixReplica replica = AtomixReplica.builder(
new Address("localhost", 8700))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
这里副本配置了Storage和Transport,声明存储的代码片段:
Storage storage = Storage.builder()
.withDirectory(new File("logs"))
.withStorageLevel(StorageLevel.DISK)
.build();
一旦声明了副本并配置了存储和传输,我们就可以通过简单地调用bootstrap()来引导它-它返回一个CompletableFuture,可用于阻塞,直到服务器通过调用相关的阻塞join()方法进行引导:
CompletableFuture<AtomixReplica> future = replica.bootstrap();
future.join();
到目前为止,我们已经构建了一个单节点集群,现在我们可以向其中添加更多节点。
为此,我们需要创建其他副本并将它们与现有集群连接起来;我们需要生成一个新线程来调用join(Address)方法:
AtomixReplica replica2 = AtomixReplica.builder(new Address("localhost", 8701))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
replica2
.join(new Address("localhost", 8700))
.join();
AtomixReplica replica3 = AtomixReplica.builder(new Address("localhost", 8702))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
replica3.join(
new Address("localhost", 8700),
new Address("localhost", 8701))
.join();
现在我们已经启动了一个3节点集群。或者,我们可以通过在bootstrap(List<Address>)方法中传递一个地址列表来启动集群:
List<Address> cluster = Arrays.asList(
new Address("localhost", 8700),
new Address("localhost", 8701),
new Address("localhsot", 8702));
AtomixReplica replica1 = AtomixReplica
.builder(cluster.get(0))
.build();
replica1.bootstrap(cluster).join();
AtomixReplica replica2 = AtomixReplica
.builder(cluster.get(1))
.build();
replica2.bootstrap(cluster).join();
AtomixReplica replica3 = AtomixReplica
.builder(cluster.get(2))
.build();
replica3.bootstrap(cluster).join();
我们需要为每个副本生成一个新线程。
2.2 引导独立集群
Atomix服务器可以作为独立服务器运行,可以从Maven Central下载。简而言之,它是一个Java存档,可以通过在地址标志中提供host:port参数并使用-bootstrap标志通过终端运行。
以下是引导集群的命令:
java -jar atomix-standalone-server.jar
-address 127.0.0.1:8700 -bootstrap -config atomix.properties
这里atomix.properties是配置存储和传输的配置文件。要创建多节点集群,我们可以使用-join标志将节点添加到现有集群。
其格式为:
java -jar atomix-standalone-server.jar
-address 127.0.0.1:8701 -join 127.0.0.1:8700
3. 使用客户端
Atomix支持通过AtomixClient API创建客户端来远程访问其集群。
由于客户端不需要有状态,AtomixClient没有任何存储。我们只需在创建客户端时配置传输,因为传输将用于与集群进行通信。
让我们创建一个具有传输的客户端:
AtomixClient client = AtomixClient.builder()
.withTransport(new NettyTransport())
.build();
我们现在需要将客户端连接到集群。
我们可以声明一个地址列表,并将该列表作为参数传递给客户端的connect()方法:
client.connect(cluster)
.thenRun(() -> {
System.out.println("Client is connected to the cluster!");
});
4. 处理资源
Atomix的真正强大之处在于其用于创建和管理分布式资源的强大API集。资源在集群中复制和持久化,并由状态机提供支持-由其Raft共识协议的底层实现进行管理。
分布式资源可以通过其get()方法之一创建和管理,我们可以从AtomixReplica创建一个分布式资源实例。
考虑到replica是AtomixReplica的一个实例,创建分布式Map资源并为其设置值的代码片段:
replica.getMap("map")
.thenCompose(m -> m.put("bar", "Hello world!"))
.thenRun(() -> System.out.println("Value is set in Distributed Map"))
.join();
这里join()方法将阻塞程序,直到创建资源并为其设置值。我们可以使用AtomixClient获取相同的对象,并使用get(“bar”)方法检索值。
我们可以在最后使用get()方法来等待结果:
String value = client.getMap("map"))
.thenCompose(m -> m.get("bar"))
.thenApply(a -> (String) a)
.get();
5. 一致性和容错性
Atomix用于任务关键型小规模数据集,对于该数据集而言,一致性比可用性更为重要。
它通过读写的线性化提供强大的可配置一致性。在线性化中,一旦提交了写入,就保证所有客户端都知道结果状态。
Atomix集群的一致性由底层Raft共识算法保证,其中选出的领导者将拥有所有之前成功的写入。
所有新的写入操作都将通过集群领导者并在完成之前同步复制到大多数服务器。
为了保持容错能力,集群中的大多数服务器需要处于活跃状态。如果少数节点发生故障,则节点将被标记为非活跃状态,并将由被动节点或备用节点替换。
一旦Leader发生故障,集群中剩余的服务器将开始新的Leader选举。同时,集群将不可用。
在发生分区的情况下,如果领导者位于分区的非法定人数侧,则其将下台,并在具有法定人数的一侧选举新的领导者。
而且,如果领导者在多数派一方,它将继续进行,不会有任何变化。当分区解决后,非法定人数一方的节点将加入法定人数,并相应地更新其日志。
6. 总结
与ZooKeeper一样,Atomix提供了一套强大的库来处理分布式计算问题。
Post Directory
