- 인쇄
- PDF
Kafka Connect로 데이터 파이프라인 구축
- 인쇄
- PDF
VPC 환경에서 이용 가능합니다.
Kafka Connect를 설치하여 Cloud Data Streaming Service를 응용하는 방법을 안내합니다.
이 가이드는 사용자의 편의를 위해 작성된 예제입니다. 이 가이드에서 사용된 connector 및 confluent-hub에 대한 라이선스는 confluent 사에 있으며 confluent 사의 라이선스 정책을 따릅니다. 네이버클라우드는 해당 connector 및 confluent-hub를 직접 제공하는 사업자가 아니며 사용자는 해당 connector 및 confluent-hub 사용 여부를 직접 결정할 수 있습니다.
Kafka Connect란?
Kafka Connect는 데이터베이스, 키-밸류 스토어, 파일 시스템과 같은 외부 시스템을 Kafka에 연결해 주는 구성 요소로, Apache Kafka에서 제공하는 오픈 소스입니다. Kafka Connect를 이용하면 다양한 제품을 시스템에 쉽게 연동할 수 있어 간편하게 데이터 파이프라인을 구축할 수 있습니다.
시작하기 전에
이 가이드는 서버에 Kafka Connect를 설치한 후 MySQL의 변경사항을 Elasticsearch에 적용하는 방법을 설명합니다.
시작하기 전에 필요한 서비스들이 활성화 되어있어야합니다.
사전 필요 서비스
Server: 물리적인 서버 자원을 별도로 구매하지 않고, 클라우드 환경에서 빠르게 생성해 사용한 만큼만 비용을 지불하는 서비스
Cloud DB for MySQL: 장애가 발생하면 자동으로 복구하는 완전 관리형 클라우드 데이터베이스 서비스
Cloud Data Streaming Service: 오픈 소스 Apache Kafka 클러스터를 손쉽게 배포, 운영 및 확장 가능한 서비스
Search Engine Service: Elasticsearch 클러스터를 손쉽게 배포, 운영 및 확장 가능한 서비스
네트워크 설정
네트워크 설정 방법은 안내합니다.
STEP 1. Cloud DB for MySQL 설정
Services > Database > Cloud DB for MySQL에서 [DB 관리] 버튼을 클릭해 주십시오.
DB User 관리를 클릭해 주십시오.
필요한 정보를 입력 후 [DB User 추가] 버튼을 클릭해 주십시오.
- Cloud DB for MySQL에 대한 자세한 설명은 Cloud DB for MySQL 사용 가이드를 참고해 주십시오.
STEP 2. ACG 설정
Services > Compute > Server > ACG 메뉴에서 보유하고 있는 'cdss-b-xxxxx'을 선택해 주십시오.
- Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정합니다.
[ACG 설정] 버튼을 클릭한 후 아래 ACG 규칙을 추가해 주십시오.
- 프로토콜: TCP
- 접근 소스: Kafka Connect가 실행될 서버의 IP
- 허용 포트: 9092
보유하고 있는 'searchengine-m-xxxxx'을 선택해 주십시오.
- Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정합니다.
[ACG 설정] 버튼을 클릭한 후 아래 ACG 규칙을 추가해 주십시오.
- 프로토콜: TCP
- 접근 소스: Kafka Connect가 실행될 서버의 IP
- 허용 포트: 9200
서버에 Kafka Connect 설치
STEP 1. Java 설치
아래 명령어를 입력하여 yum 업데이트를 진행해 주십시오.
yum update
아래 명령어를 입력하여 java-1.8.0-openjdk-devel.x86_64를 설치해 주십시오.
yum install java-1.8.0-openjdk-devel.x86_64
아래 명령어를 입력하여 정상적으로 설치가 되었는지 확인해 주십시오.
java -version javac -version
STEP 2. Kafka Connect 설치
아래 명령어를 입력하여 서버의
/root
경로에 Kafka Connect를 다운로드해 주십시오.curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
아래 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.
tar -zxvf confluent-community-7.0.1.tar.gz
아래 명령어를 입력하여 구동 전 properties 파일을 수정해 주십시오.
- Cloud Data Streaming Service의 브로커 노드 정보를 참고하여 properties 파일의 ‘bootstrap.servers’에 ip 목록을 추가합니다.
vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
STEP 3. Confluent Hub 설치
Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소입니다. 지원되는 플러그인의 전체 목록은 Confluent에서 제공하는 Confluent Connector Portfolio를 확인해 주십시오.
아래 명령어를 입력하여
/root
경로에 새로운 폴더를 만든 후 해당 폴더로 이동해 주십시오.- 예제에서는 'confluent-hub'라는 이름의 폴더를 생성합니다.
mkdir confluent-hub cd confluent-hub
아래 명령어를 입력하여 현재 경로(
/root/confluent-hub
)에 Confluent Hub를 다운로드해 주십시오.curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
아래 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.
tar -zxvf confluent-hub-client-latest.tar.gz
아래 명령어를 입력하여 현재 경로(
/root/confluent-hub
)에 추후 플러그인이 저장될 폴더를 생성해 주십시오.- 예제에서는 'plugins'라는 이름의 폴더를 생성합니다.
mkdir plugins
아래 명령어를 차례대로 입력하여 PATH 환경변수에 압축 해제한 bin 폴더의 경로를 추가해 주십시오.
vi ~/.bashrc
export CONFLUENT_HOME='~/confluent-hub' export PATH=$PATH:$CONFLUENT_HOME/bin
source ~/.bashrc
STEP 4. MySQL Connector 설치
- 아래 명령어를 입력하여 debezium-connector-mysql을 설치해 주십시오.
--component-dir
은 실제 플러그인이 설치되는 폴더 경로입니다. STEP 3에서 생성한/root/confluent-hub/plugins
로 설정해 주십시오.--worker-configs
는 플러그인 설치 후 적용된 properties 파일의 경로입니다. STEP 2에서 수정했던/root/confluent-7.0.1/etc/kafka/connect-distributed.properties
로 설정해 주십시오.
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
STEP 5. Elasticsearch Connector 설치
- 아래 명령어를 입력하여 kafka-connect-elasticsearch을 설치해 주십시오.
--component-dir
과--worker-configs
는 STEP 4와 동일하게 적용해 주십시오.
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
STEP 6. Kafka Connect 프로세스 실행
아래 명령어를 입력하여 Kafka Connect 프로세스를 백그라운드로 실행해 주십시오.
/root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
아래 명령어를 입력하여 프로세스가 정상적으로 작동하는지 확인해 주십시오.
curl localhost:8083
{"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
아래 명령어를 입력하여 앞에서 설치한 Connector가 모두 정상적으로 노출되는지 확인해 주십시오.
curl localhost:8083/connector-plugins
[ {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"}, {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"} ]
Kafka Connect 사용
STEP 1. MySQL 테이블 생성 및 데이터 추가
사용 중인 Cloud DB for MySQL 서버에 접속해 주십시오.
아래 명령어를 입력하여 테이블을 생성한 후 데이터를 추가해 주십시오.
- 예제에서는 member라는 이름의 테이블을 생성합니다.
CREATE TABLE IF NOT EXISTS member ( id int NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200) ); INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A'); INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B'); INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B'); INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
STEP 2. MySQL Connector 등록
Kafka Connect가 설치된 서버로 접속해 주십시오.
요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
{ "name": "mysql-connector", // connector의 이름 "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 사용할 플러그인의 종류 "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL 서버의 엔드포인트 "database.port": "3306", // MySQL 서버의 포트 "database.user": "kimdong", // MySQL 서버의 user "database.password": "1234", // MySQL 서버 user의 password "database.server.id": "184054", // kafka connect에서 사용되는 MySQL 서버의 uuid "database.server.name": "NCP_MYSQL", // kafka connect에서 사용할 MySQL 서버에 부여하는 이름, 추후 kafka topic의 접두사로 사용됨 "database.whitelist": "test", // kafka connect에서 접근할 MySQL 서버의 데이터베이스 지정 "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker 노드 정보 "database.history.kafka.topic": "this_is_topic", // MySQL 히스토리 변경 내역을 저장할 topic 이름 "snapshot.locking.mode": "none", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } }
아래 명령어를 입력하여 MySQL Connector를 등록해 주십시오.
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}'
STEP 3. Topic 확인
Cloud Data Streaming Service에서 제공하는 CMAK의 엔드포인트로 접속해 주십시오.
- CMAK에 대한 자세한 내용은 Cloud Data Streaming Service 사용 가이드를 확인해 주십시오.
STEP 2에서 설정한 body의 정보대로 Topic이 생성된 것을 확인해 주십시오.
- STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.
STEP 4. Elasticsearch Connector 등록
Kafka Connect가 설치된 서버로 접속해 주십시오.
요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
{ "name": "es-connector", // connector의 이름 "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 사용할 플러그인의 종류 "connection.url": "http://10.0.100.9:9200", // topic을 가져올 kafka 브로커 노드 "tasks.max": "1", "topics": "NCP_MYSQL.test.member", // 컨슈밍할 topic 이름 "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", // MySQL 테이블에서 사용하는 pk명 "behavior.on.null.values": "IGNORE" } }
아래 명령어를 입력하여 Elasticsearch Connector를 등록해 주십시오.
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
STEP 5. Elasticsearch 데이터 확인
Search Engine Service에서 제공하는 Kibana의 엔드포인트로 접속해 주십시오.
인덱스 목록을 조회하여 ncp_mysql.test.member라는 이름의 인덱스가 생성된 것을 확인해 주십시오.
ncp_mysql.test.member 인덱스의 내용을 조회해 주십시오.
ncp_mysql.test.member 인덱스의 특정 도큐먼트를 조회해 주십시오.
- MySQL에서 추가한 데이터는
_source.after
에서 확인할 수 있습니다.
- MySQL에서 추가한 데이터는