2019-09-19 21:52:16 -08:00

222 lines
8.5 KiB
Python

from discord.ext import commands
import asyncio
import traceback
import discord
import inspect
import textwrap
import time
import os
from datetime import datetime
from contextlib import redirect_stdout
import io
from geeksbot.imports.utils import run_command, format_output, Paginator, Book
import logging
repl_log = logging.getLogger('repl')
class Exec(commands.Cog):
def __init__(self, bot):
self.bot = bot
self._last_result = None
self.sessions = set()
@staticmethod
def cleanup_code(content):
"""Automatically removes code blocks from the code."""
if content.startswith('```') and content.endswith('```'):
return '\n'.join(content.split('\n')[1:(- 1)])
return content.strip('` \n')
@staticmethod
def get_syntax_error(e):
if e.text is None:
return '```py\n{0.__class__.__name__}: {0}\n```'.format(e)
return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format(e, '^', type(e).__name__)
@commands.command(hidden=True, name='exec')
async def _eval(self, ctx, *, body: str):
if ctx.author.id != self.bot.owner_id:
return
pag = Paginator(self.bot)
env = {
'bot': self.bot,
'ctx': ctx,
'channel': ctx.channel,
'author': ctx.author,
'server': ctx.guild,
'message': ctx.message,
'_': self._last_result,
}
env.update(globals())
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = 'async def func():\n%s' % textwrap.indent(body, ' ')
try:
exec(to_compile, env)
except SyntaxError as e:
return await ctx.send(self.get_syntax_error(e))
func = env['func']
# noinspection PyBroadException
try:
with redirect_stdout(stdout):
ret = await func()
except Exception:
pag.add(stdout.getvalue())
pag.add(f'\n\uFFF8{traceback.format_exc()}')
book = Book(pag, (None, ctx.channel, ctx.bot, ctx.message))
await book.create_book()
else:
value = stdout.getvalue()
# noinspection PyBroadException
try:
await ctx.message.add_reaction('')
except Exception:
pass
value = format_output(value)
pag.add(value)
pag.add(f'\n\uFFF8Returned: {ret}')
self._last_result = ret
book = Book(pag, (None, ctx.channel, ctx.bot, ctx.message))
await book.create_book()
@commands.command(hidden=True)
async def repl(self, ctx):
if ctx.author.id != self.bot.owner_id:
return
msg = ctx.message
variables = {
'ctx': ctx,
'bot': self.bot,
'message': msg,
'server': msg.guild,
'channel': msg.channel,
'author': msg.author,
'_': None,
}
if msg.channel.id in self.sessions:
await ctx.send('Already running a REPL session in this channel. Exit it with `quit`.')
return
self.sessions.add(msg.channel.id)
await ctx.send('Enter code to execute or evaluate. `exit()` or `quit` to exit.')
while True:
response = await self.bot.wait_for('message', check=(lambda m: m.content.startswith('`')))
if response.author.id == self.bot.owner_id:
cleaned = self.cleanup_code(response.content)
if cleaned in ('quit', 'exit', 'exit()'):
await response.channel.send('Exiting.')
self.sessions.remove(msg.channel.id)
return
executor = exec
if cleaned.count('\n') == 0:
try:
code = compile(cleaned, '<repl session>', 'eval')
except SyntaxError:
pass
else:
executor = eval
if executor is exec:
try:
code = compile(cleaned, '<repl session>', 'exec')
except SyntaxError as e:
await response.channel.send(self.get_syntax_error(e))
continue
variables['message'] = response
fmt = None
stdout = io.StringIO()
# noinspection PyBroadException
try:
with redirect_stdout(stdout):
result = executor(code, variables)
if inspect.isawaitable(result):
result = await result
except Exception:
value = stdout.getvalue()
fmt = '{}{}'.format(value, traceback.format_exc())
else:
value = stdout.getvalue()
if result is not None:
fmt = '{}{}'.format(value, result)
variables['_'] = result
elif value:
fmt = '{}'.format(value)
try:
if fmt is not None:
pag = Paginator(self.bot)
pag.add(fmt)
for page in pag.pages():
await response.channel.send(page)
await ctx.send(response.channel)
except discord.Forbidden:
pass
except discord.HTTPException as e:
await msg.channel.send('Unexpected error: `{}`'.format(e))
@commands.command(hidden=True)
async def os(self, ctx, *, body: str):
if ctx.author.id != self.bot.owner_id:
return
try:
body = self.cleanup_code(body)
pag = Paginator(self.bot)
pag.add(await asyncio.wait_for(self.bot.loop.create_task(run_command(body)), 120))
book = Book(pag, (None, ctx.channel, ctx.bot, ctx.message))
await book.create_book()
await ctx.message.add_reaction('')
except asyncio.TimeoutError:
await ctx.send(f"Command did not complete in the time allowed.")
await ctx.message.add_reaction('')
@commands.command(name='haskell', aliases=['hs'])
async def haskell_compiler(self, ctx, *, body: str = None):
if ctx.author.id != self.bot.owner_id:
return
if body is None:
await ctx.send('Nothing to do.')
return
async with ctx.typing():
msg = await ctx.send('Warming up GHC... Please wait.')
try:
body = self.cleanup_code(body)
file_name = f'haskell_{datetime.utcnow().strftime("%Y%m%dT%H%M%S%f")}'
with open(f'{file_name}.hs', 'w') as f:
f.write(body)
pag = Paginator(self.bot)
compile_start = time.time()
pag.add(await asyncio.wait_for(
self.bot.loop.create_task(run_command(f'ghc -o {file_name} {file_name}.hs')), timeout=60))
compile_end = time.time()
compile_real = compile_end - compile_start
book = Book(pag, (msg, ctx.channel, ctx.bot, ctx.message))
await book.create_book()
pag = Paginator(self.bot)
if file_name in os.listdir():
run_start = time.time()
pag.add(await asyncio.wait_for(self.bot.loop.create_task(run_command(f'./{file_name}')),
timeout=600))
run_end = time.time()
run_real = run_end - run_start
total_real = run_real + compile_real
pag.add(f'\n\nCompile took {compile_real:.2f} seconds')
pag.add(f'Total Time {total_real:.2f} seconds')
book = Book(pag, (None, ctx.channel, ctx.bot, ctx.message))
await msg.delete()
await book.create_book()
os.remove(file_name)
os.remove(f'{file_name}.hs')
os.remove(f'{file_name}.o')
os.remove(f'{file_name}.hi')
except asyncio.TimeoutError:
await msg.delete()
await ctx.send(f"Command did not complete in the time allowed.")
await ctx.message.add_reaction('')
except FileNotFoundError as e:
repl_log.warning(e)
def setup(bot):
bot.add_cog(Exec(bot))