kafkaダイレクトストリームから連続的に着信データを読み取るSPARKストリーミングアプリケーションがあります。
これは私のセットアップです
spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1
ほとんどの場合、かなりうまく動作しますが、時々、kafkaに書き込んでいる他のアプリケーションが再起動すると、以下のエラーが表示されます。
WARN NetworkClient: Error while fetching metadata with correlation id 139022 : {topic4908100105=LEADER_NOT_AVAILABLE}
18/05/24 11:59:33 WARN NetworkClient: Error while fetching metadata with correlation id 139022 : {topic000001=LEADER_NOT_AVAILABLE}
.
.
.
ERROR JobScheduler: Error generating jobs for time 1527163130000 ms
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
私はSOで他の投稿を見ましたが、人々はこのエラーから回復するためにkafkaまたはsparkアプリケーションを再起動することを勧めました。しかし、私のアプリケーションは、手動で介入することなく、その上で継続的に実行する必要があります!
Sparkコンシューマクラス(java)でこの例外を処理し、ストリーミングまたはアプリケーションを再起動する方法はありますか?
回答 1 件
関連記事
- Spark 3構造化ストリーミングは、TriggerOnceを使用してKafkaソースのmaxOffsetsPerTriggerを使用します
- Kafka 010を使用したSpark Streaming 21:orgapachekafkacommonerrorsGroupAuthorizationException
- ExceptionInInitializerError Spark Streaming Kafka
- Spark構造化ストリーミングkafka avroプロデューサー
- DSE Spark Streaming:長いアクティブバッチキュー
- AuthenticationException Neo4j Sparkストリーミング
- SparkでKafkaトピックに関するバイナリデータを読み取る方法
- SparkコンシューマーがKafkaプロデューサーメッセージをScalaで読み取らない
- Kafkaからデータを読み取り、PythonでSpark StructuredSreamingを使用してコンソールに出力します
関連した質問
- SparkStreamingでDirectStreamを停止した後、後で処理するデータを保存する方法は?
- Kafkaからデータを読み取り、PythonでSpark StructuredSreamingを使用してコンソールに出力します
- データセットをKafkaトピックに書き込む方法
- ExceptionInInitializerError Spark Streaming Kafka
- DSE Spark Streaming:長いアクティブバッチキュー
- SparkでRDDをソートしてからKafkaに公開しますか?
- Spark:kafkaから非常に大きなメッセージを受け取る
- kafkaデータをHDFS Spark Batchで使用する
- Kafka 010を使用したSpark Streaming 21:orgapachekafkacommonerrorsGroupAuthorizationException
これはKafkaの問題です。各パーティションリーダーとフォロワーがそこにいます。リーダーからのリクエスト。ズーキーパーの問題によりリーダーが利用できない場合、Kafkaはこのエラーをスローします。
データを受信するには、Kafkaの問題を修正する必要があります。ただし、次の構成プロパティを使用して例外シナリオを処理できます。
--conf spark.yarn.maxAppAttempts = 10
--conf spark.yarn.am.attemptFailuresValidityInterval = 1h
--conf spark.yarn.max.executor.failures = 10
--conf spark.yarn.executor.failuresValidityInterval = 1h