-
[Spring Webflux] #3 스프링 웹플럭스에서 AOP 제대로 사용하기개발 2024. 4. 11. 22:07
스프링의 핵심 요소 중 하나인 AOP(관점 지향 프로그래밍)는 횡단 관심사(cross-cutting concerns)를 모듈화 한다.
간단히 말하면 흔히 스프링에서의 선언적 트랜잭션(@Transactional)처럼 여러 서비스/비즈니스 로직에 대해 공통적으로 사용하는 부분을 따로 분리해서 관리한다고 생각하면 될 것 같다.
일반적인 AOP 구현 방법
스프링에서 AOP는 @AspectJ를 사용해 어노테이션 기반으로 작성할 수 있다.
간단한 예시로 함수 실행 시간을 측정하는 로직을 AOP를 이용해 구현해 보겠다.
일단 우리가 흔히 아는 Spring MVC의 경우 아래와 같이 구현 할 수 있다.
@Aspect @Component public class ExecutionTimerAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Around("execution(* day.mercury.aop.mvc.TestService.*(..))") public Object measureExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable { long start = System.currentTimeMillis(); Object result = joinPoint.proceed(); long end = System.currentTimeMillis(); logger.info("Executed a method(" + joinPoint.getSignature().toShortString() + ") in " + (end - start) + "ms"); return result; } }
함수 실행 앞 뒤로 타임스탬프를 기록해서 로그를 남길 수 있도록 했다.
@Service public class TestService { public String doSomething() { try { Thread.sleep(3000); return "foobar"; } catch (InterruptedException e) { throw new RuntimeException(); } } }
단순하게 3초간 sleep후 "foobar"를 리턴하는 doSomething() 메서드를 작성하였다.
2024-04-11T10:19:07.220+09:00 INFO 7544 --- [mvc-aop] [nio-8080-exec-1] d.m.a.m.ExecutionTimeMeasuringAspect : Executed a method(TestService.doSomething()) in 3005ms
해당 로그를 보면 의도한 대로 doSomething() 메서드의 실행 시간을 제대로 측정한 것을 확인할 수 있다.
만약 Spring MVC에서의 방식 그대로 Spring Webflux에도 적용하면 어떻게 될까?
@Aspect @Component public class ExecutionTimerAspect { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Around("execution(* day.mercury.aop.reactive.TestService.*(..))") public Object measureExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable { long start = System.currentTimeMillis(); Object result = joinPoint.proceed(); long end = System.currentTimeMillis(); logger.info("Executed a method(" + joinPoint.getSignature().toShortString() + ") in " + (end - start) + "ms"); return result; } }
관점에 대한 클래스는 MVC의 경우와 다르지 않게 작성했다.
@Service public class TestService { public Mono<String> doSomething() { return Mono.just("foobar").delayElement(Duration.ofMillis(3000)); } }
이번에는 웹플럭스이므로 doSomething 함수가 Mono<String>을 반환하게 했다.
"foobar"는 3초의 딜레이를 가진 뒤 방출 된다.
2024-04-11T10:27:21.503+09:00 INFO 7602 --- [reactive-aop] [ctor-http-nio-2] d.m.a.r.ExecutionTimeMeasuringAspect : Executed a method(TestService.doSomething()) in 0ms
그리고 로그를 보면,, 당연히 안된다.
Mono/Flux의 Sequence를 어느정도 이해하고 있다면, 변수 start와 end가 기록하는 시간이 각각 doSomething()의 onSubscribe()의 직전, onComplete()의 직후가 아니기 때문에 제대로 측정이 되지 않는 것을 생각할 수 있다.
@Transactional는 어떻게 Reactive를 지원할까?
Webflux에서 AOP를 구현하기 위한 방법을 참고하기 위해, 우선 Spring AOP의 대명사 격인 선언적 트랜잭션(@Transactional)이 어떻게 Reactive를 지원하는지 확인해 보았다.
TransactionContextManager.java
package org.springframework.transaction.reactive; import ... public abstract class TransactionContextManager { ... public static Mono<TransactionContext> currentContext() { return Mono.deferContextual(ctx -> { if (ctx.hasKey(TransactionContext.class)) { return Mono.just(ctx.get(TransactionContext.class)); } if (ctx.hasKey(TransactionContextHolder.class)) { TransactionContextHolder holder = ctx.get(TransactionContextHolder.class); if (holder.hasContext()) { return Mono.just(holder.currentContext()); } } return Mono.error(new NoTransactionInContextException()); }); } public static Function<Context, Context> createTransactionContext() { return context -> context.put(TransactionContext.class, new TransactionContext()); } public static Function<Context, Context> getOrCreateContext() { return ... } public static Function<Context, Context> getOrCreateContextHolder() { return ... } }
package org.springframework.transaction.reactive; import ... public class TransactionContext { @Nullable private final TransactionContext parent; private final Map<Object, Object> resources = new LinkedHashMap<>(); @Nullable private Set<TransactionSynchronization> synchronizations; @Nullable private volatile String currentTransactionName; ... }
TracnsactionContext 클래스로 트랜잭션에 필요한 resource를 context 형태로 관리하고 있다.
Spring MVC에서 ThreadLocal을 사용하는 것과는 달리, 여러 스레드 위에서 동작하는 Reactive에서는 Flux/Mono의 Context를 활용하는 것을 알 수 있다.
package org.springframework.transaction.interceptor; import ... public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { ... private class ReactiveTransactionSupport { private final ReactiveAdapter adapter; public ReactiveTransactionSupport(ReactiveAdapter adapter) { this.adapter = adapter; } public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, InvocationCallback invocation, @Nullable TransactionAttribute txAttr, ReactiveTransactionManager rtm) { String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // For Mono and suspending functions not returning kotlinx.coroutines.flow.Flow if (Mono.class.isAssignableFrom(method.getReturnType()) || (KotlinDetector.isSuspendingFunction(method) && !COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) { return TransactionContextManager.currentContext().flatMap(context -> Mono.<Object, ReactiveTransactionInfo>usingWhen( createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), tx -> { try { return (Mono<?>) invocation.proceedWithInvocation(); } catch (Throwable ex) { return Mono.error(ex); } }, this::commitTransactionAfterReturning, this::completeTransactionAfterThrowing, this::rollbackTransactionOnCancel) .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } // Any other reactive type, typically a Flux return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context -> Flux.usingWhen( createTransactionIfNecessary(rtm, txAttr, joinpointIdentification), tx -> { try { return this.adapter.toPublisher(invocation.proceedWithInvocation()); } catch (Throwable ex) { return Mono.error(ex); } }, this::commitTransactionAfterReturning, this::completeTransactionAfterThrowing, this::rollbackTransactionOnCancel) .onErrorMap(this::unwrapIfResourceCleanupFailure)) .contextWrite(TransactionContextManager.getOrCreateContext()) .contextWrite(TransactionContextManager.getOrCreateContextHolder())); } ... } ... }
TransactionContext.ReactiveTransactionSupport 클래스의 invokeWithinTransaction 메서드가 Aspect 진입점에서 트랜잭션을 생성하고, Mono/Flux의 contextWrite 연산자를 통해 reactor context에 TransactionContext를 등록하는 것을 확인 할 수 있다.
Reactor Context를 활용한 AOP 구현
앞서 살펴봤던 선언적 트랜잭션과, 메서드 실행시간을 측정하는 경우처럼, 대상이 되는 Reactive Sequence내 에서 값이 전달 될 필요가 있을 경우 Reactor Context를 활용할 수 있다.
(물론 트랜잭션 정보나 측정한 시간같은 contextual data C와 비즈니스 로직의 business data T를 Tuple2<T, C>로 묶어 데이터를 전달 할 수 있지만, 비즈니스 로직의 메서드 시그니쳐에 contextual data가 노출 되므로 바람직 하지 않다.)
맨 앞의 메서드 실행 시간을 측정하는 AOP를 Reactor Context를 활용해 구현해 보았다.
@Around("execution(* day.mercury.aop.reactive.TestService.*(..))") public Object measureMonoExecutionTime(ProceedingJoinPoint joinPoint) { return Mono.deferContextual(ctx -> { try { return ((Mono<Object>) joinPoint.proceed()).doOnSuccess(_ -> { long start = ctx.get("start"); long end = System.currentTimeMillis(); logger.info("Executed a method(" + joinPoint.getSignature().toShortString() + ") in " + (end - start) + "ms"); }); } catch (Throwable e) { throw new RuntimeException(e); } }).contextWrite(ctx -> ctx.put("start", System.currentTimeMillis())); }
대상 메서드 진입점에서 context에 시작 시간을 주입하고 (contextWrite), 대상(Mono)이 onComplete(doOnSuccess) 되었을 때 context에서 시작 시간을 가져와 실행시간을 로깅하도록 하였다.
2024-04-11T20:02:13.045+09:00 INFO 9728 --- [reactive-aop] [ parallel-1] d.m.a.r.ExecutionTimeMeasuringAspect : Executed a method(TestService.doSomething()) in 3006ms
잘 된다.
마치며
이번 글에서 다뤘던 케이스들 처럼 반드시 Reactor Context를 사용해서 AOP를 구현해야하는 것은 아니다.
너무 당연한 이야기지만 무엇보다도 Mono/Flux의 시퀀스 내의 시그널들을 잘 이용하는 것이 중요하다고 생각한다.
끝.
참고
https://curiousjinan.tistory.com/entry/spring-aop-understand
'개발' 카테고리의 다른 글
[Spring] JobRegistryBeanPostProcessor (not eligible for auto-proxying) 관련 경고 트러블 슈팅 (1) 2024.09.11 [Spring Webflux] #4 스프링 웹플럭스의 코틀린 코루틴 지원 내부 구현 뜯어보기 (0) 2024.07.29 [Spring Webflux] #2 내가 스프링 웹플럭스를 공부한 방법 (0) 2024.04.10 Docker로 Elasticsearch + Kibana 개발환경 구성하기 (2) 2023.11.24 [Spring Webflux] #1 Reactive Streams 이해하기 (0) 2023.02.25