bugfix> python > 投稿

に似ているこの 他の多くの質問、同じ構造の多くのネストされたループ(最大16)があります。

問題:4文字のアルファベットがあり、長さ16の可能なすべての単語を取得したい。これらの単語をフィルタリングする必要があります。これらはDNAシーケンス(したがって4文字:ATGC)です。フィルタリングルールは非常に単純です。

  • XXXX部分文字列なし(つまり、同じ文字を連続して3回以上使用できない、ATGCATGGGGCTAは「悪い」)
  • 特定のGC含有量、つまりGの数+ Cの数は特定の範囲(40〜50%)である必要があります。 ATATATATATATAとGCGCGCGCGCGCは悪い言葉です

itertools.product そのために動作しますが、ここのデータ構造は巨大になります(4 ^ 16 = 4 * 10 ^ 9ワード)

さらに重要なのは、 product を使用する場合 、それから私はまだそれを除外するために各要素を通過する必要があります。したがって、40億ステップ倍2

私の現在のソリューションはネストされた for ですループ

alphabet = ['a','t','g','c']
for p1 in alphabet:
    for p2 in alphabet:
       for p3 in alphabet:
       ...skip...
          for p16 in alphabet:
             word = p1+p2+p3+...+p16
             if word_is_good(word):
                 good_words.append(word)
                 counter+=1

16個のネストされたループなしでそれをプログラムする良いパターンはありますか?効率的に並列化する方法はありますか(マルチコアまたは複数のEC2ノード上) また、そのパターンで word_is_good? をプラグインできますループの途中で確認してください:ひどく始まる単語は悪いです

...skip...
for p3 in alphabet:
   word_3 = p1+p2+p3
   if not word_is_good(word_3):
     break
   for p4 in alphabet:
     ...skip...

回答 2 件
  • from itertools import product, islice
    from time import time
    length = 16
    def generate(start, alphabet):
        """
        A recursive generator function which works like itertools.product
        but restricts the alphabet as it goes based on the letters accumulated so far.
        """
        if len(start) == length:
            yield start
            return
        gcs = start.count('g') + start.count('c')
        if gcs >= length * 0.5:
            alphabet = 'at'
        # consider the maximum number of Gs and Cs we can have in the end
        # if we add one more A/T now
        elif length - len(start) - 1 + gcs < length * 0.4:
            alphabet = 'gc'
        for c in alphabet:
            if start.endswith(c * 3):
                continue
            for string in generate(start + c, alphabet):
                yield string
    def brute_force():
        """ Straightforward method for comparison """
        lower = length * 0.4
        upper = length * 0.5
        for s in product('atgc', repeat=length):
            if lower <= s.count('g') + s.count('c') <= upper:
                s = ''.join(s)
                if not ('aaaa' in s or
                        'tttt' in s or
                        'cccc' in s or
                        'gggg' in s):
                    yield s
    def main():
        funcs = [
            lambda: generate('', 'atgc'),
            brute_force
        ]
        # Testing performance
        for func in funcs:
            # This needs to be big to get an accurate measure,
            # otherwise `brute_force` seems slower than it really is.
            # This is probably because of how `itertools.product`
            # is implemented.
            count = 100000000
            start = time()
            for _ in islice(func(), count):
                pass
            print(time() - start)
        # Testing correctness
        global length
        length = 12
        for x, y in zip(*[func() for func in funcs]):
            assert x == y, (x, y)
    main()
    
    

    私のマシンでは、 generate   brute_force より少し速かった 、約390秒に対して425です。これは、私が作成できる速度とほぼ同じでした。完全に処理するには約2時間かかると思います。もちろん、実際に処理するのにもっと時間がかかります。問題は、制約が完全なセットをあまり削減しないことです。

    16個のプロセスでこれを並行して使用する方法の例を次に示します。

    from multiprocessing.pool import Pool
    alpha = 'atgc'
    def generate_worker(start):
        start = ''.join(start)
        for s in generate(start, alpha):
            print(s)
    Pool(16).map(generate_worker, product(alpha, repeat=2))
    
    

  • あなたはたまたま長さ4のアルファベットを持っているので(または「2のべき乗の整数」)、文字列内の連続した文字をチェックする代わりに、整数IDとビット単位の演算を使用するという考え方が浮かんできます。 alphabet の各文字に整数値を割り当てることができます 、簡単にするために、各文字に対応するインデックスを使用します。

    例:

    65463543 <サブ>10 = 3321232103313 <サブ>4 = 'aaaddcbcdcbaddbd'

    次の関数は、 alphabet を使用して、基数10の整数から単語に変換します 。

    def id_to_word(word_id, word_len):
        word = ''
        while word_id:
            rem = word_id & 0x3  # 2 bits pet letter
            word = ALPHABET[rem] + word
            word_id >>= 2  # Bit shift to the next letter
        return '{2:{0}>{1}}'.format(ALPHABET[0], word_len, word)
    
    

    次に、単語が"良い" 整数IDに基づきます。次の方法は id_to_word と同様の形式です 、連続する文字を追跡するためにカウンターが使用されることを除きます。関数は False を返します  同一の連続文字の最大数を超える場合、そうでない場合は True を返します 。

    def check_word(word_id, max_consecutive):
        consecutive = 0
        previous = None
        while word_id:
            rem = word_id & 0x3
            if rem != previous:
                    consecutive = 0
            consecutive += 1
            if consecutive == max_consecutive + 1:
                return False
            word_id >>= 2
            previous = rem
        return True
    
    

    事実上、各単語を基数4の整数と考えています。アルファベットの長さが「2のべき乗」 値、次にモジュロ % alpha_len  および整数除算 // alpha_len   & log2(alpha_len) の代わりに使用できます  および >> log2(alpha_len)  それぞれ、はるかに長い時間がかかりますが。

    最後に、指定された word_len のすべての良い単語を見つける 。整数値の範囲を使用する利点は、 for-loop の数を減らすことができることです。 s word_len のコード内   2 へ 、ただし、外側のループは非常に大きくなります。これにより、適切な単語検索タスクのより友好的なマルチプロセッシングが可能になります。また、適切な単語に対応する最小および最大のIDを決定するための簡単な計算を追加しました。

    ALPHABET = ('a', 'b', 'c', 'd')
    def find_good_words(word_len):
        max_consecutive = 3
        alpha_len = len(ALPHABET)
        # Determine the words corresponding to the smallest and largest ids
        smallest_word = ''  # aaabaaabaaabaaab
        largest_word = ''   # dddcdddcdddcdddc
        for i in range(word_len):
            if (i + 1) % (max_consecutive + 1):
                smallest_word = ALPHABET[0] + smallest_word
                largest_word = ALPHABET[-1] + largest_word
            else:
                smallest_word = ALPHABET[1] + smallest_word
                largest_word = ALPHABET[-2] + largest_word
        # Determine the integer ids of said words
        trans_table = str.maketrans({c: str(i) for i, c in enumerate(ALPHABET)})
        smallest_id = int(smallest_word.translate(trans_table), alpha_len)  # 1077952576
        largest_id = int(largest_word.translate(trans_table), alpha_len)    # 3217014720
        # Find and store the id's of "good" words
        counter = 0
        goodies = []
        for i in range(smallest_id, largest_id + 1):
            if check_word(i, max_consecutive):
                goodies.append(i)
                counter += 1
    
    

    このループでは、さらに処理するために単語を使用する場合に備えて、実際の単語自体ではなく、単語のIDを具体的に保存しました。ただし、単語の直後にいる場合は、2行目から最後の行を変更して goodies.append(id_to_word(i, word_len)) と読みます 。

    注意:  MemoryError を受け取ります   word_len >= 14 のすべての適切なIDを保存しようとするとき 。これらのID /単語を何らかのファイルに書き込むことをお勧めします!

あなたの答え