Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | |||||
| 3 | 4 | 5 | 6 | 7 | 8 | 9 |
| 10 | 11 | 12 | 13 | 14 | 15 | 16 |
| 17 | 18 | 19 | 20 | 21 | 22 | 23 |
| 24 | 25 | 26 | 27 | 28 | 29 | 30 |
| 31 |
Tags
- javascript
- 메소드
- Integer.MAX_VALUE
- jsp
- 오버로딩
- Break
- extends
- If
- Else If
- 이중 배열
- rs.next()
- super()
- 배열
- 인터페이스
- 다형성
- arraylist
- 상속
- 2차원 배열
- for문
- docker
- 이클립스
- 삼항 연산식
- Integer.MIN_VALUE
- interface
- 상수
- 삼항 연산
- 문자열
- 중첩for문
- scanner
- 자바
Archives
- Today
- Total
개발로드
☆KDT 2024-05-20★Kafka/EC2에 Kafka 설치 및 SpringBoot로 실습 본문
Kafka
1) 개요
- LinkedIn에서 내부 데이터 흐름을 개선하기 위해서 개발한 소프트웨어
- kafka를 이용하면 각각의 애플리케이션끼리 연결해서 데이터를 처리하지 않고 한 곳에 모아서 처리할 수 있는 중앙 집중화 방식을 사용하는 것이 가능
- 여러 데이터 소스에서 취합한 데이터 스트림을 한 곳에서 실시간으로 관리할 수 있도록 합니다.


CQRS
1) 읽기와 쓰기를 분리
- 읽기와 쓰기는 부하가 다름
- 읽기와 쓰기는 작업 방식이 다름

EC2 인스턴스 생성
CPU 이름과 OS선

키 페어 생성 및 다운로드

CPU 생성완료

CMD PC접속 명령어

ubuntu linux에 kafka를 설치하고 외부에서 사용가능하도록 설정
- 1) jdk 설치 : kafka가 java로 만들어져서 jvm이 있어야만 실행 가능
- sudo apt-get update
- sudo apt-get install openjdk-17-jdk
- 확인: java -version

- 2) kafka 설치
- 다운로드: wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
- 압축해제 : tar xvf kafka_2.13-3.6.0.tgz
- 확인: ll
- 디렉토리 변경: sudo mv kafka_2.13-3.6.0 /opt/kafka
- 자주 사용하는 명령을 path에 추가: nano ~/.bashrc 로 파일을 열어서
- export KAFKA_HOME=/opt/kafka
- export PATH=$PATH:$KAFKA_HOME/bin
- 변경한 설정 적용: source ~/.bashrc
- 3) 외부에서 kafka를 사용할 수 있도록 설정
- 힙 메모리 설정: kafka는 기본적으로 1GB의 메모리를 사용하는데 EC2를 프리티어로 생성하게 되면 1GB의 메모리를 가지게 되고 운영체제가 일부분을 사용하기 때문에 메모리 부족 에러가 발생하게 됨.
- nano ~/.bashrc를 이용해서 아래 코드를 작성하고 저장한 후 사용
- export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
- 변경한 설정 적용: source ~/.bashrc
- 확인: echo $KAFKA_HEAP_OPTS
- kafka 디렉토리의 config 디렉토리에 있는 server.properties 파일에 IP를 등록
- nano /opt/kafka/config/server.properties
- listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://54.180.99.172:9092
delete.topic.enable=true
auto.create.topics.enable=true
- 4) 실행
- 주키퍼 실행
- zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
- jps -vm
- 카프카 실행
- kafka-server-start.sh -daemon /opt/kafka/config/server.properties
- jps -m
- 5) 리눅스 콘솔에서 topic를 생성하고 메시지를 전송
- 토픽 생성: kafka-topics.sh --create --bootstrap-server localhost:9092 --topic ec2topic
- 토픽 리스트 조회: kafka-topics.sh --create --bootstrap-server localhost:9092 --list
- ec2에 설치한 kafka를 외부에서 사용하도록 할 때는 보안 그룹에 인바운드 규칙에 2181번 과 9092번을 추가

SpringBoot Project 생성
의존성에 Spring for Apache Kafka 추가

bootstrap-servers에 localhost 대신 내 EC2의 public IP 등록
spring:
kafka:
bootstrap-servers: 54.180.99.172:9092
consumer:
group-id: adamsoft
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 4) Spring 프로젝트에서 사용
- kafka 라이브러리 의존성 설정
- bootstrap-servers 부분을 자신의 IP로 수정
- spring:
kafka:
bootstrap-servers: 54.180.99.172:9092
consumer:
group-id: adamsoft
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- spring:
- kafka 환경 설정 클래스 생성: kafkaConfiguration
- package com.springkafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- package com.springkafka;
- 메세지를 보내는 클래스: kafkaProducer
- package com.springkafka;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "exam-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private final Logger log = LoggerFactory.getLogger(getClass());
public void sendMessage(String name, int age) {
log.info("Produce message : {}{}", name, age);
String message = "{\"name\":" + "\"" + name + "\"" + ", \"age\":" + age +
"}";
this.kafkaTemplate.send(TOPIC, message);
}
}
- package com.springkafka;
- 메세지가 JSON 형태로 오므로 JSON 파싱을 위한 라이브러리의 의존성 설정
-
- build.gradle 의존성에 추가: implementation 'org.json:json:20190722'
-
- 메세지를 받는 클래스: KafkaConsumer
- package com.springkafka;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
private final Logger log = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = "exam-topic", groupId = "adamsoft")
public void consume(String message) throws IOException, JSONException {
log.info("Consumed message : {}", message);
JSONObject messageObj = new JSONObject(message);
log.info(messageObj.getString("name"));
log.info(messageObj.getInt("age") + "");
}
}
- package com.springkafka;
- 요청을 처리하기 위한 Controller 클래스 생성: KafkaController
- package com.springkafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaController {
@Autowired
private KafkaProducer producer;
@PostMapping
@ResponseBody
public String sendMessage(@RequestParam("name") String name,
@RequestParam("age") int age) {
this.producer.sendMessage(name, age);
return "success";
}
}
- package com.springkafka;
- SpringBoot 프로젝트를 실행하고 요청을 전송
- http://localhost:8080/kafka?name=김원중&age=28

'JAVA' 카테고리의 다른 글
| ☆KDT 2024-05-20★sqld 기출문제(이기적 영진닷컴)11~20번 (0) | 2024.05.20 |
|---|---|
| ☆KDT 2024-05-20★sqld 기출문제(이기적 영진닷컴)1~10번 (0) | 2024.05.20 |
| ☆KDT 2024-05-18★Amazon AWS S3 버킷생성/CICD/배포 (0) | 2024.05.18 |
| ☆KDT 2024-05-18★Amazon AWS RDS MySQL 데이터베이스 생성 (0) | 2024.05.18 |
| ☆KDT 2024-05-14★SpringBoot 스케쥴링/AWS EC2 (0) | 2024.05.14 |