bugfix> postgresql > 投稿

spark(pysparkを使用)をクラスターモードで実行し、JDBCを介してRDBMSからデータを読み取ります。 クエリによって情報を読み取ります(直接テーブルではありません)

私はnumPartitions、upperBoundなどのようなパーティションにオプションを使用します...

sql = (select ... )

そして

df=spark
.read
.jdbc(url=jdbcUrl, table=sql, 
properties=connectionProperties, column="brand_id", lowerBound=1, 
upperBound=12000,numPartitions=10000 )

残念ながら、Spark変換は、生成されたクエリの最後にWHERE句のオプションを分割するため、PostGreSQLはインデックスを使用せずにテーブル全体を読み取ります!

私はそのようなクエリを1つ持っています

SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM 
  ( select  ... ) 
tab WHERE brand_id >= 5 AND brand_id < 6 

回答 1 件
  • あなたがしようとしていることは、現在のSparkのバージョンでは不可能のようです。実行されたクエリは次のように構築されます。

      val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
        stmt = conn.prepareStatement(sqlText,
            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    
    

    (org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#computeを参照)

    options.table   table の(SELECT ...)ステートメントに対応  属性。

    サブクエリから列をロードする必要がある理由を説明できますか?このサブクエリ内で結合または他のSQL操作を行う場合、いつでも「回避策」を実行し、Spark SQLを使用してそれを行うことができます(結合、SQL操作など)。


    編集:

    説明したように、サブクエリを使用する理由はJSONB抽出です。明らかに、SQLネイティブ操作としてのパフォーマンスが向上しますが、Sparkを使用して処理を並列化する場合、IMOは次のようにSparkレベルでJSON処理を宣言する必要があります。

    CREATE TABLE jsonb_test (
      content jsonb
    );
    INSERT INTO jsonb_test (content) VALUES 
    ('{"first_name": "X", "last_name": "Y"}');
    
    

    そして、コードは次のとおりです。

    val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
      "dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
      "driver" -> "org.postgresql.Driver")
    val schema = StructType(Seq(
      StructField("first_name", StringType, true), StructField("last_name", StringType, true)
    ))
    import sparkSession.implicits._
    val personDataFrame = sparkSession.read
      .format("jdbc")
      .options(opts)
      .load()
      .withColumn("person", functions.from_json($"content", schema))
    val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)
    extractedJsonNames should have size 1
    extractedJsonNames(0) shouldEqual "[X,Y]"
    
    

    Sparkは PostgresDialect 全体でJSONBフィールドをサポートします  DBタイプをCatalystタイプに変換するメソッドでJSONBを StringType と見なすこと :

     private def toCatalystType(
      typeName: String,
      precision: Int,
      scale: Int): Option[DataType] = typeName match {
          case "bool" => Some(BooleanType)
          case "bit" => Some(BinaryType)
          case "int2" => Some(ShortType)
          case "int4" => Some(IntegerType)
          case "int8" | "oid" => Some(LongType)
          case "float4" => Some(FloatType)
          case "money" | "float8" => Some(DoubleType)
          case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
            Some(StringType)
    
    

あなたの答え