Logstash와 Mariadb를 연동하여 Elasticsearch로 보내는 것을 해보겠다.
학습 서버 :운영체재 : Ubuntu
root 접속 후 Logstash 서비스 중단
su
systemctl stop logstash
다운 받은 파일을 /usr/share/java/mariadb-java-client-2.7.1.jar 경로에 복사
cp ~/다운로드/mariadb-java-client-2.7.1.jar /usr/share/java/mariadb-java-client-2.7.1.jar
/etc/logstash/conf.d 파일에 원하는명의 conf 파일을 생성
nano /etc/logstash/conf.d/원하는파일명.conf
생성후 코드 넣기 (한국어 부분 수정 필수 부분)
input {
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/java/mariadb-java-client-2.7.1.jar"
jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
jdbc_connection_string => "jdbc:mariadb://127.0.0.1:3306/[DB명]"
jdbc_user => "[DB명]"
jdbc_password => "[DB비밀번호]"
use_column_value => true
# 1분마다
schedule => "* * * * *"
# schedule => "* * * * * *" 1초마다
statement => "SELECT * FROM [테이블명]"
tracking_column => "id"
}
}
#filter {
# float 타입 경도, 위도값을 geo_point로 변환할때는 index를 미리 생성하여 맵핑해야됨. 맵핑 후 실행
# mutate {
# add_field => {
# "[location][lon]" => "%{longitude}"
# "[location][lat]" => "%{latitude}"
# }
# }
#}
output {
# stdout { codec => "rubydebug"}
elasticsearch {
# index 맵핑
hosts => "localhost:9200"
index => "인덱스명"
# user => "[아이디]"
# password => "[비밀번호]"
document_id => "index_%{+YYYY.MM.DD}"
}
}
시작하기전에 미리 인덱스 ggeo_point 타입 매핑하기
PUT 원하는인택스명
{
"mappings": {
"properties": {
"location":{
"type": "geo_point"
}
}
}
}
Logstash 서비스 시작
Systemctl start logstash
잘 들어가고 있는지 확인
tail -f /var/log/logstash/logstash-plain.log