bugfix> java > 投稿

spark rddでneo4jノード情報を取得していますneo4j-sparkコネクタ。 RDD<Row> を入手できます呼び出すことにより loadNodeRdds() 方法。しかし、データフレーム呼び出しを取得しようとすると loadDataframe() 方法、例外をスローします(主な質問が最終的に異なることが判明する可能性があるため、スタックトレースが長すぎる場合は、スタックトレースをスキップします)。

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true) AS Condition#4
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]
(skipped a lot of rows as it made question reach its character limit)

上記の大きなスタックレースから多くを得ることができませんでした。

だから私は JavaRDD<Row> を取りましたそしてそれを DataFrame<Row> に変換してみましたプログラムで指定することにより StructType スキーマ。

StructType schema = loadSchema();
Dataset<Row> df = ss.createDataFrame(neo4jJavaRdd , schema);

これはやや似た例外を投げました。

私がやったことは、単一のneo4jノードの個々のプロパティを取得し、 Row を準備したことですそして、 JavaRDD<Row> それから、次のようにプログラムでスキーマを指定して、そこからデータフレームを作成しようとしました:

Row row1 = RowFactory.create("val1", " val2", "val3", "val4", "val5", "val6", 152214d, "val7", 152206d, 11160d, "val8");
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("attr1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr3", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr4", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr5", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr6", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr7", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attrd3", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr8", DataTypes.StringType, true));

これはうまくいきました。

だから私はすべてのノードをチェックし、すべてのノードではないことに気付きました(つまり、すべて Row s in JavaRDD<Row> )同じ数の属性があります。これにより、データフレームの準備が失敗する必要があります。必要なくプログラムで何らかの方法で処理できますかpojoを作成して指定する。

回答 2 件
  • 前述のようにRDDを使用してこれを実行するには、次のようにします。

    (RDD +スキーマ)をデータフレームに変換する前に、 (マップ関数を使用して)RDDを調べ、各行にすべての関連属性があることを確認します。

    行に属性が存在しない場合は、属性を追加してnullにします。

    その後、RDD行のスキーマは同じになり、データフレームへの変換が機能します。

  • neo4j-spark-connectorを使用して作業しているときに気付いたことがありますが、ここで共有したいことがあります。

    一般に、データフレームを準備する場合、neo4jのオブジェクトタイプ、特にノードとリレーションシップを返すことは好ましくありません。それは以下のようなノードを返すことは好ましくありません:

    MATCH(n {id:'xyz'}) RETURN n
    
    

    代わりにプロパティを返します:

    MATCH(n {id:'xyz'}) RETURN properties(n)
    
    

    すべてのノードに同じ数のプロパティがないことを確信できない場合、プロパティを返してJavaRDDを取得する代わりに、明示的に返す方が適切です。そのため、 JavaRDD を処理する必要があります  もう一度 NULL を追加します  存在しないプロパティの場合。これは、これを行う代わりに:

    MATCH(n {id:'xyz'}) RETURN properties(n)
    
    

    この方法で戻ります。

    MATCH(n {id:'xyz'}) RETURN n.prop1 AS prop1, n.prop2 AS prop2, ..., n.propN AS propN
    
    

    Neo4j自体が NULL を追加します 次の図に示すように、存在しないプロパティの場合、再度繰り返す必要はありません。これを返すことで、 loadDataframe() を使用してneo4jノード情報を直接取得できました  方法。

あなたの答え