NGMsoftware

NGMsoftware
로그인 회원가입
  • 매뉴얼
  • 학습
  • 매뉴얼

    학습


    기타 Kafka, ActiveMQ 대용량 메세지 처리.

    페이지 정보

    본문

    안녕하세요. 엔지엠소프트웨어입니다. 기존에 액티브엠큐로 만들어진 메세지 버스 미들웨어를 카프카로 변경했었습니다. 그런데, 기존 ActiveMQ에서 정상 동작하던 시스템이 Kafka로 전환하고, 문제가 발생했습니다. 테스트에서 사용된 메세지는 약 200k였는데 실제 장비로부터 올라오는 메세지는 10M 이상이었습니다. 카프카 메세지 사이즈 스팩을 확인해보니 최대 1M만 설정할 수 있더라구요-_-; ActiveMQ는 서버가 감당할 수 있는 한 무제한이었습니다. 카프카도 메세지를 무제한으로 보낼 수 있으니 아래 설정을 참고 해보세요.

     

    ActiveMQ - ServerLocator.setMinLargeMessageSize

    Apache ActiveMQ Artemis Core API를 사용하는 경우 최소 대형 메세지 크기를 설정할 수 있습니다.

    ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()))
    
    locator.setMinLargeMessageSize(25 * 1024);
    
    ClientSessionFactory factory = ActiveMQClient.createClientSessionFactory();

     

    이외에도 메세지 용량이 네트워크에 부하를 줄 정도라면 압축해서 보낼수도 있습니다. compressLargeMessages 속성을 사용하면 메세지가 서버측으로 전송될 때 시스템에서 ZIP 알고리즘을 사용하여 압축하게 됩니다. 서버측에서는 특별한 처리가 필요하지 않습니다. 압축 해제는 클라이언트에서 수행되기 때문입니다. 메세지는 텍스트라서 압축률이 상당히 좋습니다. 다만, 압축하고 해제하는데 시간이 소요되므로 적절한 크기의 메세지라면 압축하고 해제하는 시간이 더 오래 걸릴 수 있으므로 네트워크 부하와 분산 처리 및 압축 관련해서 충분한 조사와 테스트가 필요합니다.

     

    Kafka 3.2.0 - server.properties

    아래와 같이 소켓의 버퍼 크기와 메세지 최대 바이트랑 레플리카 패치 최대 바이트를 설정하세요. 마지막 2줄은 직접 추가 한 코드입니다.

    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400000
    
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400000
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    message.max.bytes=502728640
    replica.fetch.max.bytes=502728640

     

    카프카 프로듀서 C# 코드

    #region Kafka
    var config = new ProducerConfig
    {
        BootstrapServers = "localhost:9092",
        MessageMaxBytes = 500000000,
        ApiVersionRequestTimeoutMs = 30000
    };
    
    try
    {
        foreach (var msg in messageTest)
        {
            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                KafkaSenario(producer, msg);
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
    
    Console.ReadLine();
    #endregion

     

    KafkaSenario 메소드

    static void KafkaSenario(IProducer<Null, string> producer, string msg)
    {
    
        var message = new Message<Null, string> { Value = File.ReadAllText(msg) };
        var headers = new Headers();
        message.Headers = headers;
        producer.Produce("mspc", message);
        System.Threading.Thread.Sleep(1000);
    }

     

    이 글이 도움이 되셨다면~ 커피 한잔이라도 후원 부탁드립니다^^

    개발자에게 후원하기

    MGtdv7r.png

     

    추천, 구독, 홍보 꼭~ 부탁드립니다.

    여러분의 후원이 빠른 귀농을 가능하게 해줍니다~ 답답한 도시를 벗어나 귀농하고 싶은 개발자~

    감사합니다~

    • 네이버 공유하기
    • 페이스북 공유하기
    • 트위터 공유하기
    • 카카오스토리 공유하기
    추천0 비추천0

    댓글목록

    등록된 댓글이 없습니다.