Logstash를 활용한 Cloud Data Streaming Service 연동

Prev Next

VPC 환경에서 이용 가능합니다.

이 가이드는 LogStash를 사용하여 Cloud Data Streaming Service의 데이터를 Search Engine Service로 전송하고 확인하는 방법을 설명합니다.

이 가이드를 수행하기 전에 이용 신청을 완료해야 하는 작업은 다음과 같습니다.

  • VPC 및 서버 생성
  • Cloud Data Streaming Service 클러스터 생성
  • Search Engine Service 클러스터 생성

예제로는 Server에 Logstash를 구동한 후, Kafka의 데이터를 Search Engine Service에 전송하는 방법을 다루고 있습니다.

Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

  1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
  2. ACG 목록에서 'cdss-b-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
  3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
    cdss-5-4_ko
    • 프로토콜: TCP
    • 접근 소스: Logstash가 실행될 서버의 IP
    • 허용 포트: 9092
  4. [적용] 버튼을 클릭해 주십시오.

Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

  1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
  2. ACG 목록에서 'searchengine-m-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
  3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
    cdss-5-6_ko
    • 프로토콜: TCP
    • 접근 소스: Logstash가 실행될 서버의 IP
    • 허용 포트: 9200
  • 설치 과정에서 ElasticSearch 와 OpenSearch의 과정이 함께 표현되어 있습니다.
  • 사용하는 버전에 맞춰 설치하셔야 정상적인 테스트가 가능합니다.
  1. 다음 명령어를 입력하여 java를 설치해 주십시오.
yum install java-devel -y
Command

Logstash를 설치하는 방법은 다음과 같습니다.

  1. 다음 명령어를 입력하여 /root 경로에 Logstash를 다운로드해 주십시오.
# Elasticsearch 버전인 경우(OSS 버전 설치)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm

# OpenSearch 버전인 경우
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
Command
  1. 다음 명령어를 입력하여 다운로드한 파일을 설치합니다.
# Elasticsearch 버전인 경우
rpm -ivh logstash-oss-7.7.0.rpm

# OpenSearch 버전인 경우
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
Command
  1. 다음 명령어를 입력하여 Logstash 구동 전 logstash.conf 파일을 수정해 주십시오.
  • Elasticsearch 버전인 경우
mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf
vi /etc/logstash/conf.d/logstash.conf
Command
  • ElasticSearch 버전 logstash.conf
input {
 kafka {
  bootstrap_servers => "${bootstrap_servers}"
  topics => "cdss_topic"
 }
}

output {
  elasticsearch {
    hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"]
    index => "cdss-%{+YYYY.MM.dd}"
  }
}
Script
  • OpenSearch 버전인 경우
# /root/ 에 설치하는 경우 {설치경로}는 /root/logstash-7.16.3 입니다. 
mv {설치경로}/config/logstash-sample.conf {설치경로}/config/logstash.conf
vi {설치경로}/config/logstash.conf
Command
  • OpenSearch 버전 logstash.conf
input {
 kafka {
  bootstrap_servers => "${bootstrap_servers}"
  topics => "cdss_topic"
 }
}

output {
  opensearch {
    hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"]
    index => "cdss-%{+YYYY.MM.dd}"
    user => ${userID}
    password => ${password}
    ssl_certificate_verification => false
  }
}
Script
  • Logstash Conf Comment
${bootstrap_servers} - Cloud Data Streaming Service 브로커 노드의 IP:Kafka port를 입력합니다.  ex) 172.16.19.6:9092,172.16.19.7:9092,172.16.19.8:9092
${ses manager node1 ip} - Search Engine Service 매니저 노드의 IP를 입력합니다.
${ses manager node2 ip} - Search Engine Service 매니저 노드의 IP를 입력합니다(매니저 노드가 이중화되어 있지 않을 경우 입력하지 않습니다)
${userID} - OpenSearch의 경우 클러스터 생성시 입력한 ID 입니다.
${password} - OpenSearch의 경우 클러스터 생성시 입력한 password 입니다.

Plain text
# Elasticsearch 버전인 경우
systemctl start logstash

# OpenSearch 버전인 경우
# 백그라운드 실행을 위해 nohup 을 사용합니다.
# -f 옵션을 사용하여 logstash.conf 의 경로를 지정해 주셔야 합니다.
nohup {설치경로}/bin/logstash -f ~{설치경로}/config/logstash.conf &
Command
  • Java 설치하기
yum install java-devel -y
Command
  • Kafka Binary 코드 설치
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz 
Command
  • Produce
./kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list ${broker list} --topic cdss_topic
>this is my first message
>this is my second message

# ${broker list} 는 브로커 노드의 IP:Kafka 포트를 입력합니다. ex) 172.16.19.6:9092,172.16.19.7:9092,172.16.19.8:9092
Command
GET cdss-2022.08.08/_search
Command
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "cdss-2022.08.08",
        "_type" : "_doc",
        "_id" : "VtmKe4IBicE7MyrTaKJ5",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2022-08-08T03:40:44.335Z",
          "message" : "this is my first message"
        }
      },
      {
        "_index" : "cdss-2022.08.08",
        "_type" : "_doc",
        "_id" : "V9mKe4IBicE7MyrTg6IW",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2022-08-08T03:40:51.248Z",
          "message" : "this is my second message"
        }
      }
    ]
  }
}
Plain text
  • Cloud Data Streaming Service 에서 SSL을 사용하는 경우 인증서를 추가 하여 설정이 가능합니다.
  • INPUT 기준의 예제 입니다.
input {
  kafka {
    bootstrap_servers => “{BrokerNode-HostName}:9093” 
    topics => "test“
    ssl_truststore_location => "/etc/logstash/conf.d/kafka.client.truststore.jks"
    ssl_truststore_password => “${password}"
    security_protocol => "SSL"
  }  
}
Plain text
  • Example Comment
${BrokerNode-HostName} - 클러스터 목록 -> Broker 노드정보 -> 상세보기 -> TLS 에서 확인 가능합니다.
ex) "networktest12-d-251:9093,networktest12-d-252:9093,networktest12-d-253:9093"
${password} - 별도의 인증서 비밀번호 
Plain text