반응형
본 내용은 인프런의 이도원 님의 강의 "Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)" 내용을 바탕으로 정리한 내용입니다.
Kafka Connect란
- 데이터 파이프라인 구축을 간소화하고 효율화하는 데 중점을 둔 도구로 이를 통해 Kafka와 외부 시스템 간의 데이터를 쉽게 이동할 수 있다.
- Standalone mode 지원, Distribution mode 지원
- Restful API 통해 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
Kafka Connect 설치
- https://packages.confluent.io/archive/7.6/confluent-community-7.6.1.tar.gz URL 호출하여 파일 다운로드 후 C:\Kafka 아래로 이동
# 압축 풀기
tar xvf .\confluent-community-7.3.1.tar.gz
cd .\confluent-7.3.1\
# Kafka Connect 실행
.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
※ Classpath is empty. Please build the project first e.g. by running 'gtrdlew jarAll' 에러 발생
.\bin\windows\kafka-run-class.bat
rem Classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
call :concat "%BASE_DIR%\share\java\kafka\*"
)
- rem Classpath addition for core를 찾아서 상단에 위 코드를 추가
※ log4j:ERROR Could not read configuration file from URL [file:.../config/tools-log4j.properties]. java.io.FileNotFoundException 에러 발생
.\bin\windows\connect-distributed.properties
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/connect-log4j.properties
)
- log4j 경로를 %BASE_DIR%/config/tools-log4j.properties 를 %BASE_DIR%/etc/kafka/connect-log4j.properties 로 수정
JDBC Connector 설정
- https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc 페이지 에서 파일 다운로드
- 해당 파일을 C:\Kafka 하위에 압축풀기 한다.C:\Kafka\confluent-7.3.1\etc\kafka\connect-distributed.properties
plugin.path=\C:\\Kafka\\confluentinc-kafka-connect-jdbc-10.7.6\\lib
- 최하단 plugin.path 값을 위의 값으로 변경한다.
JDBCConnector 에서 MariaDB 사용하기 위해 드라이버 복사
- C:\사용자\.m2\repository\org\mariadb\jdbc\mariadb-java-client\3.1.4\mariadb-java-client-3.1.4.jar 파일을 C:\Kafka\confluent-7.3.1\share\java\kafka 폴더에 복사한다.
Kafka Source Contect
Kafka Sink 순서
jdbc > Kafaka Connect Source > Kafka Cluster > Kafka Connect Sink > Target System
Kafka Connect 실습
{
"name" : "my-shop-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mariadb://localhost:3306/shop_db",
"connection.user":"root",
"connection.password":"1234",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"shop_db.users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
- Kafka Connect REST API에 POST 요청을 보내 커넥터를 설정
- http://localhost:8083/connectors/${connect_name} 를 DELETE 방식으로 요청하면 커넥터 설정이 삭제된다.
- get 방식으로 다시 한번 호출하여 커넥터가 연결됐는지 확인할 수 있다.
- 127.0.0.1:8083/connectors/my-source-connect URL을 get 방식으로 호출하여 커넥터의 상세정보를 확인할 수 있다.
> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
my_topic_users
quickstart-events
- 해당 sql 문으로 shop_db.users 테이블에 변경사항을 주면 위에서 설정한 커텍터가 토픽으로 등록된다.
- 등록된 토픽은
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
명령어로 확인 가능하다. .\bin\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
shell 화면에서 DB 변경 사항을 확인할 수 있다.
insert into shop_db.users (id, email, name, user_id, encrypted_pwd)
values (11, 'test@test.com', '관리자', 'admin123', 'admin123');
- insert 문으로 테이블 정보 추가 참고로 task.max를 id로 해서 id가 증가해야 메시지를 보낸다.
{
"schema": {
"type":"struct","fields":
[
{"type":"int64","optional":false,"field":"id"},
{"type":"string","optional":false,"field":"email"},
{"type":"string","optional":false,"field":"name"},
{"type":"string","optional":false,"field":"user_id"},
{"type":"string","optional":false,"field":"encrypted_pwd"}
],
"optional":false,
"name":"users"
},
"payload": {
"id":11,
"email":"test@test.com",
"name":"관리자",
"user_id":"admin123",
"encrypted_pwd":"admin123"
}
}
- insert 시 전송된 정보가 shell 에서 실시간으로 추가된다.
- payload 가 실제 전달된 데이터이다.
Kafka Sink Connect 테스트
- 데이터를 변경하고 그 변경한 데이터가 토픽에 전송되어 그 토픽이 데이터화 되어 변경된 정보를 실시간으로 동기화 시킬 수 있다.
- Kafka Producer를 이용하여 Kafka Topic에 데이터를 직접 전송할 것이다.
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://localhost:3306/shop_db",
"connection.user": "root",
"connection.password": "1234",
"topics": "my_topic_users",
"auto.create": "true",
"delete.enabled": "false",
"pk.fields": "id",
"tasks.max": "1"
}
}
insert into shop_db.users (id, email, name, user_id, encrypted_pwd)
values (11, 'test1@test.com', '관리자1', 'admin1', 'admin1');
- my-sink-connect 설정정보를 json 형태로 전송하고 shop_db에 데이터를 insert하면 my_topic_users 라는 테이블이 생기고 users 테이블의 데이터가 동기화 된다.
> cd C:\Kafka\kafka_2.13-3.7.1
> .\bin\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
- producer shell 창 호출
{
"schema": {
"type":"struct",
"fields":[
{"type":"int64","optional":false,"field":"id"},
{"type":"string","optional":false,"field":"email"},
{"type":"string","optional":false,"field":"name"},
{"type":"string","optional":false,"field":"user_id"},
{"type":"string","optional":false,"field":"encrypted_pwd"}
],
"optional":false,
"name":"users"
},
"payload":{
"id":12,
"email":"test12@test.com",
"name":"admin12",
"user_id":"admin12",
"encrypted_pwd":"admin12"
}
}
- json 형식으로 데이터 전송시 my_topic_users 테이블에 데이터 insert 됨
반응형
'Cloud > MSA' 카테고리의 다른 글
[MSA] MSA 분산 추적 (0) | 2024.12.02 |
---|---|
[MSA] MSA 통신 간 오류 처리 (0) | 2024.12.02 |
[MSA] Apache Kafka (0) | 2024.12.02 |
[MSA] Feign Client (0) | 2024.12.02 |
[MSA] Java KeyStore(JKS) (0) | 2024.12.02 |