Save invalid scripts

This commit is contained in:
Mattéo Delabre 2021-01-27 19:20:56 +01:00
parent 8bc925d01d
commit d969624ff9
Signed by: matteo
GPG Key ID: AE3FBD02DC583ABB
6 changed files with 183 additions and 95 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
output

View File

@ -2,5 +2,7 @@ FROM alpine
RUN apk add --no-cache bash python3
RUN adduser -D user
USER user
ADD runall.py /
CMD ["python3", "/runall.py"]
WORKDIR /home/user
ADD autogolf /home/user/autogolf
ADD run.py /home/user
CMD ["python3", "run.py"]

View File

@ -4,5 +4,6 @@ How to run:
```
docker image build --quiet --tag runall-image .
docker container run --tty --rm --read-only --memory=128m runall-image
mkdir output
docker container run --tty --rm --mount type=bind,src="$(realpath output)",dst=/home/user/output --memory=128m runall-image
```

139
autogolf/__init__.py Normal file
View File

@ -0,0 +1,139 @@
from enum import Enum, auto
from functools import partial
from itertools import product
from multiprocessing import Pool
from subprocess import DEVNULL, PIPE, Popen, TimeoutExpired
from typing import List
chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
class Status(Enum):
"""Possible correctness statuses for a program."""
Correct = auto()
WrongAnswer = auto()
Timeout = auto()
Invalid = auto()
def check_pair(script, instr, outstr, timeout) -> Status:
"""
Check that a Bash script outputs a given string when given a input string.
:param script: script to execute
:param instr: string given to the scripts stdin
:param outstr: expected stdout value
:param timeout: maximum allowed time in seconds
:returns: status indicating how the script behaved
"""
process = Popen(
[
"/bin/bash", "--restricted", "-c", "--",
"trap 'kill -9 $(jobs -p) && wait' SIGINT SIGTERM EXIT;\n"
+ script,
],
stdin=PIPE,
stdout=PIPE,
stderr=DEVNULL,
)
try:
stdout, stderr = process.communicate(instr.encode(), timeout)
if process.returncode in (2, 126, 127, 128):
return Status.Invalid
if process.returncode != 0 or stdout != outstr.encode():
return Status.WrongAnswer
return Status.Correct
except TimeoutExpired:
try:
process.terminate()
stdout, stderr = process.communicate()
except ProcessLookupError:
pass
return Status.Timeout
def check_script(pairs, timeout, script) -> Status:
"""
Check that a Bash script satisfies a set of test cases.
:param pairs: input/expected output pairs
:param timeout: maximum allowed time in seconds
:param script: script to test
:returns: status indicating how the script behaved
"""
for pair in pairs:
status = check_pair(script, *pair, timeout)
if status != Status.Correct:
return script, status
return script, status
def generate_scripts(max_length):
"""
Generate all scripts up to a given length.
:param max_length: maximum length to generate
:yields: generated scripts
"""
for length in range(max_length + 1):
for letters in product(chars, repeat=length):
yield "".join(letters)
def find_script(
pairs,
max_length,
processes,
timeout,
out_log,
out_invalid,
) -> List[str]:
"""
Find scripts that satisfy the given set of test cases.
:param pairs: input/expected output pairs
:param max_length: maximum script length to test
:param processes: number of parallel processes to spawn
:param timeout: maximum allowed time in seconds for each script run
:param out_log: stream to which progress logs are written
:param out_invalid: stream to which invalid scripts are saved
:returns: list of matching scripts
"""
candidates = []
bound_check_script = partial(check_script, pairs, timeout)
chars_count = len(chars)
num_tasks = int((chars_count ** (max_length + 1) - 1) / (chars_count - 1))
done_tasks = 0
with Pool(processes) as pool:
for script, status in pool.imap_unordered(
bound_check_script,
generate_scripts(max_length),
chunksize=10,
):
done_tasks += 1
if done_tasks % 10000 == 0:
print(
f"Progress: {done_tasks}/{num_tasks} \
{done_tasks / num_tasks * 100:.1f}%",
file=out_log, flush=True
)
if status == Status.Correct:
print(
f"> Found candidate: '{script}'",
file=out_log, flush=True
)
candidates.append(script)
if status == Status.Invalid:
print(script, file=out_invalid)
return candidates

37
run.py Normal file
View File

@ -0,0 +1,37 @@
import autogolf
import sys
with open("output/invalid_scripts", "w") as out_invalid:
processes = 8
timeout = 5 # seconds
out_log = sys.stdout
print("\nSearching for identity")
identity = autogolf.find_script(
(
("1", "1"),
("42", "42"),
("1984", "1984"),
),
max_length=3,
processes=processes,
timeout=timeout,
out_log=out_log,
out_invalid=out_invalid
)
print("Candidates:", identity)
print("\nSearching for successor")
successor = autogolf.find_script(
(
("1", "2"),
("42", "43"),
("1984", "1985"),
),
max_length=5,
processes=processes,
timeout=timeout,
out_log=out_log,
out_invalid=out_invalid
)
print("Candidates:", successor)

View File

@ -1,92 +0,0 @@
from itertools import product
from multiprocessing import current_process, Pool
import signal
from subprocess import DEVNULL, PIPE, Popen, TimeoutExpired
chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
# Prevent zombie processes
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
# Current test case pairs used for testing (local to worker process)
current_pairs = None
def set_pairs(pairs):
global current_pairs
current_pairs = pairs
def check_pair(script, instr, outstr):
process = Popen(
["/usr/bin/env", "bash", "-c", "--", script],
stdin=PIPE,
stdout=PIPE,
stderr=DEVNULL,
)
try:
stdout, stderr = process.communicate(instr.encode(), timeout=5)
return process.returncode == 0 and stdout == outstr.encode()
except TimeoutExpired:
try:
process.kill()
stdout, stderr = process.communicate()
except ProcessLookupError:
pass
return False
def check_script(script):
for pair in current_pairs:
if not check_pair(script, *pair):
return script, False
return script, True
def generate_scripts(max_length):
for length in range(max_length + 1):
for letters in product(chars, repeat=length):
yield "".join(letters)
def find_script(pairs, max_length):
candidates = []
chars_count = len(chars)
num_tasks = int((chars_count ** (max_length + 1) - 1) / (chars_count - 1))
done_tasks = 0
with Pool(processes=8, initializer=set_pairs, initargs=(pairs,)) as pool:
for script, result in pool.imap_unordered(
check_script,
generate_scripts(max_length),
chunksize=10,
):
done_tasks += 1
if done_tasks % 10000 == 0:
print(f"Progress: {done_tasks}/{num_tasks}")
if result:
print(f"> Found candidate: '{script}'")
candidates.append(script)
print("Candidates:", candidates)
if __name__ == '__main__':
print("\nSearching for identity")
find_script((
("1", "1"),
("42", "42"),
("1984", "1984"),
), max_length=3)
print("\nSearching for successor")
find_script((
("1", "2"),
("42", "43"),
("1984", "1985"),
), max_length=5)