使用WebClient限制每秒请求数

2023/05/13

1. 简介

在本教程中,我们将看到使用Spring 5 WebClient限制每秒请求数的不同方法。

虽然我们通常希望利用它的非阻塞特性,但某些情况可能会迫使我们添加延迟。我们将了解其中一些场景,同时使用一些Project Reactor功能来控制对服务器的请求流。

2. 初始设置

我们需要限制每秒请求的典型情况是避免服务器不堪重负。此外,某些Web服务具有每小时允许的最大请求数。同样,有些控制每个客户端的并发请求数。

2.1 编写简单的Web服务

为了探索这种情况,我们将从一个简单的@RestController开始,它提供固定范围内的随机数:

@RestController
@RequestMapping("/random")
public class RandomController {

    @GetMapping
    Integer getRandom() {
        return new Random().nextInt(50);
    }
}

接下来,我们将模拟一个昂贵的操作并限制并发请求数

2.2 限制我们服务器的速率

在查看解决方案之前,让我们更改我们的服务以模拟更真实的场景。

首先,我们将限制我们的服务器可以接受的并发请求数,当达到限制时抛出异常

其次,我们将添加一个延迟来处理我们的响应,模拟昂贵的操作。虽然有更强大的解决方案可用,但我们这样做只是为了说明目的:

public class Concurrency {

    public static final int MAX_CONCURRENT = 5;
    static final AtomicInteger CONCURRENT_REQUESTS = new HashMap<>();

    public static int protect(IntSupplier supplier) {
        try {
            if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
                throw new UnsupportedOperationException("max concurrent requests reached");
            }

            TimeUnit.SECONDS.sleep(2);
            return supplier.getAsInt();
        } finally {
            CONCURRENT_REQUESTS.decrementAndGet();
        }
    }
}

最后,让我们更改端点以使用它:

@GetMapping
Integer getRandom() {
    return Concurrency.protect(() -> new Random().nextInt(50));
}

现在,当我们超过MAX_CONCURRENT请求时,我们的端点拒绝处理请求,并向客户端返回错误

2.3 编写一个简单的客户端

所有示例都将遵循以下模式来生成n个请求的Flux并向我们的服务发出GET请求:

Flux.range(1, n)
  	.flatMap(i -> {
    	// GET request
  	});

为了减少样板代码,让我们在可以在所有示例中重用的方法中实现请求部分。我们接收一个WebClient,调用get(),并使用ParameterizedTypeReference和retrieve()带有泛型的响应主体

public interface RandomConsumer {

    static <T> Mono<T> get(WebClient client) {
        return client.get()
              .retrieve()
              .bodyToMono(new ParameterizedTypeReference<T>() {});
    }
}

3. 使用zipWith(Flux.Interval())延迟

我们的第一个示例使用zipWith()将我们的请求与固定延迟相结合:

public class ZipWithInterval {

    public static Flux<Integer> fetch(
          WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
              .zipWith(Flux.interval(Duration.ofMillis(delay)))
              .flatMap(i -> RandomConsumer.get(client));
    }
}

因此,这会将每个请求延迟delay毫秒。我们应该注意,这种延迟适用于发送请求之前

4. 使用Flux.delayElements()延迟

Flux有一种更直接的方式来延迟它的元素:

public class DelayElements {

    public static Flux<Integer> fetch(
          WebClient client, int requests, int delay) {
        return Flux.range(1, requests)
              .delayElements(Duration.ofMillis(delay))
              .flatMap(i -> RandomConsumer.get(client));
    }
}

使用delayElements(),延迟直接应用于Subscriber.onNext()信号。换句话说,它延迟了Flux.range()中的每个元素。因此,传递给flatMap()的函数将受到影响,需要更长的时间才能启动。例如,如果delay值为1000,则在我们的请求开始之前会有一秒的延迟。

4.1 调整我们的解决方案

因此,如果我们没有提供足够长的延迟,我们将得到一个错误:

@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
    int delay = 100;

    int requests = 10;
    assertThrows(InternalServerError.class, () -> {
      	DelayElements.fetch(client, requests, delay)
            .blockLast();
    });
}

这是因为我们每个请求等待100毫秒,但每个请求在服务器端需要两秒才能完成。因此,我们的并发请求很快就达到了限制,我们得到了500错误。

如果我们添加足够的延迟,我们可以摆脱请求限制。但是,我们会遇到另一个问题-我们会等待不必要的时间

根据我们的用例,等待太多可能会显著影响性能。因此,接下来,让我们检查一个更合适的方法来处理这个问题,因为我们知道我们服务器的局限性

5. 使用flatMap()进行并发控制

鉴于我们服务的局限性,我们最好的选择是并行发送最多Concurrency.MAX_U并发请求。为此,我们可以向flatMap()添加一个参数以获取最大并行处理数

public class LimitConcurrency {

    public static Flux<Integer> fetch(WebClient client, int requests, int concurrency) {
        return Flux.range(1, requests)
              .flatMap(i -> RandomConsumer.get(client), concurrency);
    }
}

此参数保证最大并发请求数不超过并发数,并且我们的处理不会延迟超过必要的时间

@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
    int limit = Concurrency.MAX_CONCURRENT;

    int requests = 10;
    assertDoesNotThrow(() -> {
      	LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
        	.blockLast();
    });
}

不过,还有一些其他选项值得讨论,具体取决于我们的场景和偏好。让我们来看看其中的一些。

6. 使用Resilience4j RateLimiter

Resilience4j是一个多功能库,专为处理应用程序中的容错而设计。我们将使用它来限制一个时间间隔内的并发请求数并包括超时

让我们首先添加resilience4j-reactorresilience4j-ratelimiter依赖项:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

然后我们使用RateLimiter.of()构建速率限制器,提供名称、发送新请求的间隔、并发限制和超时:

public class Resilience4jRateLimit {

    public static Flux<Integer> fetch(WebClient client, int requests, int concurrency, int interval) {
        RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
              .limitRefreshPeriod(Duration.ofMillis(interval))
              .limitForPeriod(concurrency)
              .timeoutDuration(Duration.ofMillis(interval * concurrency))
              .build());

        // ...
    }
}

现在我们使用transformDeferred()将它包含在我们的Flux中,因此它控制我们的GET请求速率

return Flux.range(1, requests)
  	.flatMap(i -> RandomConsumer.get(client)
    	.transformDeferred(RateLimiterOperator.of(limiter))
  	);

我们应该注意到,如果我们将间隔定义得太低,我们仍然会遇到问题。但是,如果我们需要与其他操作共享速率限制器规范,则此方法很有用。

7. Guava精确节流

Guava有一个通用的速率限制器,非常适合我们的场景。此外,由于它使用令牌桶算法,因此它只会在必要时阻塞而不是每次都阻塞,这与Flux.delayElements()不同

首先,我们需要将Guava添加到我们的pom.xml中:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

要使用它,我们调用RateLimiter.create()并将我们要发送的每秒最大请求数传递给它。然后,我们在发送我们的请求之前在限制器上调用acquire()以在必要时限制执行:

public class GuavaRateLimit {

    public static Flux<Integer> fetch(
          WebClient client, int requests, int requestsPerSecond) {
        RateLimiter limiter = RateLimiter.create(requestsPerSecond);

        return Flux.range(1, requests)
              .flatMap(i -> {
                  limiter.acquire();

                  return RandomConsumer.get(client);
              });
    }
}

由于其简单性,该解决方案效果极佳-它不会使我们的代码块过长。例如,如果出于某种原因,一个请求花费的时间比预期的要长,则下一个请求不会等待执行。但是,只有当我们在为requestsPerSecond设置的范围内时才会出现这种情况。

8. 总结

在本文中,我们看到了一些可用的方法来限制WebClient的速率。之后,我们模拟了一个受控的Web服务,看看它如何影响我们的代码和测试。此外,我们使用Project Reactor和一些库来帮助我们以不同的方式实现相同的目标。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章