VPC 환경에서 이용 가능합니다.
Kafka Connect를 설치하여 Cloud Data Streaming Service를 응용하는 방법을 안내합니다.
이 가이드는 사용자의 편의를 위해 작성된 예제입니다. 이 가이드에서 사용된 connector 및 confluent-hub에 대한 라이선스는 confluent 사에 있으며 confluent 사의 라이선스 정책을 따릅니다. 네이버클라우드는 해당 connector 및 confluent-hub를 직접 제공하는 사업자가 아니며 사용자는 해당 connector 및 confluent-hub 사용 여부를 직접 결정할 수 있습니다.
Kafka Connect란?
Kafka Connect는 데이터베이스, 키-밸류 스토어, 파일 시스템과 같은 외부 시스템을 Kafka에 연결해 주는 구성 요소로, Apache Kafka에서 제공하는 오픈 소스입니다. Kafka Connect를 이용하면 다양한 제품을 시스템에 쉽게 연동할 수 있어 간편하게 데이터 파이프라인을 구축할 수 있습니다.
시작하기 전에
이 가이드는 서버에 Kafka Connect를 설치한 후 MySQL의 변경사항을 Elasticsearch에 적용하는 방법을 설명합니다.
시작하기 전에 필요한 서비스들이 활성화 되어있어야합니다.
사전 필요 서비스
Server: 물리적인 서버 자원을 별도로 구매하지 않고, 클라우드 환경에서 빠르게 생성해 사용한 만큼만 비용을 지불하는 서비스
Cloud DB for MySQL: 장애가 발생하면 자동으로 복구하는 완전 관리형 클라우드 데이터베이스 서비스
Cloud Data Streaming Service: 오픈 소스 Apache Kafka 클러스터를 손쉽게 배포, 운영 및 확장 가능한 서비스
Search Engine Service: Elasticsearch 클러스터를 손쉽게 배포, 운영 및 확장 가능한 서비스
네트워크 설정
네트워크 설정 방법은 안내합니다.
STEP 1. Cloud DB for MySQL 설정
-
Services > Database > Cloud DB for MySQL에서 [DB 관리] 버튼을 클릭해 주십시오.
-
DB User 관리를 클릭해 주십시오.
-
필요한 정보를 입력 후 [DB User 추가] 버튼을 클릭해 주십시오.
- Cloud DB for MySQL에 대한 자세한 설명은 Cloud DB for MySQL 사용 가이드를 참고해 주십시오.
STEP 2. ACG 설정
-
Services > Compute > Server > ACG 메뉴에서 보유하고 있는 'cdss-b-xxxxx'을 선택해 주십시오.
- Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정합니다.
-
[ACG 설정] 버튼을 클릭한 후 아래 ACG 규칙을 추가해 주십시오.
- 프로토콜: TCP
- 접근 소스: Kafka Connect가 실행될 서버의 IP
- 허용 포트: 9092
-
보유하고 있는 'searchengine-m-xxxxx'을 선택해 주십시오.
- Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정합니다.
-
[ACG 설정] 버튼을 클릭한 후 아래 ACG 규칙을 추가해 주십시오.
- 프로토콜: TCP
- 접근 소스: Kafka Connect가 실행될 서버의 IP
- 허용 포트: 9200
서버에 Kafka Connect 설치
STEP 1. Java 설치
-
아래 명령어를 입력하여 yum 업데이트를 진행해 주십시오.
-
아래 명령어를 입력하여 java-1.8.0-openjdk-devel.x86_64를 설치해 주십시오.
-
아래 명령어를 입력하여 정상적으로 설치가 되었는지 확인해 주십시오.
STEP 2. Kafka Connect 설치
-
아래 명령어를 입력하여 서버의
/root
경로에 Kafka Connect를 다운로드해 주십시오. -
아래 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.
-
아래 명령어를 입력하여 구동 전 properties 파일을 수정해 주십시오.
- Cloud Data Streaming Service의 브로커 노드 정보를 참고하여 properties 파일의 ‘bootstrap.servers’에 ip 목록을 추가합니다.
STEP 3. Confluent Hub 설치
Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소입니다. 지원되는 플러그인의 전체 목록은 Confluent에서 제공하는 Confluent Connector Portfolio를 확인해 주십시오.
-
아래 명령어를 입력하여
/root
경로에 새로운 폴더를 만든 후 해당 폴더로 이동해 주십시오.- 예제에서는 'confluent-hub'라는 이름의 폴더를 생성합니다.
-
아래 명령어를 입력하여 현재 경로(
/root/confluent-hub
)에 Confluent Hub를 다운로드해 주십시오. -
아래 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.
-
아래 명령어를 입력하여 현재 경로(
/root/confluent-hub
)에 추후 플러그인이 저장될 폴더를 생성해 주십시오.- 예제에서는 'plugins'라는 이름의 폴더를 생성합니다.
-
아래 명령어를 차례대로 입력하여 PATH 환경변수에 압축 해제한 bin 폴더의 경로를 추가해 주십시오.
STEP 4. MySQL Connector 설치
- 아래 명령어를 입력하여 debezium-connector-mysql을 설치해 주십시오.
--component-dir
은 실제 플러그인이 설치되는 폴더 경로입니다. STEP 3에서 생성한/root/confluent-hub/plugins
로 설정해 주십시오.--worker-configs
는 플러그인 설치 후 적용된 properties 파일의 경로입니다. STEP 2에서 수정했던/root/confluent-7.0.1/etc/kafka/connect-distributed.properties
로 설정해 주십시오.
STEP 5. Elasticsearch Connector 설치
- 아래 명령어를 입력하여 kafka-connect-elasticsearch을 설치해 주십시오.
--component-dir
과--worker-configs
는 STEP 4와 동일하게 적용해 주십시오.
STEP 6. Kafka Connect 프로세스 실행
-
아래 명령어를 입력하여 Kafka Connect 프로세스를 백그라운드로 실행해 주십시오.
-
아래 명령어를 입력하여 프로세스가 정상적으로 작동하는지 확인해 주십시오.
-
아래 명령어를 입력하여 앞에서 설치한 Connector가 모두 정상적으로 노출되는지 확인해 주십시오.
Kafka Connect 사용
STEP 1. MySQL 테이블 생성 및 데이터 추가
-
사용 중인 Cloud DB for MySQL 서버에 접속해 주십시오.
-
아래 명령어를 입력하여 테이블을 생성한 후 데이터를 추가해 주십시오.
- 예제에서는 member라는 이름의 테이블을 생성합니다.
STEP 2. MySQL Connector 등록
-
Kafka Connect가 설치된 서버로 접속해 주십시오.
-
요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
-
아래 명령어를 입력하여 MySQL Connector를 등록해 주십시오.
STEP 3. Topic 확인
-
Cloud Data Streaming Service에서 제공하는 CMAK의 엔드포인트로 접속해 주십시오.
- CMAK에 대한 자세한 내용은 Cloud Data Streaming Service 사용 가이드를 확인해 주십시오.
-
STEP 2에서 설정한 body의 정보대로 Topic이 생성된 것을 확인해 주십시오.
- STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.
STEP 4. Elasticsearch Connector 등록
-
Kafka Connect가 설치된 서버로 접속해 주십시오.
-
요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
-
아래 명령어를 입력하여 Elasticsearch Connector를 등록해 주십시오.
STEP 5. Elasticsearch 데이터 확인
-
Search Engine Service에서 제공하는 Kibana의 엔드포인트로 접속해 주십시오.
-
인덱스 목록을 조회하여 ncp_mysql.test.member라는 이름의 인덱스가 생성된 것을 확인해 주십시오.
-
ncp_mysql.test.member 인덱스의 내용을 조회해 주십시오.
-
ncp_mysql.test.member 인덱스의 특정 도큐먼트를 조회해 주십시오.
- MySQL에서 추가한 데이터는
_source.after
에서 확인할 수 있습니다.
- MySQL에서 추가한 데이터는