ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Apache Camel] Netty4 요청마다 RequestTimeout 변경하기
    개발/Apache Camel 2020. 9. 16. 16:39

    Socket 통신을 편하게 사용할 수 있는 Component 이다.

     

    현재 Apache Camel로 구성된 채널 시스템 core 부분을 맡고있다.

    이 때 클라이언트가 요청전문마다 타임아웃 설정이 가능하도록 해달라는 요구를 하여 찾아보던 중 알게된 정보를 적는다. ( Connection Timeout 인지, Request Tiemout 인지 초반에 답변을 듣질 못해서 둘다 찾음... )

    결론

    - Connection Timeout 은 Netty4를 이용한 TCP 통신이라면 불가능, Netty4-HTTP는 http parameter 변경을 통해 우회적으로 가능

       - ProducerCache 기능으로 생기는 문제인데 Map의 key값이 URI여서 connection 맺을 때 적용되는 connectionTimeout 파라미터 값이 계속 고정된다. ( 이후에 시간이 된다면 Netty4 Producer 동작 프로세스를 올리겠다 )

    - Request Timeout 은 가능 (밑에 내용)


    CamelContext에 add 되고, start되는 시점에 ClientInitializerFactory를 등록하는 로직이 있다.

    ClientInitializerFactory를 상속받아서 직접 구현해도 되고 아니면 DefaultClientInitializerFactory가 생성된다.

    NettyProducer.doStart 메소드
    DefaultClientInitializerFactory 생성자, producer를 등록

    protected void initChannel(Channel ch) throws Exception {
            // create a new pipeline
            ChannelPipeline channelPipeline = ch.pipeline();
    
            SslHandler sslHandler = configureClientSSLOnDemand();
            if (sslHandler != null) {
                //TODO  must close on SSL exception
                //sslHandler.setCloseOnSSLException(true);
                LOG.debug("Client SSL handler configured and added to the ChannelPipeline: {}", sslHandler);
                addToPipeline("ssl", channelPipeline, sslHandler);
            }
    
            List<ChannelHandler> decoders = producer.getConfiguration().getDecoders();
            for (int x = 0; x < decoders.size(); x++) {
                ChannelHandler decoder = decoders.get(x);
                if (decoder instanceof ChannelHandlerFactory) {
                    // use the factory to create a new instance of the channel as it may not be shareable
                    decoder = ((ChannelHandlerFactory) decoder).newChannelHandler();
                }
                addToPipeline("decoder-" + x, channelPipeline, decoder);
            }
    
            List<ChannelHandler> encoders = producer.getConfiguration().getEncoders();
            for (int x = 0; x < encoders.size(); x++) {
                ChannelHandler encoder = encoders.get(x);
                if (encoder instanceof ChannelHandlerFactory) {
                    // use the factory to create a new instance of the channel as it may not be shareable
                    encoder = ((ChannelHandlerFactory) encoder).newChannelHandler();
                }
                addToPipeline("encoder-" + x, channelPipeline, encoder);
            }
    
            // do we use request timeout?
            if (producer.getConfiguration().getRequestTimeout() > 0) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
                }
                ChannelHandler timeout = new ReadTimeoutHandler(producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
                addToPipeline("timeout", channelPipeline, timeout);
            }
    
            // our handler must be added last
            addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));
    
            LOG.trace("Created ChannelPipeline: {}", channelPipeline);
        }

    위의 코드는 ssl, decoder, encoder, timeout handler를 등록하는 과정이다.

     

            // do we use request timeout?
            if (producer.getConfiguration().getRequestTimeout() > 0) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
                }
                ChannelHandler timeout = new ReadTimeoutHandler(producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
                addToPipeline("timeout", channelPipeline, timeout);
            }

    위에 ToEndpoint를 String으로 설정한 부분에 '?' 뒤에가 producer.configuration에 설정된다.

    설정하지 않으면 Default 값은 "무한대기" 이므로 주의하자.

     

    Custom하게 구현할 시 맨 아래 부분은 필수로 넣어 주어야함 (Camel측에서 생성한 handler가 등록되어 있어야 진행됨)

    // our handler must be added last
    addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));

     

    이 로직은 connection 이후 channel이 생성되어 body data를 보내기 전, 순서대로 실행할 channelHandler들을 등록하는 로직

    간단하게 이해한대로 그려보자면

     

    위와같이 하면 모든 요청(Request)의 Timeout은 1초(1000 milli seconds)로 적용된다.

     

    이를 매 요청마다 변경하고 싶으면

    public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";

    exchange 안에 message의 header 값( NettyConstants.NETTY_REQUEST_TIMEOUT )을 설정해주면 된다.  

    public void processWithConnectedChannel(final Exchange exchange, final BodyReleaseCallback callback, final ChannelFuture channelFuture, final Object body) {
            // remember channel so we can reuse it
            final Channel channel = channelFuture.channel();
            
            .
            .
            .
    
            if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
                long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
                ChannelHandler oldHandler = channel.pipeline().get("timeout");
                ReadTimeoutHandler newHandler = new ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS);
                if (oldHandler == null) {
                    channel.pipeline().addBefore("handler", "timeout", newHandler);
                } else {
                    channel.pipeline().replace(oldHandler, "timeout", newHandler);
                }
                
    		.
            .
            .
    
            // write body
            NettyHelper.writeBodyAsync(log, channel, remoteAddress, body, exchange, new ChannelFutureListener() {
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    log.trace("Operation complete {}", channelFuture);
                    if (!channelFuture.isSuccess()) {
                        // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught)
                        return;
                    }
    
                    // if we do not expect any reply then signal callback to continue routing
                    if (!configuration.isSync()) {
                        try {
                            // should channel be closed after complete?
                            Boolean close;
                            if (ExchangeHelper.isOutCapable(exchange)) {
                                close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
                            } else {
                                close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
                            }
    
                            // should we disconnect, the header can override the configuration
                            boolean disconnect = getConfiguration().isDisconnect();
                            if (close != null) {
                                disconnect = close;
                            }
    
                            // we should not close if we are reusing the channel
                            if (!configuration.isReuseChannel() && disconnect) {
                                if (log.isTraceEnabled()) {
                                    log.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
                                }
                                NettyHelper.close(channel);
                            }
                        } finally {
                            // signal callback to continue routing
                            producerCallback.done(false);
                        }
                    }
                }
            });
    
        }

    위의 소스코드는 ProducerPool에서 ToEndpoint에 등록된 uri를 key값으로 Producer를 얻은 후 Channel 생성에 성공 한 후 Connection 성공 시 에호출되는 CallBack 함수이다.

    실제 writeBodyAsync메소드로 data를 보내기 직전에 RequestTimeout을 한번더 설정하는 로직이 들어있다.

    Producer.configuration에 설정된 값과는 다르게 Header값으로 유동적으로 변경가능함으로 클라이언트가 원하는 상황을 구현할 수 있었다.

     

    그럼 ToEndpoint를 생성할 때 ToDynamicEndpoint로 생성 후 configuration값을 매번 변경하면 되지 않나? 싶어서 테스트를 해보았지만 ProducerPool이 필수적으로 요구되는 상황 Pool에 등록될 때(정확히는 내부에 ProducerCache 안에 ServicePool 객체가 존재) Map형식의 Key 값이 uri로 지정되어서 암만 아래와같이 configuration을 바꿔봤자

    앞에 "localhost:60010" 으로 ProducerCache에 저장되어 최초로 등록된 Producer의 Configuration값만 적용된다.

    (2.21 버전에서는 String으로 Key를 잡았었는데 지금(3.0.0-M4)은 ServicePoolKey로 잡고있다. 내부 로직도 많이 바뀌었다... 하지만 동작원리는 비슷하다 )

    2.21버전에선 String이 Key값 이었다.

     

     

     


    결론!!!

    Set NettyConstants.NETTY_REQUEST_TIMEOUT(CamelNettyRequestTimeout) in Message Header!!!

    '개발 > Apache Camel' 카테고리의 다른 글

    [Apache Camel] Netty4 maximumPoolSize 설정  (0) 2020.10.02
    [Apache Camel] Netty4 Consumer 파헤치기  (0) 2020.09.22

    댓글

Designed by Tistory.