spark rddでneo4jノード情報を取得していますneo4j-sparkコネクタ。
RDD<Row>
を入手できます呼び出すことにより
loadNodeRdds()
方法。しかし、データフレーム呼び出しを取得しようとすると
loadDataframe()
方法、例外をスローします(主な質問が最終的に異なることが判明する可能性があるため、スタックトレースが長すぎる場合は、スタックトレースをスキップします)。
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableMap is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true) AS Condition#4
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
: :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
: : +- input[0, org.apache.spark.sql.Row, true]
: +- 0
:- null
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, Condition)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]
(skipped a lot of rows as it made question reach its character limit)
上記の大きなスタックレースから多くを得ることができませんでした。
だから私は
JavaRDD<Row>
を取りましたそしてそれを
DataFrame<Row>
に変換してみましたプログラムで指定することにより
StructType
スキーマ。
StructType schema = loadSchema();
Dataset<Row> df = ss.createDataFrame(neo4jJavaRdd , schema);
これはやや似た例外を投げました。
私がやったことは、単一のneo4jノードの個々のプロパティを取得し、
Row
を準備したことですそして、
JavaRDD<Row>
それから、次のようにプログラムでスキーマを指定して、そこからデータフレームを作成しようとしました:
Row row1 = RowFactory.create("val1", " val2", "val3", "val4", "val5", "val6", 152214d, "val7", 152206d, 11160d, "val8");
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("attr1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr2", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr3", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr4", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr5", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attr6", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd1", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr7", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("attrd2", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attrd3", DataTypes.DoubleType, true));
fields.add(DataTypes.createStructField("attr8", DataTypes.StringType, true));
これはうまくいきました。
だから私はすべてのノードをチェックし、すべてのノードではないことに気付きました(つまり、すべて
Row
s in
JavaRDD<Row>
)同じ数の属性があります。これにより、データフレームの準備が失敗する必要があります。必要なくプログラムで何らかの方法で処理できますかpojoを作成して指定する。
neo4j-spark-connectorを使用して作業しているときに気付いたことがありますが、ここで共有したいことがあります。
一般に、データフレームを準備する場合、neo4jのオブジェクトタイプ、特にノードとリレーションシップを返すことは好ましくありません。それは以下のようなノードを返すことは好ましくありません:
MATCH(n {id:'xyz'}) RETURN n
代わりにプロパティを返します:
MATCH(n {id:'xyz'}) RETURN properties(n)
すべてのノードに同じ数のプロパティがないことを確信できない場合、プロパティを返してJavaRDDを取得する代わりに、明示的に返す方が適切です。そのため、
JavaRDD
を処理する必要があります もう一度NULL
を追加します 存在しないプロパティの場合。これは、これを行う代わりに:MATCH(n {id:'xyz'}) RETURN properties(n)
この方法で戻ります。
MATCH(n {id:'xyz'}) RETURN n.prop1 AS prop1, n.prop2 AS prop2, ..., n.propN AS propN
Neo4j自体が
NULL
を追加します 次の図に示すように、存在しないプロパティの場合、再度繰り返す必要はありません。これを返すことで、loadDataframe()
を使用してneo4jノード情報を直接取得できました 方法。
前述のようにRDDを使用してこれを実行するには、次のようにします。
(RDD +スキーマ)をデータフレームに変換する前に、 (マップ関数を使用して)RDDを調べ、各行にすべての関連属性があることを確認します。
行に属性が存在しない場合は、属性を追加してnullにします。
その後、RDD行のスキーマは同じになり、データフレームへの変換が機能します。