spark(pysparkを使用)をクラスターモードで実行し、JDBCを介してRDBMSからデータを読み取ります。 クエリによって情報を読み取ります(直接テーブルではありません)
私はnumPartitions、upperBoundなどのようなパーティションにオプションを使用します...
sql = (select ... )
そして
df=spark
.read
.jdbc(url=jdbcUrl, table=sql,
properties=connectionProperties, column="brand_id", lowerBound=1,
upperBound=12000,numPartitions=10000 )
残念ながら、Spark変換は、生成されたクエリの最後にWHERE句のオプションを分割するため、PostGreSQLはインデックスを使用せずにテーブル全体を読み取ります!
私はそのようなクエリを1つ持っています
SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM
( select ... )
tab WHERE brand_id >= 5 AND brand_id < 6
回答 1 件
関連記事
- ExchangeパーティショニングのSpark物理計画false/true
- PostgresQL:SELECTクエリのSEQUENCEを定義する
- PostgreSQLでsha256を使用してクエリ結果をハッシュする方法は?
- PostgreSQLインデックスはクエリで使用されません
- Postgresql selectmaxクエリには長い時間がかかります
- SELECTクエリPostgreSQLのキリル文字
- jsonbの子プロパティに基づいて除外するPostgresqlクエリ
- OracleからSpark SQLへのクエリ変換
- Spark SQLクエリが巨大なデータシャッフルの読み取り/書き込みを引き起こす
- Djangoとpostgresql db、ONLYキーワードでクエリを実行して親テーブルのみを検索しますか?
あなたがしようとしていることは、現在のSparkのバージョンでは不可能のようです。実行されたクエリは次のように構築されます。
(org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#computeを参照)
options.table
table
の(SELECT ...)ステートメントに対応 属性。サブクエリから列をロードする必要がある理由を説明できますか?このサブクエリ内で結合または他のSQL操作を行う場合、いつでも「回避策」を実行し、Spark SQLを使用してそれを行うことができます(結合、SQL操作など)。
編集:
説明したように、サブクエリを使用する理由はJSONB抽出です。明らかに、SQLネイティブ操作としてのパフォーマンスが向上しますが、Sparkを使用して処理を並列化する場合、IMOは次のようにSparkレベルでJSON処理を宣言する必要があります。
そして、コードは次のとおりです。
Sparkは
PostgresDialect
全体でJSONBフィールドをサポートします DBタイプをCatalystタイプに変換するメソッドでJSONBをStringType
と見なすこと :