반응형

본 내용은 인프런의 이도원 님의 강의 "Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)" 내용을 바탕으로 정리한 내용입니다.

 

Kafka Connect란

  • 데이터 파이프라인 구축을 간소화하고 효율화하는 데 중점을 둔 도구로 이를 통해 Kafka와 외부 시스템 간의 데이터를 쉽게 이동할 수 있다.
  • Standalone mode 지원, Distribution mode 지원
  • Restful API 통해 지원
  • Stream 또는 Batch 형태로 데이터 전송 가능

Kafka Connect 설치

# 압축 풀기
tar xvf .\confluent-community-7.3.1.tar.gz

cd .\confluent-7.3.1\
# Kafka Connect 실행
.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties

※ Classpath is empty. Please build the project first e.g. by running 'gtrdlew jarAll' 에러 발생

.\bin\windows\kafka-run-class.bat

rem Classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
    call :concat "%BASE_DIR%\share\java\kafka\*"
)
  • rem Classpath addition for core를 찾아서 상단에 위 코드를 추가

※ log4j:ERROR Could not read configuration file from URL [file:.../config/tools-log4j.properties]. java.io.FileNotFoundException 에러 발생

.\bin\windows\connect-distributed.properties

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/connect-log4j.properties
)
  • log4j 경로를 %BASE_DIR%/config/tools-log4j.properties 를 %BASE_DIR%/etc/kafka/connect-log4j.properties 로 수정

JDBC Connector 설정

plugin.path=\C:\\Kafka\\confluentinc-kafka-connect-jdbc-10.7.6\\lib
  • 최하단 plugin.path 값을 위의 값으로 변경한다.

JDBCConnector 에서 MariaDB 사용하기 위해 드라이버 복사

  • C:\사용자\.m2\repository\org\mariadb\jdbc\mariadb-java-client\3.1.4\mariadb-java-client-3.1.4.jar 파일을 C:\Kafka\confluent-7.3.1\share\java\kafka 폴더에 복사한다.

Kafka Source Contect

Kafka Sink 순서

jdbc > Kafaka Connect Source > Kafka Cluster > Kafka Connect Sink > Target System

Kafka Connect 실습

{
    "name" : "my-shop-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mariadb://localhost:3306/shop_db",
        "connection.user":"root",
        "connection.password":"1234",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"shop_db.users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
    }
}

  • get 방식으로 다시 한번 호출하여 커넥터가 연결됐는지 확인할 수 있다.
  • 127.0.0.1:8083/connectors/my-source-connect URL을 get 방식으로 호출하여 커넥터의 상세정보를 확인할 수 있다.
> .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
my_topic_users
quickstart-events
  • 해당 sql 문으로 shop_db.users 테이블에 변경사항을 주면 위에서 설정한 커텍터가 토픽으로 등록된다.
  • 등록된 토픽은 .\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list 명령어로 확인 가능하다.
  • .\bin\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning shell 화면에서 DB 변경 사항을 확인할 수 있다.
insert into shop_db.users (id, email, name, user_id, encrypted_pwd)
values (11, 'test@test.com', '관리자', 'admin123', 'admin123');
  • insert 문으로 테이블 정보 추가 참고로 task.max를 id로 해서 id가 증가해야 메시지를 보낸다.
{
    "schema": {
        "type":"struct","fields":
        [
            {"type":"int64","optional":false,"field":"id"},
            {"type":"string","optional":false,"field":"email"},
            {"type":"string","optional":false,"field":"name"},
            {"type":"string","optional":false,"field":"user_id"},
            {"type":"string","optional":false,"field":"encrypted_pwd"}
        ],
        "optional":false,
        "name":"users"
    },
    "payload": {
        "id":11,
        "email":"test@test.com",
        "name":"관리자",
        "user_id":"admin123",
        "encrypted_pwd":"admin123"
    }
}
  • insert 시 전송된 정보가 shell 에서 실시간으로 추가된다.
  • payload 가 실제 전달된 데이터이다.

Kafka Sink Connect 테스트

  • 데이터를 변경하고 그 변경한 데이터가 토픽에 전송되어 그 토픽이 데이터화 되어 변경된 정보를 실시간으로 동기화 시킬 수 있다.
  • Kafka Producer를 이용하여 Kafka Topic에 데이터를 직접 전송할 것이다.

{
    "name": "my-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mariadb://localhost:3306/shop_db",
        "connection.user": "root",
        "connection.password": "1234",
        "topics": "my_topic_users",
        "auto.create": "true",
        "delete.enabled": "false",
        "pk.fields": "id",
        "tasks.max": "1"
    }
}
insert into shop_db.users (id, email, name, user_id, encrypted_pwd)
values (11, 'test1@test.com', '관리자1', 'admin1', 'admin1');
  • my-sink-connect 설정정보를 json 형태로 전송하고 shop_db에 데이터를 insert하면 my_topic_users 라는 테이블이 생기고 users 테이블의 데이터가 동기화 된다.
> cd C:\Kafka\kafka_2.13-3.7.1
> .\bin\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning 
  • producer shell 창 호출
{
    "schema": {
        "type":"struct",
        "fields":[
            {"type":"int64","optional":false,"field":"id"},
            {"type":"string","optional":false,"field":"email"},
            {"type":"string","optional":false,"field":"name"},
            {"type":"string","optional":false,"field":"user_id"},
            {"type":"string","optional":false,"field":"encrypted_pwd"}
        ],
        "optional":false,
        "name":"users"
    },
    "payload":{
        "id":12,
        "email":"test12@test.com",
        "name":"admin12",
        "user_id":"admin12",
        "encrypted_pwd":"admin12"
    }
}
  • json 형식으로 데이터 전송시 my_topic_users 테이블에 데이터 insert 됨
반응형

'Cloud > MSA' 카테고리의 다른 글

[MSA] MSA 분산 추적  (0) 2024.12.02
[MSA] MSA 통신 간 오류 처리  (0) 2024.12.02
[MSA] Apache Kafka  (0) 2024.12.02
[MSA] Feign Client  (0) 2024.12.02
[MSA] Java KeyStore(JKS)  (0) 2024.12.02

+ Recent posts