- 인쇄
- PDF
Cloud Data Streaming Service 사용 가이드
- 인쇄
- PDF
VPC 환경에서 이용 가능합니다.
Cloud Data Streaming Service 사용하기
Cloud Data Streaming Service는 설치형 서비스로 별도의 이용 신청/해지가 없으며 클러스터 생성 시 계약이 이루어집니다.
클러스터가 삭제되면 계약이 종료됩니다.
Cloud Data Streaming Service 클러스터 생성하기
① Cloud Data Streaming Service 메뉴에서 [Cluster 생성] 버튼을 클릭합니다.
Cloud Data Streaming Service 클러스터 설정 정보 입력하기
- 클러스터 이름을 입력합니다.
- Application 버전을 선택합니다.
- ACG 정보입니다.
- Cloud Data Streaming Service에서 사용되는 ACG는
cdss-클러스터 코드
로 자동으로 생성됩니다.
- Cloud Data Streaming Service에서 사용되는 ACG는
- ConfigGroup 을 설정 합니다.
- ConfigGroup 이 없는 경우 생성이 필요 합니다.
- ConfigGroup 을 생성할 수 있는 페이지를 새창으로 엽니다.
- ConfigGroup은 생성 후 별도의 설정 없이 기본값으로 지정되어 클러스터 생성이 가능합니다.
- ConfigGroup에 대한 상세 설정은 ConfigGroup 사용 가이드를 참고하여 주세요
- Kafka 로그가 수집되며 수집된 로그는 Cloud Log Analytics에서 확인할 수 있습니다.
- 11월 25일 이전에 생성된 클러스터는 로그 수집이 되지 않고 있습니다. 로그 수집을 원한다면 고객센터에 문의해주세요.
- Kafka Broker Port입니다.
9092
으로 설정되며 변경할 수 없습니다.
- Kafka Broker TLS Port입니다.
9093
으로 설정되며 변경할 수 없습니다.
- Zookeeper Port입니다.
2181
으로 설정되며 변경할 수 없습니다.
- CMAK Port입니다.
9000
으로 설정되며 변경할 수 없습니다.
- CMAK에 접속하기 위한 ID를 입력합니다.
- CMAK 접속용 Password를 입력합니다.
- CMAK 접속용 Password를 한 번 더 입력합니다.
- 노드 설정 정보를 입력하기 위해 다음을 클릭합니다.
Cloud Data Streaming Service 노드 설정하기
① OS를 선택합니다.
② VPC를 선택합니다.
③ 매니저 노드 서버의 Subnet을 선택합니다.
- Public subnet을 선택할 수 있습니다.
④ 매니저 노드 서버 개수의 정보입니다.
- 1로, 값을 변경할 수 없습니다.
⑤ 매니저 노드 서버 타입을 선택합니다.
- 선택 가능한 서버 타입이 나타납니다.
⑥ Broker 노드 서버의 Subnet을 선택합니다.
- Private subnet을 선택할 수 있습니다.
⑦ Broker 노드 개수를 입력합니다.
- 최소 3대부터 최대 10대까지 생성이 가능합니다.
- 기본 설정은 3대입니다.
⑧ Broker 노드 서버 타입을 선택합니다.
- 선택 가능한 서버 타입이 나타납니다.
⑨ Broker 노드 스토리지 용량입니다.
- OS 스토리지가 아닌 별도의 BlockStorage를 사용합니다.
- BlockStorage는 브로커 노드 당 최대 2000GB까지 사용 가능합니다.
⑩ 최종 정보를 확인하기 위해 [다음] 을 클릭합니다.
입력 정보 최종 확인하기
① 입력된 정보가 맞는지 확인합니다.
② 클러스터 생성을 위해 [클러스터 생성] 버튼을 클릭합니다.
Cloud Data Streaming Service 클러스터 생성 확인
① Cloud Data Streaming Service 클러스터 생성이 시작되었습니다.
- 생성 시간은 수 분에서 수십 분 정도 소요될 수 있습니다.
Cloud Data Streaming Service 클러스터 관리하기
생성된 Cloud Data Streaming Service의 [삭제], [노드 추가], [CMAK 접속], [CMAK 접속 패스워드 초기화], [CMAK 접속 도메인 설정 변경], [서비스 재시작] 기능입니다.
생성된 클러스터 확인
① 클러스터 생성이 완료되면 클러스터 요약 정보가 나타나며 해당 클러스터를 선택하면 하단의 상세 정보가 나타납니다.
② 클러스터의 상세 정보 정보를 확인할 수 있습니다.
- ACG 정보와 CMAK 바로 가기, 암호화 통신에 사용 될 인증서를 다운받을 수 있습니다.
- 클러스터가 선택되면 상단의 메뉴 [삭제]/[클러스터 관리]/[클러스터 재시작] 메뉴가 활성화됩니다.
③ 선택 클러스터 삭제 메뉴입니다.
④ 클러스터 관리 메뉴입니다
⑤ 선택 클러스터 재시작 메뉴입니다.
클러스터 삭제 메뉴
① 클러스터를 선택 후 삭제 메뉴를 선택 하면 선택된 클러스터 리스트를 확인하고, 최종 삭제를 진행 할지 여부를 선택합니다.
② 실제 클러스터를 삭제 할지 여부를 선택합니다.
클러스터 관리 메뉴
① 클러스터를 선택 후 클러스터 관리 메뉴를 선택 시 [브로커 노드 추가], [CMAK 접속], [CMAK 접속 패스워드 초기화], [CMAK 접속 도메인 설정 변경] 메뉴가 나타납니다.
브로커 노드 추가
① [브로커 노드 추가] 메뉴를 선택하면 노드를 추가할 수 있는 화면이 나타납니다.
② 최초 현재 클러스터의 브로커 노드 수가 나타나며 최대 10대까지 증설을 선택할 수 있습니다.
- 예: 추가할 노드 수 1 선택 시 기존 3대에 1대의 브로커 노드가 추가됩니다.
③ 브로커 노드 증설을 실행합니다.
① 노드 증설이 시작되면 서버 생성이 진행되며 서버 상태가 변경중으로 표시됩니다.
Broker 노드 Public Endpoint 설정 변경
① [Broker 노드 Public Endpoint 설정 변경] 메뉴를 선택하면 Broker 노드의 Public Endpoint로 설정할 수 있는 로드밸런서의 목록이 나타납니다.
- Network 타입, Public IP를 사용중인 로드밸런서의 목록만 선택할 수 있습니다.
- 클러스터에서 이미 사용중일 경우 해당 로드밸런서를 선택할 수 없습니다.
- Sub Account 계정으로 접속한 경우, View/getLoadBalancerInstanceDetail 권한이 없을 경우 해당 로드밸런서를 선택할 수 없습니다.
② 로드밸런서 목록이 보이지 않는다면 로드밸런서 신규 생성 버튼을 클릭하여 로드밸런서를 생성합니다.
③ 확인 버튼을 눌러 Broker 노드의 설정 파일을 변경합니다.
Target Group 생성
① Target Group을 생성하기 위해 먼저 Broker 노드 정보 버튼을 클릭하여 Broker 노드의 정보를 확인합니다.
① Public Endpoint PlainText Listener Port의 정보를 확인합니다.
- 각각의 Broker 노드와 대응되는 Port를 확인합니다.
- 암호화 통신을 이용할 경우, Public Endpoint TLS Listener Port의 정보를 확인합니다.
① Target Group 생성 버튼을 클릭합니다.
① Target Group 이름을 입력합니다.
② VPC Server Target 유형을 선택합니다.
③ Cloud Data Streaming Service Cluster를 생성한 VPC와 동일한 VPC를 선택합니다.
④ TCP 프로토콜을 선택합니다.
⑤ 앞에서 확인한 Broker 노드와 대응되는 Port 중 하나를 입력합니다.
- 각각의 Broker 노드와 대응되는 Port로 생성한 Target Group이 필요합니다.
- Broker 노드개 3개라면, 총 3개의 Target Group이 필요합니다.
⑥ 다음 버튼을 클릭합니다.
① TCP 프로토콜을 선택합니다.
② 앞 단계에서 입력한 Port를 입력합니다.
③ 다음 버튼을 클릭합니다.
① 앞 단계에서 입력한 Port와 대응되는 Broker 노드를 선택합니다.
- Broker 노드와 대응되는 Port는 Cloud Data Streaming Service의 Broker 노드 정보을 통해 확인할 수 있습니다.
② 다음 버튼을 클릭합니다.
① 설정한 정보를 확인합니다.
② [Target Group 생성] 버튼을 클릭합니다
① 위의 과정을 Broker 노드의 개수만큼 반복하여 Target Group을 생성합니다.
Load Balancer 생성
① [로드밸런서 생성] 버튼을 클릭합니다.
② [네트워크 로드밸런서 생성] 버튼을 클릭합니다.
- 다른 타입의 로드밸런서를 생성할 경우, Cloud Data Streaming Service에 노출되지 않습니다.
① 로드밸런서 이름을 입력합니다.
② Public IP를 선택합니다.
③ Cloud Data Streaming Service Cluster를 생성한 VPC와 동일한 VPC를 선택합니다.
④ Load Balancer 전용 Subnet을 선택합니다.
⑤ [다음] 버튼을 클릭합니다.
① Broker 노드에 대응되는 Port 중 하나를 입력합니다.
- 입력되지 않은 다른 Port에 대해서는 로드밸런서 생성 후 Listener 추가를 통해 설정합니다.
② [다음] 버튼을 클릭합니다.
① 앞 단계 리스너 설정 단계에서 등록한 로드 밸런서 포트와 대응되는 Target Group을 선택합니다.
② [다음] 버튼을 클릭합니다.
① 생성된 정보가 맞는지 확인합니다.
② [로드 밸런서 생성] 버튼을 클릭합니다.
① 남은 Target Group에 대하여 추가적으로 리스너 설정을 하기 위해 [리스너 설정 변경] 버튼을 클릭합니다.
① [리스너 추가] 버튼을 클릭합니다.
① TCP 프로토콜을 선택합니다.
② Broker 노드에 대응되는 Port 중 설정하지 않은 Port를 입력합니다.
③ ②단계에서 입력한 Port에 대응하는 Target Group을 선택합니다.
④ [확인] 버튼을 눌러 리스너를 추가합니다.
① 생성한 Target Group에 대하여 모두 리스너를 등록합니다.
ACG 설정하기
① 브로커 노드의 ACG를 확인합니다. ACG 설정은 ACG 바로가기 메뉴를 통해 변경할 수 있습니다.
① 로드밸런서와 외부에서 Broker 노드에 접근하기 위한 IP에 대한 ACG 규칙 설정을 합니다.
② [ACG 설정] 을 클릭하여 규칙 설정 화면으로 이동합니다.
로드밸런서와 외부에서 Broker 노드에 접근하기 위한 IP, 2개의 ACG Rule을 생성합니다.
① TCP 프로토콜을 선택합니다.
② 접근 소스 정보를 입력합니다.
- 로드밸런서에서 접근을 허용하기 위해 로드밸런서 전용 Subnet의 IP 주소 범위를 입력합니다.
- Broker 노드에 접근이 필요한 대역의 IP를 접근 소스에 입력합니다.
모든 IP 주소 대역을 포함한 인터넷 전체를 대상으로(IP 주소: 0.0.0.0/0) 접근 소스를 허용하는 경우, 보안상 매우 취약하여 외부로부터 공격이 들어와 클러스터에서 수행 중인 작업에 영향을 미칠 수 있으니 주의가 필요합니다.
③ [myip] 버튼을 클릭해서 자신의 공인 IP 주소를 확인하고 해당 IP 주소를 접근 소스에 등록할 수 있습니다.
④ 허용할 포트를 등록합니다.
- 특정 포트나 범위로 등록이 가능합니다.
⑤ ACG에 대한 메모를 등록합니다.
[추가] 를 클릭하면 아래 리스트에 입력한 정보가 보입니다.
입력한 규칙을 한번 더 확인한 후 [적용] 을 클릭하면 해당 규칙이 ACG에 적용됩니다.
⑥ 로드밸런서에서 Broker 노드에 접근하기 위한 ACG 설정 예시입니다.
기존에 등록되어 있는 규칙은 Cloud Data Streaming Service 관리를 위해 필요한 ACG 규칙입니다.
사용자가 임의로 삭제하거나 수정할 경우 클러스터 동작에 영향을 미칠 수 있으니 주의해 주세요.
CMAK 접속
① 바로가기 버튼을 통해 CMAK에 접속할 수 있습니다.
- CMAK 접속 버튼을 클릭하기 위해서는 CMAK 접속 도메인 설정 변경을 통해 Public 도메인을 활성화해야 합니다.
CMAK 접속 패스워드 초기화
① CMAK 접속용 Password를 잃어 버렸을 경우에 Password를 초기화합니다.
② 새로운 Password를 입력 후 확인을 선택하면 새로운 Password로 CMAK에 접속할 수 있습니다. 적용 시간은 수 분가량 소요될 수 있습니다.
CMAK 접속 도메인 설정 변경
① CMAK 접속 도메인 설정을 변경합니다.
② 현재, Public 도메인이 비활성화 상태일 경우, Public 도메인을 활성화 할 수 있습니다.
③ 현재, Public 도메인이 활성화 상태일 경우, Public 도메인을 비활성화 할 수 있습니다.
- 접속 도메인을 비활성화 할 경우, 외부에서의 CMAK 접속이 차단됩니다.
클러스터 재시작
① 모든 서비스(CMAK, Kafka & Zookeeper), Kafka & Zookeeper, 노드별 Kafka 재시작, CMAK 재시작을 할 수 있는 메뉴입니다.
- 서비스 재시작 시, 데이터 유실, 서비스 지연이 발생할 수 있니다.
모든 서비스 재시작
① 모든 서비스(Kafka & Zookeeper & CMAK)를 재시작합니다.
Kafka and Zookeeper 재시작
① Kafka & Zookeeper를 재시작합니다.
노드별 Kafka 재시작
① Kafka를 재시작 할 서버를 선택합니다.
② 선택한 서버의 Kafka를 재시작합니다.
CMAK 재시작
① CMAK를 재시작합니다.
Cloud Data Streaming Service 활용하기
Cloud Data Streaming Service를 활용하는 방법을 알아봅니다.
CMAK에 접속하여, Topic을 만들고, Broker 노드들의 정보를 확인하고, Cloud Data Streaming Service Cluster에 데이터를 저장하고 조회합니다.
CMAK 접속하기
CMAK에 접속하기 위해선 Public 도메인을 활성화 해야 합니다.
① 클러스터 관리 -> CMAK 접속 도메인 설정 변경 버튼을 클릭합니다.
② [확인] 버튼을 눌러 Public 도메인을 활성화합니다.
① CMAK 바로가기 버튼을 눌러 CMAK에 접속합니다.
① 클러스터 생성 과정에서 입력했던 ID와 Password를 입력합니다.
① 생성한 클러스터에 대한 정보를 확인할 수 있습니다.
① Topic에 대한 정보를 확인할 수 있습니다.
② 브로커 노드에 대한 정보를 확인할 수 있습니다.
- Topic에 대한 정보
- 브로커 노드에 대한 정보
Topic 만들기
① Create 버튼을 눌러 Topic을 만듭니다.
① Topic의 이름을 입력합니다.
② Topic의 Partitions를 입력합니다.
③ Topic의 Replication Factor를 입력합니다.
④ 그 외 Topic의 설정들을 입력합니다.
⑤ Create 버튼을 눌러 Topic을 생성합니다.
생성된 Topic 확인하기
① Topic의 정보를 확인할 수 있습니다.
② Topic 삭제, Partition 추가, 재분배, 설정 변경 등의 조작을 수행할 수 있습니다.
③ 각 브로커 노드에 대한 생성된 Partition 정보를 확인할 수 있습니다.
④ 각 Partition의 Leader와 Replication이 어떤 브로커 노드에 위치하는지 확인할 수 있습니다.
Cloud Data Streaming Service 사용
위와 같은 구조로 Producer VM, Consumer VM을 생성하여 앞에서 생성한 Topic에 대해 데이터를 저장, 조회를 해보겠습니다.
Producer VM, Consumer VM 생성 및 세팅하기
- CentOS 7.3을 기준으로 가이드를 작성하였습니다.
① Cloud Data Streaming Service 클러스터를 생성한 VPC와 동일한 VPC를 선택합니다.
② VM에 접속, 설치를 위해 Public Subnet을 선택합니다.
- 해당 VM에 공인 IP를 할당해야 접속할 수 있습니다.
Apache Kafka, Java, Python을 활용해 데이터를 전송, 저장해보겠습니다.
Java 설치
yum install java -y
Apache Kafka 설치
// 2.4.0 버전
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 압축을 풀어줍니다.
tar -zxvf kafka_2.12-2.4.0.tgz
// 2.8.1 버전
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
# 압축을 풀어줍니다.
tar -zxvf kafka_2.13-2.8.1.tgz
- 선택한 Cloud Data Streaming Service의 버전과 동일한 버전의 Kafka를 이용하시길 권장 드립니다.
브로커 노드 정보 확인하기
① Broker 노드 정보 - 상세 보기 버튼을 클릭합니다.
① 브로커 노드와 암호화 없이 통신을 하는데 사용됩니다.
② 브로커 노드와 암호화 통신에 사용됩니다.
③ 암호화 통신에 이용되는 hosts 파일을 수정하는데 사용됩니다.
브로커 노드의 ACG를 수정합니다.
① 프로토콜을 선택합니다.
② 접근 소스 정보에 Producer VM, Consumer VM의 비공인 IP를 입력합니다.
③ 허용할 포트를 등록합니다.
- 9092-9093을 입력합니다.
④ ACG에 대한 메모를 등록합니다.
추가를 클릭하면 아래 리스트에 입력한 정보가 보입니다.
입력한 규칙을 한번 더 확인한 후 적용을 클릭하면 해당 규칙이 ACG에 적용됩니다.
Cloud Data Streaming Service에 데이터 전송 및 조회하기
Apache Kafka 활용하여 데이터 전송 및 조회하기
앞에서 생성, 세팅한 Producer VM에 접속하여 아래 명령어를 실행합니다.
cd kafka_2.12-2.4.0 # 2.8.1 버전의 경우 'cd kafka_2.13-2.8.1'
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test
전송하고 싶은 메시지 입력하면 브로커 노드들에 해당 메시지들이 저장됩니다.
- 종료를 원할 경우, ctrl + c를 입력합니다.
앞에서 생성, 세팅한 Consumer VM에 접속하여 아래 명령어를 실행합니다.
cd kafka_2.12-2.4.0 # 2.8.1 버전의 경우 'cd kafka_2.13-2.8.1'
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
# [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
- --from-beginning 명령어 사용 시, 해당 Topic에 대한 데이터를 처음부터 모두 조회합니다.
- --from-beginning 명령어를 사용하지 않을 시, 데이터를 조회한 순간부터 입력되는 데이터에 대해서만 조회합니다.
Java 활용하여 데이터 전송 및 조회하기
Java Application을 활용하여 데이터를 전송합니다.
- IntelliJ IDEA를 사용하여 진행하였습니다.
File -> New -> Project 후, Maven을 선택해 Project를 만듭니다.
- 프로젝트 종속성, Java 버전 및 패키징 메서드를 정의하는 pom.xml 파일을 수정합니다.
- 사용자 환경에 따라 pom.xml 파일이 상이할 수 있습니다.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>org.example</groupId>
<artifactId>maventest</artifactId>
<version>1.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- Apache Kafka version in Cloud Data Streaming Service -->
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>KafkaMain</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
KafkaMain.java 작성하기
- Java Application 실행 시, argument로 produce/consume 여부, Topic, Broker lists를 전달합니다.
public class KafkaMain {
public static void main(String[] args) throws IOException {
String topicName = args[1];
String brokers = args[2];
switch(args[0]){
case "produce":
Producer.produce(brokers, topicName);
break;
case "consume":
Consumer.consume(brokers, topicName);
break;
default:
System.out.println("Wrong arguments");
break;
}
System.exit(0);
}
}
Producer.java 작성하기
- 예시로 0~99의 숫자를 전송하였습니다.
public class Producer {
public static void produce(String brokers, String topicName) throws IOException {
// Create Producer
KafkaProducer<String, String> producer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
for(int i=0;i<100;i++){
ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
producer.send(record);
}
producer.close();
}
}
Consumer.java 작성하기
public class Consumer {
public static int consume(String brokers, String topicName) {
// Create Consumer
KafkaConsumer<String, String> consumer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "consumer_group");
properties.setProperty("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
System.out.println(record.value());
}
}
}
}
작성한 Java code를 git에 저장 후, VM에서 git clone 명령어를 통해 code를 다운받습니다.
git clone 자신의 git repository
해당 Java Application을 빌드하기 위해선 Maven 설치가 필요합니다.
yum install maven -y
다운 받은 Java code 폴더로 이동 후, jar 파일을 빌드합니다.
cd kafkatest
mvn clean package
jar 파일을 빌드하고 나면, target 폴더와 target 폴더 내에 jar 파일이 생성됩니다. target 폴더로 이동 후, jar 파일을 실행합니다
cd target
# 데이터 전송하기
java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
예시) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
# 데이터 조회하기
java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
예시) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
Python 활용하여 데이터 전송 및 조회하기
- Python 2.7.5 버전에서 진행하였습니다.
- Python에서 Kafka를 활용하기 위해선 kafka-python package 설치가 필요합니다.
# pip 설치
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py
# kafka-python package 설치
pip install kafka-python
KafkaMain.py 작성하기
import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time
def produce(topicName, brokerLists):
producer = KafkaProducer(bootstrap_servers=brokerLists,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(100):
producer.send(topicName, i)
def consume(topicName, brokerLists):
consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
group_id="test")
for msg in consumer:
print(msg)
action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')
if action == 'produce':
produce(topicName, brokerLists)
elif action == 'consume':
consume(topicName, brokerLists)
else:
print('wrong arguments')
KafkaMain.py 파일을 실행하여 produce 하기
- 예시로 0~99의 숫자를 전송하였습니다.
python KafkaMain.py produce [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# 예시) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
KafkaMain.py 파일을 실행하여 consume 하기
python KafkaMain.py consume [topic] [broker.list]
# [topic]에 CMAK에서 만든 Topic을 입력합니다.
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
# 예시) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
통신 구간 암호화
Apache Kafka 클라이언트와 Apache Kafka 브로커 노드 간에 통신 구간을 암호화하는 방법입니다.
전체적인 프로세스 요약은 다음과 같습니다(① ~ ③의 과정은, 클러스터 생성 시 자동으로 실행됩니다).
① 매니저 노드에서 자체 인증서를 만듭니다.
② 각 브로커 노드에서, 인증서를 생성하고, 인증서 서명 요청을 만듭니다.
③ ②에서 만든 인증서 서명 요청을 매니저 노드가 서명합니다.
④ 클라이언트에서는 ①에서 만든 인증서를 다운로드하고, 인증서에 대한 정보를 갖고 있는 truststore를 생성합니다.
⑤ 암호화 통신에 대한 설정 파일을 작성합니다.
⑥ hosts 파일 수정하기
인증서 다운로드
① 다운로드 버튼을 클릭하여 인증서를 다운로드합니다.
① 확인 버튼을 눌러 인증서를 다운로드합니다.
다운받은 인증서 파일을 Producer, Consumer VM으로 복사합니다.
- Producer, Consumer VM의 /root 경로에 ca-cert라는 이름으로 저장합니다.
Truststore 생성하기
keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
① Enter keystore password: password를 입력합니다.
② Re-enter new password: password를 재입력합니다.
③ Trust this certificate? [no]: yes를 입력합니다.
④ ls 명령어를 통해 kafka.client.truststore.jks 파일이 생성된 것을 확인할 수 있습니다.
암호화 설정 파일 작성하기
# Kafka 폴더에 암호화 설정 파일을 생성합니다.
cd kafka_2.12-2.4.0 # 2.8.1 버전의 경우 'cd kafka_2.13-2.8.1'
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]
위의 내용을 입력한 client-auth.properties 파일을 작성합니다.
- [password]에는 kafka.client.truststore.jks 생성 시 입력했던 password를 입력합니다.
hosts 파일 수정하기
암호화 통신을 하기 위해선 hosts 파일(/etc/hosts)을 수정해야 합니다. /etc/hosts 파일은, 리눅스에서 DNS 서버보다 먼저 호스트명을 IP로 변환하여 주는 파일입니다.
vi /etc/hosts
/etc/hosts 파일에 앞에서 확인했던 브로커 노드 정보의 hosts 파일 정보 복사본을 추가합니다.
예시)
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao
Cloud Data Streaming Service에 통신 구간 암호화하여 데이터 전송하기
Producer VM에서 아래의 명령어를 실행합니다.
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
# [topic]에 CMAK에서 생성한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
전송하고 싶은 메시지 입력하면 브로커 노드들에 해당 메시지들이 저장됩니다.
- 종료를 원할 경우, ctrl + c를 입력합니다.
Cloud Data Streaming Service에 통신 구간 암호화하여 저장된 데이터 조회하기
Consumer VM에서 아래의 명령어를 실행합니다.
cd kafka_2.12-2.4.0 # 2.8.1 버전의 경우 'cd kafka_2.13-2.8.1'
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
# [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
# 예시) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
모니터링
모니터링 서비스는 Cloud Data Streaming Service 서버에 대해 다양한 모니터링 정보 기능을 제공합니다.
- 모니터링 서비스는 Cloud Data Streaming Service에 포함되어 있는 서비스이므로 추가 비용 없이 사용할 수 있습니다.
모니터링 서비스가 제공하는 지표는 다음과 같습니다.
- CDSS Dashboard
그래프명 | 설명 |
---|---|
Topics Total | Topic 총 개수 |
Partitions Total | Partition 총 개수 |
Messages Total | Message 총 개수 |
Messages In(/sec) | 초당 수신된 메시지 수 |
Bytes In(/sec) | 클라이언트로부터 받은 초당 바이트 수 |
Bytes Out(/sec) | 클라이언트에 전송된 초당 바이트 수 |
- OS Dashboard
그래프명 | 설명 |
---|---|
CPU Usage | CPU 사용량 |
Load Average | 서버 부하량 |
Memory Usage | 메모리 사용량 |
Swap | Swap 메모리 발생량 |
Disk Used | 디스크 사용량 |
Disk I/O | Disk의 Input/Output 발생량 |
Network | NIC의 초당 발생량 |
Free Memory | 메모리 여유 공간 (Free + Buffer/Cache) |
Free Storage Space | 스토리지 여유 공간 |
클러스터 모니터링
① 현재 생성되어 있는 클러스터 리스트 중에서 모니터링 정보를 확인할 클러스터를 선택합니다.
② 확인할 모니터링 정보를 선택합니다.
- 기본값은 전체 선택입니다.
③ 지표를 조회할 기간을 설정합니다.
- 기본값은 최근 1시간입니다.
노드별 모니터링
Cloud Data Streaming Service 정보
① Broker Dashboard 정보에 대해서 확인할 노드를 선택합니다.
② Broker Dashboard를 클릭합니다.
③ 확인할 모니터링 정보를 선택합니다.
- 기본값은 전체 선택입니다.
④ 지표를 조회할 기간을 설정합니다.
- 기본값은 최근 1시간입니다.
OS 정보
① OS Dashboard 정보에 대해서 확인할 노드를 선택합니다.
② OS Dashboard를 클릭합니다.
③ 확인할 모니터링 정보를 선택합니다.
- 기본값은 전체 선택입니다.
④ 지표를 조회할 기간을 설정합니다.
- 기본값은 최근 1시간입니다.
Config Group
Config Group 이용가이드
ConfigGroup 서비스는 Cloud Data Streaming Service에서 생성된 Cluster의 Kafka 설정파일의 변경기능을 제공합니다.
- Config Group는 Cloud Data Streaming Service에 포함되어 있는 서비스이므로 추가 비용 없이 사용할 수 있습니다.
- ConfigGroup 을 생성할 수 있습니다.
- 생성된 ConfigGroup 의 상세정보를 수정 할 수 있습니다.
- 생성된 ConfigGroup 을 삭제할 수 있습니다.
- ConfigGroup 을 사용중인 Cluster가 있는 경우에는 삭제가 불가능 합니다.
- ConfigGroup 의 목록을 확인할 수 있습니다.
- 생성된 ConfigGroup 의 이름 입니다.
- ConfigGroup 을 적용할 수 있는 Kafka의 Version입니다. Cluster의 Kafka Version이 동일한 경우에만 ConfigGroup 을 설정할 수 있습니다.
- ConfigGroup 에 대한 Description 입니다. 사용자가 원하는 정보를 기록할 수 있습니다. (최대 255자)
- Description 은 "EDIT" 버튼을 클릭하여 자유롭게 수정이 가능합니다.
- ConfigGroup 을 사용중인 Cluster 의 목록을 확인 할 수 있습니다.
- ConfigGroup 을 사용중인 Cluster가 있는 경우에는 삭제가 불가능합니다.
- Cluster의 ConfigGroup 을 변경하거나, 삭제후 ConfigGroup 의 삭제가 가능합니다.
Config Group 생성방법
- Config Group 생성 버튼을 클릭하면 Config Group 생성 팝업이 열립니다.
- ConfigGroup 생성 팝업에서는 ConfigGroup을 생성 할 수 있습니다.
- 생성하려는 ConfigGroup 의 Kafka 버전을 선택합니다.
- ConfigGroup 의 Name 을 입력할 수 있습니다.
- 이름은 영문자(소문자), 숫자, 하이픈(-)으로만 구성가능하며, 첫글자는 영문자 또는 숫자여야 하며, 마지막에 하이픈(-)은 사용할 수 없습니다.
- 영문 최소 3자 이상, 30자 이하로 입력이 가능합니다.
- Description 입력이 가능합니다.
- 사용자가 ConfigGroup 에 남겨두고 싶은 메시지를 자유롭게 남길 수 있습니다.
Config Group 편집방법(상세설정)
- 상세설정을 하려는 ConfigGroup 을 체크 합니다.
- "Config편집" 버튼을 클릭 합니다.
- Config 편집을 위한 팝업이 오픈되며, Kafka의 설정파일 Serverproperties 변경가능한 항목이 표시됩니다.
CloudDataStreamingService 는 Cluster운영을 위한 Config설정을 수정하지 않아도 기본값을 적용하고 있습니다.
Config의 수정은 Kafka대한 지식이 있는 사용자가 수정하는 것을 추천합니다.
Config를 수정하여 Cluster가 정상동작 하지 않는 경우 NAVER Cloud는 운영에 대한 책임을 보장하지 않습니다.
Config 수정 후 Cluster가 정상동작 하지 않는 경우 4. Config 기본값으로 재설정 을 통하여 기본값으로 복구가 가능합니다.
- Config를 기본값으로 재설정 할 수 있습니다. Default 에 지정된 값이 자동으로 입력됩니다.
- Config수정을 완료한 경우 확인을 클릭하여 설정을 저장할 수 있습니다.
변경된 Config를 저장하여도 Cluster에 바로 반영되지 않습니다.
하단의 ConfigGroup 적용을 참고하여 Cluster 재시작 후에 변경된 Config가 Cluster에 반영 됩니다.
Config Group Cluster 적용
- ConfigGroup을 변경한 Cluster를 선택하여 상세정보를 확입니다.
- ConfigGroup 의 팝업열기 아이콘을 클릭합니다.
- ConfigGroup 변경 팝업에서는 설정할 수 있는 ConfigGroup의 목록을 확인 할 수 있습니다.
- 변경할 ConfigGroup 을 선택합니다.
- 확인 버턴을 클릭하면 Cluster에 ConfigGroup 의 변경된 내용이 반영되어 Cluster가 재시작 됩니다.
- Cluster 재시작은 서비스의 일시적인 서비스 중단을 가져오기 때문에 재확인 팝업을 통하여 최종적으로 재시작 됩니다.