from io import StringIO import sys, asyncio from discord.ext.commands.formatter import Paginator from . import checks class Capturing(list): def __enter__(self): self._stdout = sys.stdout sys.stdout = self._stringio = StringIO() return self def __exit__(self, *args): self.extend(self._stringio.getvalue().splitlines()) del self._stringio # free up some memory sys.stdout = self._stdout async def mute(bot, ctx, admin=0, member_id=None): mute_role = self.bot.con.one(f'select muted_role from guild_config where guild_id = {ctx.guild.id}') if muted_role: if admin or checks.is_admin(bot, ctx): if ctx.guild.me.guild_permissions.manage_roles: if member: ctx.guild.get_member(member_id).edit(roles=[discord.utils.get(ctx.guild.roles, id=mute_role)]) def to_list_of_str(items, out:list=[], level=1, recurse=0): def rec_loop(item, key, out, level): quote = '"' if type(item) == list: out.append(f'{" "*level}{quote+key+quote+": " if key else ""}[') new_level = level + 1 out = to_list_of_str(item, out, new_level, 1) out.append(f'{" "*level}]') elif type(item) == dict: out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{{') new_level = level + 1 out = to_list_of_str(item, out, new_level, 1) out.append(f'{" "*level}}}') else: out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{repr(item)},') if type(items) == list: if not recurse: out = [] out.append('[') for item in items: rec_loop(item, None, out, level) if not recurse: out.append(']') elif type(items) == dict: if not recurse: out = [] out.append('{') for key in items: rec_loop(items[key], key, out, level) if not recurse: out.append('}') return out def paginate(text): data = [] paginator = Paginator(prefix='```py') if type(text) == list: data = to_list_of_str(text) elif type(text) == dict: data = to_list_of_str(text) else: data = str(text).split('\n') for line in data: if len(line) > 1900: n = 1900 for l in [line[i:i+n] for i in range(0, len(line), n)]: paginator.add_line(l) else: paginator.add_line(line) return paginator.pages async def run_command(*args): # Create subprocess process = await asyncio.create_subprocess_exec( *args, # stdout must a pipe to be accessible as process.stdout stdout=asyncio.subprocess.PIPE) # Wait for the subprocess to finish stdout, stderr = await process.communicate() # Return stdout return stdout.decode().strip()