들어가며
이번 장은 ⌜빅데이터를 지탱하는 기술⌟ 리뷰의 마지막 장으로, 지금까지 배운 내용들을 기반으로 실제 실습을 진행해보도록 하겠습니다.
1. Spark를 통한 대화식 애드 혹 분석
스키마리스 데이터 수집
먼저, 실제 데이터를 수집하는 과정이 필요합니다. 책에서 사용한 twitter streaming api는 무료계정으로 사용하는 것이 불가능하여, reddit
의 hot api
로 테스트 하였습니다. 코드 자체는 동일합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# reddit api docs : https://www.reddit.com/dev/api
import datetime
import json
import pymongo
import requests
import tqdm
import configparser
# config = configparser.ConfigParser() -> token 내 %문제로 ConfigParser 사용할 수 없음
config = configparser.RawConfigParser()
config.read('config.ini')
# api keys
CLIENT_ID = config['REDDIT_DEFAULT']['CLIENT_ID']
CLIENT_SECRET = config['REDDIT_DEFAULT']['CLIENT_SECRET']
USERNAME = config['REDDIT_DEFAULT']['USERNAME']
PASSWORD = config['REDDIT_DEFAULT']['PASSWORD']
AUTH_TOKEN = config['REDDIT_DEFAULT']['AUTH_TOKEN']
def bearer_oauth(r):
"""
Method required by bearer token authentication.
"""
r.headers["Authorization"] = f"Bearer {AUTH_TOKEN}"
return r
base_url = 'https://oauth.reddit.com/hot'
response = requests.get(base_url, auth=bearer_oauth, stream=True)
# save in mongo
mongo = pymongo.MongoClient()
for line in tqdm.tqdm(response.iter_lines(), unit='reddit', mininterval=10):
if line:s
reddit = json.loads(line)
reddit['_timestamp'] = datetime.datetime.utcnow().isoformat()
mongo.reddit.sample.insert_one(reddit)
prerequisites 1)MongoDB version 4.0 or later 2)Spark version 3.1 through 3.2.4 3)Java 8 or later
1
2
3
4
5
6
7
8
9
10
11
12
# pips
pip3 install pyspark==3.1.2
pip3 install install ipywidgets ipykernel metakernel py4j pandas
pip3 install "IPython<8.0.0"
pip3 install pyspark_kernel
# jars
146K Apr 2 22:02 mongodb-driver-sync-4.8.2.jar
1.5M Apr 2 22:03 mongodb-driver-core-4.8.2.jar
496K Apr 2 22:03 bson-4.8.2.jar
13K Apr 2 22:03 bson-record-codec-4.8.2.jar
173K Apr 3 08:12 mongo-spark-connector_2.12-10.2.2.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import SparkSession
spark = SparkSession \
.builder.master("myApp") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:10.2.0') \
.getOrCreate()
df = spark.read.format('mongodb')\
.option("spark.mongodb.read.database", "reddit") \
.option("spark.mongodb.read.collection", "sample") \
.load()
df.createOrReplaceTempView('reddit')
# query = '''
# select subreddit, count(*)
# from reddit
# where score >= 0
# group by 1
# order by 2 desc
# '''
# spark.sql(query).show(5)
데이터를 수집한 결과는 위와 같습니다. 수집한 데이터를 pyspark에서 조회하면 다음과 같습니다.
쿼리 수행속도 비교에 적절한 데이터 용량은 물론 아니지만, MongoDB의 경우 열 지향 스토리지와 같이 칼럼 단위 읽기에 최적화 된 DB는 아닙니다. 따라서 고속 집계에는 적합하지 않으므로 최적화를 위해서 한 차례 데이터를 추가로 추출해야 합니다.
spark 를 통한 데이터의 가공
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyspark.sql import SparkSession
spark = SparkSession \
.builder.master("myApp") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:10.2.0') \
.getOrCreate()
df = spark.read.format('mongodb')\
.option("spark.mongodb.read.database", "reddit") \
.option("spark.mongodb.read.collection", "sample") \
.load()
df.createOrReplaceTempView('reddit')
from pyspark.sql import Row
def text_split(row):
for word in row.title.split():
yield Row(time=row.timeline, word=word)
query = '''
select _timestamp timeline, title
from reddit
'''
titles_from_reddit = spark.sql(query)
# titles_from_reddit.show(5)
# titles_from_reddit.rdd.take(1)
titles_from_reddit.rdd.flatMap(text_split).take(2)
titles_from_reddit.rdd.flatMap(text_split).toDF().show(2)
그 다음은 flatMap() 내에서 텍스트 분해를 위한 함수를 실행하고, 다시 데이터 프레임을 만듭니다.
1
2
3
4
5
6
7
8
9
10
words = titles_from_reddit.rdd.flatMap(text_split).toDF()
words.createOrReplaceTempView('words')
query = '''
select word, count(*)
from words group by 1 order by 2 desc;
'''
spark.sql(query).show(3)
분해된 데이터는 어느정도 정리된 상태에서 물리적인 테이블로 보관하도록 합니다.
1
2
3
4
5
6
7
8
9
words.write.saveAsTable('reddit_sample_words')
import subprocess
result = subprocess.getoutput('ls -R spark-warehouse')
result
# 결과
'reddit_sample_words\n\nspark-warehouse/reddit_sample_words:\n_SUCCESS\npart-00000-8765ec86-3309-4854-8645-8f93a880f744-c000.snappy.parquet'
데이터 집계에서 더 나아가, 시각화에 적합한 데이터 마트를 만듭니다. 여기서는 선택지가 몇가지 있습니다.
- Spark에 ODBC/JDBC로 접속하기
- MPP 데이터베이스에 비정규화 테이블 만들기
- 데이터를 작게 집약하여 CSV 파일에 출력하기
1
2
3
4
5
6
7
8
spark.table('reddit_sample_words').count()
query = '''
select substr(time, 1, 13)time, word, count(*) count
from reddit_sample_words group by 1, 2
'''
spark.sql(query).count()
카디널리티 삭감
카디널리티가 높은 칼럼은 데이터 제약의 방해가 됩니다. 따라서 사용하는 빈도가 낮은 데이터는 따로 분리하여 보관할 필요가 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# query = '''
# select t.count, count(*) words
# from (
# select word, count(*) count from reddit_sample_words group by 1
# ) t
# group by 1 order by 1
# '''
query = '''
select word, count,
if(count > 1000, word, concat('count=',count)) category
from (
select word, count(*) count from reddit_sample_words group by 1
)t
'''
spark.sql(query).show(10)
spark.sql(query).createOrReplaceTempView('word_category')
여기까지 왔다면 좀 더 집약이 가능하다. 1시간마다 카테고리별로 그룹화하여 집계하여보자.
1
2
3
4
5
6
7
8
query='''
select substr(a.time, 1, 13) time, b.category, count(*) count
from reddit_sample_words a
left join word_category b on a.word = b.word
group by 1,2
'''
spark.sql(query).count()
결과적으로 집계된 결과를 CSV 파일로 작성하도록 합니다. Spark에서 집계는 두가지 방법이 있습니다.
- 표준 spark-csv 라이브러리 사용 ```python
spark.sql(query) .coalesce(1) # 출력 파일은 하나로 한다. .write.format(‘com.databricks.spark.csv’) # csv 형식 사용한다. .option(‘header’, ‘true’) # 헤더 사용 .save(‘csv_output’) # 출력 디렉터리
spark.sql(query).coalesce(1).write.format(‘com.databricks.spark.csv’).option(‘header’, ‘true’).save(‘csv_output’)
import subprocess
result = subprocess.getoutput(‘ls csv_output’)
‘_SUCCESS\npart-00000-a1db7b63-6cdex-4aee-a611-34b2bcf37440-c000.csv’
1
2
3
4
5
6
7
8
9
10
2. pandas df 사용
```python
import pandas
result = spark.sql(query).toPandas()
2. Hive와 Presto를 통한 재구축
3. Airflow를 통한 구성
4. 클라우드 서비스를 이용한 빅데이터 파이프라인
정리하며
참고문헌
- [출처] 니시다 케이스케(Keisuke Nishida), ⌜빅데이터를 지탱하는 기술(BIG DATA WO SASAERU GIJUTSU)⌟, 장성두 옮김, 주식회사 제이펍
- reddit api oauth
- reddit api docs
- jupyter lab & spark
- mongodb connector
- spark jar files