from itertools import product from multiprocessing import current_process, Pool import signal from subprocess import DEVNULL, PIPE, Popen, TimeoutExpired chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ' # Prevent zombie processes signal.signal(signal.SIGCHLD, signal.SIG_IGN) # Current test case pairs used for testing (local to worker process) current_pairs = None def set_pairs(pairs): global current_pairs current_pairs = pairs def check_pair(script, instr, outstr): process = Popen( ["/usr/bin/env", "bash", "-c", "--", script], stdin=PIPE, stdout=PIPE, stderr=DEVNULL, ) try: stdout, stderr = process.communicate(instr.encode(), timeout=5) return process.returncode == 0 and stdout == outstr.encode() except TimeoutExpired: try: process.kill() stdout, stderr = process.communicate() except ProcessLookupError: pass return False def check_script(script): for pair in current_pairs: if not check_pair(script, *pair): return script, False return script, True def generate_scripts(max_length): for length in range(max_length + 1): for letters in product(chars, repeat=length): yield "".join(letters) def find_script(pairs, max_length): candidates = [] chars_count = len(chars) num_tasks = int((chars_count ** (max_length + 1) - 1) / (chars_count - 1)) done_tasks = 0 with Pool(processes=8, initializer=set_pairs, initargs=(pairs,)) as pool: for script, result in pool.imap_unordered( check_script, generate_scripts(max_length), chunksize=10, ): done_tasks += 1 if done_tasks % 10000 == 0: print(f"Progress: {done_tasks}/{num_tasks}") if result: print("> Found candidate:", script) candidates.append(script) print("Candidates:", candidates) if __name__ == '__main__': print("\nSearching for identity") find_script(( ("1", "1"), ("42", "42"), ("1984", "1984"), ), max_length=3) print("\nSearching for successor") find_script(( ("1", "2"), ("42", "43"), ("1984", "1985"), ), max_length=5)