Home Logstash에서 Ruby필터로 반복형 JSON객체배열 파싱하기
Post
Cancel

Logstash에서 Ruby필터로 반복형 JSON객체배열 파싱하기

Logstash JSON 객체 파싱

Logstash는 다양한 경로에서 데이터를 수집하고 처리합니다. 수집하는 데이터의 형태는 정말 다양한 형태로 들어오며, 때로는 복잡한 전처리를 요구하기도 합니다. 이번 포스팅에서는 JSON 객체 안에 배열형태로 들어오는 JSON 객체를 파싱하는 방법을 기록으로 남기고자 합니다.

JSON 데이터 형태

이번 포스팅에서 가정할 Input 데이터는 아래와 같습니다. JSON객체 내부에 배열 형태의 JSON 객체들을 가지는 형태로, 배열 내 각 객체 내부의 필드값을 파싱하여 처리할 일들이 가끔 생기곤 합니다. 이런 경우에는 ruby filter를 사용하면 처리를 간편하게 할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{

    ...

  "wanted_to_be_parsed" : {
    "field1" : "value1",
    "field2" : "value2",
    "field3" : [
      {
        "InnerField1" : "value3",
        "InnerField2" : "value4",
      },
      {
        "InnerField1" : "value3",
        "InnerField2" : "value4"
      },

      ...

    ]

  }
}

In Ruby Code!

위의 예시 JSON Object를 파싱해보도록 하겠습니다. (지금 작성하려는 부분은 로그스태쉬 전체 .conf 파일의 filter 내부입니다.) 우선, 기본적으로 json filter를 통해 파싱하려는 대상 필드를 target필드로 이동시킵니다. 이때 target field는 json_parsed 로 가정하겠습니다. 해당 작업은 아래와 같이 이루어집니다.

1
2
3
4
json {
    source => "wanted_to_be_parsed"
    target => "json_parsed"
}

이젠 진짜 ruby filter를 통해 한번 파싱된 json 객체의 특정 필드배열을 집계하도록 하겠습니다. 아래와 같이, python과 c++ 을 섞은듯한(?) 문법으로 간단하게 반복문 처리 및 집계처리가 가능합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  ruby {
      code => '
          field1ValueList = []
          field2ValueList = []

          event.get("[wanted_to_parse][field3]").each do |each_json_obj|
              field1ValueList << each_json_obj["InnerField1"]
              field2ValueList << each_json_obj["InnerField2"]
          end

          event.set("elasticsearch_index_field1", field1ValueList.join(","))
          event.set("elasticsearch_index_field2", field2ValueList.join(","))
      '
  }

전체 logstash config 파일 형태

로그스태쉬는 기본적으로 input, filter, output 의 구조로 되어있으므로, 위의 예제를 kafka에서 elasticsearch로 보내는 pipeline 이라고 가정한다면 아마 아래와 같은 구조가 전체 .conf파일의 모습이 될 것입니다. 사실 이는 조금 특수한 상황에서의 json 객체 처리를 다룬 형태이고, ruby와 함께 여러 필터들을 조합하면 복잡한 전처리도 어렵지않게 처리할 수 있을 것입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
input{
    kafka{
        bootstrap_servers => ""
        topics => []
        auto_offset_reset =>
        group_id =>
        client_id =>
        decorate_events =>
    }
} 
-> 
filter{
    mutate{
        add_field => {
            "filed1" => "%{[@metadata][kafka][topic]}"
            ...
            "fieldN" => ""
        }
    }
    ...
    dissect {
        mapping => {
            "message" => "%{field1}{delimiter}%{field2}{delimiter}%{field3} ... %{fieldN}"
        }
    }
    ...
    json {
        source => "wanted_to_be_parsed"
        target => "json_parsed"
    }
    ...
    if [특정필드] == '특정값' {
        ruby {
            code => '
                field1ValueList = []
                field2ValueList = []

                event.get("[wanted_to_parse][field3]").each do |each_json_obj|
                    field1ValueList << each_json_obj["InnerField1"]
                    field2ValueList << each_json_obj["InnerField2"]
                end

                event.set("elasticsearch_index_field1", field1ValueList.join(","))
                event.set("elasticsearch_index_field2", field2ValueList.join(","))
            '
        }
    }
    ...

    mutate {
        remove_field => [
            "json_parsed"
        ]
    }
} 
-> 
output{
    elasticsearch {
        hosts => ["http://hostip:postport", ...]
        index => 
        document_id =>
        user => "${elasticsearch_username}"
        password => "${elasticsearch_passowrd}"
    }
}
This post is licensed under CC BY 4.0 by the author.