SupySandbox: Import last changes made to fschfsch (by @haypo).

master
Valentin Lorentz 2012-08-13 01:15:19 +02:00
parent f6f16d6d83
commit 913e461ff7
1 changed files with 57 additions and 88 deletions

View File

@ -51,6 +51,7 @@ import time
import random
import select
import signal
import contextlib
import resource as R
import supybot.utils as utils
from supybot.commands import *
@ -69,7 +70,7 @@ def createSandboxConfig():
'regex',
'unicodedata', # flow wants u'\{ATOM SYMBOL}' :-)
'future',
'code',
#'code',
'time',
'datetime',
'math',
@ -77,109 +78,77 @@ def createSandboxConfig():
'random',
'encodings',
)
cfg.max_memory = EVAL_MAXMEMORYBYTES
cfg.timeout = EVAL_MAXTIMESECONDS
cfg.allowModule('sys',
'version', 'hexversion', 'version_info')
return cfg
def _evalPython(line, locals):
locals = dict(locals)
try:
if "\n" in line:
raise SyntaxError()
code = compile(line, "<irc>", "single")
except SyntaxError:
code = compile(line, "<irc>", "exec")
exec code in locals
evalPythonInSandbox = r"""
try:
if "\n" in line:
raise SyntaxError()
code = compile(line, "<irc>", "single")
except SyntaxError:
code = compile(line, "<irc>", "exec")
exec code in namespace, namespace
del code
del namespace
"""
def evalPython(line, locals=None):
sandbox = S.Sandbox(config=createSandboxConfig())
if locals is not None:
locals = dict(locals)
else:
locals = dict()
try:
sandbox.call(_evalPython, line, locals)
config = createSandboxConfig()
sandbox = S.Sandbox(config=config)
if locals is None:
locals = {}
sandbox.execute(
evalPythonInSandbox,
locals={'namespace': locals, 'line': line})
except BaseException, e:
print 'Error: [%s] %s' % (e.__class__.__name__, str(e))
except:
print 'Error: <unknown exception>'
sys.stdout.flush()
def check_output(expr, expected, locals=None):
from cStringIO import StringIO
original_stdout = sys.stdout
try:
output = StringIO()
sys.stdout = output
evalPython(expr, locals)
stdout = output.getvalue()
assert stdout == expected, "%r != %r" % (stdout, expected)
finally:
sys.stdout = original_stdout
@contextlib.contextmanager
def capture_stdout():
import sys
import tempfile
stdout_fd = sys.stdout.fileno()
with tempfile.TemporaryFile(mode='w+b') as tmp:
stdout_copy = os.dup(stdout_fd)
try:
os.dup2(tmp.fileno(), stdout_fd)
yield tmp
finally:
os.dup2(stdout_copy, stdout_fd)
os.close(stdout_copy)
def runTests():
# single
check_output('1+1', '2\n')
check_output('1; 2', '1\n2\n')
check_output(
# written in a single line
"prime=lambda n,i=2:"
"False if n%i==0 else prime(n,i+1) if i*i<n else True; "
"prime(17)",
"True\n")
def evalLine(line, locals):
with capture_stdout() as stdout:
evalPython(line, locals)
stdout.seek(0)
txt = stdout.read()
# exec
check_output('def f(): print("hello")\nf()', 'hello\n')
check_output('print 1\nprint 2', '1\n2\n')
check_output('text', "'abc'\n", {'text': 'abc'})
return True
print("Output: %r" % txt)
def childProcess(line, w, locals):
# reseed after a fork to avoid generating the same sequence for each child
random.seed()
sys.stdout = sys.stderr = os.fdopen(w, 'w')
R.setrlimit(R.RLIMIT_CPU, (EVAL_MAXTIMESECONDS, EVAL_MAXTIMESECONDS))
R.setrlimit(R.RLIMIT_AS, (EVAL_MAXMEMORYBYTES, EVAL_MAXMEMORYBYTES))
R.setrlimit(R.RLIMIT_NPROC, (0, 0)) # 0 forks
evalPython(line, locals)
def handleChild(childpid, r):
txt = ''
if __import__("__builtin__").any(select.select([r], [], [], TIMEOUT)):
txt = os.read(r, OUT_MAXLEN + 1)
os.close(r)
n = 0
while n < 6:
pid, status = os.waitpid(childpid, os.WNOHANG)
if pid:
break
time.sleep(.5)
n += 1
if not pid:
import signal
os.kill(childpid, signal.SIGKILL)
raise SandboxError('Timeout')
elif os.WIFEXITED(status):
return txt.rstrip()
elif os.WIFSIGNALED(status):
raise SandboxError('Killed')
txts = txt.rstrip().split('\n')
if len(txts) > 1:
txt = txts[0].rstrip() + ' [+ %d line(s)]' % (len(txts) - 1)
else:
txt = txts[0].rstrip()
return 'Output: ' + txt
def handle_line(line):
r, w = os.pipe()
childpid = os.fork()
if not childpid:
os.close(r)
childProcess(line, w, {})
os._exit(0)
else:
os.close(w)
result = handleChild(childpid, r)
return result
if IN_MAXLEN < len(line):
return '(command is too long: %s bytes, the maximum is %s)' % (len(line), IN_MAXLEN)
print("Process %s" % repr(line))
result = evalLine(line, {})
print("=> %s" % repr(result))
return result
class SupySandbox(callbacks.Plugin):
"""Add the help for "@plugin help SupySandbox" here
@ -187,15 +156,15 @@ class SupySandbox(callbacks.Plugin):
_parser = re.compile(r'(.*sandbox)? (?P<code>.*)')
_parser = re.compile(r'(.?sandbox)? (?P<code>.*)')
def sandbox(self, irc, msg, args):
def sandbox(self, irc, msg, args, code):
"""<code>
Runs Python code safely thanks to pysandbox"""
code = self._parser.match(msg.args[1]).group('code')
try:
irc.reply(handle_line(code.replace(' $$ ', '\n')))
except SandboxError, e:
irc.error('; '.join(e.args))
sandbox = wrap(sandbox, ['text'])
def runtests(self, irc, msg, args):
irc.reply(runTests())