- 인쇄
- PDF
Apache Livy로 Spark Job 제출
- 인쇄
- PDF
Classic 환경에서 이용 가능합니다.
Apache Livy는 REST 인터페이스를 이용하여 Spark 클러스터와 쉽게 상호작용할 수 있는 서비스입니다. 간단한 REST 인터페이스 또는 RPC(Remote Procedure Call) 클라이언트 라이브러리를 통하여 Spark Job 또는 Spark 코드 스니펫, 동기/비동기 결과 검색, SparkContext 관리를 쉽게 제출할 수 있습니다.
또한, Apache Livy는 Spark와 애플리케이션 서버 간 상호작용을 단순화하여, 대화형 웹/모바일 애플리케이션에서 Spark를 사용할 수 있도록 도와줍니다.
- 멀티 클라이언트에서 여러 개의 Spark Job을 사용할 수 있도록 SparkContext를 가지고 있습니다.
- 멀티 Job 및 클라이언트에서 캐시된 RDD(Resilient Distributed Dataset) 또는 데이터 프레임을 공유합니다.
- 멀티 SparkContext를 동시에 관리할 수 있습니다. 우수한 내결함성과 동시성을 위해 Livy 서버 대신 SparkContext가 클러스터(YARN/Mesos)에서 실행됩니다.
- 미리 컴파일된 jar, 코드 스니펫 또는 Java/Scala 클라이언트 API를 통해 Job을 제출할 수 있습니다.
- 보안 인증 통신을 이용하여 보안을 확보합니다.
- Apache Livy에 대한 자세한 내용은 Apache Livy 홈페이지를 참고해 주십시오.
- 이미지 출처: https://livy.incubator.apache.org/assets/images/livy-architecture.png
이 가이드에서는 Cloud Hadoop에서 제공하는 Apache Livy를 사용하여 Spark Job을 제출하는 방법을 설명합니다. Cloud Hadoop 클러스터 생성 시 클러스터 Type을 Spark로 선택해 주십시오.
Python 모듈 설치
Spark 예제 코드 수행을 위해서는 먼저 requests
라는 Python 모듈을 설치해 주십시오.
$ sudo yum install -y epel-release
$ sudo yum install -y python-pip
$ sudo pip install requests
Apache Livy 서버 정보 확인
Apache Livy 서버의 포트 정보는 Ambari UI에서 확인할 수 있습니다.
Ambari UI에 접속한 후 Spark2 > CONFIGS > 를 차례대로 클릭해 주십시오.
Advanced livy2-conf 항목을 클릭한 후 livy.server.port 정보를 확인해 주십시오.
Spark 예제 코드
예제 코드는 Apache Livy Examples를 참고하여 작성하였습니다.
- 소스 코드 내용을 livy-test.py로 저장
#-*- coding:utf-8 -*-
import json, pprint, requests, textwrap, time, sys
# Livy2 접속 정보 입력
if len(sys.argv) < 2:
print('ERROR : Livy 서버 접속 정보를 입력해 주세요')
print(' - Usage: python {0} http://호스트명:포트'.format(sys.argv[0]))
sys.exit(1)
host = sys.argv[1]
# 헤더 정보
headers = {'Content-Type': 'application/json'}
# Spark 세션 생성
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
print("Created " + r.headers['location'])
# Spark 세션 상태 확인
state = "notIdle"
session_url = host + r.headers['location']
sys.stdout.write('Waiting for session state to idle')
while state != 'idle':
r = requests.get(session_url, headers=headers)
state = r.json()['state']
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
sys.stdout.flush()
# 테스트 코드 1
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))
output = None
while output == None:
r = requests.get(statement_url, headers=headers)
ret = r.json()
if ret['output'] == None:
time.sleep(1)
continue
if 'output' in ret and 'data' in ret['output']:
output = ret['output']['data']['text/plain']
print('-' * 80)
print(output)
# 테스트 코드 2
data = {
'code': textwrap.dedent("""
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
""")
}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))
output = None
while output == None:
r = requests.get(statement_url, headers=headers)
ret = r.json()
if ret['output'] == None:
time.sleep(1)
continue
if 'output' in ret and 'data' in ret['output']:
output = ret['output']['data']['text/plain']
print('-' * 80)
print(output)
# Spark 세션 종료
print('=' * 80)
r = requests.delete(session_url, headers=headers)
print('{0} {1}'.format(r.json()['msg'], session_url))
예제 코드인 livy-test.py
수행 시에는 아래와 같이 Livy 서버 접속 정보(http://ip:port)를 인자 값으로 입력해 주어야 합니다.
$ python livy-test.py http://ip:port
사용 방법은 아래와 같습니다.
$ python livy-test.py http://172.16.3.22:8999
Created /sessions/47
Sessioin State is Ready!!!!!!!!!!!!!!...........................
================================================================================
http://172.16.3.22:8999/sessions/47/statements/0
Request: 1 + 1
--------------------------------------------------------------------------------
res0: Int = 2================================================================================
http://172.16.3.22:8999/sessions/47/statements/1
Request:
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
NUM_SAMPLES: Int = 100000
count: Int = 78503
Pi is roughly 3.14012================================================================================
deleted http://172.16.3.22:8999/sessions/47