bugfix> scala > 投稿

次のように、Apache Sparkにデータセットがあるとします。

+---+--------------------+
| id|                 vec|
+---+--------------------+
|  0|[1, 2, 3, 4]        |
|  0|[2, 3, 4, 5]        |
|  0|[6, 7, 8, 9]        |
|  1|[1, 2, 3, 4]        |
|  1|[5, 6, 7, 8]        |
+---+--------------------+

そして、vecは List です Doubles の 。

次のように、IDとそのIDに関連付けられたベクトルの平均を含むデータセットを作成するにはどうすればよいですか。

+---+--------------------+
| id|                 vec|
+---+--------------------+
|  0|[3, 4, 5, 6]        |
|  1|[3, 4, 5, 6]        |
+---+--------------------+

前もって感謝します!

回答 2 件
  • DataSetの入力スキーマに一致するケースクラスを作成しました。 IDでデータセットをグループ化し、foldLeftを使用して、グループ化されたデータセットのベクトル内の各idxの平均を累積しました。

    scala> case class Test(id: Int, vec: List[Double])
    defined class Test
    scala> val inputList = List(
         |   Test(0, List(1, 2, 3, 4)),
         |   Test(0, List(2, 3, 4, 5)),
         |   Test(0, List(6, 7, 8, 9)),
         |   Test(1, List(1, 2, 3, 4)),
         |   Test(1, List(5, 6, 7, 8)))
    inputList: List[Test] = List(Test(0,List(1.0, 2.0, 3.0, 4.0)), Test(0,List(2.0, 3.0, 4.0, 5.0)), Test(0,List(6.0, 7.0, 8.0, 9.0)), Test(1,
    List(1.0, 2.0, 3.0, 4.0)), Test(1,List(5.0, 6.0, 7.0, 8.0)))
    scala>
    scala> import spark.implicits._
    import spark.implicits._
    scala> val ds = inputList.toDF.as[Test]
    ds: org.apache.spark.sql.Dataset[Test] = [id: int, vec: array<double>]
    scala> ds.show(false)
    +---+--------------------+
    |id |vec                 |
    +---+--------------------+
    |0  |[1.0, 2.0, 3.0, 4.0]|
    |0  |[2.0, 3.0, 4.0, 5.0]|
    |0  |[6.0, 7.0, 8.0, 9.0]|
    |1  |[1.0, 2.0, 3.0, 4.0]|
    |1  |[5.0, 6.0, 7.0, 8.0]|
    +---+--------------------+
    
    scala>
    scala> val outputDS = ds.groupByKey(_.id).mapGroups {
         |   case (key, valuePairs) =>
         |     val vectors = valuePairs.map(_.vec).toArray
         |     // compute the length of the vectors for each key
         |     val len = vectors.length
         |     // get average for each index in vectors
         |     val avg = vectors.head.indices.foldLeft(List[Double]()) {
         |       case (acc, idx) =>
         |         val sumOfIdx = vectors.map(_ (idx)).sum
         |         acc :+ (sumOfIdx / len)
         |     }
         |     Test(key, avg)
         | }
    outputDS: org.apache.spark.sql.Dataset[Test] = [id: int, vec: array<double>]
    scala> outputDS.show(false)
    +---+--------------------+
    |id |vec                 |
    +---+--------------------+
    |1  |[3.0, 4.0, 5.0, 6.0]|
    |0  |[3.0, 4.0, 5.0, 6.0]|
    +---+--------------------+
    
    

    お役に立てれば!

あなたの答え