Kafka Connect로 데이터 파이프라인 구축
    • PDF

    Kafka Connect로 데이터 파이프라인 구축

    • PDF

    기사 요약

    VPC 환경에서 이용 가능합니다.

    Kafka Connect를 설치하여 Cloud Data Streaming Service를 응용하는 방법을 안내합니다.

    참고

    이 가이드는 사용자의 편의를 위해 작성된 예제입니다. 이 가이드에서 사용된 connector 및 confluent-hub에 대한 라이선스는 confluent 사에 있으며 confluent 사의 라이선스 정책을 따릅니다. 네이버클라우드는 해당 connector 및 confluent-hub를 직접 제공하는 사업자가 아니며 사용자는 해당 connector 및 confluent-hub 사용 여부를 직접 결정할 수 있습니다.

    Kafka Connect란?

    Kafka Connect는 데이터베이스, 키-밸류 스토어, 파일 시스템과 같은 외부 시스템을 Kafka에 연결해 주는 구성 요소로, Apache Kafka에서 제공하는 오픈 소스입니다. Kafka Connect를 이용하면 다양한 제품을 시스템에 쉽게 연동할 수 있어 간편하게 데이터 파이프라인을 구축할 수 있습니다.

    시작하기 전에

    이 가이드는 서버에 Kafka Connect를 설치한 후 MySQL의 변경사항을 Elasticsearch에 적용하는 방법을 설명합니다.
    시작하기 전에 필요한 서비스들이 활성화 되어있어야합니다.

    사전 필요 서비스

    Server: 물리적인 서버 자원을 별도로 구매하지 않고, 클라우드 환경에서 빠르게 생성해 사용한 만큼만 비용을 지불하는 서비스
    Cloud DB for MySQL: 장애가 발생하면 자동으로 복구하는 완전 관리형 클라우드 데이터베이스 서비스
    Cloud Data Streaming Service: 오픈 소스 Apache Kafka 클러스터를 손쉽게 배포, 운영 및 확장 가능한 서비스
    Search Engine Service: Elasticsearch 클러스터를 손쉽게 배포, 운영 및 확장 가능한 서비스

    네트워크 설정

    네트워크 설정 방법은 안내합니다.

    STEP 1. Cloud DB for MySQL 설정

    1. Services > Database > Cloud DB for MySQL에서 [DB 관리] 버튼을 클릭해 주십시오.

    2. DB User 관리를 클릭해 주십시오.
      cdss-5-1_ko

    3. 필요한 정보를 입력 후 [DB User 추가] 버튼을 클릭해 주십시오.

      cdss-5-2_ko

    STEP 2. ACG 설정

    1. Services > Compute > Server > ACG 메뉴에서 보유하고 있는 'cdss-b-xxxxx'을 선택해 주십시오.

      • Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정합니다.

      cdss-5-3_ko

    2. [ACG 설정] 버튼을 클릭한 후 아래 ACG 규칙을 추가해 주십시오.

      • 프로토콜: TCP
      • 접근 소스: Kafka Connect가 실행될 서버의 IP
      • 허용 포트: 9092

      cdss-5-4_ko

    3. 보유하고 있는 'searchengine-m-xxxxx'을 선택해 주십시오.

      • Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정합니다.

      cdss-5-5_ko

    4. [ACG 설정] 버튼을 클릭한 후 아래 ACG 규칙을 추가해 주십시오.

      • 프로토콜: TCP
      • 접근 소스: Kafka Connect가 실행될 서버의 IP
      • 허용 포트: 9200

      cdss-5-6_ko

    서버에 Kafka Connect 설치

    STEP 1. Java 설치

    1. 아래 명령어를 입력하여 yum 업데이트를 진행해 주십시오.

      yum update
      
    2. 아래 명령어를 입력하여 java-1.8.0-openjdk-devel.x86_64를 설치해 주십시오.

      yum install java-1.8.0-openjdk-devel.x86_64
      
    3. 아래 명령어를 입력하여 정상적으로 설치가 되었는지 확인해 주십시오.

      java -version
      javac -version
      

    STEP 2. Kafka Connect 설치

    1. 아래 명령어를 입력하여 서버의 /root 경로에 Kafka Connect를 다운로드해 주십시오.

      curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
      
    2. 아래 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.

      tar -zxvf confluent-community-7.0.1.tar.gz
      
    3. 아래 명령어를 입력하여 구동 전 properties 파일을 수정해 주십시오.

      • Cloud Data Streaming Service의 브로커 노드 정보를 참고하여 properties 파일의 ‘bootstrap.servers’에 ip 목록을 추가합니다.
      vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
      bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
      

    STEP 3. Confluent Hub 설치

    Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소입니다. 지원되는 플러그인의 전체 목록은 Confluent에서 제공하는 Confluent Connector Portfolio를 확인해 주십시오.

    1. 아래 명령어를 입력하여 /root 경로에 새로운 폴더를 만든 후 해당 폴더로 이동해 주십시오.

      • 예제에서는 'confluent-hub'라는 이름의 폴더를 생성합니다.
      mkdir confluent-hub
      cd confluent-hub
      
    2. 아래 명령어를 입력하여 현재 경로(/root/confluent-hub)에 Confluent Hub를 다운로드해 주십시오.

      curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
      
    3. 아래 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.

      tar -zxvf confluent-hub-client-latest.tar.gz
      
    4. 아래 명령어를 입력하여 현재 경로(/root/confluent-hub)에 추후 플러그인이 저장될 폴더를 생성해 주십시오.

      • 예제에서는 'plugins'라는 이름의 폴더를 생성합니다.
      mkdir plugins
      
    5. 아래 명령어를 차례대로 입력하여 PATH 환경변수에 압축 해제한 bin 폴더의 경로를 추가해 주십시오.

      vi ~/.bashrc
      
      export CONFLUENT_HOME='~/confluent-hub'
      export PATH=$PATH:$CONFLUENT_HOME/bin
      
      source ~/.bashrc
      

    STEP 4. MySQL Connector 설치

    1. 아래 명령어를 입력하여 debezium-connector-mysql을 설치해 주십시오.
      • --component-dir은 실제 플러그인이 설치되는 폴더 경로입니다. STEP 3에서 생성한 /root/confluent-hub/plugins로 설정해 주십시오.
      • --worker-configs는 플러그인 설치 후 적용된 properties 파일의 경로입니다. STEP 2에서 수정했던 /root/confluent-7.0.1/etc/kafka/connect-distributed.properties로 설정해 주십시오.
      confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      

    STEP 5. Elasticsearch Connector 설치

    1. 아래 명령어를 입력하여 kafka-connect-elasticsearch을 설치해 주십시오.
      • --component-dir--worker-configs는 STEP 4와 동일하게 적용해 주십시오.
      confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      

    STEP 6. Kafka Connect 프로세스 실행

    1. 아래 명령어를 입력하여 Kafka Connect 프로세스를 백그라운드로 실행해 주십시오.

      /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
    2. 아래 명령어를 입력하여 프로세스가 정상적으로 작동하는지 확인해 주십시오.

      curl localhost:8083
      
      {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
      
    3. 아래 명령어를 입력하여 앞에서 설치한 Connector가 모두 정상적으로 노출되는지 확인해 주십시오.

      curl localhost:8083/connector-plugins
      
      [
         {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
         {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
         {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
      ]
      

    Kafka Connect 사용

    STEP 1. MySQL 테이블 생성 및 데이터 추가

    1. 사용 중인 Cloud DB for MySQL 서버에 접속해 주십시오.

    2. 아래 명령어를 입력하여 테이블을 생성한 후 데이터를 추가해 주십시오.

      • 예제에서는 member라는 이름의 테이블을 생성합니다.
      CREATE TABLE IF NOT EXISTS member (
       id int NOT NULL PRIMARY KEY,
       name varchar(100),
       email varchar(200),
       department varchar(200)
      );
      
      INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
      INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
      INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
      INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
      

    STEP 2. MySQL Connector 등록

    1. Kafka Connect가 설치된 서버로 접속해 주십시오.

    2. 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.

      {
         "name": "mysql-connector", // connector의 이름
         "config": {
           "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 사용할 플러그인의 종류
           "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL 서버의 엔드포인트
           "database.port": "3306", // MySQL 서버의 포트
           "database.user": "kimdong", // MySQL 서버의 user
           "database.password": "1234", // MySQL 서버 user의 password
           "database.server.id": "184054", // kafka connect에서 사용되는 MySQL 서버의 uuid
           "database.server.name": "NCP_MYSQL", // kafka connect에서 사용할 MySQL 서버에 부여하는 이름, 추후 kafka topic의 접두사로 사용됨 
           "database.whitelist": "test", // kafka connect에서 접근할 MySQL 서버의 데이터베이스 지정
           "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker 노드 정보
           "database.history.kafka.topic": "this_is_topic", // MySQL 히스토리 변경 내역을 저장할 topic 이름 
           "snapshot.locking.mode": "none",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter"
         }
      }
      
    3. 아래 명령어를 입력하여 MySQL Connector를 등록해 주십시오.

      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
      
      

    STEP 3. Topic 확인

    1. Cloud Data Streaming Service에서 제공하는 CMAK의 엔드포인트로 접속해 주십시오.

    2. STEP 2에서 설정한 body의 정보대로 Topic이 생성된 것을 확인해 주십시오.

      • STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.

      cdss-5-7_ko

    STEP 4. Elasticsearch Connector 등록

    1. Kafka Connect가 설치된 서버로 접속해 주십시오.

    2. 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.

      {
         "name": "es-connector", // connector의 이름
         "config": {
           "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 사용할 플러그인의 종류
           "connection.url": "http://10.0.100.9:9200", // topic을 가져올 kafka 브로커 노드
           "tasks.max": "1",
           "topics": "NCP_MYSQL.test.member", // 컨슈밍할 topic 이름
           "type.name": "_doc",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "true",
           "value.converter.schemas.enable": "true",
           "transforms": "extractKey",
           "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
           "transforms.extractKey.field": "id", // MySQL 테이블에서 사용하는 pk명
           "behavior.on.null.values": "IGNORE"
         }
      }
      
    3. 아래 명령어를 입력하여 Elasticsearch Connector를 등록해 주십시오.

      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
      

    STEP 5. Elasticsearch 데이터 확인

    1. Search Engine Service에서 제공하는 Kibana의 엔드포인트로 접속해 주십시오.

    2. 인덱스 목록을 조회하여 ncp_mysql.test.member라는 이름의 인덱스가 생성된 것을 확인해 주십시오.
      cdss-5-8_ko

    3. ncp_mysql.test.member 인덱스의 내용을 조회해 주십시오.
      cdss-5-9_ko

    4. ncp_mysql.test.member 인덱스의 특정 도큐먼트를 조회해 주십시오.

      • MySQL에서 추가한 데이터는 _source.after에서 확인할 수 있습니다.

      cdss-5-10_ko


    이 문서가 도움이 되었습니까?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.