Logstash JSON 객체 파싱
Logstash
는 다양한 경로에서 데이터를 수집하고 처리합니다. 수집하는 데이터의 형태는 정말 다양한 형태로 들어오며, 때로는 복잡한 전처리를 요구하기도 합니다. 이번 포스팅에서는 JSON 객체 안에 배열형태로 들어오는 JSON 객체
를 파싱하는 방법을 기록으로 남기고자 합니다.
JSON 데이터 형태
이번 포스팅에서 가정할 Input 데이터는 아래와 같습니다. JSON객체 내부에 배열 형태의 JSON 객체들
을 가지는 형태로, 배열 내 각 객체 내부의 필드값을 파싱하여 처리할 일들이 가끔 생기곤 합니다. 이런 경우에는 ruby filter
를 사용하면 처리를 간편하게 할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
...
"wanted_to_be_parsed" : {
"field1" : "value1",
"field2" : "value2",
"field3" : [
{
"InnerField1" : "value3",
"InnerField2" : "value4",
},
{
"InnerField1" : "value3",
"InnerField2" : "value4"
},
...
]
}
}
In Ruby Code!
위의 예시 JSON Object
를 파싱해보도록 하겠습니다. (지금 작성하려는 부분은 로그스태쉬 전체 .conf 파일의 filter 내부입니다.) 우선, 기본적으로 json filter를 통해 파싱하려는 대상 필드를 target필드로 이동시킵니다. 이때 target field는 json_parsed
로 가정하겠습니다. 해당 작업은 아래와 같이 이루어집니다.
1
2
3
4
json {
source => "wanted_to_be_parsed"
target => "json_parsed"
}
이젠 진짜 ruby filter를 통해 한번 파싱된 json 객체의 특정 필드배열을 집계하도록 하겠습니다. 아래와 같이, python과 c++ 을 섞은듯한(?) 문법으로 간단하게 반복문 처리 및 집계처리가 가능합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ruby {
code => '
field1ValueList = []
field2ValueList = []
event.get("[wanted_to_parse][field3]").each do |each_json_obj|
field1ValueList << each_json_obj["InnerField1"]
field2ValueList << each_json_obj["InnerField2"]
end
event.set("elasticsearch_index_field1", field1ValueList.join(","))
event.set("elasticsearch_index_field2", field2ValueList.join(","))
'
}
전체 logstash config 파일 형태
로그스태쉬는 기본적으로 input, filter, output 의 구조로 되어있으므로, 위의 예제를 kafka에서 elasticsearch로 보내는 pipeline 이라고 가정한다면 아마 아래와 같은 구조가 전체 .conf
파일의 모습이 될 것입니다. 사실 이는 조금 특수한 상황에서의 json 객체 처리를 다룬 형태이고, ruby와 함께 여러 필터들을 조합하면 복잡한 전처리도 어렵지않게 처리할 수 있을 것입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
input{
kafka{
bootstrap_servers => ""
topics => []
auto_offset_reset =>
group_id =>
client_id =>
decorate_events =>
}
}
->
filter{
mutate{
add_field => {
"filed1" => "%{[@metadata][kafka][topic]}"
...
"fieldN" => ""
}
}
...
dissect {
mapping => {
"message" => "%{field1}{delimiter}%{field2}{delimiter}%{field3} ... %{fieldN}"
}
}
...
json {
source => "wanted_to_be_parsed"
target => "json_parsed"
}
...
if [특정필드] == '특정값' {
ruby {
code => '
field1ValueList = []
field2ValueList = []
event.get("[wanted_to_parse][field3]").each do |each_json_obj|
field1ValueList << each_json_obj["InnerField1"]
field2ValueList << each_json_obj["InnerField2"]
end
event.set("elasticsearch_index_field1", field1ValueList.join(","))
event.set("elasticsearch_index_field2", field2ValueList.join(","))
'
}
}
...
mutate {
remove_field => [
"json_parsed"
]
}
}
->
output{
elasticsearch {
hosts => ["http://hostip:postport", ...]
index =>
document_id =>
user => "${elasticsearch_username}"
password => "${elasticsearch_passowrd}"
}
}