Logstash를 활용한 Cloud Database(MySQL) 연동

Prev Next

VPC 환경에서 이용 가능합니다.

이 가이드는 LogStash를 사용하여 Search Engine Service와 Cloud DB for MySQL간 Data Pipeline 구축 가이드 입니다.

이 가이드를 수행하기 전에 이용 신청을 완료해야 하는 작업은 다음과 같습니다.

  • VPC 및 서버 생성
  • Cloud DB for MySQL 서버 생성
  • Search Engine Service 클러스터 생성

예제로는 Server에 Logstash를 구동한 후, Cloud DB for MySQL의 데이터를 일정 주기 마다 Search Engine Service에 전송하는 방법을 다루고 있습니다.

[DB 관리 > DB User 관리]에서 server 대역으로부터 DB User의 접속이 가능하도록 설정해야 합니다.
ses-mysql-user-screen

Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

  1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
  2. ACG 목록에서 'cloud-mysql-xxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
  3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
    ses-5-4-1-1_ko
    • 프로토콜: TCP
    • 접근 소스: Logstash가 실행될 서버의 IP
    • 허용 포트: DB에서 설정한 포트(Default: 3306)
  4. [적용] 버튼을 클릭해 주십시오.

Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

  1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
  2. ACG 목록에서 'searchengine-m-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
  3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
    cdss-5-6_ko
    • 프로토콜: TCP
    • 접근 소스: Logstash가 실행될 서버의 IP
    • 허용 포트: 9200
  • 설치 과정에서 ElasticSearch 와 OpenSearch의 과정이 함께 표현되어 있습니다.
  • 사용하는 버전에 맞춰 설치하셔야 정상적인 테스트가 가능합니다.
  1. 다음 명령어를 입력하여 java를 설치해 주십시오.
yum install java-devel -y
Command

Logstash를 설치하는 방법은 다음과 같습니다.
반드시 OSS 라이센스로 설치해야 정상적으로 작동합니다.

  1. 다음 명령어를 입력하여 /root 경로에 Logstash를 다운로드해 주십시오.
# Elasticsearch 버전인 경우(OSS 버전 설치)
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.7.0.rpm

# OpenSearch 버전인 경우
wget https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
Command
  1. 다음 명령어를 입력하여 다운로드한 파일을 설치합니다.
# Elasticsearch 버전인 경우
rpm -ivh logstash-oss-7.7.0.rpm

# OpenSearch 버전인 경우
tar -zxvf logstash-oss-with-opensearch-output-plugin-7.16.3-linux-x64.tar.gz
Command
  1. JDBC Driver 설치하기
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.30.zip
unzip mysql-connector-java-8.0.30.zip
mkdir /etc/logstash/plugin
mv /root/mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar /etc/logstash/plugin/ 
Plain text
  1. 다음 명령어를 입력하여 Logstash 구동 전 logstash.conf 파일을 수정해 주십시오.
  • Elasticsearch 버전인 경우
mv /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash.conf
vi /etc/logstash/conf.d/logstash.conf
Command
  • ElasticSearch 버전 logstash.conf
input {
   jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar"
    jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false"
    jdbc_user => "${cdb user name}"
    jdbc_password => "${cdb mysql password}"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => 50
    statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
    record_last_run => true
    clean_run => true
    tracking_column_type => "numeric"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    last_run_metadata_path => "/etc/logstash/data/student"
    schedule => "*/5 * * * * *"
  }
}

output {
  elasticsearch {
    hosts => ["http://${ses manager node1 ip}:9200", "http://${ses manager node2 ip}:9200"]
    index => "cdss-%{+YYYY.MM.dd}"
  }
}
Script
  • OpenSearch 버전인 경우
# /root/ 에 설치하는 경우 {설치경로}는 /root/logstash-7.16.3 입니다. 
mv {설치경로}/config/logstash-sample.conf {설치경로}/config/logstash.conf
vi {설치경로}/config/logstash.conf
Command
  • OpenSearch 버전 logstash.conf
input {
   jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_driver_library => "/etc/logstash/plugin/mysql-connector-java-8.0.30.jar"
    jdbc_connection_string => "jdbc:mysql://${cdb mysql endpoint}:3306/${cdb mysql database name}?useSSL=false"
    jdbc_user => "${cdb user name}"
    jdbc_password => "${cdb mysql password}"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => 50
    statement => "SELECT *, UNIX_TIMESTAMP(update_time) AS unix_ts_in_secs FROM logstash_test WHERE (UNIX_TIMESTAMP(update_time) > :sql_last_value AND update_time < NOW()) ORDER BY update_time ASC"
    record_last_run => true
    clean_run => true
    tracking_column_type => "numeric"
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    last_run_metadata_path => "/etc/logstash/data/student"
    schedule => "*/5 * * * * *"
  }
}

output {
  opensearch {
    hosts => ["https://${ses manager node1 ip}:9200", "https://${ses manager node2 ip}:9200"]
    index => "cdss-%{+YYYY.MM.dd}"
    user => ${userID}
    password => ${password}
    ssl_certificate_verification => false
  }
}
Script
  • Logstash Conf Comment
위의 Sample Logstash Conf 파일은 5초마다 "statement" 구문이 실행되며
"statement" 구문은 가장 마지막에 조회한 row의 update_time보다 최근 업데이트된 row들을 선택하는 쿼리입니다.

${cdb mysql endpoint} - CDB MySQL의 Private 도메인으로 변경합니다.
${cdb mysql database name} - CDB MySQL 내에 본인이 생성한 Database 이름을 입력합니다.
${cdb user name} - CDB MySQL 내에 본인이 생성한 계정 이름을 입력합니다.
${cdb mysql password} - CDB MySQL 내에 본인이 생성한 계정의 패스워드를 입력합니다.
${ses manager node1 ip} - Search Engine Service 매니저 노드의 IP를 입력합니다.
${ses manager node2 ip} - Search Engine Service 매니저 노드의 IP를 입력합니다(매니저 노드가 이중화되어 있지 않을 경우 입력하지 않습니다)
${userID} - OpenSearch의 경우 클러스터 생성시 입력한 ID 입니다.
${password} - OpenSearch의 경우 클러스터 생성시 입력한 password 입니다.
Plain text
  • logstash metadata 경로 생성하기
mkdir /etc/logstash/data
chown -R logstash:logstash /etc/logstash/data
Plain text
  • Table 생성하기
create table logstash_test(
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
contents nvarchar(255),
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
Plain text
  • Data 입력하기
insert into logstash_test(id, contents) values(1, "this is first contents");
insert into logstash_test(id, contents) values(2, "this is second contents");
Plain text
# Elasticsearch 버전인 경우
systemctl start logstash

# OpenSearch 버전인 경우
# 백그라운드 실행을 위해 nohup 을 사용합니다.
# -f 옵션을 사용하여 logstash.conf 의 경로를 지정해 주셔야 합니다.
nohup {설치경로}/bin/logstash -f ~{설치경로}/config/logstash.conf &
Command
GET mysql-2022.08.08/_search
Plain text
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mysql-2022.08.08",
        "_type" : "_doc",
        "_id" : "VNkwe4IBicE7MyrTCKIL",
        "_score" : 1.0,
        "_source" : {
          "contents" : "this is first contents",
          "update_time" : "2022-08-08T01:55:47.000Z",
          "unix_ts_in_secs" : 1659923747,
          "id" : 1,
          "create_time" : "2022-08-08T01:55:47.000Z",
          "@version" : "1",
          "@timestamp" : "2022-08-08T02:02:01.082Z"
        }
      },
      {
        "_index" : "mysql-2022.08.08",
        "_type" : "_doc",
        "_id" : "3yEwe4IBBeKbW6yXB2l_",
        "_score" : 1.0,
        "_source" : {
          "contents" : "this is second contents",
          "update_time" : "2022-08-08T01:59:05.000Z",
          "unix_ts_in_secs" : 1659923945,
          "id" : 2,
          "create_time" : "2022-08-08T01:59:05.000Z",
          "@version" : "1",
          "@timestamp" : "2022-08-08T02:02:01.093Z"
        }
      }
    ]
  }
}
Plain text