ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Webflux] #3 스프링 웹플럭스에서 AOP 제대로 사용하기
    개발 2024. 4. 11. 22:07

    스프링의 핵심 요소 중 하나인 AOP(관점 지향 프로그래밍)는 횡단 관심사(cross-cutting concerns)를 모듈화 한다.

    간단히 말하면 흔히 스프링에서의 선언적 트랜잭션(@Transactional)처럼 여러 서비스/비즈니스 로직에 대해 공통적으로 사용하는 부분을 따로 분리해서 관리한다고 생각하면 될 것 같다.

    일반적인 AOP 구현 방법

    스프링에서 AOP는 @AspectJ를 사용해 어노테이션 기반으로 작성할 수 있다.

    간단한 예시로 함수 실행 시간을 측정하는 로직을 AOP를 이용해 구현해 보겠다.

     

    일단 우리가 흔히 아는 Spring MVC의 경우 아래와 같이 구현 할 수 있다.

    @Aspect
    @Component
    public class ExecutionTimerAspect {
    	private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
      	@Around("execution(* day.mercury.aop.mvc.TestService.*(..))")
      	public Object measureExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable {
        
        	long start = System.currentTimeMillis();
        	Object result = joinPoint.proceed();
        	long end = System.currentTimeMillis();
        	logger.info("Executed a method(" + joinPoint.getSignature().toShortString() + ") in " + (end - start) + "ms");
    
        	return result;
      	}
    }

    함수 실행 앞 뒤로 타임스탬프를 기록해서 로그를 남길 수 있도록 했다.

    @Service
    public class TestService {
    
    	public String doSomething() {
    		try {
    			Thread.sleep(3000);
                return "foobar";
    		} catch (InterruptedException e) {
    			throw new RuntimeException();
    		}
    	}
    }

    단순하게 3초간 sleep후 "foobar"를 리턴하는 doSomething() 메서드를 작성하였다.

    2024-04-11T10:19:07.220+09:00  INFO 7544 --- [mvc-aop] [nio-8080-exec-1] d.m.a.m.ExecutionTimeMeasuringAspect
    : Executed a method(TestService.doSomething()) in 3005ms

     

    해당 로그를 보면 의도한 대로 doSomething() 메서드의 실행 시간을 제대로 측정한 것을 확인할 수 있다.

     

    만약 Spring MVC에서의 방식 그대로 Spring Webflux에도 적용하면 어떻게 될까?

    @Aspect
    @Component
    public class ExecutionTimerAspect {
    
    	private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    	@Around("execution(* day.mercury.aop.reactive.TestService.*(..))")
    	public Object measureExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable {
    		long start = System.currentTimeMillis();
    		Object result = joinPoint.proceed();
    		long end = System.currentTimeMillis();
    		logger.info("Executed a method(" + joinPoint.getSignature().toShortString() + ") in " + (end - start) + "ms");
    
    		return result;
    	}   
    }

    관점에 대한 클래스는 MVC의 경우와 다르지 않게 작성했다.

    @Service
    public class TestService {
    	
    	public Mono<String> doSomething() {
    		return Mono.just("foobar").delayElement(Duration.ofMillis(3000));
    	}
    
    }

    이번에는 웹플럭스이므로 doSomething 함수가 Mono<String>을 반환하게 했다.

    "foobar"는 3초의 딜레이를 가진 뒤 방출 된다.

    2024-04-11T10:27:21.503+09:00  INFO 7602 --- [reactive-aop] [ctor-http-nio-2] d.m.a.r.ExecutionTimeMeasuringAspect
    : Executed a method(TestService.doSomething()) in 0ms

    그리고 로그를 보면,, 당연히 안된다.

    Mono/Flux의 Sequence를 어느정도 이해하고 있다면, 변수 start와 end가 기록하는 시간이 각각  doSomething()의 onSubscribe()의 직전, onComplete()의 직후가 아니기 때문에 제대로 측정이 되지 않는 것을 생각할 수 있다.

    @Transactional는 어떻게 Reactive를 지원할까?

    Webflux에서 AOP를 구현하기 위한 방법을 참고하기 위해, 우선 Spring AOP의 대명사 격인 선언적 트랜잭션(@Transactional)이 어떻게 Reactive를 지원하는지 확인해 보았다.

     

    TransactionContextManager.java

    package org.springframework.transaction.reactive;
    
    import ...
    
    public abstract class TransactionContextManager {
    
    	...
        
    	public static Mono<TransactionContext> currentContext() {
        	return Mono.deferContextual(ctx -> {
    			if (ctx.hasKey(TransactionContext.class)) {
    				return Mono.just(ctx.get(TransactionContext.class));
    			}
    			if (ctx.hasKey(TransactionContextHolder.class)) {
    				TransactionContextHolder holder = ctx.get(TransactionContextHolder.class);
    				if (holder.hasContext()) {
    					return Mono.just(holder.currentContext());
    				}
    			}
    			return Mono.error(new NoTransactionInContextException());
    		});
    	}
    
    	public static Function<Context, Context> createTransactionContext() {
    		return context -> context.put(TransactionContext.class, new TransactionContext());
    	}
    	
    	public static Function<Context, Context> getOrCreateContext() {
    		return ...
    	}
    
    	public static Function<Context, Context> getOrCreateContextHolder() {
    		return ...
    	}
    
    }

    TransactionContext.java

    package org.springframework.transaction.reactive;
    
    import ...
    
    public class TransactionContext {
    
    	@Nullable
    	private final TransactionContext parent;
    
    	private final Map<Object, Object> resources = new LinkedHashMap<>();
    
    	@Nullable
    	private Set<TransactionSynchronization> synchronizations;
    
    	@Nullable
    	private volatile String currentTransactionName;
        
        ...
        
    }

    TracnsactionContext 클래스로 트랜잭션에 필요한 resource를 context 형태로 관리하고 있다.

    Spring MVC에서 ThreadLocal을 사용하는 것과는 달리, 여러 스레드 위에서 동작하는 Reactive에서는 Flux/Mono의 Context를 활용하는 것을 알 수 있다.

    TransactionAspectSupport.java

    package org.springframework.transaction.interceptor;
    
    import ...
    
    public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
    	...
        
    	private class ReactiveTransactionSupport {
    
    		private final ReactiveAdapter adapter;
    
    		public ReactiveTransactionSupport(ReactiveAdapter adapter) {
    			this.adapter = adapter;
    		}
    
    		public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
    				InvocationCallback invocation, @Nullable TransactionAttribute txAttr, ReactiveTransactionManager rtm) {
    
    			String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
    			// For Mono and suspending functions not returning kotlinx.coroutines.flow.Flow
    			if (Mono.class.isAssignableFrom(method.getReturnType()) || (KotlinDetector.isSuspendingFunction(method) &&
    					!COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) {
    
    				return TransactionContextManager.currentContext().flatMap(context ->
    							Mono.<Object, ReactiveTransactionInfo>usingWhen(
    								createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
    								tx -> {
    									try {
    										return (Mono<?>) invocation.proceedWithInvocation();
    									}
    									catch (Throwable ex) {
    										return Mono.error(ex);
    									}
    								},
    								this::commitTransactionAfterReturning,
    								this::completeTransactionAfterThrowing,
    								this::rollbackTransactionOnCancel)
    							.onErrorMap(this::unwrapIfResourceCleanupFailure))
    						.contextWrite(TransactionContextManager.getOrCreateContext())
    						.contextWrite(TransactionContextManager.getOrCreateContextHolder());
    			}
    
    			// Any other reactive type, typically a Flux
    			return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
    						Flux.usingWhen(
    							createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
    							tx -> {
    								try {
    									return this.adapter.toPublisher(invocation.proceedWithInvocation());
    								}
    								catch (Throwable ex) {
    									return Mono.error(ex);
    								}
    							},
    							this::commitTransactionAfterReturning,
    							this::completeTransactionAfterThrowing,
    							this::rollbackTransactionOnCancel)
    						.onErrorMap(this::unwrapIfResourceCleanupFailure))
    					.contextWrite(TransactionContextManager.getOrCreateContext())
    					.contextWrite(TransactionContextManager.getOrCreateContextHolder()));
    		}
            
            ...
    	}     
        ...
    }

    TransactionContext.ReactiveTransactionSupport 클래스의 invokeWithinTransaction 메서드가 Aspect 진입점에서 트랜잭션을 생성하고, Mono/Flux의 contextWrite 연산자를 통해 reactor context에 TransactionContext를 등록하는 것을 확인 할 수 있다.

     

     

    Reactor Context를 활용한 AOP 구현

    앞서 살펴봤던 선언적 트랜잭션과, 메서드 실행시간을 측정하는 경우처럼, 대상이 되는 Reactive Sequence내 에서 값이 전달 될 필요가 있을 경우 Reactor Context를 활용할 수 있다.

    (물론 트랜잭션 정보나 측정한 시간같은 contextual data C와 비즈니스 로직의 business data T를 Tuple2<T, C>로 묶어 데이터를 전달 할 수 있지만, 비즈니스 로직의 메서드 시그니쳐에 contextual data가 노출 되므로 바람직 하지 않다.)

     

    맨 앞의 메서드 실행 시간을 측정하는 AOP를 Reactor Context를 활용해 구현해 보았다.

    @Around("execution(* day.mercury.aop.reactive.TestService.*(..))")
    public Object measureMonoExecutionTime(ProceedingJoinPoint joinPoint) {
        return Mono.deferContextual(ctx -> {
            try {
                return ((Mono<Object>) joinPoint.proceed()).doOnSuccess(_ -> {
                    long start = ctx.get("start");
                    long end = System.currentTimeMillis();
                    logger.info("Executed a method(" + joinPoint.getSignature().toShortString() + ") in " + (end - start) + "ms");
                });
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }).contextWrite(ctx -> ctx.put("start", System.currentTimeMillis()));
    }

     

    대상 메서드 진입점에서 context에 시작 시간을 주입하고 (contextWrite), 대상(Mono)이 onComplete(doOnSuccess) 되었을 때 context에서 시작 시간을 가져와 실행시간을 로깅하도록 하였다.

    2024-04-11T20:02:13.045+09:00  INFO 9728 --- [reactive-aop] [     parallel-1] d.m.a.r.ExecutionTimeMeasuringAspect
    : Executed a method(TestService.doSomething()) in 3006ms

    잘 된다.

    마치며

    이번 글에서 다뤘던 케이스들 처럼 반드시 Reactor Context를 사용해서 AOP를 구현해야하는 것은 아니다.

    너무 당연한 이야기지만 무엇보다도 Mono/Flux의 시퀀스 내의 시그널들을 잘 이용하는 것이 중요하다고 생각한다.

    끝.

    참고

    https://curiousjinan.tistory.com/entry/spring-aop-understand

    https://medium.com/@develxb/spring-data-r2dbc-%EC%BB%A4%EB%84%A5%EC%85%98-%EC%9C%A0%EC%A7%80-%EB%B0%A9%EB%B2%95-fb1bc8d83a4f

    https://itvillage.tistory.com/65

Designed by Tistory.