ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Apache Camel] Netty4 Consumer 파헤치기
    개발/Apache Camel 2020. 9. 22. 12:10

    Netty4 라이브러리를 이용해 from을 구성하게 될 시 내부적으로 어떻게 connection을 관리하고 비동기적으로 실행되는지 궁금해서 Deep 하게는 못하고 간단하게 알아보았다.

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-netty4-starter</artifactId>
        <version>3.0.0-M4</version>
    </dependency>

    version은 3.0.0-M4 최신버전으로 확인해보았다.

    실제 현업에서 사용했던 version은 2.21.1 이었는데 확실히 최신버전이랑 차이점이 있었다.


    일단 아래와 같이 Router를 작성

    @Configuration
    public class TcpRouter extends RouteBuilder{
    
    	@Bean
    	public StringDecoder stringDecoder() {
    		return new StringDecoder();
    	}
    
    	@Bean
    	public StringEncoder stringEncoder() {
    		return new StringEncoder();
    	}
    	
    	@Override
    	public void configure() throws Exception {		
    		from("netty4:tcp://localhost:60010?decoders=stringDecoder&encoders=stringEncoder&workerCount=2")
    		.process(exchange->{
    			System.out.println("start tcp:"+exchange.getMessage().getBody(String.class));
    			Thread.sleep(3000);
    			System.out.println("end tcp:"+exchange.getMessage().getBody(String.class));
    		})
    		.end();
    	}
    
    }
    

    Netty에서 기본으로 지원하는 StringDecoder, StringEncoder를 미리 Bean으로 생성한다. { Consumer (from)에 설정하기 위해 }

     

    여기서 decoder, encoder 설정은 바로 전 게시글에 Producer의 ClientInitializerFactory 를 초기화하는것과 같이 Consumer는 ServerInitializerFactory 를 초기화할 때 pipeline에 등록된다.

    그럼 나머지 parameter인 workerCount 무엇인가? ( Netty4 Consumer가 동작되는 과정을 살펴보기 위해서 2로 설정해 놓았다. )

    Default값이 cpu_core_threads x 2 라는것만 알고 넘어가자.

     

    clientMode가 아닌 Server로 작성한 Netty4 Consumer는 결과적으로 위와같이 Thread들이 생성된다.

     

    1. NettyServerTCPBoss

    - 최초 Connection을 받아주고 Channel을 생성한다. Worker를 생성하고 Channel을 넘겨주는 Thread

      Parameter 설정 이름 : bossCount (default=1)

     

    2. NettyServerTCPWorker

    - 실제 Connection이 된 여러개 Channel들을 담당하는 Thread

      NettyServerTCPWorker 갯수 < Channel 갯수 면 NettyServerTCPWorker들에 분산되어서 Channel이 등록됨 하나의 worker가 여러개 channel을 관리한다고 생각하면 됨

      Parameter 설정 이름 : workerCount (default=cpu_core_threads x 2)

     

    3. NettyEventExecutorGroup

    - NettyServerTCPWorker Channel로 들어온 송,수신 데이터를 실제로 작업하는 Thread

      from에 연결된 processor들을 여기서 실행한다.

      version 2.21.1 에는 NettyServerTCPWorker 내부에 존재하였음. (Thread로 분리되지 않았다는 뜻)

      option 설정 이름 : maximumPoolSize (default=16) (Endpoint생성 시 설정 불가, Component 생성 시 설정해야함)

     

    마지막 NettyEventExecutorGroup에 대해서는 3.0.0 과 2.21.1 이 동작하는 로직이 조금 달랐다.

    예를들어 위와같이 workerCount=2 로 NettyEventExecutorGroup Thread가 2개만 생성되어있고 connection이 3개가 맺어져있다고 가정하자.

    Version 2.21.1

    Version 2.21.1 에서는 하나의 Worker == 하나의 Executor 였다. 

    그래서 #1Worker 같이 여러개의 channel에서 동시에 데이터("a", "b")가 전송되면 먼저들어온 "a"에 대한 처리가 끝나기 전까지 "b"는 대기하고 있었다. ( Blocking 이었다. )

    Version 3.0.0

    그러나 Version 3.0.0 은 NettyEventExecutorGroup을 Pool로 생성해놓음으로서 maximumPoolSize 만큼 동시처리 가능하게 바뀌었다.

     

    결론적으로

    Netty4 Consumer의 설정을 변경하여 성능개선을 위해서는 workerCount, maximumPoolSize를 설정하여야 한다는 것

    bossCount는 connection만 받아주고 Worker로 Channel 생성후 넘기기 때문에 default값인 1개여도 충분할 듯 싶다.

     

    여기서 workerCount는 Endpoint 생성 시 from(String uri) uri를 통해 NettyConfiguration이 가능한데 문제는 maximumPoolSize 이녀석이다.

     

    Version 2.16.x 까지는 uri NettyConfiguration을 통해 설정이 가능하였던 것 같은데

    Version 2.17.x 이후부터는 NettyConfiguration을 통해 설정이 불가능해졌다.

     

    실제로 Apache Camel Issues에도 Major로 등록되어 Fix된 사항이다. (issues.apache.org/jira/browse/CAMEL-8031)

    더보기
    maximumPoolSize endpoint option of the Netty component is effectively ignored. We keep OrderedMemoryAwareThreadPoolExecutor in the NettyComponent#executorService field - it means that we keep, configure and start the executor on the component, not the endpoint level.

    Netty component is started before the endpoint is created, so NettyComponent#executorService will be always created with the default size (before endpoint will set the pool size on the configuration object).

    There is a workaround for this issue (changing configuration on the component level), but if we provide maximumPoolSize on the endpoint, then we should respect it.

    IMHO it is impossible to configure maximumPoolSize at the endpoint level and cache executor instance in the component at the same time. Maybe we should just remove that option from the documentation of the endpoint options as maximumPoolSize should be configured only on the component level?

    나도 실제로 maximumPoolSize가 변경되지 않아서 삽질을 하다가 찾아낸 사항이다.

    결론적으로 NettyComponent가 Endpoint생성 전에 미리 생성되므로 Endpoint생성 시 NettyConfiguration에 등록한 maximumPoolSize가 무시된다는점이다.

    그래서 NettyConfiguration -> NettyComponent로 maximumPoolSize의 설정을 옮겼다.

     

    하지만 여기서 또 하나의 문제점이 생기는데 ...

    너무 길어져서 다음 포스팅에 이어서 적도록 하겠다.

    댓글

Designed by Tistory.