bugfix> c++ > 投稿

次のようないくつかの処理レベルを持つアプリケーションがあります。

InputStream->Pre-Processing->Computation->OutputStream

これらの各エンティティは、個別のスレッドで実行されます。 だから私のコードには、一般的なスレッドがあります。

std::vector<ImageRead> m_readImages;

次に、このメンバー変数を各スレッドに渡します。

InputStream input{&m_readImages};
std::thread threadStream{&InputStream::start, &InputStream};
PreProcess pre{&m_readImages};
std::thread preStream{&PreProcess::start, &PreProcess};
...

そして、これらの各クラスは、このデータへのポインターメンバーを所有しています。

std::vector<ImageRead>* m_ptrReadImages;

また、グローバルミューテックスが定義されており、その共有コンテナーに対する読み取り/書き込み操作ごとにロックおよびロック解除されます。 気になるのは、このメカニズムがかなりあいまいであり、データが別のスレッドで使用されているかどうかで混乱することです。

これらのスレッド間でこのコンテナを共有するより簡単な方法は何ですか?

回答 5 件
  • 「入力->」と説明したプロセス 前処理->計算->出力」は設計上シーケンシャルです。各ステップは前のステップに依存するため、この特定の方法での並列化は、各スレッドが別のスレッドの完了を待つだけなので有益ではありません。独立した個々のデータセットで順次動作する複数の並列処理パイプラインをセットアップするための通常のアプローチでは、スレッドセット間でタスクを分散する処理キューを使用します。

  • あなたの読み取りと前処理は、コンテナとは独立して行うことができるように思えます。

    単純に、これをタスクのファンアウトおよびファンインネットワークとして構成します。

    まず、ディスパッチを行います仕事 (タスクは、実際に操作するためにスレッドに与えられる作業単位です)、入力および前処理タスクを作成します。

    サブタスクが完全にロードされたイメージへのポインターを通信するための手段としてフューチャーを使用します。

    2番目のタスク、std :: vector builderタスク  join を呼び出すだけです  完了時に結果を取得し、それらを std::vector に追加する先物  アレイ。

    このように構造化することをお勧めします。なぜなら、実行しているIOおよび前処理は、ベクトルに値を設定するよりも時間がかかると思われるからです。を使用してタスク スレッドの代わりに、作業の並行部分を直接調整できます。

    具体的な要素からあまりにも抽象化されていないことを願っています。これは、利用可能なハードウェアを飽和させ、スラッシュ/ロックの競合を減らすことでバランスが取れているパターンであり、将来デバッグするときに理解できます。

  • 私は3つの別々のキュー、 ready_for_preprocessing を使用します  InputStreamによって供給され、前処理 ready_for_computation によって消費されます  前処理により供給され、計算により消費され、 ready_for_output  これは、計算によって供給され、OutputStreamによって消費されます。

    各キューは、アクセスミューテックス(キューへのアイテムの実際の追加と削除を制御する)と「イメージ利用可能」セマフォ(アイテムが利用可能であることを通知する)および実際のキューを持つクラスに配置する必要があります。 。これにより、各スレッドの複数のインスタンスが許可されます。このようなもの:

    class imageQueue
    {
        std::deque<ImageRead> m_readImages;
        std::mutex            m_changeQueue;
        Semaphore             m_imagesAvailable;
        public:
        bool addImage( ImageRead );
        ImageRead getNextImage();
    }
    
    

    addImage()  m_changeQueueミューテックスを取得し、イメージをm_readImagesに追加してから、m_imagesAvailableに信号を送ります。

    getNextImage()  m_imagesAvailableで待機します。シグナルが送信されると、m_changeQueueを取得し、リストから次の画像を削除して返します。

    cf. http://en.cppreference.com/w/cpp/thread

  • 「各操作を個別のスレッドで実行する必要がある」という質問を無視すると、処理するオブジェクトがスレッドからスレッドに移動するように見えます。実際には、それらは一度に1つのスレッドによってのみ一意に所有されます(他のスレッドからのデータにアクセスする必要のあるスレッドはありません)。 C ++でそれを表現する方法があります: std::unique_ptr

    各ステップは、所有するイメージでのみ機能します。必要なのは、スレッドセーフな方法を見つけて、プロセスのステップごとに画像の所有権を1つずつ移動することです。つまり、クリティカルセクションはタスク間の境界にのみ存在します。これらは複数あるため、抽象化することは合理的です。

    class ProcessBoundary
    {
    public:
      void setImage(std::unique_ptr<ImageRead> newImage)
      {
        while (running)
        {
          {
            std::lock_guard<m_mutex> guard;
            if (m_imageToTransfer == nullptr)
            {
              // Image has been transferred to next step, so we can place this one here.
              m_imageToTransfer = std::move(m_newImage);
              return;
            }
          }
          std::this_thread::yield();
        }
      }
      std::unique_ptr<ImageRead> getImage()
      {
        while (running)
        {
          {
            std::lock_guard<m_mutex> guard;
            if (m_imageToTransfer != nullptr)
            {
              // Image has been transferred to next step, so we can place this one here.
              return std::move(m_imageToTransfer);
            }
          }
          std::this_thread::yield();
        }
      }
      void stop()
      {
        running = false;
      }
    private:
      std::mutex m_mutex;
      std::unique_ptr<ImageRead> m_imageToTransfer;
      std::atomic<bool> running; // Set to true in constructor
    };
    
    

    その後、プロセスのステップは getImage() で画像を要求します 、その関数が戻ると一意に所有します。彼らはそれを処理して setImage に渡します  次の ProcessBoundary の 。

    おそらく、条件変数を使用してこれを改善するか、スレッドが次のイメージの処理に戻ることができるようにこのクラスにキューを追加することができます。ただし、一部のステップが他のステップよりも高速である場合、必然的に遅いステップによって停止されます。

  • これは設計パターンの問題です。並行性の設計パターンについて読んで、あなたを助ける何かがあるかどうかを確認することをお勧めします。

    次の順次プロセスに並行性を追加したい場合。

    InputStream->Pre-Processing->Computation->OutputStream
    
    

    次に、アクティブオブジェクトのデザインパターンを使用することをお勧めします。この方法では、各プロセスは前のステップでブロックされず、同時に実行できます。実装も非常に簡単です(実装は次のとおりです。 http://www.drdobbs.com/parallel/prefer-using-active-objects-instead-of-n/225700095)

    DTOを共有する各スレッドに関する質問について。これは、DTOのラッパーで簡単に解決できます。ラッパーには、書き込み関数と読み取り関数が含まれます。書き込み関数はmutextでブロックし、読み取りはconstデータを返します。

    しかし、私はあなたの問題はデザインにあると思います。説明したようにプロセスがシーケンシャルである場合、各プロセスがデータを共有しているのはなぜですか?現在のプロセスが完了したら、データを次のプロセスに渡す必要があります。つまり、各プロセスを分離する必要があります。

あなたの答え