bugfix> python > 投稿

私は count にしようとしていますSpark APIを使用したmllibのFP成長によって生成される頻繁なアイテムセット。私のSparkはバージョン1.5.1です。以下は私のコードです:

#!/usr/bin/python 
from pyspark.mllib.fpm import FPGrowth
from pyspark import SparkContext,SparkConf
from pyspark import HiveContext
import os
os.environ['PYSPARK_PYTHON']='/usr/bin/python'
appName = "FP_growth"
sc = SparkContext()
sql_context = HiveContext(sc)
def read_spu(prod):#prod_code):
    sql = """
        select 
        t.orderno_nosplit, 
        t.prod_code, 
        t.item_code, 
        sum(t.item_qty) as item_qty
        from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
        where t.prod_code='%s'
        group by t.prod_code, t.orderno_nosplit, t.item_code  """%prod
    spu_result = sql_context.sql(sql)
    return spu_result.cache()
if __name__ == '__main__':
    spu=read_spu('6727780')  
    conf=0.7             
    trans=spu.rdd.repartition(100).map(lambda x: (x[0],x[2])).groupByKey().mapValues(list).values().cache()
    model = FPGrowth.train(trans, 0.01, 100) 
    freq_count = model.freqItemsets().count()
    print 'freq_count:',freq_count  
    sc.stop()

入力データはHadoopから読み取られ、データはそれほど大きくなく、約20000行のみです。ただし、 .count の段階ではスクリプトの動作が非常に遅くなります 。理由はわかりません。パフォーマンスから、データスキューが原因のようです。ただし、出力データは大きくありません(タスクごとに約100KBのみ)。

クラスターには、320コアの8つのノードと1.56 Tの合計メモリがあります(1人のユーザーだけでなく)。私のスパーク送信スクリプトは spark-submit --master yarn-cluster --executor-memory 30g --num-executors 20 --executor-cores 5 FP_growth.py です

添付ファイルは、実行時のパフォーマンスのスクリーンプリントです。

使用リソース

アクティブステージ 

タスク

回答 1 件
  • repartition(100)  良いアイデアとは思えませんが、どのステージが最も時間がかかっているかを確認できます。 20000レコードしかないため。本国送還では、各パーティションで200レコードに分割する必要があります。

    データサイズが大きくない場合、本国送還する必要はまったくありません。または、40〜60個のパーティション(2または3)* executorなしで試してください。

あなたの答え