ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [ELK Logstash] config 파일 작성 예시 - input, filter, output
    [IT] 공부하는 개발자/Open Source 2022. 2. 20. 18:29

    Logstash

    ELK의 한 축 을 담당하는 Logstash는 오픈소스 데이터 수집 엔진입니다.

    다양한 경로로 원천 데이터를 가져와서 가공해서 내보내는 역할을 합니다.

    ELK의 한 스택으로 쓰일 때에는 주로 데이터를 읽어 들이고 Elastic Search로 내보내는 파이프라인의 역할을 합니다.


     

    Config 파일 작성

    로그스태시를 기동 할 때에 실행할 데이터 파이프라인에 대한 설정을 미리 작성해둔 것이 conf 파일입니다.

    Conf 파일은 입력, 필터, 출력 3 단계로 구성합니다. 이 중 필터 단계는 생략해도 무방합니다.


     

     

    Input plugin  예시

    reference: https://www.elastic.co/guide/en/logstash/current/input-plugins.html

     

    Input plugins | Logstash Reference [8.0] | Elastic

     

    www.elastic.co

     

     


     

     

    1. JDBC 드라이버로 MySql Database table에서 데이터를 불러 들일 때

     

    ELK 공식 매뉴얼(위 reference 참고) 에서 소개한 예제를 살짝 변형해보았습니다.

     

    input {
      jdbc {
        jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
        jdbc_user => "mysql"
        jdbc_password => ${JDBC_PASSWORD}
        use_column_value => true
        tracking_column => "id"
        tracking_column_type => "numeric"
        last_run_metadata_path => "offset/.last_run"
        use_prepared_statements => true
        prepared_statement_name => "prepared_statement1.saved_sql"
        schedule => "* * * * *"
        statement => "SELECT *, DATE_FORMAT(saveDate, '%Y-%m-%dT%T+0900') as stashDate from songs where id > ? AND id < ? + ?"
        prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 1000]
      }
    }

     

    Option Description
    jdbc_driver_library 타사 드라이버 라이브러리에 대한 JDBC 드라이버 라이브러리 경로입니다. 여러 라이브러리가 필요한 경우 쉼표로 구분하여 전달할 수 있습니다.
    jdbc_driver_class 필수 입력값입니다. (default 없음)
    드라이버명은 jdbc 버전에 따라 다릅니다.
    예를 들어 ojdbc6 의 경우에는 "Java::oracle.jdbc.driver.OracleDriver" 가 들어가야 하고,
    8.0.16 의 경우에는 "com.mysql.cj.jdbc.Driver" 가 들어가야 합니다.
    jdbc_connection_string JDBC connection 문자열을 넣습니다.
    jdbc_user Database 접속 user 명입니다.
    jdbc_password Database 접속 password 입니다. 비밀번호는 미리 주입된 환경변수로 대체하도록 했습니다.
    use_column_value 쿼리할 때 기준이 되는 칼럼을 직접 설정할 거라서 true 로 해두었습니다.
    tracking_column "id" 컬럼을 사용하겠다는 뜻입니다. query condition 대상으로 사용할 컬럼명을 넣습니다
    tracking_column_type 숫자형, 날짜만 사용이 가능합니다. "numeric" 혹은 "timestamp" 를 넣어주세요. 
    last_run_metadata_path 쿼리할 때 기준으로 사용한 identifeir 를 저장할 path 를 지정해줍니다.
    예를 들어 logstash 가 바로 직전에 input 한 데이터 행의 "id" column value 가 734 번이었다면, offset/.last_run 이라는 파일을 생성하여 그 안에 734이라는 내용을 써서 temp 값을 저장을 하게됩니다.
    use_prepared_statements 데이터 input 을 위해 table 검색시에 사용할 쿼리를 직접 작성할거라서 true 를 넣어 줬습니다.
    default 는 false 입니다.
    prepared_statement_name 위에서 true 를 넣어줬다면, 이번엔 실행할 쿼리명을 지정해줍니다. (쿼리명은 각각의 pipeline 에 대해 unique 해야 합니다.)
    schedule input 쿼리가 실행될 주기를 Cron 식으로 입력합니다. 저는 매 분 실행되도록 해봤습니다.
    statement 직접 작성한 SELECT 쿼리를 넣습니다. 저는 한 번에 table full scan 이 일어나지 않도록 id 로 개수 지정을 직접했습니다. ? 를 입력한 곳에 아래의 prepared_statement_bind_values 에서 지정한 파라미터값이 들어갑니다. 
    prepared_statement_bind_values 쿼리 파라미터에 들어갈 값을 지정합니다.
    제 예시에서는 ? ? ? 에 순서대로 "직전 기준 identifeir", "직전 기준 identifier", 1000 이 들어가기 때문에, 매 분마다 실행되는 쿼리의 실행범위는 SELECT 어쩌구 WHERE ${직전 id} < id < ${직전 id} + 1000 이 되겠죠

     

    2. Kafka 로 데이터를 불러 들일 때

     

    ELK 공식 매뉴얼(위 reference 참고)에서 소개한 예제를 살짝 변형해보았습니다.

     

    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["kafka_topic"]
        group_id => "logstash.group"
        client_id => "client_id"
        consumer_thread => 4
        code => json {
          charset => "UTF=8"
      }
    }

    consumer_thread를 설정할 때에는 Kafka의 파티션 수와 동일하게 해주는 것이 가장 이상적입니다. 스레드 수가 파티션 수보다 적으면 읽어 들이는 속도보다 데이터가 더 빨리 쌓이게 되고, 스레드 수가 파티션 수보다 많으면 놀고 있는 스레드가 생기게 됩니다. 그래서 동일하게 해주는 것이 가장 좋습니다.

     


     

    Filter plugin 예시

    제가 골라 사용한 3 가지 필터 플러그인을 소개합니다.

    공식 메뉴얼에 더 다양하고 많은 필터 플러그인들이 있습니다.

    https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

     

    Filter plugins | Logstash Reference [8.0] | Elastic

     

    www.elastic.co

     


     

    1. Date 플러그인

     

    날짜 형식의 데이터를 날짜로 인식할 수 있도록 해주는 플러그인입니다.

     

     filter {
          date {
            match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
          }
     }

     

    위와 같이 filter 설정을 작성한다면, input에서 logDate라는 필드로 "Feb 20 2022 17:16:30"이라는 데이터를 받았고, 이것이 단순 String 데이터가 아니라 timeStamp라는 것을 logstash에 알려주는 것입니다.

     

    맨 처음 input 예제에서 제가 SELECT 쿼리로 DATE_FORMAT(saveDate, '%Y-%m-%dT%T+0900') as stashDate라는 필드를 하나 뽑았었는데요. 이 timestamp는 국제화 ISO 8601 표준입니다. "ISO8601"으로만 작성해도 인식됩니다.

     

    filter {
          date {
            match => [ "logdate", "ISO8601" ]
          }
    }

     


     

    2. Mutate 플러그인

     

    Mutate 플러그인은 input 받은 값에 대해 rename, replace, modify 등의 일반적인 변형을 수행합니다. output으로 내보내기 전에 한 번 정제를 거치고 싶다면 mutate로 수행할 수 있습니다. 

     

    공식 매뉴얼에 기재된 Mutate 예제를 조금 변형해서 만들어 볼게요.

     

     filter {
        mutate {
            add_field => {
              "shortHostname" => "%{[hostname][0]}"
              "[@metadata][_temporary_today]" => "%{+YYYYMMdd}"
            }
        }
    
        mutate {
            rename => {"shortHostname" => "hostname"}
            remove_field => ["user_phone_number"]
        }
    }

     

    위와 같이 필터 설정파일을 작성한 의도는 다음과 같습니다.

     

    1. input 에 들어있지 않았던 shortHostname이라는 신규 필드를 만들고 그 안에 hostname [0]이라는 logstash 환경변수 값을 끌어와서 넣습니다. 이 필드는 output 에도 추가되게 됩니다.

    2. input 에 들어있지 않았던 _temporary_today라는 신규 필드를 만들고 그 안에 현재 timestamp를 +YYYY-MM-dd 포맷으로 끌어와 넣었습니다. 그런데 신규 필드 앞에 [@metadata]라는 prefix를 붙여줌으로써 _temporary_today 라는 필드 값은 output 에는 추가되지 않게 했습니다. 나중에 작성할 output 설정에서 활용하거나, 다른 mutate 식에 활용하는 등의 임시 용도로만 사용하고 날려버리는 필드입니다.

    3. shortHostname 의 input 필드명을 hostname으로 변경했습니다. output에서는 필드명이 hostname으로 들어가게 될 거에요.

    4. user_phone_number라는 필드는 output에 들어가지 않도록 삭제했습니다.

     


    3. UUID 플러그인

    데이터가 Unique 함을 보장해야 할 때 사용할 수 있는 UUID 플러그인입니다. 매 이벤트마다 UUID 를 생성합니다. 동일한 데이터가 이미 존재할 때에 덮어쓸지, 새로 만들지를 정의할 수 있습니다.
     
    default 는 덮어쓰지 않고 새로 만드는 것입니다. (아래에서 overwrite 를 빼버리거나 false 로 작성하면 됩니다.)
     
    filter {
          uuid {
            target => "uuid"
            overwrite => true
          }
    }

     


     

    Output plugin 예시

    공식 매뉴얼에 다양하고 많은 필터 플러그인들이 있습니다.

    테스트 용으로 가장 간편한 stdout과 ELK 스택의 한 축인 Elastic search 플러그인을 예제로 소개합니다.

     

    https://www.elastic.co/guide/en/logstash/current/output-plugins.html

     

    Output plugins | Logstash Reference [8.0] | Elastic

     

    www.elastic.co


     

    1. stdout 플러그인

     

    logstash를 실행하는 shell의 stdout에 바로 프린트해주는 간단한 플러그인으로, input & filter 설정을 디버깅할 때 유용하게 사용할 수 있습니다.

     

    output {
          stdout {}
    }

     

    json으로 출력할 수도 있습니다.

     

    output {
          stdout { codec => json }
    }

     

     

    2. Elastic search 플러그인

     

    공식 매뉴얼의 예제를 조금 변형했습니다.

     

    output {
        elasticsearch {
            hosts => "localhost:9200"
            index => "songs_%{[@metadata][_temporary_today}"
            codec => json
            template => "song_es_scheme.json"
            template_name => "song_es_scheme"
        }
    }

     

    insert 하고자 하는 ES의 정보를 설정에 넣어주었습니다.

    hosts 에는 ES endpoint를 기입하는데, 여러 개의 엔드포인트가 존재하는 경우에는 위의 예시처럼 String 이 아니라 hosts => ["A", "B" ]와 같이 array로 넣으면, Logstash 가 Load balancing 하여 보내줍니다.

    index 에는 들어갈 document의 인덱스명을 넣어주는데, 저는 아까 filter 단계에서 만들어준 오늘 날짜 [@metadata][_temporary_today] 넣어서 인덱스를 만들었습니다. 

    ES는 날짜로 인덱싱하여 주기적으로 rotate 해주는 것이 흔하기 때문입니다. 

     

    filter 단계를 거치지 않는다면, @metadata를 활용하지 않고 output 단계에서 아래처럼 바로 처리해도 됩니다.

     output {
          elasticsearch {
            index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}"
          }
     }

     

     


    ELK 다른 포스팅 보기

    2019.09.08 - [[IT] 공부하는 개발자/Open Source] - [엘라스틱서치] ElasticSearch & 키바나 시큐리티 기능 구현

     

    [엘라스틱서치] ElasticSearch & 키바나 시큐리티 기능 구현

    개요 엘라스틱서치는 6.8 버전부터 시큐리티 기능을 지원한다. 그 전 버전까지는 별도의 인증과정 없이, 리퀘스트를 보내면 바로 리스폰스가 돌아왔는데, 시큐리티 기능을 활성화시키면 리퀘스

    gem1n1.tistory.com

    2019.08.26 - [[IT] 공부하는 개발자/Open Source] - [도커 + 엘라스틱서치] Docker로 ElasticSearch ELK 스택 디플로이

     

    [도커 + 엘라스틱서치] Docker로 ElasticSearch ELK 스택 디플로이

    도커를 사용하는 이유  다른 OS 플랫폼을 사용하는 유저들도 동일한 로컬 개발환경을 구성할 수 있다. 쉽고 간단한 버전 매니지먼트가 가능하다. 도커로 ELK 스택 디플로이 도커(Docker)를 이용하

    gem1n1.tistory.com

    댓글

Copyright in 2020 (And Beyond)