[Logstash] Mariadb, Mysql와 Elasticsearch 연동 하기

Logstash와 Mariadb를 연동하여 Elasticsearch로 보내는 것을 해보겠다.

학습 서버 :운영체재 : Ubuntu

마리아DB Java8 connectors 다운로드

root 접속 후 Logstash 서비스 중단

su
systemctl stop logstash

다운 받은 파일을 /usr/share/java/mariadb-java-client-2.7.1.jar 경로에 복사

cp ~/다운로드/mariadb-java-client-2.7.1.jar /usr/share/java/mariadb-java-client-2.7.1.jar

/etc/logstash/conf.d 파일에 원하는명의 conf 파일을 생성

nano /etc/logstash/conf.d/원하는파일명.conf

생성후 코드 넣기 (한국어 부분 수정 필수 부분)

input {
  jdbc {
    jdbc_validate_connection => true
    jdbc_driver_library => "/usr/share/java/mariadb-java-client-2.7.1.jar"
    jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
    jdbc_connection_string => "jdbc:mariadb://127.0.0.1:3306/[DB명]"
    jdbc_user => "[DB명]"
    jdbc_password => "[DB비밀번호]"
    use_column_value => true
    # 1분마다
    schedule => "* * * * *"  
#   schedule => "* * * * * *" 1초마다
    statement => "SELECT * FROM [테이블명]"
    tracking_column => "id"
  }
}

#filter {
#  float 타입 경도, 위도값을 geo_point로 변환할때는 index를 미리 생성하여 맵핑해야됨. 맵핑 후 실행
#        mutate {
#           add_field => {
#                "[location][lon]" => "%{longitude}"
#                "[location][lat]" => "%{latitude}"
#           }
#        }
#}


output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
#     index 맵핑
      hosts => "localhost:9200"
      index => "인덱스명"
#      user => "[아이디]"
#      password => "[비밀번호]"
      document_id => "index_%{+YYYY.MM.DD}"
  }
}

시작하기전에 미리 인덱스 ggeo_point 타입 매핑하기

PUT 원하는인택스명
{
  "mappings": {
    "properties": {
      "location":{
        "type": "geo_point"
      }
    }
  }
}

Logstash 서비스 시작

Systemctl start logstash

잘 들어가고 있는지 확인

tail -f /var/log/logstash/logstash-plain.log

답글 남기기

이메일 주소는 공개되지 않습니다.