Spring Bean后处理器

2023/05/13

1. 概述

在许多其他文章中,我们提到了BeanPostProcessor。在本文中,我们将在使用Guava的EventBus的真实示例中使用它们。

Spring的BeanPostProcessor为我们提供了连接到Spring bean生命周期的hook,以修改其配置

BeanPostProcessor允许直接修改bean本身。

在本文中,我们将看看这些类集成Guava的EventBus的具体示例。

2. 项目构建

首先,我们需要构建我们的环境。让我们将spring-context、spring-expression和guava依赖项添加到我们的build.gradle中:

ext {
    guava = '31.0.1-jre'
    spring = '5.3.13'
}

dependencies {
    implementation "org.springframework:spring-context:${spring}"
    implementation "org.springframework:spring-expression:${spring}"
    implementation "com.google.guava:guava:${guava}"
}

3. 目标和实现

对于我们的第一个目标,我们想利用Guava的EventBus在系统的各个方面异步传递消息

接下来,我们希望在bean创建/销毁时自动注册和注销事件对象,而不是使用EventBus提供的手动方法。

所以,我们现在准备开始编码!

我们的实现将包括Guava的EventBus的包装类、自定义标记注解、BeanPostProcessor、模型对象和从EventBus接收股票交易事件的bean。 此外,我们将创建一个测试用例来验证所需的功能。

3.1 EventBus包装器

首先,我们将定义一个EventBus包装器来提供一些静态方法,以便为BeanPostProcessor使用的事件轻松注册和注销bean:

public final class GlobalEventBus {
    public static final String GLOBAL_EVENT_BUS_EXPRESSION = "T(cn.tuyucheng.taketoday.beanpostprocessor.GlobalEventBus).getEventBus()";
    private static final String IDENTIFIER = "global-event-bus";
    private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
    private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());

    private GlobalEventBus() {
    }

    public static GlobalEventBus getInstance() {
        return GlobalEventBus.GLOBAL_EVENT_BUS;
    }

    public static EventBus getEventBus() {
        return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
    }

    public static void subscribe(Object obj) {
        getEventBus().register(obj);
    }

    public static void unsubscribe(Object obj) {
        getEventBus().unregister(obj);
    }

    public static void post(Object event) {
        getEventBus().post(event);
    }
}

上述代码提供了用于访问GlobalEventBus和底层EventBus以及注册和注销事件以及发布事件的静态方法。 它还有一个SpEL表达式,用作我们自定义注解中的默认表达式,以定义我们要使用的EventBus。

3.2 自定义标记注解

接下来,让我们定义一个自定义标记注解,BeanPostProcessor将使用该注解来识别要自动注册/注销事件的bean:

/**
 * An annotation which indicates which Guava {@link com.google.common.eventbus.EventBus} a Spring bean wishes to subscribe to.
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface Subscriber {

    /**
     * A SpEL expression which selects the {@link com.google.common.eventbus.EventBus}.
     */
    String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
}

3.3 BeanPostProcessor

现在,我们将定义BeanPostProcessor,它将检查每个bean的Subscriber注解。 这个类也是一个DestructionAwareBeanPostProcessor,它是一个Spring接口,向BeanPostProcessor添加了一个销毁前的回调。 如果存在注解,我们将在bean初始化时将其注册到由注解的SpEL表达式标识的EventBus中,并在bean销毁时取消注册:

/**
 * A {@link DestructionAwareBeanPostProcessor} which registers/un-registers subscribers to a Guava {@link EventBus}. The class must
 * be annotated with {@link Subscriber} and each subscribing method must be annotated with
 * {@link com.google.common.eventbus.Subscribe}.
 */
@SuppressWarnings("ALL")
public class GuavaEventBusBeanPostProcessor implements DestructionAwareBeanPostProcessor {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final SpelExpressionParser expressionParser = new SpelExpressionParser();

    @Override
    public void postProcessBeforeDestruction(final Object bean, final String beanName) throws BeansException {
        this.process(bean, EventBus::unregister, "destruction");
    }

    @Override
    public boolean requiresDestruction(Object bean) {
        return true;
    }

    @Override
    public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        this.process(bean, EventBus::register, "initialization");
        return bean;
    }

    private void process(final Object bean, final BiConsumer<EventBus, Object> consumer, final String action) {
        // See implementation below
    }
}

上面的代码获取每个bean并通过,下面定义的process方法运行它。它在bean初始化之后和销毁之前对其进行处理。 requiresDestruction方法默认返回true,当我们在postProcessBeforeDestruction回调中检查@Subscriber注解是否存在时,我们会保留该行为。

现在让我们看一下process方法:


@SuppressWarnings("ALL")
public class GuavaEventBusBeanPostProcessor implements DestructionAwareBeanPostProcessor {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final SpelExpressionParser expressionParser = new SpelExpressionParser();

    private void process(final Object bean, final BiConsumer<EventBus, Object> consumer, final String action) {
        Object proxy = this.getTargetObject(bean);
        final Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
        if (annotation == null)
            return;
        this.logger.info("{}: processing bean of type {} during {}",
                this.getClass().getSimpleName(), proxy.getClass().getName(), action);
        final String annotationValue = annotation.value();
        try {
            final Expression expression = this.expressionParser.parseExpression(annotationValue);
            final Object value = expression.getValue();
            if (!(value instanceof EventBus)) {
                this.logger.error("{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
                        this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
                return;
            }
            final EventBus eventBus = (EventBus) value;
            consumer.accept(eventBus, proxy);
        } catch (ExpressionException ex) {
            this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}",
                    this.getClass().getSimpleName(), annotationValue, proxy.getClass().getName());
        }
    }
}

以上代码检查是否存在名为Subscriber的自定义标记注解,如果存在,则从其value属性中读取SpEL表达式。 然后,SpEL表达式被解析为一个Expression对象。如果它是EventBus的实例,我们将BiConsumer函数参数应用于bean。 BiConsumer用于从EventBus注册和注销bean。

getTargetObject方法的实现如下:


@SuppressWarnings("ALL")
public class GuavaEventBusBeanPostProcessor implements DestructionAwareBeanPostProcessor {

    private Object getTargetObject(Object proxy) throws BeansException {
        if (AopUtils.isJdkDynamicProxy(proxy)) {
            try {
                return ((Advised) proxy).getTargetSource().getTarget();
            } catch (Exception e) {
                throw new FatalBeanException("Error getting target of JDK proxy", e);
            }
        }
        return proxy;
    }
}

3.4 StockTrade模型对象

接下来,让我们定义StockTrade模型对象:

public record StockTrade(String symbol, int quantity, double price, Date tradeDate) {

}

3.5 StockTradePublisher事件接收器

然后,让我们定义一个监听器类来通知我们收到了交易,这样我们就可以编写我们的测试了:


@FunctionalInterface
public interface StockTradeListener {
    void stockTradePublished(StockTrade trade);
}

最后,我们将为新的StockTrade事件定义一个接收者:


@Subscriber
public class StockTradePublisher {
    private final Set<StockTradeListener> stockTradeListeners = new HashSet<>();

    public void addStockTradeListener(StockTradeListener listener) {
        synchronized (this.stockTradeListeners) {
            this.stockTradeListeners.add(listener);
        }
    }

    public void removeStockTradeListener(StockTradeListener listener) {
        synchronized (this.stockTradeListeners) {
            this.stockTradeListeners.remove(listener);
        }
    }

    @Subscribe
    @AllowConcurrentEvents
    private void handleNewStockTradeEvent(StockTrade trade) {
        // publish to DB, send to PubNub, whatever you want here
        final Set<StockTradeListener> listeners;
        synchronized (this.stockTradeListeners) {
            listeners = new HashSet<>(this.stockTradeListeners);
        }
        listeners.forEach(li -> li.stockTradePublished(trade));
    }
}

上面的代码将这个类标记为Guava EventBus事件的订阅者,而Guava的@Subscribe注解将方法handleNewStockTradeEvent标记为事件的接收者。 它将接收的事件类型基于方法的参数;在这种情况下,我们将接收StockTrade类型的事件。

@AllowConcurrentEvents注解允许并发调用此方法。一旦我们收到一笔交易,我们就会进行任何我们希望的处理,然后通知任何监听器。

3.6 测试

现在我们编写一个集成测试,以验证BeanPostProcessor是否正常工作。首先,我们需要一个Spring上下文:


@Configuration
public class PostProcessorConfiguration {

    @Bean
    public GlobalEventBus eventBus() {
        return GlobalEventBus.getInstance();
    }

    @Bean
    public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
        return new GuavaEventBusBeanPostProcessor();
    }

    @Bean
    public StockTradePublisher stockTradePublisher() {
        return new StockTradePublisher();
    }
}

现在我们可以实现我们的测试:


@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {PostProcessorConfiguration.class})
class StockTradeIntegrationTest {

    @Autowired
    private StockTradePublisher stockTradePublisher;

    @Test
    void givenValidConfig_whenTradePublished_thenTradeReceived() {
        Date tradeDate = new Date();
        StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
        AtomicBoolean assertionsPassed = new AtomicBoolean(false);
        StockTradeListener listener = trade -> assertionsPassed.set(this.verifyExact(stockTrade, trade));
        this.stockTradePublisher.addStockTradeListener(listener);
        try {
            GlobalEventBus.post(stockTrade);
            await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
        } finally {
            this.stockTradePublisher.removeStockTradeListener(listener);
        }
    }

    private boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
        return Objects.equals(stockTrade.symbol(), trade.symbol())
                && Objects.equals(stockTrade.tradeDate(), trade.tradeDate())
                && stockTrade.quantity() == trade.quantity()
                && stockTrade.price() == trade.price();
    }
}

上面的测试代码生成StockTrade并将其发布到GlobalEventBus。我们最多等待两秒钟,等待操作完成,并通知stockTradePublisher收到交易。 此外,我们验证收到的交易在运输过程中没有被修改。

4. 总结

总而言之,Spring的BeanPostProcessor允许我们自定义bean本身,为我们提供了一种自动化bean操作的方法,否则我们必须手动执行。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章