SupySandbox: add README

master
Valentin Lorentz 2011-02-16 12:07:35 +01:00
parent 79085326ee
commit b0291cedff
3 changed files with 81 additions and 61 deletions

View File

@ -1 +1,6 @@
Insert a description of your plugin here, with any notes, etc. about using it. SupySandbox provides a safe way to allow everybody to run Python code from IRC
thanks to haypo's pysandbox.
SupySandbox also aims to provide a powerful safe (i.e. anyone can run code
without lateral effect on the bot or the host computer) scripting language,
but it is blocked by technical problems.

View File

@ -26,7 +26,7 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
# pysandbox were originally writen by haypo (under the BSD license), # pysandbox were originally writen by haypo (under the BSD license),
# and fschfsch by Tila (under the WTFPL license). # and fschfsch by Tila (under the WTFPL license).
### ###
@ -59,9 +59,50 @@ import supybot.ircutils as ircutils
import supybot.callbacks as callbacks import supybot.callbacks as callbacks
from cStringIO import StringIO from cStringIO import StringIO
class SandboxError(Exception): class SandboxError(Exception):
pass pass
class SandboxInterface(object):
"""This is the base class for interfaces between parent and child."""
def __init__(self, pipes):
self._read, self._write = pipes
self._send('print', 'kikoo')
def _send(self, command, data):
assert ' ' not in command
assert ':' not in command
os.write(self._write, '%s:%s\n' % (command, data))
def _get(self):
lines = []
rawData = ''
newData = 'foo'
while newData != '':
newData = os.read(self._read, 256)
rawData += newData
for line in rawData.split('\n'):
if line == '':
continue
lines.append(line.split(':', 1))
return lines
class ChildSide(SandboxInterface):
def __init__(self, pipes):
super(ChildSide, self).__init__(pipes)
sys.stdout = sys.stderr = self
def write(self, data):
self._send('print', data)
def flush(self):
pass
class ParentSide(SandboxInterface):
def daemon(self):
"""Code that needs to be runned while the child is running"""
printed = ''
for command, data in self._get():
if command == 'print':
printed += data
return printed
def createSandboxConfig(): def createSandboxConfig():
cfg = S.SandboxConfig( cfg = S.SandboxConfig(
'stdout', 'stdout',
@ -81,17 +122,19 @@ def createSandboxConfig():
'version', 'hexversion', 'version_info') 'version', 'hexversion', 'version_info')
return cfg return cfg
def _evalPython(line, locals): def _evalPython(line, pipes, locals):
locals = dict(locals) interface = ChildSide(pipes)
locals_ = dict(locals)
locals_.update({'interface': interface})
try: try:
if "\n" in line: if "\n" in line:
raise SyntaxError() raise SyntaxError()
code = compile(line, "<irc>", "single") code = compile(line, "<irc>", "single")
except SyntaxError: except SyntaxError:
code = compile(line, "<irc>", "exec") code = compile(line, "<irc>", "exec")
exec code in locals exec code in locals_
def evalPython(line, locals=None): def evalPython(line, pipes, locals=None):
sandbox = S.Sandbox(config=createSandboxConfig()) sandbox = S.Sandbox(config=createSandboxConfig())
if locals is not None: if locals is not None:
@ -99,67 +142,35 @@ def evalPython(line, locals=None):
else: else:
locals = dict() locals = dict()
try: try:
sandbox.call(_evalPython, line, locals) sandbox.call(_evalPython, line, pipes, locals)
except BaseException, e: except BaseException, e:
print 'Error: [%s] %s' % (e.__class__.__name__, str(e)) print 'Error: [%s] %s' % (e.__class__.__name__, str(e))
except: except:
print 'Error: <unknown exception>' print 'Error: <unknown exception>'
sys.stdout.flush() sys.stdout.flush()
def check_output(expr, expected, locals=None): def childProcess(line, pipes, locals):
from cStringIO import StringIO
original_stdout = sys.stdout
try:
output = StringIO()
sys.stdout = output
evalPython(expr, locals)
stdout = output.getvalue()
assert stdout == expected, "%r != %r" % (stdout, expected)
finally:
sys.stdout = original_stdout
def runTests():
# single
check_output('1+1', '2\n')
check_output('1; 2', '1\n2\n')
check_output(
# written in a single line
"prime=lambda n,i=2:"
"False if n%i==0 else prime(n,i+1) if i*i<n else True; "
"prime(17)",
"True\n")
# exec
check_output('def f(): print("hello")\nf()', 'hello\n')
check_output('print 1\nprint 2', '1\n2\n')
check_output('text', "'abc'\n", {'text': 'abc'})
return True
def childProcess(line, w, locals):
# reseed after a fork to avoid generating the same sequence for each child # reseed after a fork to avoid generating the same sequence for each child
random.seed() random.seed()
sys.stdout = sys.stderr = os.fdopen(w, 'w')
R.setrlimit(R.RLIMIT_CPU, (EVAL_MAXTIMESECONDS, EVAL_MAXTIMESECONDS)) R.setrlimit(R.RLIMIT_CPU, (EVAL_MAXTIMESECONDS, EVAL_MAXTIMESECONDS))
R.setrlimit(R.RLIMIT_AS, (EVAL_MAXMEMORYBYTES, EVAL_MAXMEMORYBYTES)) R.setrlimit(R.RLIMIT_AS, (EVAL_MAXMEMORYBYTES, EVAL_MAXMEMORYBYTES))
R.setrlimit(R.RLIMIT_NPROC, (0, 0)) # 0 forks R.setrlimit(R.RLIMIT_NPROC, (0, 0)) # 0 forks
evalPython(line, locals) evalPython(line, pipes, locals)
def handleChild(childpid, r): def handleChild(childpid, pipes):
txt = '' txt = ''
if __import__("__builtin__").any(select.select([r], [], [], TIMEOUT)):
txt = os.read(r, OUT_MAXLEN + 1)
os.close(r)
n = 0 n = 0
while n < 6: interface = ParentSide(pipes)
while n < 20:
pid, status = os.waitpid(childpid, os.WNOHANG) pid, status = os.waitpid(childpid, os.WNOHANG)
if pid: if pid:
break break
time.sleep(.5) time.sleep(.1)
n += 1 n += 1
txt += interface.daemon()
if not pid: if not pid:
os.kill(childpid, signal.SIGKILL) os.kill(childpid, signal.SIGKILL)
raise SandboxError('Timeout') raise SandboxError('Timeout')
@ -169,35 +180,39 @@ def handleChild(childpid, r):
raise SandboxError('Killed') raise SandboxError('Killed')
def handle_line(line): def handle_line(line):
r, w = os.pipe() childToParent = os.pipe()
parentToChild = os.pipe()
child = (childToParent[0], parentToChild[1])
parent = (parentToChild[0], childToParent[1])
del childToParent, parentToChild
childpid = os.fork() childpid = os.fork()
if not childpid: if not childpid:
os.close(r) pipes = child
childProcess(line, w, {}) os.close(parent[0])
os.close(parent[1])
childProcess(line, pipes, {})
os._exit(0) os._exit(0)
else: else:
os.close(w) pipes = parent
result = handleChild(childpid, r) os.close(child[0])
os.close(child[1])
result = handleChild(childpid, pipes)
return result return result
class SupySandbox(callbacks.Plugin): class SupySandbox(callbacks.Plugin):
"""Add the help for "@plugin help SupySandbox" here """Add the help for "@plugin help SupySandbox" here
This should describe *how* to use this plugin.""" This should describe *how* to use this plugin."""
_parser = re.compile(r'(.?sandbox)? (?P<code>.*)') _parser = re.compile(r'(.?sandbox)? (?P<code>.*)')
def sandbox(self, irc, msg, args): def sandbox(self, irc, msg, args):
"""<code> """<code>
Runs Python code safely thanks to pysandbox""" Runs Python code safely thanks to pysandbox"""
code = self._parser.match(msg.args[1]).group('code') code = self._parser.match(msg.args[1]).group('code')
try: try:
irc.reply(handle_line(code.replace(' $$ ', '\n'))) irc.reply(handle_line(code.replace(' $$ ', '\n')))
except SandboxError, e: except SandboxError, e:
irc.error('; '.join(e.args)) irc.error('; '.join(e.args))
def runtests(self, irc, msg, args):
irc.reply(runTests())
Class = SupySandbox Class = SupySandbox

View File

@ -34,9 +34,6 @@ from supybot.test import *
class SupySandboxTestCase(PluginTestCase): class SupySandboxTestCase(PluginTestCase):
plugins = ('SupySandbox',) plugins = ('SupySandbox',)
def testFschfschTestcase(self):
self.assertResponse('runtests', 'True')
def testCodeIsSuccessfullyRunned(self): def testCodeIsSuccessfullyRunned(self):
self.assertResponse('sandbox 1+1', "2") self.assertResponse('sandbox 1+1', "2")
self.assertResponse('sandbox print 1+1', "2") self.assertResponse('sandbox print 1+1', "2")
@ -50,7 +47,10 @@ class SupySandboxTestCase(PluginTestCase):
def testProtections(self): def testProtections(self):
self.assertError('sandbox while True: pass') self.assertError('sandbox while True: pass')
self.assertError('sandbox foo="bar"; $$ while True:foo=foo*10') self.assertError('sandbox foo="bar"; $$ while True: foo=foo*10')
def testTmp(self):
self.assertResponse('sandbox print foo', 'bar')
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: # vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: