bugfix> apache-kafka > 投稿

バックグラウンド:
以前は、インデックス作成にhibernate search、Lucene、jboss hornetqキューを使用していました。

このアプリケーションはプロデューサーであり、メタデータ(データベース内のレコードを識別するための一意のデータ情報)をhornetqに送信します。 コンシューマはこのメタデータを受信し、データベースに対してクエリを実行して、完全なレコードの詳細(子オブジェクトを含む)を取得します。 これは、はるかにデータベース中心のアプローチです。

ここで、インデックス作成のためのデータベース中心のアプローチを排除したいと思います。 hornetqではなくkafkaを使用することにしました。

ユーザーがデータを作成するときに問題はありません。

ユーザーがデータを編集するときに、潜在的な問題があることがわかります(2つの子オブジェクトを持つ親エンティティを言う)。ユーザーが表示するためにデータベースからデータが取得されると、
同じデータをkafka topic1にプッシュします。ユーザーがデータ(parenetレベルのデータなど)を変更して送信するとき。親レベルのデータのみを取得し(子オブジェクトのデータは取得しない)、変更したデータをtopic2にプッシュします。次に、topic1(子オブジェクト)に存在するメッセージを、topic2(親レベルデータ)の対応するメッセージとマージする必要があります

注:インデックス作成には更新がなく、削除してから挿入することがわかっているため、このルートを使用する必要があります。

質問:

  1. 上記のアプローチを使用すると、特定の topic1にあるメッセージとtopic2にある特定のメッセージ。は topic1とtopic2で同じメッセージIDを提供する方法はありますか?

  2. 単一のトピックを使用する場合、この問題を解決する方法はありますか?

  3. 上記の問題を解決するためのより良い設計/アプローチはありますか?

前もって感謝します。

回答 1 件
    1. If i go with the above approach, how can I map the specific message present in topic1 with the specific message in topic2. Is there a way to provide the same message ids in topic1 and topic2 ?

    同じKafkaクラスター内のトピック間で特定のメッセージをマッピングまたは結合するには、Kafka StreamとKSQLを使用することをお勧めします。ここで参照を見つけることができます。

    オブジェクトを一意にするには多くの方法があり、topic1およびtopic2にメッセージを送信するときに親エンティティIDを使用することをお勧めします。次のサンプルJavaコード:

    ProducerRecord<String, ParentEntity> record = new ProducerRecord<>(topic1, 
    ParentEntity.getId(), ParentEntity);
    ListenableFuture<SendResult<String, ParentEntity>> future = 
    kafkaTemplate.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<String, 
    ParentEntity>>() {
        @Override
        public void onSuccess(SendResult<String, ParentEntity> result) {}
        @Override
        public void onFailure(Throwable ex) {
            //print out error log 
        }
    });
    ProducerRecord<String, ChildEntity> record = new ProducerRecord<>(topic2, 
    ChildEntity.getParentEntityId(), ChildEntity);
    ListenableFuture<SendResult<String, ChildEntity>> future = 
    kafkaTemplate.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<String, 
    ChildEntity>>() {
        @Override
        public void onSuccess(SendResult<String, ChildEntity> result) {}
        @Override
        public void onFailure(Throwable ex) {
            //print out error log 
        }
    });
    
    

      Is there any way to resolve this issue if i use the single topic ?

    データベースに新しいテーブル(A)を作成して、インデックス作成のために送信されるメッセージ全体を保存できます。ユーザーがデータを作成または更新するたびに、メッセージもテーブルAに挿入/更新されます。最後に、KafkaクライアントはメッセージオブジェクトをテーブルAから取得し、Kafkaクラスターの一意のトピックを生成します。

      Is there any better design/approach to resolve the above issue ?

    上で述べたように、Kafka StreamとKSQLを試してみてください。

あなたの答え