개발로드

☆KDT 2024-05-20★Kafka/EC2에 Kafka 설치 및 SpringBoot로 실습 본문

JAVA

☆KDT 2024-05-20★Kafka/EC2에 Kafka 설치 및 SpringBoot로 실습

위대한개발자 2024. 5. 20. 10:50

Kafka

1) 개요

  • LinkedIn에서 내부 데이터 흐름을 개선하기 위해서 개발한 소프트웨어
  • kafka를 이용하면 각각의 애플리케이션끼리 연결해서 데이터를 처리하지 않고 한 곳에 모아서 처리할 수 있는 중앙 집중화 방식을 사용하는 것이 가능
  • 여러 데이터 소스에서 취합한 데이터 스트림을 한 곳에서 실시간으로 관리할 수 있도록 합니다.

CQRS

1) 읽기와 쓰기를 분리

  • 읽기와 쓰기는 부하가 다름
  • 읽기와 쓰기는 작업 방식이 다름

 

 

EC2 인스턴스 생성

CPU 이름과 OS선

 

키 페어 생성 및 다운로드

 

CPU 생성완료

 

CMD PC접속 명령어


ubuntu linux에 kafka를 설치하고 외부에서 사용가능하도록 설정

  • 1) jdk 설치 : kafka가 java로 만들어져서 jvm이 있어야만 실행 가능
    • sudo apt-get update
    • sudo apt-get install openjdk-17-jdk
    • 확인: java -version

 

  • 2) kafka 설치
    • 다운로드: wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
    • 압축해제 : tar xvf kafka_2.13-3.6.0.tgz
    • 확인: ll
    • 디렉토리 변경: sudo mv kafka_2.13-3.6.0 /opt/kafka
    • 자주 사용하는 명령을 path에 추가: nano ~/.bashrc 로 파일을 열어서
      • export KAFKA_HOME=/opt/kafka
      • export PATH=$PATH:$KAFKA_HOME/bin 
    • 변경한 설정 적용: source ~/.bashrc
  • 3) 외부에서 kafka를 사용할 수 있도록 설정
    • 힙 메모리 설정: kafka는 기본적으로 1GB의 메모리를 사용하는데 EC2를 프리티어로 생성하게 되면 1GB의 메모리를 가지게 되고 운영체제가 일부분을 사용하기 때문에 메모리 부족 에러가 발생하게 됨.
    • nano ~/.bashrc를 이용해서 아래 코드를 작성하고 저장한 후 사용
      • export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
      • 변경한 설정 적용: source ~/.bashrc
      • 확인: echo $KAFKA_HEAP_OPTS
      • kafka 디렉토리의 config 디렉토리에 있는 server.properties 파일에 IP를 등록
        • nano /opt/kafka/config/server.properties
        • listeners=PLAINTEXT://:9092
          advertised.listeners=PLAINTEXT://54.180.99.172:9092
          delete.topic.enable=true
          auto.create.topics.enable=true
    • 4) 실행
      • 주키퍼 실행
      • zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
      • jps -vm
      • 카프카 실행
        • kafka-server-start.sh -daemon /opt/kafka/config/server.properties
        • jps -m
    • 5) 리눅스 콘솔에서 topic를 생성하고 메시지를 전송
      • 토픽 생성: kafka-topics.sh --create --bootstrap-server localhost:9092 --topic ec2topic
      • 토픽 리스트 조회: kafka-topics.sh --create --bootstrap-server localhost:9092 --list
      • ec2에 설치한 kafka를 외부에서 사용하도록 할 때는 보안 그룹에 인바운드 규칙에 2181번 과 9092번을 추가


SpringBoot Project 생성

의존성에 Spring for Apache Kafka 추가

 

 

bootstrap-servers에 localhost 대신 내 EC2의 public IP 등록

spring:
  kafka:
    bootstrap-servers: 54.180.99.172:9092
  consumer:
    group-id: adamsoft
    auto-offset-reset: earliest
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

  • 4) Spring 프로젝트에서 사용
    • kafka 라이브러리 의존성 설정
    • bootstrap-servers 부분을 자신의 IP로 수정
      • spring:
          kafka:
            bootstrap-servers: 54.180.99.172:9092
          consumer:
            group-id: adamsoft
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          producer:
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
    • kafka 환경 설정 클래스 생성: kafkaConfiguration
      • package com.springkafka;

        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.common.serialization.StringSerializer;
        import org.springframework.beans.factory.annotation.Value;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.kafka.core.DefaultKafkaProducerFactory;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.kafka.core.ProducerFactory;
        import java.util.HashMap;
        import java.util.Map;



            @Configuration
            public class KafkaConfiguration {

                @Value("${spring.kafka.bootstrap-servers}")
                private String bootstrapServers;
                @Bean
                public ProducerFactory<String, String> producerFactory() {
                    Map<String,Object> configs = new HashMap<>();
                    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
                    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                            StringSerializer.class);
                    return new DefaultKafkaProducerFactory(configs);
                }
                @Bean
                public KafkaTemplate<String, String> kafkaTemplate() {
                    return new KafkaTemplate<>(producerFactory());
                }
            }
    • 메세지를 보내는 클래스: kafkaProducer
      • package com.springkafka;

        import lombok.RequiredArgsConstructor;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.stereotype.Service;
        @Service
        @RequiredArgsConstructor
        public class KafkaProducer {
            private static final String TOPIC = "exam-topic";
            @Autowired
            private KafkaTemplate<String, String> kafkaTemplate;
            private final Logger log = LoggerFactory.getLogger(getClass());
            public void sendMessage(String name, int age) {
                log.info("Produce message : {}{}", name, age);
                String message = "{\"name\":" + "\"" + name + "\"" + ", \"age\":" + age +
                        "}";
                this.kafkaTemplate.send(TOPIC, message);
            }
        }
    • 메세지가 JSON 형태로 오므로 JSON 파싱을 위한 라이브러리의 의존성 설정
        • build.gradle 의존성에 추가: implementation 'org.json:json:20190722'
    • 메세지를 받는 클래스:  KafkaConsumer
      • package com.springkafka;

        import org.json.JSONException;
        import org.json.JSONObject;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.kafka.annotation.KafkaListener;
        import org.springframework.stereotype.Service;
        import java.io.IOException;

        @Service
        public class KafkaConsumer {
            private final Logger log = LoggerFactory.getLogger(getClass());
            @KafkaListener(topics = "exam-topic", groupId = "adamsoft")
            public void consume(String message) throws IOException, JSONException {
                log.info("Consumed message : {}", message);
                JSONObject messageObj = new JSONObject(message);
                log.info(messageObj.getString("name"));
                log.info(messageObj.getInt("age") + "");
            }
        }
    • 요청을 처리하기 위한 Controller 클래스 생성: KafkaController
      • package com.springkafka;

        import lombok.RequiredArgsConstructor;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.web.bind.annotation.*;

        @RestController
        @RequestMapping(value = "/kafka")
        @Slf4j
        @RequiredArgsConstructor
        public class KafkaController {
            @Autowired
            private KafkaProducer producer;
            @PostMapping
            @ResponseBody
            public String sendMessage(@RequestParam("name") String name,
                                      @RequestParam("age") int age) {
                this.producer.sendMessage(name, age);
                return "success";
            }
        }
    • SpringBoot 프로젝트를 실행하고 요청을 전송
      • http://localhost:8080/kafka?name=김원중&age=28