-
[ELK Logstash] config 파일 작성 예시 - input, filter, output[IT] 공부하는 개발자/Open Source 2022. 2. 20. 18:29
Logstash
ELK의 한 축 을 담당하는 Logstash는 오픈소스 데이터 수집 엔진입니다.
다양한 경로로 원천 데이터를 가져와서 가공해서 내보내는 역할을 합니다.
ELK의 한 스택으로 쓰일 때에는 주로 데이터를 읽어 들이고 Elastic Search로 내보내는 파이프라인의 역할을 합니다.
Config 파일 작성
로그스태시를 기동 할 때에 실행할 데이터 파이프라인에 대한 설정을 미리 작성해둔 것이 conf 파일입니다.
Conf 파일은 입력, 필터, 출력 3 단계로 구성합니다. 이 중 필터 단계는 생략해도 무방합니다.
Input plugin 예시
reference: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
1. JDBC 드라이버로 MySql Database table에서 데이터를 불러 들일 때
ELK 공식 매뉴얼(위 reference 참고) 에서 소개한 예제를 살짝 변형해보았습니다.
input { jdbc { jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "mysql" jdbc_password => ${JDBC_PASSWORD} use_column_value => true tracking_column => "id" tracking_column_type => "numeric" last_run_metadata_path => "offset/.last_run" use_prepared_statements => true prepared_statement_name => "prepared_statement1.saved_sql" schedule => "* * * * *" statement => "SELECT *, DATE_FORMAT(saveDate, '%Y-%m-%dT%T+0900') as stashDate from songs where id > ? AND id < ? + ?" prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 1000] } }
Option Description jdbc_driver_library 타사 드라이버 라이브러리에 대한 JDBC 드라이버 라이브러리 경로입니다. 여러 라이브러리가 필요한 경우 쉼표로 구분하여 전달할 수 있습니다. jdbc_driver_class 필수 입력값입니다. (default 없음)
드라이버명은 jdbc 버전에 따라 다릅니다.
예를 들어 ojdbc6 의 경우에는 "Java::oracle.jdbc.driver.OracleDriver" 가 들어가야 하고,
8.0.16 의 경우에는 "com.mysql.cj.jdbc.Driver" 가 들어가야 합니다.jdbc_connection_string JDBC connection 문자열을 넣습니다. jdbc_user Database 접속 user 명입니다. jdbc_password Database 접속 password 입니다. 비밀번호는 미리 주입된 환경변수로 대체하도록 했습니다. use_column_value 쿼리할 때 기준이 되는 칼럼을 직접 설정할 거라서 true 로 해두었습니다. tracking_column "id" 컬럼을 사용하겠다는 뜻입니다. query condition 대상으로 사용할 컬럼명을 넣습니다 tracking_column_type 숫자형, 날짜만 사용이 가능합니다. "numeric" 혹은 "timestamp" 를 넣어주세요. last_run_metadata_path 쿼리할 때 기준으로 사용한 identifeir 를 저장할 path 를 지정해줍니다.
예를 들어 logstash 가 바로 직전에 input 한 데이터 행의 "id" column value 가 734 번이었다면, offset/.last_run 이라는 파일을 생성하여 그 안에 734이라는 내용을 써서 temp 값을 저장을 하게됩니다.use_prepared_statements 데이터 input 을 위해 table 검색시에 사용할 쿼리를 직접 작성할거라서 true 를 넣어 줬습니다.
default 는 false 입니다.prepared_statement_name 위에서 true 를 넣어줬다면, 이번엔 실행할 쿼리명을 지정해줍니다. (쿼리명은 각각의 pipeline 에 대해 unique 해야 합니다.) schedule input 쿼리가 실행될 주기를 Cron 식으로 입력합니다. 저는 매 분 실행되도록 해봤습니다. statement 직접 작성한 SELECT 쿼리를 넣습니다. 저는 한 번에 table full scan 이 일어나지 않도록 id 로 개수 지정을 직접했습니다. ? 를 입력한 곳에 아래의 prepared_statement_bind_values 에서 지정한 파라미터값이 들어갑니다. prepared_statement_bind_values 쿼리 파라미터에 들어갈 값을 지정합니다.
제 예시에서는 ? ? ? 에 순서대로 "직전 기준 identifeir", "직전 기준 identifier", 1000 이 들어가기 때문에, 매 분마다 실행되는 쿼리의 실행범위는 SELECT 어쩌구 WHERE ${직전 id} < id < ${직전 id} + 1000 이 되겠죠2. Kafka 로 데이터를 불러 들일 때
ELK 공식 매뉴얼(위 reference 참고)에서 소개한 예제를 살짝 변형해보았습니다.
input { kafka { bootstrap_servers => "localhost:9092" topics => ["kafka_topic"] group_id => "logstash.group" client_id => "client_id" consumer_thread => 4 code => json { charset => "UTF=8" } }
consumer_thread를 설정할 때에는 Kafka의 파티션 수와 동일하게 해주는 것이 가장 이상적입니다. 스레드 수가 파티션 수보다 적으면 읽어 들이는 속도보다 데이터가 더 빨리 쌓이게 되고, 스레드 수가 파티션 수보다 많으면 놀고 있는 스레드가 생기게 됩니다. 그래서 동일하게 해주는 것이 가장 좋습니다.
Filter plugin 예시
제가 골라 사용한 3 가지 필터 플러그인을 소개합니다.
공식 메뉴얼에 더 다양하고 많은 필터 플러그인들이 있습니다.
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
1. Date 플러그인
날짜 형식의 데이터를 날짜로 인식할 수 있도록 해주는 플러그인입니다.
filter { date { match => [ "logdate", "MMM dd yyyy HH:mm:ss" ] } }
위와 같이 filter 설정을 작성한다면, input에서 logDate라는 필드로 "Feb 20 2022 17:16:30"이라는 데이터를 받았고, 이것이 단순 String 데이터가 아니라 timeStamp라는 것을 logstash에 알려주는 것입니다.
맨 처음 input 예제에서 제가 SELECT 쿼리로 DATE_FORMAT(saveDate, '%Y-%m-%dT%T+0900') as stashDate라는 필드를 하나 뽑았었는데요. 이 timestamp는 국제화 ISO 8601 표준입니다. "ISO8601"으로만 작성해도 인식됩니다.
filter { date { match => [ "logdate", "ISO8601" ] } }
2. Mutate 플러그인
Mutate 플러그인은 input 받은 값에 대해 rename, replace, modify 등의 일반적인 변형을 수행합니다. output으로 내보내기 전에 한 번 정제를 거치고 싶다면 mutate로 수행할 수 있습니다.
공식 매뉴얼에 기재된 Mutate 예제를 조금 변형해서 만들어 볼게요.
filter { mutate { add_field => { "shortHostname" => "%{[hostname][0]}" "[@metadata][_temporary_today]" => "%{+YYYYMMdd}" } } mutate { rename => {"shortHostname" => "hostname"} remove_field => ["user_phone_number"] } }
위와 같이 필터 설정파일을 작성한 의도는 다음과 같습니다.
1. input 에 들어있지 않았던 shortHostname이라는 신규 필드를 만들고 그 안에 hostname [0]이라는 logstash 환경변수 값을 끌어와서 넣습니다. 이 필드는 output 에도 추가되게 됩니다.
2. input 에 들어있지 않았던 _temporary_today라는 신규 필드를 만들고 그 안에 현재 timestamp를 +YYYY-MM-dd 포맷으로 끌어와 넣었습니다. 그런데 신규 필드 앞에 [@metadata]라는 prefix를 붙여줌으로써 _temporary_today 라는 필드 값은 output 에는 추가되지 않게 했습니다. 나중에 작성할 output 설정에서 활용하거나, 다른 mutate 식에 활용하는 등의 임시 용도로만 사용하고 날려버리는 필드입니다.
3. shortHostname 의 input 필드명을 hostname으로 변경했습니다. output에서는 필드명이 hostname으로 들어가게 될 거에요.
4. user_phone_number라는 필드는 output에 들어가지 않도록 삭제했습니다.
3. UUID 플러그인
데이터가 Unique 함을 보장해야 할 때 사용할 수 있는 UUID 플러그인입니다. 매 이벤트마다 UUID 를 생성합니다. 동일한 데이터가 이미 존재할 때에 덮어쓸지, 새로 만들지를 정의할 수 있습니다.default 는 덮어쓰지 않고 새로 만드는 것입니다. (아래에서 overwrite 를 빼버리거나 false 로 작성하면 됩니다.)filter { uuid { target => "uuid" overwrite => true } }
Output plugin 예시
공식 매뉴얼에 다양하고 많은 필터 플러그인들이 있습니다.
테스트 용으로 가장 간편한 stdout과 ELK 스택의 한 축인 Elastic search 플러그인을 예제로 소개합니다.
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
1. stdout 플러그인
logstash를 실행하는 shell의 stdout에 바로 프린트해주는 간단한 플러그인으로, input & filter 설정을 디버깅할 때 유용하게 사용할 수 있습니다.
output { stdout {} }
json으로 출력할 수도 있습니다.
output { stdout { codec => json } }
2. Elastic search 플러그인
공식 매뉴얼의 예제를 조금 변형했습니다.
output { elasticsearch { hosts => "localhost:9200" index => "songs_%{[@metadata][_temporary_today}" codec => json template => "song_es_scheme.json" template_name => "song_es_scheme" } }
insert 하고자 하는 ES의 정보를 설정에 넣어주었습니다.
hosts 에는 ES endpoint를 기입하는데, 여러 개의 엔드포인트가 존재하는 경우에는 위의 예시처럼 String 이 아니라 hosts => ["A", "B" ]와 같이 array로 넣으면, Logstash 가 Load balancing 하여 보내줍니다.
index 에는 들어갈 document의 인덱스명을 넣어주는데, 저는 아까 filter 단계에서 만들어준 오늘 날짜 [@metadata][_temporary_today] 넣어서 인덱스를 만들었습니다.
ES는 날짜로 인덱싱하여 주기적으로 rotate 해주는 것이 흔하기 때문입니다.
filter 단계를 거치지 않는다면, @metadata를 활용하지 않고 output 단계에서 아래처럼 바로 처리해도 됩니다.
output { elasticsearch { index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}" } }
ELK 다른 포스팅 보기
2019.09.08 - [[IT] 공부하는 개발자/Open Source] - [엘라스틱서치] ElasticSearch & 키바나 시큐리티 기능 구현
2019.08.26 - [[IT] 공부하는 개발자/Open Source] - [도커 + 엘라스틱서치] Docker로 ElasticSearch ELK 스택 디플로이
'[IT] 공부하는 개발자 > Open Source' 카테고리의 다른 글
Dockerfile 작성방법, 운영에서 알아야 할 주의 사항! (0) 2023.04.21 [Kubernetes] CrashLoopBackOff 이슈 트러블슈팅 (+Nginx) (0) 2023.04.15 맥 Numbers 앱에서 환율 / 주식 함수 사용하기 (STOCK, CURRENCY) (2) 2021.05.08 Groovy Spock 을 쓰는 이유, 코드 작성 예시 소개 (0) 2021.02.15 logstash 'unknown setting *** for jdbc' 에러 메시지 (0) 2020.12.07