bugfix> logstash > 投稿

次のlogstashパイプラインを提供します。

input
{
    generator
    {
        lines => [
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"name" : "user_interaction", "product" : { "module" : "search" , "name" : "front"}, "data" : { "query" : "toto"}}',
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"hello": "world"}',
        '{"name" :"wrong data", "data" : "I am wrong !"}',
        '{"name" :"wrong data", "data" : { "hello" : "world" }}'
        ]
        codec => json
        count => 1
    }
}
filter
{
  mutate
  {
    remove_field => ["sequence", "host", "@version"]
  }
}
output
{
   elasticsearch
   {
     hosts => ["elasticsearch:9200"]
     index => "events-dev6-test"
     document_type => "_doc"
     manage_template => false
   }
   stdout
   {
       codec => rubydebug
   }
}

elasticsearchにはこのインデックスの厳密なマッピングがあるため、一部のイベントでは400エラー "mapping set to strict, dynamic introduction of [hello] within [data] is not allowed" が発生します(これは正常です)。

失敗したイベントを他の場所(テキストログまたは別のelasticsearchインデックス)に送信する方法(したがって、イベントを失ってはいけません)

回答 1 件
  • Logstash 6.2では、必要な処理に使用できるデッドレターキューが導入されました。 dead_letter_queue.enable: true を有効にする必要があります  あなたの logstash.yml で 。

    そして、単に入力として処理します。

    input {
      dead_letter_queue {
        path => "/path/to/data/dead_letter_queue" 
        commit_offsets => true 
        pipeline_id => "main" 
      }
    }
    output {
      file {
        path => ...
           codec => line { format => "%{message}"}
       }    
    }
    
    

    6.2より前は、あなたが望むことをする方法がなかったと思います。

あなたの答え