Home 6. 빅데이터 분석 기반의 구축
Post
Cancel

6. 빅데이터 분석 기반의 구축

들어가며

이번 장은 ⌜빅데이터를 지탱하는 기술⌟ 리뷰의 마지막 장으로, 지금까지 배운 내용들을 기반으로 실제 실습을 진행해보도록 하겠습니다.

1. Spark를 통한 대화식 애드 혹 분석

스키마리스 데이터 수집

먼저, 실제 데이터를 수집하는 과정이 필요합니다. 책에서 사용한 twitter streaming api는 무료계정으로 사용하는 것이 불가능하여, reddithot api 로 테스트 하였습니다. 코드 자체는 동일합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# reddit api docs : https://www.reddit.com/dev/api

import datetime
import json
import pymongo
import requests
import tqdm
import configparser

# config = configparser.ConfigParser() -> token 내 %문제로 ConfigParser 사용할 수 없음
config = configparser.RawConfigParser()
config.read('config.ini')

# api keys
CLIENT_ID = config['REDDIT_DEFAULT']['CLIENT_ID']
CLIENT_SECRET = config['REDDIT_DEFAULT']['CLIENT_SECRET']
USERNAME = config['REDDIT_DEFAULT']['USERNAME']
PASSWORD = config['REDDIT_DEFAULT']['PASSWORD']
AUTH_TOKEN = config['REDDIT_DEFAULT']['AUTH_TOKEN']


def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {AUTH_TOKEN}"
    return r


base_url = 'https://oauth.reddit.com/hot'

response = requests.get(base_url, auth=bearer_oauth, stream=True)

# save in mongo
mongo = pymongo.MongoClient()
for line in tqdm.tqdm(response.iter_lines(), unit='reddit', mininterval=10):
    if line:s
        reddit = json.loads(line)
        reddit['_timestamp'] = datetime.datetime.utcnow().isoformat()
        mongo.reddit.sample.insert_one(reddit)

prerequisites 1)MongoDB version 4.0 or later 2)Spark version 3.1 through 3.2.4 3)Java 8 or later

1
2
3
4
5
6
7
8
9
10
11
12
# pips
pip3 install pyspark==3.1.2
pip3 install install ipywidgets ipykernel metakernel py4j pandas
pip3 install "IPython<8.0.0"
pip3 install pyspark_kernel

# jars
146K Apr  2 22:02 mongodb-driver-sync-4.8.2.jar
1.5M Apr  2 22:03 mongodb-driver-core-4.8.2.jar
496K Apr  2 22:03 bson-4.8.2.jar
 13K Apr  2 22:03 bson-record-codec-4.8.2.jar
173K Apr  3 08:12 mongo-spark-connector_2.12-10.2.2.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder.master("myApp") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:10.2.0') \
    .getOrCreate()

df = spark.read.format('mongodb')\
    .option("spark.mongodb.read.database", "reddit") \
    .option("spark.mongodb.read.collection", "sample") \
    .load()

df.createOrReplaceTempView('reddit')

# query = '''
#     select subreddit, count(*)
#     from reddit
#     where score >= 0
#     group by 1
#     order by 2 desc
# '''

# spark.sql(query).show(5)

reddit sample data reddit sample data

reddit sample data reddit sample data2

데이터를 수집한 결과는 위와 같습니다. 수집한 데이터를 pyspark에서 조회하면 다음과 같습니다.

쿼리 수행속도 비교에 적절한 데이터 용량은 물론 아니지만, MongoDB의 경우 열 지향 스토리지와 같이 칼럼 단위 읽기에 최적화 된 DB는 아닙니다. 따라서 고속 집계에는 적합하지 않으므로 최적화를 위해서 한 차례 데이터를 추가로 추출해야 합니다.

spark 를 통한 데이터의 가공

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder.master("myApp") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/reddit.sample") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:10.2.0') \
    .getOrCreate()

df = spark.read.format('mongodb')\
    .option("spark.mongodb.read.database", "reddit") \
    .option("spark.mongodb.read.collection", "sample") \
    .load()

df.createOrReplaceTempView('reddit')

from pyspark.sql import Row

def text_split(row):
    for word in row.title.split():
        yield Row(time=row.timeline, word=word)

query = '''
    select _timestamp timeline, title
    from reddit
'''

titles_from_reddit = spark.sql(query)
# titles_from_reddit.show(5)
# titles_from_reddit.rdd.take(1)

titles_from_reddit.rdd.flatMap(text_split).take(2)
titles_from_reddit.rdd.flatMap(text_split).toDF().show(2)

그 다음은 flatMap() 내에서 텍스트 분해를 위한 함수를 실행하고, 다시 데이터 프레임을 만듭니다.

1
2
3
4
5
6
7
8
9
10
words = titles_from_reddit.rdd.flatMap(text_split).toDF()
words.createOrReplaceTempView('words')

query = '''
    select word, count(*)
    from words group by 1 order by 2 desc;
'''

spark.sql(query).show(3)

분해된 데이터는 어느정도 정리된 상태에서 물리적인 테이블로 보관하도록 합니다.

1
2
3
4
5
6
7
8
9
words.write.saveAsTable('reddit_sample_words')

import subprocess

result = subprocess.getoutput('ls -R spark-warehouse')
result

# 결과
'reddit_sample_words\n\nspark-warehouse/reddit_sample_words:\n_SUCCESS\npart-00000-8765ec86-3309-4854-8645-8f93a880f744-c000.snappy.parquet'

데이터 집계에서 더 나아가, 시각화에 적합한 데이터 마트를 만듭니다. 여기서는 선택지가 몇가지 있습니다.

  • Spark에 ODBC/JDBC로 접속하기
  • MPP 데이터베이스에 비정규화 테이블 만들기
  • 데이터를 작게 집약하여 CSV 파일에 출력하기
1
2
3
4
5
6
7
8
spark.table('reddit_sample_words').count()

query = '''
select substr(time, 1, 13)time, word, count(*) count
from reddit_sample_words group by 1, 2
'''

spark.sql(query).count()

카디널리티 삭감

카디널리티가 높은 칼럼은 데이터 제약의 방해가 됩니다. 따라서 사용하는 빈도가 낮은 데이터는 따로 분리하여 보관할 필요가 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# query = '''
# select t.count, count(*) words
# from (
#     select word, count(*) count from reddit_sample_words group by 1
# ) t
# group by 1 order by 1
# '''

query = '''
select word, count,
    if(count > 1000, word, concat('count=',count)) category
from (
    select word, count(*) count from reddit_sample_words group by 1
)t
'''

spark.sql(query).show(10)

spark.sql(query).createOrReplaceTempView('word_category')

여기까지 왔다면 좀 더 집약이 가능하다. 1시간마다 카테고리별로 그룹화하여 집계하여보자.

1
2
3
4
5
6
7
8
query='''
select substr(a.time, 1, 13) time, b.category, count(*) count
from reddit_sample_words a
left join word_category b on a.word = b.word
group by 1,2
'''

spark.sql(query).count()

결과적으로 집계된 결과를 CSV 파일로 작성하도록 합니다. Spark에서 집계는 두가지 방법이 있습니다.

  1. 표준 spark-csv 라이브러리 사용 ```python

spark.sql(query) .coalesce(1) # 출력 파일은 하나로 한다. .write.format(‘com.databricks.spark.csv’) # csv 형식 사용한다. .option(‘header’, ‘true’) # 헤더 사용 .save(‘csv_output’) # 출력 디렉터리

spark.sql(query).coalesce(1).write.format(‘com.databricks.spark.csv’).option(‘header’, ‘true’).save(‘csv_output’)

import subprocess

result = subprocess.getoutput(‘ls csv_output’)

‘_SUCCESS\npart-00000-a1db7b63-6cdex-4aee-a611-34b2bcf37440-c000.csv’

1
2
3
4
5
6
7
8
9
10
2. pandas df 사용

```python

import pandas

result = spark.sql(query).toPandas()


2. Hive와 Presto를 통한 재구축

3. Airflow를 통한 구성

4. 클라우드 서비스를 이용한 빅데이터 파이프라인

정리하며

참고문헌

This post is licensed under CC BY 4.0 by the author.

5. 빅데이터의 파이프라인

Java 멀티스레딩, 병행성, 성능 최적화 강의 후기