""" === MIT License Modified from repl.py at sebi's Bot Tutorial on Discord GWdhBSp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from discord.ext import commands import asyncio import traceback import discord import inspect import textwrap from contextlib import redirect_stdout import io from .imports.utils import paginate, run_command ownerids = [351794468870946827, 275280442884751360] ownerid = 351794468870946827 class Repl: def __init__(self, bot): self.bot = bot self._last_result = None self.sessions = set() @staticmethod def cleanup_code(content): """Automatically removes code blocks from the code.""" if content.startswith('```') and content.endswith('```'): return '\n'.join(content.split('\n')[1:(- 1)]) return content.strip('` \n') @staticmethod def get_syntax_error(e): if e.text is None: return '```py\n{0.__class__.__name__}: {0}\n```'.format(e) return '```py\n{0.text}{1:>{0.offset}}\n{2}: {0}```'.format(e, '^', type(e).__name__) @commands.command(hidden=True, name='exec') async def _eval(self, ctx, *, body: str): if ctx.author.id != ownerid: return env = { 'bot': self.bot, 'ctx': ctx, 'channel': ctx.channel, 'author': ctx.author, 'server': ctx.guild, 'message': ctx.message, '_': self._last_result, } env.update(globals()) body = self.cleanup_code(body) stdout = io.StringIO() to_compile = 'async def func():\n%s' % textwrap.indent(body, ' ') try: exec(to_compile, env) except SyntaxError as e: return await ctx.send(self.get_syntax_error(e)) func = env['func'] # noinspection PyBroadException try: with redirect_stdout(stdout): ret = await func() except Exception: value = stdout.getvalue() await ctx.send('```py\n{}{}\n```'.format(value, traceback.format_exc())) else: value = stdout.getvalue() # noinspection PyBroadException try: await ctx.message.add_reaction('✅') except Exception: pass if ret is None: if value: for page in paginate(value): await ctx.send(page) else: self._last_result = ret if value: for page in paginate(value): await ctx.send(page) for page in paginate(ret): await ctx.send(page) @commands.command(hidden=True) async def repl(self, ctx): if ctx.author.id != ownerid: return msg = ctx.message variables = { 'ctx': ctx, 'bot': self.bot, 'message': msg, 'server': msg.guild, 'channel': msg.channel, 'author': msg.author, '_': None, } if msg.channel.id in self.sessions: await ctx.send('Already running a REPL session in this channel. Exit it with `quit`.') return self.sessions.add(msg.channel.id) await ctx.send('Enter code to execute or evaluate. `exit()` or `quit` to exit.') while True: response = await self.bot.wait_for('message', check=(lambda m: m.content.startswith('`'))) if response.author.id == ownerid: cleaned = self.cleanup_code(response.content) if cleaned in ('quit', 'exit', 'exit()'): await response.channel.send('Exiting.') self.sessions.remove(msg.channel.id) return executor = exec if cleaned.count('\n') == 0: try: code = compile(cleaned, '', 'eval') except SyntaxError: pass else: executor = eval if executor is exec: try: code = compile(cleaned, '', 'exec') except SyntaxError as e: await response.channel.send(self.get_syntax_error(e)) continue variables['message'] = response fmt = None stdout = io.StringIO() # noinspection PyBroadException try: with redirect_stdout(stdout): result = executor(code, variables) if inspect.isawaitable(result): result = await result except Exception: value = stdout.getvalue() fmt = '{}{}'.format(value, traceback.format_exc()) else: value = stdout.getvalue() if result is not None: fmt = '{}{}'.format(value, result) variables['_'] = result elif value: fmt = '{}'.format(value) try: if fmt is not None: if len(fmt) > 1990: for page in paginate(fmt): await response.channel.send(page) await ctx.send(response.channel) else: await response.channel.send(f'```py\n{fmt}\n```') except discord.Forbidden: pass except discord.HTTPException as e: await msg.channel.send('Unexpected error: `{}`'.format(e)) @commands.command(hidden=True) async def os(self, ctx, *, body: str): if ctx.author.id != ownerid: return try: body = self.cleanup_code(body) result = await asyncio.wait_for(self.bot.loop.create_task(run_command(body)), 10) value = result for page in paginate(value): await ctx.send(page) await ctx.message.add_reaction('✅') except asyncio.TimeoutError: value = f"Command did not complete in the time allowed." for page in paginate(value): await ctx.send(page) await ctx.message.add_reaction('❌') def setup(bot): bot.add_cog(Repl(bot))