Changed CRLF to LF, passed over code with black to normalise formatting.

This commit is contained in:
neko404notfound 2018-06-21 09:21:54 +01:00
parent f6c4a5a570
commit 8d579f5bb3
19 changed files with 1014 additions and 732 deletions

View File

@ -4,12 +4,12 @@
Sebi-Machine.
"""
__author__ = 'Annihilator708'
__author__ = "Annihilator708"
# TODO: add yourselves here. I can't remember everyones handles.
__contributors__ = (__author__, 'Neko404NotFound', 'Dusty.P', 'davfsa', 'YashKandalkar')
__license__ = 'MIT'
__title__ = 'Sebi-Machine'
__version__ = 'tbd'
__contributors__ = (__author__, "Neko404NotFound", "Dusty.P", "davfsa", "YashKandalkar")
__license__ = "MIT"
__title__ = "Sebi-Machine"
__version__ = "tbd"
__repository__ = f'https://github.com/{__author__}/{__title__}'
__repository__ = f"https://github.com/{__author__}/{__title__}"
__url__ = __repository__

View File

@ -24,69 +24,73 @@ from typing import Dict
# Init logging to output on INFO level to stderr.
logging.basicConfig(level='INFO')
logging.basicConfig(level="INFO")
# If uvloop is installed, change to that eventloop policy as it
# is more efficient
try:
# https://stackoverflow.com/a/45700730
if sys.platform == 'win32':
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
logging.warning('Detected Windows. Changing event loop to ProactorEventLoop.')
logging.warning("Detected Windows. Changing event loop to ProactorEventLoop.")
else:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
del uvloop
except BaseException as ex:
logging.warning(f'Could not load uvloop. {type(ex).__qualname__}: {ex};',
'reverting to default impl.')
logging.warning(
f"Could not load uvloop. {type(ex).__qualname__}: {ex};",
"reverting to default impl.",
)
else:
logging.info(f'Using uvloop for asyncio event loop policy.')
logging.info(f"Using uvloop for asyncio event loop policy.")
# Bot Class
# Might be worth moving this to it's own file?
class SebiMachine(commands.Bot, LoadConfig, Loggable):
"""This discord is dedicated to http://www.discord.gg/GWdhBSp"""
def __init__(self):
# Initialize and attach config / settings
LoadConfig.__init__(self)
commands.Bot.__init__(self, command_prefix=self.defaultprefix)
with open(in_here('config', 'PrivateConfig.json')) as fp:
with open(in_here("config", "PrivateConfig.json")) as fp:
self.bot_secrets = json.load(fp)
self.db_con = database.DatabaseConnection(**self.bot_secrets['db-con'])
self.db_con = database.DatabaseConnection(**self.bot_secrets["db-con"])
self.book_emojis: Dict[str, str] = {
'unlock': '🔓',
'start': '',
'back': '',
'hash': '#\N{COMBINING ENCLOSING KEYCAP}',
'forward': '',
'end': '',
'close': '🇽',
"unlock": "🔓",
"start": "",
"back": "",
"hash": "#\N{COMBINING ENCLOSING KEYCAP}",
"forward": "",
"end": "",
"close": "🇽",
}
# Load plugins
# Add your cog file name in this list
with open(in_here('extensions.txt')) as cog_file:
with open(in_here("extensions.txt")) as cog_file:
cogs = cog_file.readlines()
for cog in cogs:
# Could this just be replaced with `strip()`?
cog = cog.replace('\n', '')
self.load_extension(f'src.cogs.{cog}')
self.logger.info(f'Loaded: {cog}')
cog = cog.replace("\n", "")
self.load_extension(f"src.cogs.{cog}")
self.logger.info(f"Loaded: {cog}")
async def on_ready(self):
"""On ready function"""
self.maintenance and self.logger.warning('MAINTENANCE ACTIVE')
with open(f'src/config/reboot', 'r') as f:
self.maintenance and self.logger.warning("MAINTENANCE ACTIVE")
with open(f"src/config/reboot", "r") as f:
reboot = f.readlines()
if int(reboot[0]) == 1:
await self.get_channel(int(reboot[1])).send('Restart Finished.')
with open(f'src/config/reboot', 'w') as f:
f.write(f'0')
await self.get_channel(int(reboot[1])).send("Restart Finished.")
with open(f"src/config/reboot", "w") as f:
f.write(f"0")
async def on_command_error(self, ctx, error):
"""
@ -94,7 +98,8 @@ class SebiMachine(commands.Bot, LoadConfig, Loggable):
ctx : Context
error : Exception
"""
jokes = ["I\'m a bit tipsy, I took to many screenshots...",
jokes = [
"I'm a bit tipsy, I took to many screenshots...",
"I am rushing to the 24/7 store to get myself anti-bug spray...",
"Organizing turtle race...",
"There is no better place then 127.0.0.1...",
@ -102,7 +107,8 @@ class SebiMachine(commands.Bot, LoadConfig, Loggable):
"No worry, I get fixed :^)...",
"R.I.P, press F for respect...",
"The bug repellent dit not work...",
"You found a bug in the program. Unfortunately the joke did not fit here, better luck next time..."]
"You found a bug in the program. Unfortunately the joke did not fit here, better luck next time...",
]
# CommandErrors triggered by other propagating errors tend to get wrapped. This means
# if we have a cause, we should probably consider unwrapping that so we get a useful
@ -111,15 +117,21 @@ class SebiMachine(commands.Bot, LoadConfig, Loggable):
# If command is not found, return
em = discord.Embed(colour=self.error_color)
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
em.title = 'Command Not Found'
em.description = f'{ctx.prefix}{ctx.invoked_with} is not a valid command.'
em.title = "Command Not Found"
em.description = f"{ctx.prefix}{ctx.invoked_with} is not a valid command."
else:
error = error.__cause__ or error
tb = traceback.format_exception(type(error), error, error.__traceback__, limit=2, chain=False)
tb = ''.join(tb)
tb = traceback.format_exception(
type(error), error, error.__traceback__, limit=2, chain=False
)
tb = "".join(tb)
joke = random.choice(jokes)
fmt = f'**`{self.defaultprefix}{ctx.command}`**\n{joke}\n\n**{type(error).__name__}:**:\n```py\n{tb}\n```'
em.title = f'**{type(error).__name__}** in command {ctx.prefix}{ctx.command}'
fmt = (
f"**`{self.defaultprefix}{ctx.command}`**\n{joke}\n\n**{type(error).__name__}:**:\n```py\n{tb}\n```"
)
em.title = (
f"**{type(error).__name__}** in command {ctx.prefix}{ctx.command}"
)
em.description = str(error)
await ctx.send(embed=em)
@ -133,21 +145,27 @@ class SebiMachine(commands.Bot, LoadConfig, Loggable):
except:
pass
else:
if ('exec' in message.content or 'repl' in message.content or 'token' in message.content) \
and message.author != self.user:
await self.get_user(351794468870946827).send(f'{message.author.name} ({message.author.id}) is using me '
f'in DMs\n{message.content}')
if (
"exec" in message.content
or "repl" in message.content
or "token" in message.content
) and message.author != self.user:
await self.get_user(351794468870946827).send(
f"{message.author.name} ({message.author.id}) is using me "
f"in DMs\n{message.content}"
)
# If author is a bot, ignore the message
if message.author.bot: return
if message.author.bot:
return
# Make sure the command get processed as if it was typed with lowercase
# Split message.content one first space
command = message.content.split(None, 1)
if command:
command[0] = command[0].lower()
message.content = ' '.join(command)
message.content = ' '.join(command)
message.content = " ".join(command)
message.content = " ".join(command)
# process command
await self.process_commands(message)

View File

@ -5,62 +5,90 @@ from discord.ext import commands
import discord
import asyncio
class BasicCommands:
def __init__(self, bot):
self.bot = bot
@commands.command()
async def tutorial(self, ctx):
await ctx.send(f"Hello, {ctx.author.display_name}. Welcome to Sebi's Bot Tutorials. \nFirst off, would you like a quick walkthrough on the server channels?")
await ctx.send(
f"Hello, {ctx.author.display_name}. Welcome to Sebi's Bot Tutorials. \nFirst off, would you like a quick walkthrough on the server channels?"
)
channel_list = {'channel-1' : self.bot.get_channel(333149949883842561).mention,
'd.py-rewrite-start' : self.bot.get_channel(386419285439938560).mention,
'js-klasa-start' : self.bot.get_channel(341816240186064897).mention,
'd.js' : self.bot.get_channel(436771798303113217).mention}
channel_list = {
"channel-1": self.bot.get_channel(333149949883842561).mention,
"d.py-rewrite-start": self.bot.get_channel(386419285439938560).mention,
"js-klasa-start": self.bot.get_channel(341816240186064897).mention,
"d.js": self.bot.get_channel(436771798303113217).mention,
}
bots_channels = (self.bot.get_channel(339112602867204097).mention,
self.bot.get_channel(411586546551095296).mention)
bots_channels = (
self.bot.get_channel(339112602867204097).mention,
self.bot.get_channel(411586546551095296).mention,
)
help_channels = (self.bot.get_channel(425315253153300488).mention,
help_channels = (
self.bot.get_channel(425315253153300488).mention,
self.bot.get_channel(392215236612194305).mention,
self.bot.get_channel(351034776985141250).mention)
self.bot.get_channel(351034776985141250).mention,
)
def check(m):
return True if m.author.id == ctx.author.id and m.channel.id == ctx.channel.id else False
return (
True
if m.author.id == ctx.author.id and m.channel.id == ctx.channel.id
else False
)
msg = await self.bot.wait_for('message', check = check, timeout = 15)
msg = await self.bot.wait_for("message", check=check, timeout=15)
agree = ("yes", "yep", "yesn't", "ya", "ye")
if msg is None:
await ctx.send("Sorry, {ctx.author.mention}, you didn't reply on time. You can run the command again when you're free :)")
await ctx.send(
"Sorry, {ctx.author.mention}, you didn't reply on time. You can run the command again when you're free :)"
)
else:
if msg.content.lower() in agree:
async with ctx.typing():
await ctx.send("Alrighty-Roo... Check your DMs!")
await ctx.author.send("Alrighty-Roo...")
await ctx.author.send(f"To start making your bot from scratch, you first need to head over to {channel_list['channel-1']}"
" (Regardless of the language you're gonna use).")
await ctx.author.send(
f"To start making your bot from scratch, you first need to head over to {channel_list['channel-1']}"
" (Regardless of the language you're gonna use)."
)
await asyncio.sleep(0.5)
await ctx.author.send(f"After you have a bot account, you can either continue with {channel_list['d.py-rewrite-start']}"
await ctx.author.send(
f"After you have a bot account, you can either continue with {channel_list['d.py-rewrite-start']}"
f"if you want to make a bot in discord.py rewrite __or__ go to {channel_list['js-klasa-start']} or "
f"{channel_list['d.js']} for making a bot in JavaScript.")
f"{channel_list['d.js']} for making a bot in JavaScript."
)
await ctx.author.send("...Read all the tutorials and still need help? You have two ways to get help.")
await ctx.author.send(
"...Read all the tutorials and still need help? You have two ways to get help."
)
await asyncio.sleep(1.5)
await ctx.author.send("**Method-1**\nThis is the best method of getting help. You help yourself.\n"
await ctx.author.send(
"**Method-1**\nThis is the best method of getting help. You help yourself.\n"
f"To do so, head over to a bots dedicated channel (either {bots_channels[0]} or {bots_channels[1]})"
" and type `?rtfm rewrite thing_you_want_help_with`.\nThis will trigger the bot R.Danny Bot and will"
"give you links on your query on the official discord.py rewrite docs. *PS: Let the page completely load*")
"give you links on your query on the official discord.py rewrite docs. *PS: Let the page completely load*"
)
await asyncio.sleep(5)
await ctx.author.send("**Method-2**\nIf you haven't found anything useful with Method-1, feel free to ask your question "
f"in any of the related help channels. ({', '.join(help_channels)})\nMay the force be with you!!")
await ctx.author.send(
"**Method-2**\nIf you haven't found anything useful with Method-1, feel free to ask your question "
f"in any of the related help channels. ({', '.join(help_channels)})\nMay the force be with you!!"
)
else:
return await ctx.send("Session terminated. You can run this command again whenever you want.")
return await ctx.send(
"Session terminated. You can run this command again whenever you want."
)
def setup(bot):
bot.add_cog(BasicCommands(bot))

View File

@ -12,10 +12,16 @@ class BotManager:
return
else:
# The member is a bot
await member.add_roles(discord.utils.get(member.guild.roles, name='Bots'))
await member.add_roles(discord.utils.get(member.guild.roles, name="Bots"))
try:
await member.edit(nick='[' + await self.bot.db_con.fetch('select prefix from bots where id = $1', member.id)
+ '] ' + member.name)
await member.edit(
nick="["
+ await self.bot.db_con.fetch(
"select prefix from bots where id = $1", member.id
)
+ "] "
+ member.name
)
except:
pass
@ -25,30 +31,39 @@ class BotManager:
return
else:
# The member is a bot
await self.bot.db_con.execute('DELETE FROM bots WHERE id = $1', member.id)
await self.bot.db_con.execute("DELETE FROM bots WHERE id = $1", member.id)
@commands.command()
async def invite(self, ctx, bot=None, prefix=None):
bot = await ctx.bot.get_user_info(bot)
if not bot:
raise Warning('You must include the id of the bot you are trying to invite... Be exact.')
raise Warning(
"You must include the id of the bot you are trying to invite... Be exact."
)
if not bot.bot:
raise Warning('You can only invite bots.')
raise Warning("You can only invite bots.")
if not prefix:
raise Warning('Please provide a prefix')
raise Warning("Please provide a prefix")
# Make sure that the bot has not been invited already and it is not being tested
if await self.bot.db_con.fetch('select count(*) from bots where id = $1', bot.id) == 1:
raise Warning('The bot has already been invited or is being tested')
if (
await self.bot.db_con.fetch(
"select count(*) from bots where id = $1", bot.id
)
== 1
):
raise Warning("The bot has already been invited or is being tested")
await self.bot.db_con.execute('insert into bots (id, owner, prefix) values ($1, $2, $3)',
bot.id, ctx.author.id, prefix)
await self.bot.db_con.execute(
"insert into bots (id, owner, prefix) values ($1, $2, $3)",
bot.id,
ctx.author.id,
prefix,
)
em = discord.Embed(colour=self.bot.embed_color)
em.title = "Hello {},".format(ctx.author.name)
em.description = "Thanks for inviting your bot! It will be tested and invited shortly. " \
"Please open your DMs if they are not already so the bot can contact " \
"you to inform you about the progress of the bot!"
em.description = "Thanks for inviting your bot! It will be tested and invited shortly. " "Please open your DMs if they are not already so the bot can contact " "you to inform you about the progress of the bot!"
await ctx.send(embed=em)
em = discord.Embed(title="Bot invite", colour=discord.Color(0x363941))
@ -59,18 +74,26 @@ class BotManager:
em.add_field(name="Bot prefix", value="`" + prefix + "`")
await ctx.bot.get_channel(448803675574370304).send(embed=em)
@commands.command(name='claim', aliases=['makemine', 'gimme'])
@commands.command(name="claim", aliases=["makemine", "gimme"])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _claim_bot(self, ctx, bot: discord.Member = None, prefix: str = None, owner: discord.Member = None):
async def _claim_bot(
self,
ctx,
bot: discord.Member = None,
prefix: str = None,
owner: discord.Member = None,
):
if not bot:
raise Warning('You must include the name of the bot you are trying to claim... Be exact.')
raise Warning(
"You must include the name of the bot you are trying to claim... Be exact."
)
if not bot.bot:
raise Warning('You can only claim bots.')
raise Warning("You can only claim bots.")
if not prefix:
if bot.display_name.startswith('['):
prefix = bot.display_name.split(']')[0].strip('[')
if bot.display_name.startswith("["):
prefix = bot.display_name.split("]")[0].strip("[")
else:
raise Warning('Prefix not provided and can\'t be found in bot nick.')
raise Warning("Prefix not provided and can't be found in bot nick.")
if owner is not None and ctx.author.guild_permissions.manage_roles:
author_id = owner.id
@ -79,100 +102,126 @@ class BotManager:
em = discord.Embed()
if await self.bot.db_con.fetchval('select count(*) from bots where owner = $1', author_id) >= 10:
if (
await self.bot.db_con.fetchval(
"select count(*) from bots where owner = $1", author_id
)
>= 10
):
em.colour = self.bot.error_color
em.title = 'Too Many Bots Claimed'
em.description = 'Each person is limited to claiming 10 bots as that is how ' \
'many bots are allowed by the Discord API per user.'
em.title = "Too Many Bots Claimed"
em.description = "Each person is limited to claiming 10 bots as that is how " "many bots are allowed by the Discord API per user."
return await ctx.send(embed=em)
existing = await self.bot.db_con.fetchrow('select * from bots where id = $1', bot.id)
existing = await self.bot.db_con.fetchrow(
"select * from bots where id = $1", bot.id
)
if not existing:
await self.bot.db_con.execute('insert into bots (id, owner, prefix) values ($1, $2, $3)',
bot.id, author_id, prefix)
await self.bot.db_con.execute(
"insert into bots (id, owner, prefix) values ($1, $2, $3)",
bot.id,
author_id,
prefix,
)
em.colour = self.bot.embed_color
em.title = 'Bot Claimed'
em.description = f'You have claimed {bot.display_name} with a prefix of {prefix}\n' \
f'If there is an error please run command again to correct the prefix,\n' \
f'or {ctx.prefix}unclaim {bot.mention} to unclaim the bot.'
elif existing['owner'] and existing['owner'] != author_id:
em.title = "Bot Claimed"
em.description = f"You have claimed {bot.display_name} with a prefix of {prefix}\n" f"If there is an error please run command again to correct the prefix,\n" f"or {ctx.prefix}unclaim {bot.mention} to unclaim the bot."
elif existing["owner"] and existing["owner"] != author_id:
em.colour = self.bot.error_color
em.title = 'Bot Already Claimed'
em.description = 'This bot has already been claimed by someone else.\n' \
'If this is actually your bot please let the guild Administrators know.'
elif existing['owner'] and existing['owner'] == author_id:
em.title = "Bot Already Claimed"
em.description = "This bot has already been claimed by someone else.\n" "If this is actually your bot please let the guild Administrators know."
elif existing["owner"] and existing["owner"] == author_id:
em.colour = self.bot.embed_color
em.title = 'Bot Already Claimed'
em.description = 'You have already claimed this bot.\n' \
'If the prefix you provided is different from what is already in the database' \
' it will be updated for you.'
if existing['prefix'] != prefix:
await self.bot.db_con.execute("update bots set prefix = $1 where id = $2", prefix, bot.id)
elif not existing['owner']:
await self.bot.db_con.execute('update bots set owner = $1, prefix = $2 where id = $3',
author_id, prefix, bot.id)
em.title = "Bot Already Claimed"
em.description = "You have already claimed this bot.\n" "If the prefix you provided is different from what is already in the database" " it will be updated for you."
if existing["prefix"] != prefix:
await self.bot.db_con.execute(
"update bots set prefix = $1 where id = $2", prefix, bot.id
)
elif not existing["owner"]:
await self.bot.db_con.execute(
"update bots set owner = $1, prefix = $2 where id = $3",
author_id,
prefix,
bot.id,
)
em.colour = self.bot.embed_color
em.title = 'Bot Claimed'
em.description = f'You have claimed {bot.display_name} with a prefix of {prefix}\n' \
f'If there is an error please run command again to correct the prefix,\n' \
f'or {ctx.prefix}unclaim {bot.mention} to unclaim the bot.'
em.title = "Bot Claimed"
em.description = f"You have claimed {bot.display_name} with a prefix of {prefix}\n" f"If there is an error please run command again to correct the prefix,\n" f"or {ctx.prefix}unclaim {bot.mention} to unclaim the bot."
else:
em.colour = self.bot.error_color
em.title = 'Something Went Wrong...'
em.title = "Something Went Wrong..."
await ctx.send(embed=em)
@commands.command(name='unclaim')
@commands.command(name="unclaim")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _unclaim_bot(self, ctx, bot: discord.Member = None):
if not bot:
raise Warning('You must include the name of the bot you are trying to claim... Be exact.')
raise Warning(
"You must include the name of the bot you are trying to claim... Be exact."
)
if not bot.bot:
raise Warning('You can only unclaim bots.')
raise Warning("You can only unclaim bots.")
em = discord.Embed()
existing = await self.bot.db_con.fetchrow('select * from bots where id = $1', bot.id)
if not existing or not existing['owner']:
existing = await self.bot.db_con.fetchrow(
"select * from bots where id = $1", bot.id
)
if not existing or not existing["owner"]:
em.colour = self.bot.error_color
em.title = 'Bot Not Found'
em.description = 'That bot is not claimed'
elif existing['owner'] != ctx.author.id and not ctx.author.guild_permissions.manage_roles:
em.title = "Bot Not Found"
em.description = "That bot is not claimed"
elif (
existing["owner"] != ctx.author.id
and not ctx.author.guild_permissions.manage_roles
):
em.colour = self.bot.error_color
em.title = 'Not Claimed By You'
em.description = 'That bot is claimed by someone else.\n' \
'You can\'t unclaim someone else\'s bot'
em.title = "Not Claimed By You"
em.description = "That bot is claimed by someone else.\n" "You can't unclaim someone else's bot"
else:
await self.bot.db_con.execute('update bots set owner = null where id = $1', bot.id)
await self.bot.db_con.execute(
"update bots set owner = null where id = $1", bot.id
)
em.colour = self.bot.embed_color
em.title = 'Bot Unclaimed'
em.description = f'You have unclaimed {bot.display_name}\n' \
f'If this is an error please reclaim using\n' \
f'{ctx.prefix}claim {bot.mention} {existing["prefix"]}'
em.title = "Bot Unclaimed"
em.description = f"You have unclaimed {bot.display_name}\n" f"If this is an error please reclaim using\n" f'{ctx.prefix}claim {bot.mention} {existing["prefix"]}'
await ctx.send(embed=em)
@commands.command(name='listclaims', aliases=['claimed', 'mybots'])
@commands.command(name="listclaims", aliases=["claimed", "mybots"])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _claimed_bots(self, ctx, usr: discord.Member = None):
if usr is None:
usr = ctx.author
bots = await self.bot.db_con.fetch('select * from bots where owner = $1', usr.id)
bots = await self.bot.db_con.fetch(
"select * from bots where owner = $1", usr.id
)
if bots:
em = discord.Embed(title=f'{usr.display_name} has claimed the following bots:',
colour=self.bot.embed_color)
em = discord.Embed(
title=f"{usr.display_name} has claimed the following bots:",
colour=self.bot.embed_color,
)
for bot in bots:
member = ctx.guild.get_member(int(bot['id']))
em.add_field(name=member.display_name, value=f'Stored Prefix: {bot["prefix"]}', inline=False)
member = ctx.guild.get_member(int(bot["id"]))
em.add_field(
name=member.display_name,
value=f'Stored Prefix: {bot["prefix"]}',
inline=False,
)
else:
em = discord.Embed(title='You have not claimed any bots.',
colour=self.bot.embed_color)
em = discord.Embed(
title="You have not claimed any bots.", colour=self.bot.embed_color
)
await ctx.send(embed=em)
@commands.command(name='whowns')
@commands.command(name="whowns")
async def _whowns(self, ctx, bot: discord.Member):
if not bot.bot:
await ctx.send('this commands only for bots')
await ctx.send("this commands only for bots")
else:
owner = await self.bot.db_con.fetchrow('select * from bots where id = $1', bot.id)
await ctx.send(ctx.guild.get_member(owner['owner']).display_name)
owner = await self.bot.db_con.fetchrow(
"select * from bots where id = $1", bot.id
)
await ctx.send(ctx.guild.get_member(owner["owner"]).display_name)
def setup(bot):

View File

@ -9,6 +9,7 @@ import io
class REPL:
"""Python in Discords"""
def __init__(self, bot):
self.bot = bot
self._last_result = None
@ -19,19 +20,18 @@ class REPL:
Automatically removes code blocks from the code.
"""
# remove ```py\n```
if content.startswith('```') and content.endswith('```'):
return '\n'.join(content.split('\n')[1:-1])
if content.startswith("```") and content.endswith("```"):
return "\n".join(content.split("\n")[1:-1])
# remove `foo`
return content.strip('` \n')
return content.strip("` \n")
def get_syntax_error(self, e):
if e.text is None:
return '{0.__class__.__name__}: {0}'.format(e)
return '{0.text}{1:>{0.offset}}\n{2}: {0}'.format(e, '^', type(e).__name__)
return "{0.__class__.__name__}: {0}".format(e)
return "{0.text}{1:>{0.offset}}\n{2}: {0}".format(e, "^", type(e).__name__)
@commands.command(name='exec')
@commands.command(name="exec")
async def _eval(self, ctx, *, body: str = None):
"""
Execute python code in discord chat.
@ -45,24 +45,26 @@ class REPL:
- exec print(546132)
"""
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if body is None:
return await ctx.send(
'Please, use\n'
"Please, use\n"
f'`{self.bot.config["prefix"]}exec`\n\n'
'\n`\\`\\`\\`py\n[python code]\n\\`\\`\\`\n'
'to get the most out of the command')
"\n`\\`\\`\\`py\n[python code]\n\\`\\`\\`\n"
"to get the most out of the command"
)
env = {
'bot': self.bot,
'ctx': ctx,
'channel': ctx.message.channel,
'author': ctx.message.author,
'server': ctx.message.guild,
'message': ctx.message,
'_': self._last_result
"bot": self.bot,
"ctx": ctx,
"channel": ctx.message.channel,
"author": ctx.message.author,
"server": ctx.message.guild,
"message": ctx.message,
"_": self._last_result,
}
env.update(globals())
@ -70,61 +72,65 @@ class REPL:
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = 'async def func():\n%s' % textwrap.indent(body, ' ')
to_compile = "async def func():\n%s" % textwrap.indent(body, " ")
try:
exec(to_compile, env)
except SyntaxError as e:
try:
await ctx.send(f'```py\n{self.get_syntax_error(e)}\n```')
await ctx.send(f"```py\n{self.get_syntax_error(e)}\n```")
except Exception as e:
error = [self.get_syntax_error(e)[i:i + 2000] for i in
range(0, len(self.get_syntax_error(e)), 2000)]
error = [
self.get_syntax_error(e)[i : i + 2000]
for i in range(0, len(self.get_syntax_error(e)), 2000)
]
for i in error:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
func = env['func']
func = env["func"]
try:
with redirect_stdout(stdout):
ret = await func()
except Exception as e:
value = stdout.getvalue()
try:
await ctx.send(f'```py\n{value}{traceback.format_exc()}\n```')
await ctx.send(f"```py\n{value}{traceback.format_exc()}\n```")
except Exception as e:
error = [value[i:i + 2000] for i in range(0, len(value), 2000)]
error = [value[i : i + 2000] for i in range(0, len(value), 2000)]
for i in error:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
tracebackerror = [traceback.format_exc()[i:i + 2000] for i in
range(0, len(traceback.format_exc()), 2000)]
tracebackerror = [
traceback.format_exc()[i : i + 2000]
for i in range(0, len(traceback.format_exc()), 2000)
]
for i in tracebackerror:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
else:
value = stdout.getvalue()
if ret is None:
if value:
try:
await ctx.send(f'```py\n{value}\n```')
await ctx.send(f"```py\n{value}\n```")
except Exception as e:
code = [value[i:i + 1980] for i in range(0, len(value), 1980)]
code = [value[i : i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
else:
self._last_result = ret
try:
code = [value[i:i + 1980] for i in range(0, len(value), 1980)]
code = [value[i : i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
except Exception as e:
code = [value[i:i + 1980] for i in range(0, len(value), 1980)]
code = [value[i : i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
modifyd_ret = [ret[i:i + 1980] for i in range(0, len(ret), 1980)]
await ctx.send(f"```py\n{i}\n```")
modifyd_ret = [ret[i : i + 1980] for i in range(0, len(ret), 1980)]
for i in modifyd_ret:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
@commands.command(hidden=True)
async def repl(self, ctx):
@ -138,43 +144,51 @@ class REPL:
- repl print(205554)
"""
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
msg = ctx.message
variables = {
'ctx': ctx,
'bot': self.bot,
'message': msg,
'server': msg.guild,
'channel': msg.channel,
'author': msg.author,
'_': None,
"ctx": ctx,
"bot": self.bot,
"message": msg,
"server": msg.guild,
"channel": msg.channel,
"author": msg.author,
"_": None,
}
if msg.channel.id in self.sessions:
msg = await ctx.send('Already running a REPL session in this channel. Exit it with `quit`.')
msg = await ctx.send(
"Already running a REPL session in this channel. Exit it with `quit`."
)
self.sessions.add(msg.channel.id)
await ctx.send('Enter code to execute or evaluate. `exit()` or `quit` to exit.')
await ctx.send("Enter code to execute or evaluate. `exit()` or `quit` to exit.")
while True:
response = await self.bot.wait_for('message', check=lambda m: m.content.startswith(
'`') and m.author == ctx.author and m.channel == ctx.channel)
response = await self.bot.wait_for(
"message",
check=lambda m: m.content.startswith("`")
and m.author == ctx.author
and m.channel == ctx.channel,
)
cleaned = self.cleanup_code(response.content)
if cleaned in ('quit', 'exit', 'exit()'):
msg = await ctx.send('Exiting.')
if cleaned in ("quit", "exit", "exit()"):
msg = await ctx.send("Exiting.")
self.sessions.remove(msg.channel.id)
return
executor = exec
if cleaned.count('\n') == 0:
if cleaned.count("\n") == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, '<repl session>', 'eval')
code = compile(cleaned, "<repl session>", "eval")
except SyntaxError:
pass
else:
@ -182,17 +196,19 @@ class REPL:
if executor is exec:
try:
code = compile(cleaned, '<repl session>', 'exec')
code = compile(cleaned, "<repl session>", "exec")
except SyntaxError as e:
try:
await ctx.send(f'```Python\n{self.get_syntax_error(e)}\n```')
await ctx.send(f"```Python\n{self.get_syntax_error(e)}\n```")
except Exception as e:
error = [self.get_syntax_error(e)[i:i + 2000] for i in
range(0, len(self.get_syntax_error(e)), 2000)]
error = [
self.get_syntax_error(e)[i : i + 2000]
for i in range(0, len(self.get_syntax_error(e)), 2000)
]
for i in error:
await ctx.send(f'```Python\n{i}\n```')
await ctx.send(f"```Python\n{i}\n```")
variables['message'] = response
variables["message"] = response
fmt = None
stdout = io.StringIO()
try:
@ -203,29 +219,30 @@ class REPL:
except Exception as e:
value = stdout.getvalue()
await ctx.send(f'```Python\n{value}{traceback.format_exc()}\n```')
await ctx.send(f"```Python\n{value}{traceback.format_exc()}\n```")
continue
else:
value = stdout.getvalue()
if result is not None:
fmt = '{}{}'.format(value, result)
variables['_'] = result
fmt = "{}{}".format(value, result)
variables["_"] = result
elif value:
fmt = value
try:
if fmt is not None:
if len(fmt) > 1980:
code = [fmt[i:i + 1980] for i in range(0, len(fmt), 1980)]
code = [fmt[i : i + 1980] for i in range(0, len(fmt), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
await ctx.send(f"```py\n{i}\n```")
else:
await ctx.send(fmt)
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send(f'Unexpected error: `{e}`')
await ctx.send(f"Unexpected error: `{e}`")
def setup(bot):
bot.add_cog(REPL(bot))

View File

@ -7,20 +7,24 @@ import traceback
import aiofiles
import os
class Upload:
"""
CogName should be the name of the cog
"""
def __init__(self, bot):
self.bot = bot
print('upload loaded')
print("upload loaded")
@commands.command()
async def reload(self, ctx, *, extension: str):
"""Reload an extension."""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
extension = extension.lower()
try:
@ -28,16 +32,18 @@ class Upload:
self.bot.load_extension("src.cogs.{}".format(extension))
except Exception as e:
traceback.print_exc()
await ctx.send(f'Could not reload `{extension}` -> `{e}`')
await ctx.send(f"Could not reload `{extension}` -> `{e}`")
else:
await ctx.send(f'Reloaded `{extension}`.')
await ctx.send(f"Reloaded `{extension}`.")
@commands.command()
async def reloadall(self, ctx):
"""Reload all extensions."""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
try:
for extension in self.bot.extensions:
@ -52,7 +58,9 @@ class Upload:
"""Unload an extension."""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
extension = extension.lower()
try:
@ -61,41 +69,47 @@ class Upload:
except Exception as e:
traceback.print_exc()
if ctx.message.author.id not in self.bot.owner_list:
await ctx.send(f'Could not unload `{extension}` -> `{e}`')
await ctx.send(f"Could not unload `{extension}` -> `{e}`")
else:
await ctx.send(f'Unloaded `{extension}`.')
await ctx.send(f"Unloaded `{extension}`.")
@commands.command()
async def load(self, ctx, *, extension: str):
"""Load an extension."""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
extension = extension.lower()
try:
self.bot.load_extension("src.cogs.{}".format(extension))
except Exception as e:
traceback.print_exc()
await ctx.send(f'Could not load `{extension}` -> `{e}`')
await ctx.send(f"Could not load `{extension}` -> `{e}`")
else:
await ctx.send(f'Loaded `{extension}`.')
await ctx.send(f"Loaded `{extension}`.")
@commands.command()
async def permunload(self, ctx, extension=None):
"""Disables permanently a cog."""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if cog is None:
return await ctx.send("Please provide a extension. Do `help permunload` for more info")
return await ctx.send(
"Please provide a extension. Do `help permunload` for more info"
)
extension = extension.lower()
async with aiofiles.open("extension.txt") as fp:
lines=fp.readlines()
lines = fp.readlines()
removed = False
async with aiofiles.open("extension.txt", "w") as fp:
@ -118,10 +132,12 @@ class Upload:
@commands.command(hidden=True)
async def reboot(self, ctx):
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
await ctx.send('Sebi-Machine is restarting.')
with open(f'src/config/reboot', 'w') as f:
f.write(f'1\n{ctx.channel.id}')
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
await ctx.send("Sebi-Machine is restarting.")
with open(f"src/config/reboot", "w") as f:
f.write(f"1\n{ctx.channel.id}")
# noinspection PyProtectedMember
os._exit(1)

View File

@ -4,10 +4,12 @@
from discord.ext import commands
import discord
class CogName:
"""
CogName should be the name of the cog
"""
def __init__(self, bot):
self.bot = bot
@ -15,9 +17,9 @@ class CogName:
async def ping(self, ctx):
"""Say pong"""
now = ctx.message.created_at
msg = await ctx.send('Pong')
msg = await ctx.send("Pong")
sub = msg.created_at - now
await msg.edit(content=f'🏓Pong, **{sub.total_seconds() * 1000}ms**')
await msg.edit(content=f"🏓Pong, **{sub.total_seconds() * 1000}ms**")
def setup(bot):

View File

@ -6,10 +6,12 @@ import discord
import random
import aiohttp
class Fun:
"""
CogName should be the name of the cog
"""
def __init__(self, bot):
self.bot = bot
@ -23,10 +25,10 @@ class Fun:
- sebisauce
"""
await ctx.trigger_typing()
url = 'http://ikbengeslaagd.com/API/sebisauce.json'
url = "http://ikbengeslaagd.com/API/sebisauce.json"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
source = await response.json(encoding='utf8')
source = await response.json(encoding="utf8")
total_sebi = 0
for key in dict.keys(source):
@ -34,11 +36,12 @@ class Fun:
im = random.randint(0, int(total_sebi) - 1)
await ctx.send(embed=discord.Embed(
title='\t',
description='\t',
color=self.bot.embed_color).set_image(
url=source[str(im)]))
await ctx.send(
embed=discord.Embed(
title="\t", description="\t", color=self.bot.embed_color
).set_image(url=source[str(im)])
)
def setup(bot):
bot.add_cog(Fun(bot))

View File

@ -41,53 +41,72 @@ class Git(Loggable):
@commands.group(case_insensitive=True, invoke_without_command=True)
async def git(self, ctx):
"""Run help git for more info"""
await ctx.send('https://github.com/dustinpianalto/Sebi-Machine/')
await ctx.send("https://github.com/dustinpianalto/Sebi-Machine/")
@commands.command(case_insensitive=True, brief='Gets the Trello link.')
@commands.command(case_insensitive=True, brief="Gets the Trello link.")
async def trello(self, ctx):
await ctx.send('<https://trello.com/b/x02goBbW/sebis-bot-tutorial-roadmap>')
await ctx.send("<https://trello.com/b/x02goBbW/sebis-bot-tutorial-roadmap>")
@git.command()
async def pull(self, ctx):
self.logger.warning('Invoking git-pull')
self.logger.warning("Invoking git-pull")
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
em = discord.Embed(style='rich',
title=f'Git Pull',
color=self.bot.embed_color)
em.set_thumbnail(url=f'{ctx.guild.me.avatar_url}')
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
em = discord.Embed(style="rich", title=f"Git Pull", color=self.bot.embed_color)
em.set_thumbnail(url=f"{ctx.guild.me.avatar_url}")
# Pretty sure you can just do await run_command() if that is async,
# or run in a TPE otherwise.
result = await asyncio.wait_for(self.bot.loop.create_task(
run_command('git fetch --all')), 120) + '\n'
result += await asyncio.wait_for(self.bot.loop.create_task(
run_command('git reset --hard origin/$(git rev-parse '
'--symbolic-full-name --abbrev-ref HEAD)')),
120) + '\n\n'
result += await asyncio.wait_for(self.bot.loop.create_task(
run_command('git show --stat | sed "s/.*@.*[.].*/ /g"')), 10)
result = (
await asyncio.wait_for(
self.bot.loop.create_task(run_command("git fetch --all")), 120
)
+ "\n"
)
result += (
await asyncio.wait_for(
self.bot.loop.create_task(
run_command(
"git reset --hard origin/$(git rev-parse "
"--symbolic-full-name --abbrev-ref HEAD)"
)
),
120,
)
+ "\n\n"
)
result += await asyncio.wait_for(
self.bot.loop.create_task(
run_command('git show --stat | sed "s/.*@.*[.].*/ /g"')
),
10,
)
results = paginate(result, maxlen=1014)
for page in results[:5]:
em.add_field(name='\uFFF0', value=f'{page}')
em.add_field(name="\uFFF0", value=f"{page}")
await ctx.send(embed=em)
@git.command()
async def status(self, ctx):
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
em = discord.Embed(style='rich',
title=f'Git Status',
color=self.bot.embed_color)
em.set_thumbnail(url=f'{ctx.guild.me.avatar_url}')
result = await asyncio.wait_for(self.bot.loop.create_task(
run_command('git status')), 10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
em = discord.Embed(
style="rich", title=f"Git Status", color=self.bot.embed_color
)
em.set_thumbnail(url=f"{ctx.guild.me.avatar_url}")
result = await asyncio.wait_for(
self.bot.loop.create_task(run_command("git status")), 10
)
results = paginate(result, maxlen=1014)
for page in results[:5]:
em.add_field(name='\uFFF0', value=f'{page}')
em.add_field(name="\uFFF0", value=f"{page}")
await ctx.send(embed=em)

View File

@ -4,10 +4,12 @@
from discord.ext import commands
import discord
class Moderation:
"""
Moderation Commands
"""
def __init__(self, bot):
self.bot = bot
@ -23,17 +25,24 @@ class Moderation:
"""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if member is None:
await ctx.send('Are you sure you are capable of this command?')
await ctx.send("Are you sure you are capable of this command?")
try:
await member.kick()
await ctx.send(f'You kicked **`{member.name}`** from **`{ctx.guild.name}`**')
await ctx.send(
f"You kicked **`{member.name}`** from **`{ctx.guild.name}`**"
)
except Exception as e:
await ctx.send('You may not use this command, as you do not have permission to do so:\n\n**`{ctx.guild.name}`**'
f'\n\n```py\n{e}\n```')
await ctx.send(
"You may not use this command, as you do not have permission to do so:\n\n**`{ctx.guild.name}`**"
f"\n\n```py\n{e}\n```"
)
@commands.command()
async def ban(self, ctx, member: discord.Member = None):
"""
@ -46,17 +55,24 @@ class Moderation:
"""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if member is None:
await ctx.send('Are you sure you are capable of this command?')
await ctx.send("Are you sure you are capable of this command?")
try:
await member.ban()
await ctx.send(f'You banned **`{member.name}`** from **`{ctx.guild.name}`**')
await ctx.send(
f"You banned **`{member.name}`** from **`{ctx.guild.name}`**"
)
except Exception as e:
await ctx.send('You may not use this command, as you do not have permission to do so:\n\n**`{ctx.guild.name}`**'
f'\n\n```py\n{e}\n```')
await ctx.send(
"You may not use this command, as you do not have permission to do so:\n\n**`{ctx.guild.name}`**"
f"\n\n```py\n{e}\n```"
)
def setup(bot):
bot.add_cog(Moderation(bot))

View File

@ -11,15 +11,15 @@ from discord.ext import commands
import youtube_dl
# noinspection PyUnresolvedReferences,PyUnresolvedReferences,PyPackageRequirements
from . utils import noblock
from .utils import noblock
YT_DL_OPTS = {
"format": 'mp3[abr>0]/bestaudio/best',
"format": "mp3[abr>0]/bestaudio/best",
"ignoreerrors": True,
"default_search": "auto",
"source_address": "0.0.0.0",
'quiet': True
"quiet": True,
}
@ -31,6 +31,7 @@ IDLE_FOR = 60 * 30
@dataclasses.dataclass(repr=True)
class Request:
"""Track request."""
who: discord.Member
what: str # Referral
title: str # Video title
@ -60,6 +61,7 @@ class Session:
queue: asyncio.Queue
Track queue.
"""
@classmethod
async def new_session(cls, ctx: commands.Context):
"""
@ -71,7 +73,9 @@ class Session:
await s.connect()
except Exception as ex:
traceback.print_exc()
await ctx.send(f"I couldn't connect! Reason: {str(ex) or type(ex).__qualname__}")
await ctx.send(
f"I couldn't connect! Reason: {str(ex) or type(ex).__qualname__}"
)
return None
else:
return s
@ -79,7 +83,7 @@ class Session:
def __init__(self, ctx: commands.Context) -> None:
"""Create a new session."""
if ctx.author.voice is None:
raise RuntimeError('Please enter a voice channel I have access to first.')
raise RuntimeError("Please enter a voice channel I have access to first.")
# Holds the tasks currently running associated with this.
self.voice_channel = ctx.author.voice.channel
@ -106,7 +110,7 @@ class Session:
self._start_next_track_event.clear()
self._player = self.__spawn_player()
else:
raise RuntimeError('I already have a voice client/player running.')
raise RuntimeError("I already have a voice client/player running.")
async def disconnect(self) -> None:
"""Disconnects from the VC."""
@ -115,6 +119,7 @@ class Session:
def __spawn_player(self) -> asyncio.Task:
"""Starts a new player."""
async def player():
try:
while True:
@ -122,14 +127,18 @@ class Session:
with async_timeout.timeout(IDLE_FOR):
request = await self.queue.get()
await self.ctx.send(f'Playing `{request}` requested by {request.who}')
await self.ctx.send(
f"Playing `{request}` requested by {request.who}"
)
# Clear the skip event if it is set.
self._start_next_track_event.clear()
# Start the player if it was a valid request, else continue to the next track.
if not self.__play(request.actual_url):
await self.ctx.send(f'{request.referral} was a bad request and was skipped.')
await self.ctx.send(
f"{request.referral} was a bad request and was skipped."
)
continue
await self._start_next_track_event.wait()
@ -139,10 +148,10 @@ class Session:
except asyncio.CancelledError:
# Hit when someone kills the player using stop().
print('Requested to stop player', repr(self))
print("Requested to stop player", repr(self))
except asyncio.TimeoutError:
await self.ctx.send('Was idle for too long...')
print('Player queue was empty for too long and was stopped', repr(self))
await self.ctx.send("Was idle for too long...")
print("Player queue was empty for too long and was stopped", repr(self))
except Exception:
traceback.print_exc()
finally:
@ -150,6 +159,7 @@ class Session:
await self.voice_client.stop()
if self.is_connected:
await self.disconnect()
return self.loop.create_task(player())
def __play(self, url):
@ -159,7 +169,9 @@ class Session:
# Play the stream. After we finish, either from being cancelled or otherwise, fire the
# skip track event to start the next track.
self.voice_client.play(ffmpeg_player, after=lambda error: self._start_next_track_event.set())
self.voice_client.play(
ffmpeg_player, after=lambda error: self._start_next_track_event.set()
)
except Exception:
traceback.print_exc()
return False
@ -179,9 +191,11 @@ class Session:
def on_exit(self, func):
"""Decorates a function to invoke it on exit."""
async def callback():
await self._on_stop_event.wait()
inspect.iscoroutinefunction(func) and await func() or func()
self.loop.create_task(callback())
return func
@ -192,6 +206,7 @@ class PlayerCog:
self.sessions: Dict[discord.Guild, Session] = {}
# noinspection PyMethodMayBeStatic
async def __local_check(self, ctx):
return ctx.guild
@ -211,9 +226,12 @@ class PlayerCog:
await ctx.send("*hacker voice*\n**I'm in.**", delete_after=15)
else:
await ctx.send(f'I am already playing in {self.sessions[ctx.guild].voice_channel.mention}')
await ctx.send(
f"I am already playing in {self.sessions[ctx.guild].voice_channel.mention}"
)
# noinspection PyNestedDecorators
@staticmethod
@noblock.no_block
def _get_video_meta(referral):
@ -224,34 +242,35 @@ class PlayerCog:
@commands.command()
async def queue(self, ctx):
if ctx.guild not in self.sessions:
return await ctx.send('Please join me into a voice channel first.')
return await ctx.send("Please join me into a voice channel first.")
sesh = self.sessions[ctx.guild]
if sesh.queue.empty():
return await ctx.send(
'There is nothing in the queue at the moment!\n\n'
'Add something by running `<>play https://url` or `<>play search term`!')
"There is nothing in the queue at the moment!\n\n"
"Add something by running `<>play https://url` or `<>play search term`!"
)
# We cannot faff around with the actual queue so make a shallow copy of the internal
# non-async dequeue.
# noinspection PyProtectedMember
agenda = sesh.queue._queue.copy()
message = ['**Queue**']
message = ["**Queue**"]
for i, item in enumerate(list(agenda)[:15]):
message.append(f'`{i+1: >2}: {item.title} ({item.who})`')
message.append(f"`{i+1: >2}: {item.title} ({item.who})`")
if len(agenda) >= 15:
message.append('')
message.append(f'There are {len(agenda)} items in the queue currently.')
message.append("")
message.append(f"There are {len(agenda)} items in the queue currently.")
await ctx.send('\n'.join(message)[:2000])
await ctx.send("\n".join(message)[:2000])
@commands.command()
async def play(self, ctx, *, referral):
if ctx.guild not in self.sessions:
return await ctx.send('Please join me into a voice channel first.')
return await ctx.send("Please join me into a voice channel first.")
try:
try:
@ -259,29 +278,33 @@ class PlayerCog:
# If it was interpreted as a search, it appears this happens?
# The documentation is so nice.
if info.get('_type') == 'playlist':
info = info['entries'][0]
if info.get("_type") == "playlist":
info = info["entries"][0]
# ...wait... did I say nice? I meant "non existent."
url = info['url']
title = info.get('title') or referral
url = info["url"]
title = info.get("title") or referral
except IndexError:
return await ctx.send('No results...', delete_after=15)
return await ctx.send("No results...", delete_after=15)
except Exception as ex:
return await ctx.send(f"Couldn't add this to the queue... reason: {ex!s}")
return await ctx.send(
f"Couldn't add this to the queue... reason: {ex!s}"
)
await self.sessions[ctx.guild].queue.put(Request(ctx.author, referral, title, url))
await ctx.send(f'Okay. Queued `{title or referral}`.')
await self.sessions[ctx.guild].queue.put(
Request(ctx.author, referral, title, url)
)
await ctx.send(f"Okay. Queued `{title or referral}`.")
except KeyError:
await ctx.send('I am not playing in this server.')
await ctx.send("I am not playing in this server.")
@commands.command()
async def stop(self, ctx):
try:
await self.sessions[ctx.guild].stop()
except KeyError:
await ctx.send('I am not playing in this server.')
await ctx.send("I am not playing in this server.")
except TypeError:
await ctx.send("I wasn't playing anything, but okay.", delete_after=15)
@ -290,11 +313,11 @@ class PlayerCog:
try:
self.sessions[ctx.guild].skip()
try:
await ctx.message.add_reaction('\N{OK HAND SIGN}')
await ctx.message.add_reaction("\N{OK HAND SIGN}")
except discord.Forbidden:
await ctx.send('\N{OK HAND SIGN}')
await ctx.send("\N{OK HAND SIGN}")
except KeyError:
await ctx.send('I am not playing in this server.')
await ctx.send("I am not playing in this server.")
@commands.command()
async def disconnect(self, ctx):

View File

@ -4,6 +4,7 @@ import json
import aiofiles
import asyncio
class Tag:
def __init__(self, bot):
self.bot = bot
@ -17,12 +18,14 @@ class Tag:
"""Gets a tag"""
await ctx.trigger_typing()
if tag is None:
return await ctx.send('Please provide a argument. Do `help tag` for more info')
return await ctx.send(
"Please provide a argument. Do `help tag` for more info"
)
found = tags.get(tag, None)
if found is None:
return await ctx.send('Tag not found')
return await ctx.send("Tag not found")
await ctx.send(found)
@ -37,7 +40,9 @@ class Tag:
if desc == "":
desc = "None"
em = discord.Embed(title='Available tags:', description=desc ,colour=discord.Colour(0x00FFFF))
em = discord.Embed(
title="Available tags:", description=desc, colour=discord.Colour(0x00FFFF)
)
await ctx.send(embed=em)
@ -49,7 +54,9 @@ class Tag:
return await ctx.send("You are not allowed to do this")
if tag_name is None or tag_info is None:
return await ctx.send("Please provide a tag name and the tag info. Do `help tag` for more info")
return await ctx.send(
"Please provide a tag name and the tag info. Do `help tag` for more info"
)
exists = False
for i in tags:
@ -57,7 +64,7 @@ class Tag:
exists = True
if not exists:
tags.update({tag_name : tag_info})
tags.update({tag_name: tag_info})
async with aiofiles.open("src/shared_libs/tags.json", "w") as fp:
json_data = json.dumps(tags)
@ -75,7 +82,9 @@ class Tag:
return await ctx.send("You are not allowed to do this")
if tag is None:
return await ctx.send("Please provide a tag name and the tag info. Do `help tag` for more info")
return await ctx.send(
"Please provide a tag name and the tag info. Do `help tag` for more info"
)
found = None
for i in tags:

View File

@ -6,8 +6,10 @@ import functools
def no_block(func):
"""Turns a blocking function into a non-blocking coroutine function."""
@functools.wraps(func)
async def no_blocking_handler(*args, **kwargs):
partial = functools.partial(func, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(None, partial)
return no_blocking_handler

View File

@ -5,13 +5,15 @@ import json
import discord
import os
class LoadConfig:
"""
All config is collected here
"""
def __init__(self):
# Read our config file
with open('src/config/Config.json') as fp:
with open("src/config/Config.json") as fp:
self.config = json.load(fp)
# Initialize config
@ -23,7 +25,7 @@ class LoadConfig:
self.maintenance = self.config["maintenance"]
self.embed_color = discord.Colour(0x00FFFF)
self.error_color = discord.Colour(0xFF0000)
if self.maintenance == 'False':
if self.maintenance == "False":
self.maintenance = False
else:
self.maintenance = True

View File

@ -3,10 +3,23 @@ import asyncio
class DatabaseConnection:
def __init__(self, host: str='localhost', port: int=5432, database: str='', user: str='', password: str=''):
if user == '' or password == '' or database == '':
raise RuntimeError('Username or Password are blank')
self.kwargs = {'host': host, 'port': port, 'database': database, 'user': user, 'password': password}
def __init__(
self,
host: str = "localhost",
port: int = 5432,
database: str = "",
user: str = "",
password: str = "",
):
if user == "" or password == "" or database == "":
raise RuntimeError("Username or Password are blank")
self.kwargs = {
"host": host,
"port": port,
"database": database,
"user": user,
"password": password,
}
self._conn = None
asyncio.get_event_loop().run_until_complete(self.acquire())
self.fetchval = self._conn.fetchval

View File

@ -8,10 +8,10 @@ import inspect
import os
__all__ = ('in_here',)
__all__ = ("in_here",)
def in_here(first_path_bit: str, *path_bits: str, stack_depth: int=0) -> str:
def in_here(first_path_bit: str, *path_bits: str, stack_depth: int = 0) -> str:
"""
A somewhat voodooish and weird piece of code. This enables us to
directly refer to a file in the same directory as the code that
@ -40,11 +40,12 @@ def in_here(first_path_bit: str, *path_bits: str, stack_depth: int=0) -> str:
try:
frame = inspect.stack()[1 + stack_depth]
except IndexError:
raise RuntimeError('Could not find a stack record. Interpreter has '
'been shot.')
raise RuntimeError(
"Could not find a stack record. Interpreter has " "been shot."
)
else:
module = inspect.getmodule(frame[0])
assert hasattr(module, '__file__'), 'No `__file__\' attr, welp.'
assert hasattr(module, "__file__"), "No `__file__' attr, welp."
# https://docs.python.org/3/library/inspect.html#the-interpreter-stack
# If Python caches strings rather than copying when we move them

View File

@ -14,11 +14,11 @@ boast faster lookups.
import logging
__all__ = ('Loggable',)
__all__ = ("Loggable",)
class Loggable:
__slots__ = ('logger',)
__slots__ = ("logger",)
def __init_subclass__(cls, **_):
cls.logger = logging.getLogger(cls.__qualname__)

View File

@ -36,19 +36,21 @@ import typing
class Paginator:
def __init__(self,
def __init__(
self,
bot: discord.ext.commands.Bot,
*,
max_chars: int=1970,
max_lines: int=20,
prefix: str='```md',
suffix: str='```',
page_break: str='\uFFF8',
field_break: str='\uFFF7',
field_name_char: str='\uFFF6',
inline_char: str='\uFFF5',
max_line_length: int=100,
embed=False):
max_chars: int = 1970,
max_lines: int = 20,
prefix: str = "```md",
suffix: str = "```",
page_break: str = "\uFFF8",
field_break: str = "\uFFF7",
field_name_char: str = "\uFFF6",
inline_char: str = "\uFFF5",
max_line_length: int = 100,
embed=False,
):
_max_len = 6000 if embed else 1980
assert 0 < max_lines <= max_chars
assert 0 < max_line_length < 120
@ -56,9 +58,12 @@ class Paginator:
self._parts = list()
self._prefix = prefix
self._suffix = suffix
self._max_chars = max_chars if max_chars + len(prefix) + len(suffix) + 2 <= _max_len \
self._max_chars = (
max_chars
if max_chars + len(prefix) + len(suffix) + 2 <= _max_len
else _max_len - len(prefix) - len(suffix) - 2
self._max_lines = max_lines - (prefix + suffix).count('\n') + 1
)
self._max_lines = max_lines - (prefix + suffix).count("\n") + 1
self._page_break = page_break
self._max_line_length = max_line_length
self._pages = list()
@ -69,24 +74,27 @@ class Paginator:
self._field_break = field_break
self._field_name_char = field_name_char
self._inline_char = inline_char
self._embed_title = ''
self._embed_description = ''
self._embed_title = ""
self._embed_description = ""
self._embed_color = None
self._embed_thumbnail = None
self._embed_url = None
self._bot = bot
def set_embed_meta(self, title: str=None,
description: str=None,
color: discord.Colour=None,
thumbnail: str=None,
url: str=None):
def set_embed_meta(
self,
title: str = None,
description: str = None,
color: discord.Colour = None,
thumbnail: str = None,
url: str = None,
):
if title and len(title) > self._max_field_name:
raise RuntimeError('Provided Title is too long')
raise RuntimeError("Provided Title is too long")
else:
self._embed_title = title
if description and len(description) > self._max_description:
raise RuntimeError('Provided Description is too long')
raise RuntimeError("Provided Description is too long")
else:
self._embed_description = description
self._embed_color = color
@ -96,10 +104,10 @@ class Paginator:
def pages(self) -> typing.List[str]:
_pages = list()
_fields = list()
_page = ''
_page = ""
_lines = 0
_field_name = ''
_field_value = ''
_field_name = ""
_field_value = ""
_inline = False
def open_page():
@ -131,26 +139,29 @@ class Paginator:
if new_chars > self._max_chars:
close_page()
elif (_lines + (part.count('\n') + 1 or 1)) > self._max_lines:
elif (_lines + (part.count("\n") + 1 or 1)) > self._max_lines:
close_page()
_lines += (part.count('\n') + 1 or 1)
_page += '\n' + part
_lines += part.count("\n") + 1 or 1
_page += "\n" + part
else:
def open_field(name: str):
nonlocal _field_value, _field_name
_field_name = name
_field_value = self._prefix
def close_field(next_name: str=None):
def close_field(next_name: str = None):
nonlocal _field_name, _field_value, _fields
_field_value += self._suffix
if _field_value != self._prefix + self._suffix:
_fields.append({'name': _field_name, 'value': _field_value, 'inline': _inline})
_fields.append(
{"name": _field_name, "value": _field_value, "inline": _inline}
)
if next_name:
open_field(next_name)
open_field('\uFFF0')
open_field("\uFFF0")
for part in [str(p) for p in self._parts]:
if part == self._page_break:
@ -158,17 +169,17 @@ class Paginator:
continue
elif part == self._field_break:
if len(_fields) + 1 < 25:
close_field(next_name='\uFFF0')
close_field(next_name="\uFFF0")
else:
close_field()
close_page()
continue
if part.startswith(self._field_name_char):
part = part.replace(self._field_name_char, '')
part = part.replace(self._field_name_char, "")
if part.startswith(self._inline_char):
_inline = True
part = part.replace(self._inline_char, '')
part = part.replace(self._inline_char, "")
else:
_inline = False
if _field_value and _field_value != self._prefix:
@ -177,7 +188,7 @@ class Paginator:
_field_name = part
continue
_field_value += '\n' + part
_field_value += "\n" + part
close_field()
@ -188,14 +199,15 @@ class Paginator:
def process_pages(self) -> typing.List[str]:
_pages = self._pages or self.pages()
_len_pages = len(_pages)
_len_page_str = len(f'{_len_pages}/{_len_pages}')
_len_page_str = len(f"{_len_pages}/{_len_pages}")
if not self._embed:
for i, page in enumerate(_pages):
if len(page) + _len_page_str <= 2000:
_pages[i] = f'{i + 1}/{_len_pages}\n{page}'
_pages[i] = f"{i + 1}/{_len_pages}\n{page}"
else:
for i, page in enumerate(_pages):
em = discord.Embed(title=self._embed_title,
em = discord.Embed(
title=self._embed_title,
description=self._embed_description,
color=self._bot.embed_color,
)
@ -205,9 +217,11 @@ class Paginator:
em.url = self._embed_url
if self._embed_color:
em.colour = self._embed_color
em.set_footer(text=f'{i + 1}/{_len_pages}')
em.set_footer(text=f"{i + 1}/{_len_pages}")
for field in page:
em.add_field(name=field['name'], value=field['value'], inline=field['inline'])
em.add_field(
name=field["name"], value=field["value"], inline=field["inline"]
)
_pages[i] = em
return _pages
@ -218,51 +232,68 @@ class Paginator:
# noinspection PyProtectedMember
return self.__class__ == other.__class__ and self._parts == other._parts
def add_page_break(self, *, to_beginning: bool=False) -> None:
def add_page_break(self, *, to_beginning: bool = False) -> None:
self.add(self._page_break, to_beginning=to_beginning)
def add(self, item: typing.Any, *, to_beginning: bool=False, keep_intact: bool=False, truncate=False) -> None:
def add(
self,
item: typing.Any,
*,
to_beginning: bool = False,
keep_intact: bool = False,
truncate=False,
) -> None:
item = str(item)
i = 0
if not keep_intact and not item == self._page_break:
item_parts = item.strip('\n').split('\n')
item_parts = item.strip("\n").split("\n")
for part in item_parts:
if len(part) > self._max_line_length:
if not truncate:
length = 0
out_str = ''
out_str = ""
def close_line(line):
nonlocal i, out_str, length
self._parts.insert(i, out_str) if to_beginning else self._parts.append(out_str)
self._parts.insert(
i, out_str
) if to_beginning else self._parts.append(out_str)
i += 1
out_str = line + ' '
out_str = line + " "
length = len(out_str)
bits = part.split(' ')
bits = part.split(" ")
for bit in bits:
next_len = length + len(bit) + 1
if next_len <= self._max_line_length:
out_str += bit + ' '
out_str += bit + " "
length = next_len
elif len(bit) > self._max_line_length:
if out_str:
close_line(line='')
for out_str in [bit[i:i + self._max_line_length]
for i in range(0, len(bit), self._max_line_length)]:
close_line('')
close_line(line="")
for out_str in [
bit[i : i + self._max_line_length]
for i in range(0, len(bit), self._max_line_length)
]:
close_line("")
else:
close_line(bit)
close_line('')
close_line("")
else:
line = f'{part:.{self._max_line_length-3}}...'
self._parts.insert(i, line) if to_beginning else self._parts.append(line)
line = f"{part:.{self._max_line_length-3}}..."
self._parts.insert(
i, line
) if to_beginning else self._parts.append(line)
else:
self._parts.insert(i, part) if to_beginning else self._parts.append(part)
self._parts.insert(i, part) if to_beginning else self._parts.append(
part
)
i += 1
elif keep_intact and not item == self._page_break:
if len(item) >= self._max_chars or item.count('\n') > self._max_lines:
raise RuntimeError('{item} is too long to keep on a single page and is marked to keep intact.')
if len(item) >= self._max_chars or item.count("\n") > self._max_lines:
raise RuntimeError(
"{item} is too long to keep on a single page and is marked to keep intact."
)
if to_beginning:
self._parts.insert(0, item)
else:
@ -275,17 +306,23 @@ class Paginator:
class Book:
def __init__(self, pag: Paginator, ctx: typing.Tuple[typing.Optional[discord.Message],
def __init__(
self,
pag: Paginator,
ctx: typing.Tuple[
typing.Optional[discord.Message],
discord.TextChannel,
discord.ext.commands.Bot,
discord.Message]) -> None:
discord.Message,
],
) -> None:
self._pages = pag.process_pages()
self._len_pages = len(self._pages)
self._current_page = 0
self._message, self._channel, self._bot, self._calling_message = ctx
self._locked = True
if pag == Paginator(self._bot):
raise RuntimeError('Cannot create a book out of an empty Paginator.')
raise RuntimeError("Cannot create a book out of an empty Paginator.")
def advance_page(self) -> None:
self._current_page += 1
@ -300,14 +337,22 @@ class Book:
async def display_page(self) -> None:
if isinstance(self._pages[self._current_page], discord.Embed):
if self._message:
await self._message.edit(content=None, embed=self._pages[self._current_page])
await self._message.edit(
content=None, embed=self._pages[self._current_page]
)
else:
self._message = await self._channel.send(embed=self._pages[self._current_page])
self._message = await self._channel.send(
embed=self._pages[self._current_page]
)
else:
if self._message:
await self._message.edit(content=self._pages[self._current_page], embed=None)
await self._message.edit(
content=self._pages[self._current_page], embed=None
)
else:
self._message = await self._channel.send(self._pages[self._current_page])
self._message = await self._channel.send(
self._pages[self._current_page]
)
async def create_book(self) -> None:
# noinspection PyUnresolvedReferences
@ -315,12 +360,16 @@ class Book:
# noinspection PyShadowingNames
def check(reaction, user):
if self._locked:
return str(reaction.emoji) in self._bot.book_emojis.values() \
and user == self._calling_message.author \
return (
str(reaction.emoji) in self._bot.book_emojis.values()
and user == self._calling_message.author
and reaction.message.id == self._message.id
)
else:
return str(reaction.emoji) in self._bot.book_emojis.values() \
return (
str(reaction.emoji) in self._bot.book_emojis.values()
and reaction.message.id == self._message.id
)
await self.display_page()
@ -332,14 +381,16 @@ class Book:
pass
else:
try:
await self._message.add_reaction(self._bot.book_emojis['unlock'])
await self._message.add_reaction(self._bot.book_emojis['close'])
await self._message.add_reaction(self._bot.book_emojis["unlock"])
await self._message.add_reaction(self._bot.book_emojis["close"])
except (discord.Forbidden, KeyError):
pass
while True:
try:
reaction, user = await self._bot.wait_for('reaction_add', timeout=60, check=check)
reaction, user = await self._bot.wait_for(
"reaction_add", timeout=60, check=check
)
except asyncio.TimeoutError:
try:
await self._message.clear_reactions()
@ -348,34 +399,42 @@ class Book:
raise asyncio.CancelledError
else:
await self._message.remove_reaction(reaction, user)
if str(reaction.emoji) == self._bot.book_emojis['close']:
if str(reaction.emoji) == self._bot.book_emojis["close"]:
await self._calling_message.delete()
await self._message.delete()
raise asyncio.CancelledError
elif str(reaction.emoji) == self._bot.book_emojis['forward']:
elif str(reaction.emoji) == self._bot.book_emojis["forward"]:
self.advance_page()
elif str(reaction.emoji) == self._bot.book_emojis['back']:
elif str(reaction.emoji) == self._bot.book_emojis["back"]:
self.reverse_page()
elif str(reaction.emoji) == self._bot.book_emojis['end']:
elif str(reaction.emoji) == self._bot.book_emojis["end"]:
self._current_page = self._len_pages - 1
elif str(reaction.emoji) == self._bot.book_emojis['start']:
elif str(reaction.emoji) == self._bot.book_emojis["start"]:
self._current_page = 0
elif str(reaction.emoji) == self._bot.book_emojis['hash']:
m = await self._channel.send(f'Please enter a number in range 1 to {self._len_pages}')
elif str(reaction.emoji) == self._bot.book_emojis["hash"]:
m = await self._channel.send(
f"Please enter a number in range 1 to {self._len_pages}"
)
def num_check(message):
if self._locked:
return message.content.isdigit() \
and 0 < int(message.content) <= self._len_pages \
and message.author == self._calling_message.author
else:
return message.content.isdigit() \
return (
message.content.isdigit()
and 0 < int(message.content) <= self._len_pages
and message.author == self._calling_message.author
)
else:
return (
message.content.isdigit()
and 0 < int(message.content) <= self._len_pages
)
try:
msg = await self._bot.wait_for('message', timeout=30, check=num_check)
msg = await self._bot.wait_for(
"message", timeout=30, check=num_check
)
except asyncio.TimeoutError:
await m.edit(content='Message Timed out.')
await m.edit(content="Message Timed out.")
else:
self._current_page = int(msg.content) - 1
try:
@ -383,9 +442,11 @@ class Book:
await msg.delete()
except discord.Forbidden:
pass
elif str(reaction.emoji) == self._bot.book_emojis['unlock']:
elif str(reaction.emoji) == self._bot.book_emojis["unlock"]:
self._locked = False
await self._message.remove_reaction(reaction, self._channel.guild.me)
await self._message.remove_reaction(
reaction, self._channel.guild.me
)
continue
await self.display_page()

View File

@ -46,7 +46,7 @@ class Capturing(list):
sys.stdout = self._stdout
def to_list_of_str(items, out: list=list(), level=1, recurse=0):
def to_list_of_str(items, out: list = list(), level=1, recurse=0):
def rec_loop(item, key, out, level):
quote = '"'
if type(item) == list:
@ -60,40 +60,42 @@ def to_list_of_str(items, out: list=list(), level=1, recurse=0):
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}}}')
else:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{repr(item)},')
out.append(
f'{" "*level}{quote+key+quote+": " if key else ""}{repr(item)},'
)
if type(items) == list:
if not recurse:
out = list()
out.append('[')
out.append("[")
for item in items:
rec_loop(item, None, out, level)
if not recurse:
out.append(']')
out.append("]")
elif type(items) == dict:
if not recurse:
out = list()
out.append('{')
out.append("{")
for key in items:
rec_loop(items[key], key, out, level)
if not recurse:
out.append('}')
out.append("}")
return out
def paginate(text, maxlen=1990):
paginator = Paginator(prefix='```py', max_size=maxlen+10)
paginator = Paginator(prefix="```py", max_size=maxlen + 10)
if type(text) == list:
data = to_list_of_str(text)
elif type(text) == dict:
data = to_list_of_str(text)
else:
data = str(text).split('\n')
data = str(text).split("\n")
for line in data:
if len(line) > maxlen:
n = maxlen
for l in [line[i:i+n] for i in range(0, len(line), n)]:
for l in [line[i : i + n] for i in range(0, len(line), n)]:
paginator.add_line(l)
else:
paginator.add_line(line)
@ -105,7 +107,8 @@ async def run_command(args):
process = await asyncio.create_subprocess_shell(
args,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE)
stdout=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return stdout