Merge branch 'neko404notfound-development' into development

This commit is contained in:
davfsa 2018-06-21 19:50:13 +02:00
commit 6b8bda9223
42 changed files with 2064 additions and 1024 deletions

7
.gitattributes vendored Normal file
View File

@ -0,0 +1,7 @@
*.py text eol=lf
*.js text eol=lf
*.sh text eol=lf
*.json text eol=lf
*.png binary
*.jpg binary

3
.gitignore vendored
View File

@ -120,3 +120,6 @@ docker-compose.yml
node_modules/ node_modules/
*.xml *.xml
*.iml *.iml
# Lock file, I guess.
package-lock.json

View File

@ -16,4 +16,4 @@ RUN python3.6 -m pip install --upgrade pip && \
python3.6 -m pip install -r requirements.txt && \ python3.6 -m pip install -r requirements.txt && \
python3.6 -m pip install -U git+https://github.com/Rapptz/discord.py@rewrite#egg=discord.py[voice] python3.6 -m pip install -U git+https://github.com/Rapptz/discord.py@rewrite#egg=discord.py[voice]
cmd ["python3.6","-m","src"] cmd ["python3.6","-m","sebimachine"]

59
package-lock.json generated
View File

@ -1,59 +0,0 @@
{
"name": "sebi-machine",
"version": "1.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"async-limiter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
"integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
},
"discord.js": {
"version": "11.3.2",
"resolved": "https://registry.npmjs.org/discord.js/-/discord.js-11.3.2.tgz",
"integrity": "sha512-Abw9CTMX3Jb47IeRffqx2VNSnXl/OsTdQzhvbw/JnqCyqc2imAocc7pX2HoRmgKd8CgSqsjBFBneusz/E16e6A==",
"requires": {
"long": "^4.0.0",
"prism-media": "^0.0.2",
"snekfetch": "^3.6.4",
"tweetnacl": "^1.0.0",
"ws": "^4.0.0"
}
},
"long": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz",
"integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA=="
},
"prism-media": {
"version": "0.0.2",
"resolved": "https://registry.npmjs.org/prism-media/-/prism-media-0.0.2.tgz",
"integrity": "sha512-L6yc8P5NVG35ivzvfI7bcTYzqFV+K8gTfX9YaJbmIFfMXTs71RMnAupvTQPTCteGsiOy9QcNLkQyWjAafY/hCQ=="
},
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
},
"snekfetch": {
"version": "3.6.4",
"resolved": "https://registry.npmjs.org/snekfetch/-/snekfetch-3.6.4.tgz",
"integrity": "sha512-NjxjITIj04Ffqid5lqr7XdgwM7X61c/Dns073Ly170bPQHLm6jkmelye/eglS++1nfTWktpP6Y2bFXjdPlQqdw=="
},
"tweetnacl": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.0.tgz",
"integrity": "sha1-cT2LgY2kIGh0C/aDhtBHnmb8ins="
},
"ws": {
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-4.1.0.tgz",
"integrity": "sha512-ZGh/8kF9rrRNffkLFV4AzhvooEclrOH0xaugmqGsIfFgOE/pIz4fMc4Ef+5HSQqTEug2S9JZIWDR47duDSLfaA==",
"requires": {
"async-limiter": "~1.0.0",
"safe-buffer": "~5.1.0"
}
}
}
}

View File

@ -11,7 +11,7 @@
"url": "git+https://github.com/dustinpianalto/Sebi-Machine.git" "url": "git+https://github.com/dustinpianalto/Sebi-Machine.git"
}, },
"author": "", "author": "",
"license": "ISC", "license": "MIT",
"bugs": { "bugs": {
"url": "https://github.com/dustinpianalto/Sebi-Machine/issues" "url": "https://github.com/dustinpianalto/Sebi-Machine/issues"
}, },

View File

@ -12,3 +12,5 @@ opuslib
dataclasses dataclasses
PyNaCl PyNaCl
youtube_dl youtube_dl
# Duh!
git+https://github.com/rapptz/discord.py@rewrite

View File

@ -4,14 +4,39 @@
# Esp: added a trap here, as it otherwise attempts to restart when given # Esp: added a trap here, as it otherwise attempts to restart when given
# the interrupt signal. This is really annoying over SSH when I have # the interrupt signal. This is really annoying over SSH when I have
# a 1-second lag anyway. # a 1-second lag anyway.
trap "echo 'Received interrupt. Exiting.'; exit 0" SIGINT
trap "echo 'Received interrupt. Exiting.'; exit 0" SIGINT SIGTERM
# Also loads the venv if it is present. # Also loads the venv if it is present.
[ -d .venv/bin ] && source .venv/bin/activate && echo "Entered venv." || echo "No venv detected." [ -d .venv/bin ] && source .venv/bin/activate && echo "Entered venv." || echo "No venv detected."
until python -m src; do function git-try-pull() {
git pull --all
}
FAIL_COUNTER=0
while true; do
if [ ${FAIL_COUNTER} -eq 4 ]; then
echo -e "\e[0;31mFailed four times in a row. Trying to repull.\e[0m"
git-try-pull
let FAIL_COUNTER=0
fi
# Just respawn repeatedly until sigint.
python3.6 -m src
EXIT_STATUS=${?}
if [ ${EXIT_STATUS} -ne 0 ]; then
let FAIL_COUNTER=${FAIL_COUNTER}+1
else
let FAIL_COUNTER=0
fi
# Added colouring to ensure the date of shutdown and the exit code stands # Added colouring to ensure the date of shutdown and the exit code stands
# out from the other clutter in the traceback that might have been output. # out from the other clutter in the traceback that might have been output.
echo -e "\e[0;31m[$(date --utc)]\e[0m Sebi-Machine shutdown with error \e[0;31m$?\e[0m. Restarting..." >&2 echo -e "\e[0;31m[$(date --utc)]\e[0m Sebi-Machine shutdown with error \e[0;31m${EXIT_STATUS}\e[0m. Restarting..." >&2
sleep 1 sleep 1
done done

15
sebimachine/__init__.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
"""
Sebi-Machine.
"""
__author__ = "Dusty.P"
__contributors__ = (__author__, "Neko404NotFound", "Dusty.P", "davfsa", "YashKandalkar")
__license__ = "MIT"
__title__ = "Sebi-Machine"
__version__ = "0.0.1"
__repository__ = f"https://github.com/{__author__}/{__title__}"
__url__ = __repository__

176
sebimachine/__main__.py Normal file
View File

@ -0,0 +1,176 @@
# !/usr/bin/python
# -*- coding: utf8 -*-
"""
App entry point.
Something meaningful here, eventually.
"""
import asyncio
import json
import logging
import os
import random
import sys
import traceback
from typing import Dict
import discord
from discord.ext import commands
from .config.config import LoadConfig
from .shared_libs import database
from .shared_libs.ioutils import in_here
from .shared_libs.loggable import Loggable
# Init logging to output on INFO level to stderr.
logging.basicConfig(level="INFO")
# If uvloop is installed, change to that eventloop policy as it
# is more efficient
try:
# https://stackoverflow.com/a/45700730
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
logging.warning("Detected Windows. Changing event loop to ProactorEventLoop.")
else:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
del uvloop
except BaseException as ex:
logging.warning(
f"Could not load uvloop. {type(ex).__qualname__}: {ex};",
"reverting to default impl.",
)
else:
logging.info(f"Using uvloop for asyncio event loop policy.")
# Bot Class
# Might be worth moving this to it's own file?
class SebiMachine(commands.Bot, LoadConfig, Loggable):
"""This discord is dedicated to http://www.discord.gg/GWdhBSp"""
def __init__(self):
# Initialize and attach config / settings
LoadConfig.__init__(self)
commands.Bot.__init__(self, command_prefix=self.defaultprefix)
with open(in_here("config", "PrivateConfig.json")) as fp:
self.bot_secrets = json.load(fp)
self.db_con = database.DatabaseConnection(**self.bot_secrets["db-con"])
self.book_emojis: Dict[str, str] = {
"unlock": "🔓",
"start": "",
"back": "",
"hash": "#\N{COMBINING ENCLOSING KEYCAP}",
"forward": "",
"end": "",
"close": "🇽",
}
# Load plugins
# Add your cog file name in this list
with open(in_here("extensions.txt")) as cog_file:
cogs = cog_file.readlines()
for cog in cogs:
# Could this just be replaced with `strip()`?
cog = cog.replace("\n", "")
self.load_extension(f"src.cogs.{cog}")
self.logger.info(f"Loaded: {cog}")
async def on_ready(self):
"""On ready function"""
self.maintenance and self.logger.warning("MAINTENANCE ACTIVE")
with open(f"src/config/reboot", "r") as f:
reboot = f.readlines()
if int(reboot[0]) == 1:
await self.get_channel(int(reboot[1])).send("Restart Finished.")
with open(f"src/config/reboot", "w") as f:
f.write(f"0")
async def on_command_error(self, ctx, error):
"""
The event triggered when an error is raised while invoking a command.
ctx : Context
error : Exception
"""
jokes = [
"I'm a bit tipsy, I took to many screenshots...",
"I am rushing to the 24/7 store to get myself anti-bug spray...",
"Organizing turtle race...",
"There is no better place then 127.0.0.1...",
"Recycling Hex Decimal...",
"No worry, I get fixed :^)...",
"R.I.P, press F for respect...",
"The bug repellent dit not work...",
"You found a bug in the program. Unfortunately the joke did not fit here, better luck next time...",
]
# CommandErrors triggered by other propagating errors tend to get wrapped. This means
# if we have a cause, we should probably consider unwrapping that so we get a useful
# message.
# If command is not found, return
em = discord.Embed(colour=self.error_color)
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
em.title = "Command Not Found"
em.description = f"{ctx.prefix}{ctx.invoked_with} is not a valid command."
else:
error = error.__cause__ or error
tb = traceback.format_exception(
type(error), error, error.__traceback__, limit=2, chain=False
)
tb = "".join(tb)
joke = random.choice(jokes)
fmt = (
f"**`{self.defaultprefix}{ctx.command}`**\n{joke}\n\n**{type(error).__name__}:**:\n```py\n{tb}\n```"
)
em.title = (
f"**{type(error).__name__}** in command {ctx.prefix}{ctx.command}"
)
em.description = str(error)
await ctx.send(embed=em)
async def on_message(self, message):
# Make sure people can't change the username
if message.guild:
if message.guild.me.display_name != self.display_name:
try:
await message.guild.me.edit(nick=self.display_name)
except:
pass
else:
if (
"exec" in message.content
or "repl" in message.content
or "token" in message.content
) and message.author != self.user:
await self.get_user(351794468870946827).send(
f"{message.author.name} ({message.author.id}) is using me "
f"in DMs\n{message.content}"
)
# If author is a bot, ignore the message
if message.author.bot:
return
# Make sure the command get processed as if it was typed with lowercase
# Split message.content one first space
command = message.content.split(None, 1)
if command:
command[0] = command[0].lower()
message.content = " ".join(command)
message.content = " ".join(command)
# process command
await self.process_commands(message)
client = SebiMachine()
client.run(client.bot_secrets["bot-key"])

View File

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 16 KiB

View File

@ -0,0 +1,94 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import asyncio
from discord.ext import commands
import discord
class BasicCommands:
def __init__(self, bot):
self.bot = bot
@commands.command()
async def tutorial(self, ctx):
await ctx.send(
f"Hello, {ctx.author.display_name}. Welcome to Sebi's Bot Tutorials. \nFirst off, would you like a quick walkthrough on the server channels?"
)
channel_list = {
"channel-1": self.bot.get_channel(333149949883842561).mention,
"d.py-rewrite-start": self.bot.get_channel(386419285439938560).mention,
"js-klasa-start": self.bot.get_channel(341816240186064897).mention,
"d.js": self.bot.get_channel(436771798303113217).mention,
}
bots_channels = (
self.bot.get_channel(339112602867204097).mention,
self.bot.get_channel(411586546551095296).mention,
)
help_channels = (
self.bot.get_channel(425315253153300488).mention,
self.bot.get_channel(392215236612194305).mention,
self.bot.get_channel(351034776985141250).mention,
)
def check(m):
return (
True
if m.author.id == ctx.author.id and m.channel.id == ctx.channel.id
else False
)
msg = await self.bot.wait_for("message", check=check, timeout=15)
agree = ("yes", "yep", "yesn't", "ya", "ye")
if msg is None:
await ctx.send(
"Sorry, {ctx.author.mention}, you didn't reply on time. You can run the command again when you're free :)"
)
else:
if msg.content.lower() in agree:
async with ctx.typing():
await ctx.send("Alrighty-Roo... Check your DMs!")
await ctx.author.send("Alrighty-Roo...")
await ctx.author.send(
f"To start making your bot from scratch, you first need to head over to {channel_list['channel-1']}"
" (Regardless of the language you're gonna use)."
)
await asyncio.sleep(0.5)
await ctx.author.send(
f"After you have a bot account, you can either continue with {channel_list['d.py-rewrite-start']}"
f"if you want to make a bot in discord.py rewrite __or__ go to {channel_list['js-klasa-start']} or "
f"{channel_list['d.js']} for making a bot in JavaScript."
)
await ctx.author.send(
"...Read all the tutorials and still need help? You have two ways to get help."
)
await asyncio.sleep(1.5)
await ctx.author.send(
"**Method-1**\nThis is the best method of getting help. You help yourself.\n"
f"To do so, head over to a bots dedicated channel (either {bots_channels[0]} or {bots_channels[1]})"
" and type `?rtfm rewrite thing_you_want_help_with`.\nThis will trigger the bot R.Danny Bot and will"
"give you links on your query on the official discord.py rewrite docs. *PS: Let the page completely load*"
)
await asyncio.sleep(5)
await ctx.author.send(
"**Method-2**\nIf you haven't found anything useful with Method-1, feel free to ask your question "
f"in any of the related help channels. ({', '.join(help_channels)})\nMay the force be with you!!"
)
else:
return await ctx.send(
"Session terminated. You can run this command again whenever you want."
)
def setup(bot):
bot.add_cog(BasicCommands(bot))

View File

@ -0,0 +1,228 @@
import discord
from discord.ext import commands
class BotManager:
def __init__(self, bot):
self.bot = bot
async def on_member_join(self, member):
# If the member is not a bot
if member.bot is False:
return
else:
# The member is a bot
await member.add_roles(discord.utils.get(member.guild.roles, name="Bots"))
try:
await member.edit(
nick="["
+ await self.bot.db_con.fetch(
"select prefix from bots where id = $1", member.id
)
+ "] "
+ member.name
)
except:
pass
async def on_member_remove(self, member):
# If the member is not a bot
if member.bot is False:
return
else:
# The member is a bot
await self.bot.db_con.execute("DELETE FROM bots WHERE id = $1", member.id)
@commands.command()
async def invite(self, ctx, bot=None, prefix=None):
bot = await ctx.bot.get_user_info(bot)
if not bot:
raise Warning(
"You must include the id of the bot you are trying to invite... Be exact."
)
if not bot.bot:
raise Warning("You can only invite bots.")
if not prefix:
raise Warning("Please provide a prefix")
# Make sure that the bot has not been invited already and it is not being tested
if (
await self.bot.db_con.fetch(
"select count(*) from bots where id = $1", bot.id
)
== 1
):
raise Warning("The bot has already been invited or is being tested")
await self.bot.db_con.execute(
"insert into bots (id, owner, prefix) values ($1, $2, $3)",
bot.id,
ctx.author.id,
prefix,
)
em = discord.Embed(colour=self.bot.embed_color)
em.title = "Hello {},".format(ctx.author.name)
em.description = "Thanks for inviting your bot! It will be tested and invited shortly. " "Please open your DMs if they are not already so the bot can contact " "you to inform you about the progress of the bot!"
await ctx.send(embed=em)
em = discord.Embed(title="Bot invite", colour=discord.Color(0x363941))
em.set_thumbnail(url=bot.avatar_url)
em.add_field(name="Bot name", value=bot.name)
em.add_field(name="Bot id", value="`" + str(bot.id) + "`")
em.add_field(name="Bot owner", value=ctx.author.mention)
em.add_field(name="Bot prefix", value="`" + prefix + "`")
await ctx.bot.get_channel(448803675574370304).send(embed=em)
@commands.command(name="claim", aliases=["makemine", "gimme"])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _claim_bot(
self,
ctx,
bot: discord.Member = None,
prefix: str = None,
owner: discord.Member = None,
):
if not bot:
raise Warning(
"You must include the name of the bot you are trying to claim... Be exact."
)
if not bot.bot:
raise Warning("You can only claim bots.")
if not prefix:
if bot.display_name.startswith("["):
prefix = bot.display_name.split("]")[0].strip("[")
else:
raise Warning("Prefix not provided and can't be found in bot nick.")
if owner is not None and ctx.author.guild_permissions.manage_roles:
author_id = owner.id
else:
author_id = ctx.author.id
em = discord.Embed()
if (
await self.bot.db_con.fetchval(
"select count(*) from bots where owner = $1", author_id
)
>= 10
):
em.colour = self.bot.error_color
em.title = "Too Many Bots Claimed"
em.description = "Each person is limited to claiming 10 bots as that is how " "many bots are allowed by the Discord API per user."
return await ctx.send(embed=em)
existing = await self.bot.db_con.fetchrow(
"select * from bots where id = $1", bot.id
)
if not existing:
await self.bot.db_con.execute(
"insert into bots (id, owner, prefix) values ($1, $2, $3)",
bot.id,
author_id,
prefix,
)
em.colour = self.bot.embed_color
em.title = "Bot Claimed"
em.description = f"You have claimed {bot.display_name} with a prefix of {prefix}\n" f"If there is an error please run command again to correct the prefix,\n" f"or {ctx.prefix}unclaim {bot.mention} to unclaim the bot."
elif existing["owner"] and existing["owner"] != author_id:
em.colour = self.bot.error_color
em.title = "Bot Already Claimed"
em.description = "This bot has already been claimed by someone else.\n" "If this is actually your bot please let the guild Administrators know."
elif existing["owner"] and existing["owner"] == author_id:
em.colour = self.bot.embed_color
em.title = "Bot Already Claimed"
em.description = "You have already claimed this bot.\n" "If the prefix you provided is different from what is already in the database" " it will be updated for you."
if existing["prefix"] != prefix:
await self.bot.db_con.execute(
"update bots set prefix = $1 where id = $2", prefix, bot.id
)
elif not existing["owner"]:
await self.bot.db_con.execute(
"update bots set owner = $1, prefix = $2 where id = $3",
author_id,
prefix,
bot.id,
)
em.colour = self.bot.embed_color
em.title = "Bot Claimed"
em.description = f"You have claimed {bot.display_name} with a prefix of {prefix}\n" f"If there is an error please run command again to correct the prefix,\n" f"or {ctx.prefix}unclaim {bot.mention} to unclaim the bot."
else:
em.colour = self.bot.error_color
em.title = "Something Went Wrong..."
await ctx.send(embed=em)
@commands.command(name="unclaim")
@commands.cooldown(1, 5, commands.BucketType.user)
async def _unclaim_bot(self, ctx, bot: discord.Member = None):
if not bot:
raise Warning(
"You must include the name of the bot you are trying to claim... Be exact."
)
if not bot.bot:
raise Warning("You can only unclaim bots.")
em = discord.Embed()
existing = await self.bot.db_con.fetchrow(
"select * from bots where id = $1", bot.id
)
if not existing or not existing["owner"]:
em.colour = self.bot.error_color
em.title = "Bot Not Found"
em.description = "That bot is not claimed"
elif (
existing["owner"] != ctx.author.id
and not ctx.author.guild_permissions.manage_roles
):
em.colour = self.bot.error_color
em.title = "Not Claimed By You"
em.description = "That bot is claimed by someone else.\n" "You can't unclaim someone else's bot"
else:
await self.bot.db_con.execute(
"update bots set owner = null where id = $1", bot.id
)
em.colour = self.bot.embed_color
em.title = "Bot Unclaimed"
em.description = f"You have unclaimed {bot.display_name}\n" f"If this is an error please reclaim using\n" f'{ctx.prefix}claim {bot.mention} {existing["prefix"]}'
await ctx.send(embed=em)
@commands.command(name="listclaims", aliases=["claimed", "mybots"])
@commands.cooldown(1, 5, commands.BucketType.user)
async def _claimed_bots(self, ctx, usr: discord.Member = None):
if usr is None:
usr = ctx.author
bots = await self.bot.db_con.fetch(
"select * from bots where owner = $1", usr.id
)
if bots:
em = discord.Embed(
title=f"{usr.display_name} has claimed the following bots:",
colour=self.bot.embed_color,
)
for bot in bots:
member = ctx.guild.get_member(int(bot["id"]))
em.add_field(
name=member.display_name,
value=f'Stored Prefix: {bot["prefix"]}',
inline=False,
)
else:
em = discord.Embed(
title="You have not claimed any bots.", colour=self.bot.embed_color
)
await ctx.send(embed=em)
@commands.command(name="whowns")
async def _whowns(self, ctx, bot: discord.Member):
if not bot.bot:
await ctx.send("this commands only for bots")
else:
owner = await self.bot.db_con.fetchrow(
"select * from bots where id = $1", bot.id
)
await ctx.send(ctx.guild.get_member(owner["owner"]).display_name)
def setup(bot):
bot.add_cog(BotManager(bot))

248
sebimachine/cogs/code.py Normal file
View File

@ -0,0 +1,248 @@
from discord.ext import commands
import traceback
import discord
import inspect
import textwrap
from contextlib import redirect_stdout
import io
class REPL:
"""Python in Discords"""
def __init__(self, bot):
self.bot = bot
self._last_result = None
self.sessions = set()
def cleanup_code(self, content):
"""
Automatically removes code blocks from the code.
"""
# remove ```py\n```
if content.startswith("```") and content.endswith("```"):
return "\n".join(content.split("\n")[1:-1])
# remove `foo`
return content.strip("` \n")
def get_syntax_error(self, e):
if e.text is None:
return "{0.__class__.__name__}: {0}".format(e)
return "{0.text}{1:>{0.offset}}\n{2}: {0}".format(e, "^", type(e).__name__)
@commands.command(name="exec")
async def _eval(self, ctx, *, body: str = None):
"""
Execute python code in discord chat.
Only the owner of this bot can use this command.
Alias:
- exec
Usage:
- exec < python code >
Example:
- exec print(546132)
"""
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if body is None:
return await ctx.send(
"Please, use\n"
f'`{self.bot.config["prefix"]}exec`\n\n'
"\n`\\`\\`\\`py\n[python code]\n\\`\\`\\`\n"
"to get the most out of the command"
)
env = {
"bot": self.bot,
"ctx": ctx,
"channel": ctx.message.channel,
"author": ctx.message.author,
"server": ctx.message.guild,
"message": ctx.message,
"_": self._last_result,
}
env.update(globals())
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = "async def func():\n%s" % textwrap.indent(body, " ")
try:
exec(to_compile, env)
except SyntaxError as e:
try:
await ctx.send(f"```py\n{self.get_syntax_error(e)}\n```")
except Exception as e:
error = [
self.get_syntax_error(e)[i : i + 2000]
for i in range(0, len(self.get_syntax_error(e)), 2000)
]
for i in error:
await ctx.send(f"```py\n{i}\n```")
func = env["func"]
try:
with redirect_stdout(stdout):
ret = await func()
except Exception as e:
value = stdout.getvalue()
try:
await ctx.send(f"```py\n{value}{traceback.format_exc()}\n```")
except Exception as e:
error = [value[i : i + 2000] for i in range(0, len(value), 2000)]
for i in error:
await ctx.send(f"```py\n{i}\n```")
tracebackerror = [
traceback.format_exc()[i : i + 2000]
for i in range(0, len(traceback.format_exc()), 2000)
]
for i in tracebackerror:
await ctx.send(f"```py\n{i}\n```")
else:
value = stdout.getvalue()
if ret is None:
if value:
try:
await ctx.send(f"```py\n{value}\n```")
except Exception as e:
code = [value[i : i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f"```py\n{i}\n```")
else:
self._last_result = ret
try:
code = [value[i : i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f"```py\n{i}\n```")
except Exception as e:
code = [value[i : i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f"```py\n{i}\n```")
modifyd_ret = [ret[i : i + 1980] for i in range(0, len(ret), 1980)]
for i in modifyd_ret:
await ctx.send(f"```py\n{i}\n```")
@commands.command(hidden=True)
async def repl(self, ctx):
"""
Start a interactive python shell in chat.
Only the owner of this bot can use this command.
Usage:
- repl < python code >
Example:
- repl print(205554)
"""
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
msg = ctx.message
variables = {
"ctx": ctx,
"bot": self.bot,
"message": msg,
"server": msg.guild,
"channel": msg.channel,
"author": msg.author,
"_": None,
}
if msg.channel.id in self.sessions:
msg = await ctx.send(
"Already running a REPL session in this channel. Exit it with `quit`."
)
self.sessions.add(msg.channel.id)
await ctx.send("Enter code to execute or evaluate. `exit()` or `quit` to exit.")
while True:
response = await self.bot.wait_for(
"message",
check=lambda m: m.content.startswith("`")
and m.author == ctx.author
and m.channel == ctx.channel,
)
cleaned = self.cleanup_code(response.content)
if cleaned in ("quit", "exit", "exit()"):
msg = await ctx.send("Exiting.")
self.sessions.remove(msg.channel.id)
return
executor = exec
if cleaned.count("\n") == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, "<repl session>", "eval")
except SyntaxError:
pass
else:
executor = eval
if executor is exec:
try:
code = compile(cleaned, "<repl session>", "exec")
except SyntaxError as e:
try:
await ctx.send(f"```Python\n{self.get_syntax_error(e)}\n```")
except Exception as e:
error = [
self.get_syntax_error(e)[i : i + 2000]
for i in range(0, len(self.get_syntax_error(e)), 2000)
]
for i in error:
await ctx.send(f"```Python\n{i}\n```")
variables["message"] = response
fmt = None
stdout = io.StringIO()
try:
with redirect_stdout(stdout):
result = executor(code, variables)
if inspect.isawaitable(result):
result = await result
except Exception as e:
value = stdout.getvalue()
await ctx.send(f"```Python\n{value}{traceback.format_exc()}\n```")
continue
else:
value = stdout.getvalue()
if result is not None:
fmt = "{}{}".format(value, result)
variables["_"] = result
elif value:
fmt = value
try:
if fmt is not None:
if len(fmt) > 1980:
code = [fmt[i : i + 1980] for i in range(0, len(fmt), 1980)]
for i in code:
await ctx.send(f"```py\n{i}\n```")
else:
await ctx.send(fmt)
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send(f"Unexpected error: `{e}`")
def setup(bot):
bot.add_cog(REPL(bot))

View File

@ -7,20 +7,24 @@ import traceback
import aiofiles import aiofiles
import os import os
class Upload: class Upload:
""" """
CogName should be the name of the cog CogName should be the name of the cog
""" """
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
print('upload loaded') print("upload loaded")
@commands.command() @commands.command()
async def reload(self, ctx, *, extension: str): async def reload(self, ctx, *, extension: str):
"""Reload an extension.""" """Reload an extension."""
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
extension = extension.lower() extension = extension.lower()
try: try:
@ -28,16 +32,18 @@ class Upload:
self.bot.load_extension("src.cogs.{}".format(extension)) self.bot.load_extension("src.cogs.{}".format(extension))
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
await ctx.send(f'Could not reload `{extension}` -> `{e}`') await ctx.send(f"Could not reload `{extension}` -> `{e}`")
else: else:
await ctx.send(f'Reloaded `{extension}`.') await ctx.send(f"Reloaded `{extension}`.")
@commands.command() @commands.command()
async def reloadall(self, ctx): async def reloadall(self, ctx):
"""Reload all extensions.""" """Reload all extensions."""
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
try: try:
for extension in self.bot.extensions: for extension in self.bot.extensions:
@ -52,7 +58,9 @@ class Upload:
"""Unload an extension.""" """Unload an extension."""
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
extension = extension.lower() extension = extension.lower()
try: try:
@ -61,36 +69,42 @@ class Upload:
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
if ctx.message.author.id not in self.bot.owner_list: if ctx.message.author.id not in self.bot.owner_list:
await ctx.send(f'Could not unload `{extension}` -> `{e}`') await ctx.send(f"Could not unload `{extension}` -> `{e}`")
else: else:
await ctx.send(f'Unloaded `{extension}`.') await ctx.send(f"Unloaded `{extension}`.")
@commands.command() @commands.command()
async def load(self, ctx, *, extension: str): async def load(self, ctx, *, extension: str):
"""Load an extension.""" """Load an extension."""
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
extension = extension.lower() extension = extension.lower()
try: try:
self.bot.load_extension("src.cogs.{}".format(extension)) self.bot.load_extension("src.cogs.{}".format(extension))
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
await ctx.send(f'Could not load `{extension}` -> `{e}`') await ctx.send(f"Could not load `{extension}` -> `{e}`")
else: else:
await ctx.send(f'Loaded `{extension}`.') await ctx.send(f"Loaded `{extension}`.")
@commands.command() @commands.command()
async def permunload(self, ctx, extension=None): async def permunload(self, ctx, extension=None):
"""Disables permanently a cog.""" """Disables permanently a cog."""
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if cog is None: if cog is None:
return await ctx.send("Please provide a extension. Do `help permunload` for more info") return await ctx.send(
"Please provide a extension. Do `help permunload` for more info"
)
extension = extension.lower() extension = extension.lower()
@ -118,10 +132,12 @@ class Upload:
@commands.command(hidden=True) @commands.command(hidden=True)
async def reboot(self, ctx): async def reboot(self, ctx):
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
await ctx.send('Sebi-Machine is restarting.') "Only my contributors can use me like this :blush:", delete_after=10
with open(f'src/config/reboot', 'w') as f: )
f.write(f'1\n{ctx.channel.id}') await ctx.send("Sebi-Machine is restarting.")
with open(f"src/config/reboot", "w") as f:
f.write(f"1\n{ctx.channel.id}")
# noinspection PyProtectedMember # noinspection PyProtectedMember
os._exit(1) os._exit(1)

View File

@ -4,10 +4,12 @@
from discord.ext import commands from discord.ext import commands
import discord import discord
class CogName: class CogName:
""" """
CogName should be the name of the cog CogName should be the name of the cog
""" """
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
@ -15,9 +17,9 @@ class CogName:
async def ping(self, ctx): async def ping(self, ctx):
"""Say pong""" """Say pong"""
now = ctx.message.created_at now = ctx.message.created_at
msg = await ctx.send('Pong') msg = await ctx.send("Pong")
sub = msg.created_at - now sub = msg.created_at - now
await msg.edit(content=f'🏓Pong, **{sub.total_seconds() * 1000}ms**') await msg.edit(content=f"🏓Pong, **{sub.total_seconds() * 1000}ms**")
def setup(bot): def setup(bot):

View File

@ -6,10 +6,12 @@ import discord
import random import random
import aiohttp import aiohttp
class Fun: class Fun:
""" """
CogName should be the name of the cog CogName should be the name of the cog
""" """
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
@ -23,10 +25,10 @@ class Fun:
- sebisauce - sebisauce
""" """
await ctx.trigger_typing() await ctx.trigger_typing()
url = 'http://ikbengeslaagd.com/API/sebisauce.json' url = "http://ikbengeslaagd.com/API/sebisauce.json"
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url) as response: async with session.get(url) as response:
source = await response.json(encoding='utf8') source = await response.json(encoding="utf8")
total_sebi = 0 total_sebi = 0
for key in dict.keys(source): for key in dict.keys(source):
@ -34,11 +36,12 @@ class Fun:
im = random.randint(0, int(total_sebi) - 1) im = random.randint(0, int(total_sebi) - 1)
await ctx.send(embed=discord.Embed( await ctx.send(
title='\t', embed=discord.Embed(
description='\t', title="\t", description="\t", color=self.bot.embed_color
color=self.bot.embed_color).set_image( ).set_image(url=source[str(im)])
url=source[str(im)])) )
def setup(bot): def setup(bot):
bot.add_cog(Fun(bot)) bot.add_cog(Fun(bot))

View File

@ -29,8 +29,10 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import discord import discord
from discord.ext import commands from discord.ext import commands
from src.shared_libs.utils import paginate, run_command from sebimachine.shared_libs.utils import paginate, run_command
from src.shared_libs.loggable import Loggable from sebimachine.shared_libs.loggable import Loggable
from sebimachine import __url__
import asyncio import asyncio
@ -41,53 +43,73 @@ class Git(Loggable):
@commands.group(case_insensitive=True, invoke_without_command=True) @commands.group(case_insensitive=True, invoke_without_command=True)
async def git(self, ctx): async def git(self, ctx):
"""Run help git for more info""" """Run help git for more info"""
await ctx.send('https://github.com/dustinpianalto/Sebi-Machine/') # await ctx.send("https://github.com/dustinpianalto/Sebi-Machine/")
await ctx.send(__url__ or "No URL specified in __init__.py")
@commands.command(case_insensitive=True, brief='Gets the Trello link.') @commands.command(case_insensitive=True, brief="Gets the Trello link.")
async def trello(self, ctx): async def trello(self, ctx):
await ctx.send('<https://trello.com/b/x02goBbW/sebis-bot-tutorial-roadmap>') await ctx.send("<https://trello.com/b/x02goBbW/sebis-bot-tutorial-roadmap>")
@git.command() @git.command()
async def pull(self, ctx): async def pull(self, ctx):
self.logger.warning('Invoking git-pull') self.logger.warning("Invoking git-pull")
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
em = discord.Embed(style='rich', "Only my contributors can use me like this :blush:", delete_after=10
title=f'Git Pull', )
color=self.bot.embed_color) em = discord.Embed(style="rich", title=f"Git Pull", color=self.bot.embed_color)
em.set_thumbnail(url=f'{ctx.guild.me.avatar_url}') em.set_thumbnail(url=f"{ctx.guild.me.avatar_url}")
# Pretty sure you can just do await run_command() if that is async, # Pretty sure you can just do await run_command() if that is async,
# or run in a TPE otherwise. # or run in a TPE otherwise.
result = await asyncio.wait_for(self.bot.loop.create_task( result = (
run_command('git fetch --all')), 120) + '\n' await asyncio.wait_for(
result += await asyncio.wait_for(self.bot.loop.create_task( self.bot.loop.create_task(run_command("git fetch --all")), 120
run_command('git reset --hard origin/$(git rev-parse ' )
'--symbolic-full-name --abbrev-ref HEAD)')), + "\n"
120) + '\n\n' )
result += await asyncio.wait_for(self.bot.loop.create_task( result += (
run_command('git show --stat | sed "s/.*@.*[.].*/ /g"')), 10) await asyncio.wait_for(
self.bot.loop.create_task(
run_command(
"git reset --hard origin/$(git rev-parse "
"--symbolic-full-name --abbrev-ref HEAD)"
)
),
120,
)
+ "\n\n"
)
result += await asyncio.wait_for(
self.bot.loop.create_task(
run_command('git show --stat | sed "s/.*@.*[.].*/ /g"')
),
10,
)
results = paginate(result, maxlen=1014) results = paginate(result, maxlen=1014)
for page in results[:5]: for page in results[:5]:
em.add_field(name='\uFFF0', value=f'{page}') em.add_field(name="\uFFF0", value=f"{page}")
await ctx.send(embed=em) await ctx.send(embed=em)
@git.command() @git.command()
async def status(self, ctx): async def status(self, ctx):
await ctx.trigger_typing() await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist: if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10) return await ctx.send(
em = discord.Embed(style='rich', "Only my contributors can use me like this :blush:", delete_after=10
title=f'Git Status', )
color=self.bot.embed_color) em = discord.Embed(
em.set_thumbnail(url=f'{ctx.guild.me.avatar_url}') style="rich", title=f"Git Status", color=self.bot.embed_color
result = await asyncio.wait_for(self.bot.loop.create_task( )
run_command('git status')), 10) em.set_thumbnail(url=f"{ctx.guild.me.avatar_url}")
result = await asyncio.wait_for(
self.bot.loop.create_task(run_command("git status")), 10
)
results = paginate(result, maxlen=1014) results = paginate(result, maxlen=1014)
for page in results[:5]: for page in results[:5]:
em.add_field(name='\uFFF0', value=f'{page}') em.add_field(name="\uFFF0", value=f"{page}")
await ctx.send(embed=em) await ctx.send(embed=em)

View File

@ -0,0 +1,78 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
from discord.ext import commands
import discord
class Moderation:
"""
Moderation Commands
"""
def __init__(self, bot):
self.bot = bot
@commands.command()
async def kick(self, ctx, member: discord.Member = None):
"""
Kick a discord member from your server.
Only contributors can use this command.
Usage:
- kick <discord.member>
"""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if member is None:
await ctx.send("Are you sure you are capable of this command?")
try:
await member.kick()
await ctx.send(
f"You kicked **`{member.name}`** from **`{ctx.guild.name}`**"
)
except Exception as e:
await ctx.send(
"You may not use this command, as you do not have permission to do so:\n\n**`{ctx.guild.name}`**"
f"\n\n```py\n{e}\n```"
)
@commands.command()
async def ban(self, ctx, member: discord.Member = None):
"""
Ban a discord member from your server.
Only contributors can use this command.
Usage:
- ban <discord.member>
"""
await ctx.trigger_typing()
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send(
"Only my contributors can use me like this :blush:", delete_after=10
)
if member is None:
await ctx.send("Are you sure you are capable of this command?")
try:
await member.ban()
await ctx.send(
f"You banned **`{member.name}`** from **`{ctx.guild.name}`**"
)
except Exception as e:
await ctx.send(
"You may not use this command, as you do not have permission to do so:\n\n**`{ctx.guild.name}`**"
f"\n\n```py\n{e}\n```"
)
def setup(bot):
bot.add_cog(Moderation(bot))

329
sebimachine/cogs/music.py Normal file
View File

@ -0,0 +1,329 @@
import asyncio
import inspect
import traceback
import weakref
from typing import Dict
import async_timeout
import dataclasses
import discord
from discord.ext import commands
import youtube_dl
# noinspection PyUnresolvedReferences,PyUnresolvedReferences,PyPackageRequirements
from .utils import noblock
YT_DL_OPTS = {
"format": "ogg[abr>0]/bestaudio/best",
"ignoreerrors": True,
"default_search": "auto",
"source_address": "0.0.0.0",
"quiet": True,
}
# Let it be waiting on an empty queue for about 30 minutes
# before closing the connection from being idle.
IDLE_FOR = 60 * 30
@dataclasses.dataclass(repr=True)
class Request:
"""Track request."""
who: discord.Member
what: str # Referral
title: str # Video title
actual_url: str # Actual URL to play
def __str__(self):
return self.title
def __hash__(self):
return hash(str(self.who.id) + self.what)
# noinspection PyBroadException
class Session:
"""
Each player being run is a session; (E.g. if you open a player in one server and I did in another).
Sessions will have a queue, an event that can fire to stop the current track and move on, and a voice
channel to bind to. This is defined as the voice channel the owner of the session was in when they made the channel.
To create a session, call ``Session.new_session``. Do not call the constructor directly.
Attributes:
ctx: discord.ext.commands.Context
The context of the original command invocation we are creating a session for.
loop: asyncio.AbstractEventLoop
The event loop to run this in.
voice_client: discord.VoiceClient
Voice client we are streaming audio through.
queue: asyncio.Queue
Track queue.
"""
@classmethod
async def new_session(cls, ctx: commands.Context):
"""
Helper to make a new session. Invoke constructor using this, as it handles any errors. It also ensures
we connect immediately.
"""
try:
s = cls(ctx)
await s.connect()
except Exception as ex:
traceback.print_exc()
await ctx.send(
f"I couldn't connect! Reason: {str(ex) or type(ex).__qualname__}"
)
return None
else:
return s
def __init__(self, ctx: commands.Context) -> None:
"""Create a new session."""
if ctx.author.voice is None:
raise RuntimeError("Please enter a voice channel I have access to first.")
# Holds the tasks currently running associated with this.
self.voice_channel = ctx.author.voice.channel
self.ctx: commands.Context = ctx
self.voice_client: discord.VoiceClient = None
self.loop: asyncio.AbstractEventLoop = weakref.proxy(self.ctx.bot.loop)
self.queue = asyncio.Queue()
# Lock-based event to allow firing a handler to advance to the next track.
self._start_next_track_event = asyncio.Event()
self._on_stop_event = asyncio.Event()
self._player: asyncio.Task = None
self._track: asyncio.Task = None
@property
def is_connected(self) -> bool:
return self.voice_client and self.voice_client.is_connected()
async def connect(self) -> None:
"""Connects to the VC."""
if not self.is_connected and not self._player:
# noinspection PyUnresolvedReferences
self.voice_client = await self.voice_channel.connect()
self._start_next_track_event.clear()
self._player = self.__spawn_player()
else:
raise RuntimeError("I already have a voice client/player running.")
async def disconnect(self) -> None:
"""Disconnects from the VC."""
await self.voice_client.disconnect()
self.voice_client = None
def __spawn_player(self) -> asyncio.Task:
"""Starts a new player."""
async def player():
try:
while True:
# Wait on an empty queue for a finite period of time.
with async_timeout.timeout(IDLE_FOR):
request = await self.queue.get()
await self.ctx.send(
f"Playing `{request}` requested by {request.who}"
)
# Clear the skip event if it is set.
self._start_next_track_event.clear()
# Start the player if it was a valid request, else continue to the next track.
if not self.__play(request.actual_url):
await self.ctx.send(
f"{request.referral} was a bad request and was skipped."
)
continue
await self._start_next_track_event.wait()
if self.voice_client.is_playing():
self.voice_client.stop()
except asyncio.CancelledError:
# Hit when someone kills the player using stop().
print("Requested to stop player", repr(self))
except asyncio.TimeoutError:
await self.ctx.send("Was idle for too long...")
print("Player queue was empty for too long and was stopped", repr(self))
except Exception:
traceback.print_exc()
finally:
if self.voice_client.is_playing():
await self.voice_client.stop()
if self.is_connected:
await self.disconnect()
return self.loop.create_task(player())
def __play(self, url):
"""Tries to play the given URL. If it fails, we return False, else we return True."""
try:
ffmpeg_player = discord.FFmpegPCMAudio(url)
# Play the stream. After we finish, either from being cancelled or otherwise, fire the
# skip track event to start the next track.
self.voice_client.play(
ffmpeg_player, after=lambda error: self._start_next_track_event.set()
)
except Exception:
traceback.print_exc()
return False
else:
return True
def skip(self):
"""Request to skip track."""
self._start_next_track_event.set()
def stop(self):
"""Request to stop playing."""
if self._player:
self._player.cancel()
self._on_stop_event.set()
self._on_stop_event.clear()
def on_exit(self, func):
"""Decorates a function to invoke it on exit."""
async def callback():
await self._on_stop_event.wait()
inspect.iscoroutinefunction(func) and await func() or func()
self.loop.create_task(callback())
return func
# noinspection PyBroadException
class PlayerCog:
def __init__(self):
self.sessions: Dict[discord.Guild, Session] = {}
# noinspection PyMethodMayBeStatic
async def __local_check(self, ctx):
return ctx.guild
@commands.command()
async def join(self, ctx):
if ctx.guild not in self.sessions:
p = await Session.new_session(ctx)
if p:
self.sessions[ctx.guild] = p
@p.on_exit
def when_terminated():
try:
self.sessions.pop(ctx.guild)
finally:
return
await ctx.send("*hacker voice*\n**I'm in.**", delete_after=15)
else:
await ctx.send(
f"I am already playing in {self.sessions[ctx.guild].voice_channel.mention}"
)
# noinspection PyNestedDecorators
@staticmethod
@noblock.no_block
def _get_video_meta(referral):
downloader = youtube_dl.YoutubeDL(YT_DL_OPTS)
info = downloader.extract_info(referral, download=False)
return info
@commands.command()
async def queue(self, ctx):
if ctx.guild not in self.sessions:
return await ctx.send("Please join me into a voice channel first.")
sesh = self.sessions[ctx.guild]
if sesh.queue.empty():
return await ctx.send(
"There is nothing in the queue at the moment!\n\n"
"Add something by running `<>play https://url` or `<>play search term`!"
)
# We cannot faff around with the actual queue so make a shallow copy of the internal
# non-async dequeue.
# noinspection PyProtectedMember
agenda = sesh.queue._queue.copy()
message = ["**Queue**"]
for i, item in enumerate(list(agenda)[:15]):
message.append(f"`{i+1: >2}: {item.title} ({item.who})`")
if len(agenda) >= 15:
message.append("")
message.append(f"There are {len(agenda)} items in the queue currently.")
await ctx.send("\n".join(message)[:2000])
@commands.command()
async def play(self, ctx, *, referral):
if ctx.guild not in self.sessions:
return await ctx.send("Please join me into a voice channel first.")
try:
try:
info = await self._get_video_meta(referral)
# If it was interpreted as a search, it appears this happens?
# The documentation is so nice.
if info.get("_type") == "playlist":
info = info["entries"][0]
# ...wait... did I say nice? I meant "non existent."
url = info["url"]
title = info.get("title") or referral
except IndexError:
return await ctx.send("No results...", delete_after=15)
except Exception as ex:
return await ctx.send(
f"Couldn't add this to the queue... reason: {ex!s}"
)
await self.sessions[ctx.guild].queue.put(
Request(ctx.author, referral, title, url)
)
await ctx.send(f"Okay. Queued `{title or referral}`.")
except KeyError:
await ctx.send("I am not playing in this server.")
@commands.command()
async def stop(self, ctx):
try:
await self.sessions[ctx.guild].stop()
except KeyError:
await ctx.send("I am not playing in this server.")
except TypeError:
await ctx.send("I wasn't playing anything, but okay.", delete_after=15)
@commands.command()
async def skip(self, ctx):
try:
self.sessions[ctx.guild].skip()
try:
await ctx.message.add_reaction("\N{OK HAND SIGN}")
except discord.Forbidden:
await ctx.send("\N{OK HAND SIGN}")
except KeyError:
await ctx.send("I am not playing in this server.")
@commands.command()
async def disconnect(self, ctx):
await self.sessions[ctx.guild].stop()
await self.disconnect()
def setup(bot):
bot.add_cog(PlayerCog())

106
sebimachine/cogs/tag.py Normal file
View File

@ -0,0 +1,106 @@
import discord
from discord.ext import commands
import json
import aiofiles
import asyncio
class Tag:
def __init__(self, bot):
self.bot = bot
with open("src/shared_libs/tags.json", "r") as fp:
json_data = fp.read()
global tags
tags = json.loads(json_data)
@commands.group(case_insensitive=True, invoke_without_command=True)
async def tag(self, ctx, tag=None):
"""Gets a tag"""
await ctx.trigger_typing()
if tag is None:
return await ctx.send(
"Please provide a argument. Do `help tag` for more info"
)
found = tags.get(tag, None)
if found is None:
return await ctx.send("Tag not found")
await ctx.send(found)
@tag.command(case_insensitive=True)
async def list(self, ctx):
"""Lists available tags"""
await ctx.trigger_typing()
desc = ""
for i in tags:
desc = desc + i + "\n"
if desc == "":
desc = "None"
em = discord.Embed(
title="Available tags:", description=desc, colour=discord.Colour(0x00FFFF)
)
await ctx.send(embed=em)
@tag.command(case_insensitive=True)
async def add(self, ctx, tag_name=None, *, tag_info=None):
"""Adds a new tag"""
await ctx.trigger_typing()
if not ctx.author.guild_permissions.manage_roles:
return await ctx.send("You are not allowed to do this")
if tag_name is None or tag_info is None:
return await ctx.send(
"Please provide a tag name and the tag info. Do `help tag` for more info"
)
exists = False
for i in tags:
if i == tag_name:
exists = True
if not exists:
tags.update({tag_name: tag_info})
async with aiofiles.open("src/shared_libs/tags.json", "w") as fp:
json_data = json.dumps(tags)
await fp.write(json_data)
return await ctx.send("The tag has been added")
await ctx.send("The tag already exists")
@tag.command(case_insensitive=True)
async def remove(self, ctx, tag=None):
"""Remove a existing tag"""
await ctx.trigger_typing()
if not ctx.author.guild_permissions.manage_roles:
return await ctx.send("You are not allowed to do this")
if tag is None:
return await ctx.send(
"Please provide a tag name and the tag info. Do `help tag` for more info"
)
found = None
for i in tags:
if i == tag:
found = i
if found is not None:
del tags[found]
async with aiofiles.open("src/shared_libs/tags.json", "w") as fp:
json_data = json.dumps(tags)
await fp.write(json_data)
return await ctx.send("The tag has been removed")
await ctx.send("The tag has not been found")
def setup(bot):
bot.add_cog(Tag(bot))

View File

@ -6,8 +6,10 @@ import functools
def no_block(func): def no_block(func):
"""Turns a blocking function into a non-blocking coroutine function.""" """Turns a blocking function into a non-blocking coroutine function."""
@functools.wraps(func) @functools.wraps(func)
async def no_blocking_handler(*args, **kwargs): async def no_blocking_handler(*args, **kwargs):
partial = functools.partial(func, *args, **kwargs) partial = functools.partial(func, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(None, partial) return await asyncio.get_event_loop().run_in_executor(None, partial)
return no_blocking_handler return no_blocking_handler

View File

@ -5,13 +5,15 @@ import json
import discord import discord
import os import os
class LoadConfig: class LoadConfig:
""" """
All config is collected here All config is collected here
""" """
def __init__(self): def __init__(self):
# Read our config file # Read our config file
with open('src/config/Config.json') as fp: with open("src/config/Config.json") as fp:
self.config = json.load(fp) self.config = json.load(fp)
# Initialize config # Initialize config
@ -23,7 +25,7 @@ class LoadConfig:
self.maintenance = self.config["maintenance"] self.maintenance = self.config["maintenance"]
self.embed_color = discord.Colour(0x00FFFF) self.embed_color = discord.Colour(0x00FFFF)
self.error_color = discord.Colour(0xFF0000) self.error_color = discord.Colour(0xFF0000)
if self.maintenance == 'False': if self.maintenance == "False":
self.maintenance = False self.maintenance = False
else: else:
self.maintenance = True self.maintenance = True

View File

@ -1,12 +1,26 @@
import asyncpg
import asyncio import asyncio
import asyncpg
class DatabaseConnection: class DatabaseConnection:
def __init__(self, host: str='localhost', port: int=5432, database: str='', user: str='', password: str=''): def __init__(
if user == '' or password == '' or database == '': self,
raise RuntimeError('Username or Password are blank') host: str = "localhost",
self.kwargs = {'host': host, 'port': port, 'database': database, 'user': user, 'password': password} port: int = 5432,
database: str = "",
user: str = "",
password: str = "",
):
if user == "" or password == "" or database == "":
raise RuntimeError("Username or Password are blank")
self.kwargs = {
"host": host,
"port": port,
"database": database,
"user": user,
"password": password,
}
self._conn = None self._conn = None
asyncio.get_event_loop().run_until_complete(self.acquire()) asyncio.get_event_loop().run_until_complete(self.acquire())
self.fetchval = self._conn.fetchval self.fetchval = self._conn.fetchval

View File

@ -8,7 +8,7 @@ import inspect
import os import os
__all__ = ('in_here',) __all__ = ("in_here",)
def in_here(first_path_bit: str, *path_bits: str, stack_depth: int = 0) -> str: def in_here(first_path_bit: str, *path_bits: str, stack_depth: int = 0) -> str:
@ -40,11 +40,12 @@ def in_here(first_path_bit: str, *path_bits: str, stack_depth: int=0) -> str:
try: try:
frame = inspect.stack()[1 + stack_depth] frame = inspect.stack()[1 + stack_depth]
except IndexError: except IndexError:
raise RuntimeError('Could not find a stack record. Interpreter has ' raise RuntimeError(
'been shot.') "Could not find a stack record. Interpreter has " "been shot."
)
else: else:
module = inspect.getmodule(frame[0]) module = inspect.getmodule(frame[0])
assert hasattr(module, '__file__'), 'No `__file__\' attr, welp.' assert hasattr(module, "__file__"), "No `__file__' attr, welp."
# https://docs.python.org/3/library/inspect.html#the-interpreter-stack # https://docs.python.org/3/library/inspect.html#the-interpreter-stack
# If Python caches strings rather than copying when we move them # If Python caches strings rather than copying when we move them

View File

@ -14,11 +14,11 @@ boast faster lookups.
import logging import logging
__all__ = ('Loggable',) __all__ = ("Loggable",)
class Loggable: class Loggable:
__slots__ = ('logger',) __slots__ = ("logger",)
def __init_subclass__(cls, **_): def __init_subclass__(cls, **_):
cls.logger = logging.getLogger(cls.__qualname__) cls.logger = logging.getLogger(cls.__qualname__)

View File

@ -31,24 +31,27 @@ Utility for creating Paginated responses
import asyncio import asyncio
import discord
import typing import typing
import discord
class Paginator: class Paginator:
def __init__(self, def __init__(
self,
bot: discord.ext.commands.Bot, bot: discord.ext.commands.Bot,
*, *,
max_chars: int = 1970, max_chars: int = 1970,
max_lines: int = 20, max_lines: int = 20,
prefix: str='```md', prefix: str = "```md",
suffix: str='```', suffix: str = "```",
page_break: str='\uFFF8', page_break: str = "\uFFF8",
field_break: str='\uFFF7', field_break: str = "\uFFF7",
field_name_char: str='\uFFF6', field_name_char: str = "\uFFF6",
inline_char: str='\uFFF5', inline_char: str = "\uFFF5",
max_line_length: int = 100, max_line_length: int = 100,
embed=False): embed=False,
):
_max_len = 6000 if embed else 1980 _max_len = 6000 if embed else 1980
assert 0 < max_lines <= max_chars assert 0 < max_lines <= max_chars
assert 0 < max_line_length < 120 assert 0 < max_line_length < 120
@ -56,9 +59,12 @@ class Paginator:
self._parts = list() self._parts = list()
self._prefix = prefix self._prefix = prefix
self._suffix = suffix self._suffix = suffix
self._max_chars = max_chars if max_chars + len(prefix) + len(suffix) + 2 <= _max_len \ self._max_chars = (
max_chars
if max_chars + len(prefix) + len(suffix) + 2 <= _max_len
else _max_len - len(prefix) - len(suffix) - 2 else _max_len - len(prefix) - len(suffix) - 2
self._max_lines = max_lines - (prefix + suffix).count('\n') + 1 )
self._max_lines = max_lines - (prefix + suffix).count("\n") + 1
self._page_break = page_break self._page_break = page_break
self._max_line_length = max_line_length self._max_line_length = max_line_length
self._pages = list() self._pages = list()
@ -69,24 +75,27 @@ class Paginator:
self._field_break = field_break self._field_break = field_break
self._field_name_char = field_name_char self._field_name_char = field_name_char
self._inline_char = inline_char self._inline_char = inline_char
self._embed_title = '' self._embed_title = ""
self._embed_description = '' self._embed_description = ""
self._embed_color = None self._embed_color = None
self._embed_thumbnail = None self._embed_thumbnail = None
self._embed_url = None self._embed_url = None
self._bot = bot self._bot = bot
def set_embed_meta(self, title: str=None, def set_embed_meta(
self,
title: str = None,
description: str = None, description: str = None,
color: discord.Colour = None, color: discord.Colour = None,
thumbnail: str = None, thumbnail: str = None,
url: str=None): url: str = None,
):
if title and len(title) > self._max_field_name: if title and len(title) > self._max_field_name:
raise RuntimeError('Provided Title is too long') raise RuntimeError("Provided Title is too long")
else: else:
self._embed_title = title self._embed_title = title
if description and len(description) > self._max_description: if description and len(description) > self._max_description:
raise RuntimeError('Provided Description is too long') raise RuntimeError("Provided Description is too long")
else: else:
self._embed_description = description self._embed_description = description
self._embed_color = color self._embed_color = color
@ -96,10 +105,10 @@ class Paginator:
def pages(self) -> typing.List[str]: def pages(self) -> typing.List[str]:
_pages = list() _pages = list()
_fields = list() _fields = list()
_page = '' _page = ""
_lines = 0 _lines = 0
_field_name = '' _field_name = ""
_field_value = '' _field_value = ""
_inline = False _inline = False
def open_page(): def open_page():
@ -131,12 +140,13 @@ class Paginator:
if new_chars > self._max_chars: if new_chars > self._max_chars:
close_page() close_page()
elif (_lines + (part.count('\n') + 1 or 1)) > self._max_lines: elif (_lines + (part.count("\n") + 1 or 1)) > self._max_lines:
close_page() close_page()
_lines += (part.count('\n') + 1 or 1) _lines += part.count("\n") + 1 or 1
_page += '\n' + part _page += "\n" + part
else: else:
def open_field(name: str): def open_field(name: str):
nonlocal _field_value, _field_name nonlocal _field_value, _field_name
_field_name = name _field_name = name
@ -146,11 +156,13 @@ class Paginator:
nonlocal _field_name, _field_value, _fields nonlocal _field_name, _field_value, _fields
_field_value += self._suffix _field_value += self._suffix
if _field_value != self._prefix + self._suffix: if _field_value != self._prefix + self._suffix:
_fields.append({'name': _field_name, 'value': _field_value, 'inline': _inline}) _fields.append(
{"name": _field_name, "value": _field_value, "inline": _inline}
)
if next_name: if next_name:
open_field(next_name) open_field(next_name)
open_field('\uFFF0') open_field("\uFFF0")
for part in [str(p) for p in self._parts]: for part in [str(p) for p in self._parts]:
if part == self._page_break: if part == self._page_break:
@ -158,17 +170,17 @@ class Paginator:
continue continue
elif part == self._field_break: elif part == self._field_break:
if len(_fields) + 1 < 25: if len(_fields) + 1 < 25:
close_field(next_name='\uFFF0') close_field(next_name="\uFFF0")
else: else:
close_field() close_field()
close_page() close_page()
continue continue
if part.startswith(self._field_name_char): if part.startswith(self._field_name_char):
part = part.replace(self._field_name_char, '') part = part.replace(self._field_name_char, "")
if part.startswith(self._inline_char): if part.startswith(self._inline_char):
_inline = True _inline = True
part = part.replace(self._inline_char, '') part = part.replace(self._inline_char, "")
else: else:
_inline = False _inline = False
if _field_value and _field_value != self._prefix: if _field_value and _field_value != self._prefix:
@ -177,7 +189,7 @@ class Paginator:
_field_name = part _field_name = part
continue continue
_field_value += '\n' + part _field_value += "\n" + part
close_field() close_field()
@ -188,14 +200,15 @@ class Paginator:
def process_pages(self) -> typing.List[str]: def process_pages(self) -> typing.List[str]:
_pages = self._pages or self.pages() _pages = self._pages or self.pages()
_len_pages = len(_pages) _len_pages = len(_pages)
_len_page_str = len(f'{_len_pages}/{_len_pages}') _len_page_str = len(f"{_len_pages}/{_len_pages}")
if not self._embed: if not self._embed:
for i, page in enumerate(_pages): for i, page in enumerate(_pages):
if len(page) + _len_page_str <= 2000: if len(page) + _len_page_str <= 2000:
_pages[i] = f'{i + 1}/{_len_pages}\n{page}' _pages[i] = f"{i + 1}/{_len_pages}\n{page}"
else: else:
for i, page in enumerate(_pages): for i, page in enumerate(_pages):
em = discord.Embed(title=self._embed_title, em = discord.Embed(
title=self._embed_title,
description=self._embed_description, description=self._embed_description,
color=self._bot.embed_color, color=self._bot.embed_color,
) )
@ -205,9 +218,11 @@ class Paginator:
em.url = self._embed_url em.url = self._embed_url
if self._embed_color: if self._embed_color:
em.colour = self._embed_color em.colour = self._embed_color
em.set_footer(text=f'{i + 1}/{_len_pages}') em.set_footer(text=f"{i + 1}/{_len_pages}")
for field in page: for field in page:
em.add_field(name=field['name'], value=field['value'], inline=field['inline']) em.add_field(
name=field["name"], value=field["value"], inline=field["inline"]
)
_pages[i] = em _pages[i] = em
return _pages return _pages
@ -221,48 +236,65 @@ class Paginator:
def add_page_break(self, *, to_beginning: bool = False) -> None: def add_page_break(self, *, to_beginning: bool = False) -> None:
self.add(self._page_break, to_beginning=to_beginning) self.add(self._page_break, to_beginning=to_beginning)
def add(self, item: typing.Any, *, to_beginning: bool=False, keep_intact: bool=False, truncate=False) -> None: def add(
self,
item: typing.Any,
*,
to_beginning: bool = False,
keep_intact: bool = False,
truncate=False,
) -> None:
item = str(item) item = str(item)
i = 0 i = 0
if not keep_intact and not item == self._page_break: if not keep_intact and not item == self._page_break:
item_parts = item.strip('\n').split('\n') item_parts = item.strip("\n").split("\n")
for part in item_parts: for part in item_parts:
if len(part) > self._max_line_length: if len(part) > self._max_line_length:
if not truncate: if not truncate:
length = 0 length = 0
out_str = '' out_str = ""
def close_line(line): def close_line(line):
nonlocal i, out_str, length nonlocal i, out_str, length
self._parts.insert(i, out_str) if to_beginning else self._parts.append(out_str) self._parts.insert(
i, out_str
) if to_beginning else self._parts.append(out_str)
i += 1 i += 1
out_str = line + ' ' out_str = line + " "
length = len(out_str) length = len(out_str)
bits = part.split(' ') bits = part.split(" ")
for bit in bits: for bit in bits:
next_len = length + len(bit) + 1 next_len = length + len(bit) + 1
if next_len <= self._max_line_length: if next_len <= self._max_line_length:
out_str += bit + ' ' out_str += bit + " "
length = next_len length = next_len
elif len(bit) > self._max_line_length: elif len(bit) > self._max_line_length:
if out_str: if out_str:
close_line(line='') close_line(line="")
for out_str in [bit[i:i + self._max_line_length] for out_str in [
for i in range(0, len(bit), self._max_line_length)]: bit[i : i + self._max_line_length]
close_line('') for i in range(0, len(bit), self._max_line_length)
]:
close_line("")
else: else:
close_line(bit) close_line(bit)
close_line('') close_line("")
else: else:
line = f'{part:.{self._max_line_length-3}}...' line = f"{part:.{self._max_line_length-3}}..."
self._parts.insert(i, line) if to_beginning else self._parts.append(line) self._parts.insert(
i, line
) if to_beginning else self._parts.append(line)
else: else:
self._parts.insert(i, part) if to_beginning else self._parts.append(part) self._parts.insert(i, part) if to_beginning else self._parts.append(
part
)
i += 1 i += 1
elif keep_intact and not item == self._page_break: elif keep_intact and not item == self._page_break:
if len(item) >= self._max_chars or item.count('\n') > self._max_lines: if len(item) >= self._max_chars or item.count("\n") > self._max_lines:
raise RuntimeError('{item} is too long to keep on a single page and is marked to keep intact.') raise RuntimeError(
"{item} is too long to keep on a single page and is marked to keep intact."
)
if to_beginning: if to_beginning:
self._parts.insert(0, item) self._parts.insert(0, item)
else: else:
@ -275,17 +307,23 @@ class Paginator:
class Book: class Book:
def __init__(self, pag: Paginator, ctx: typing.Tuple[typing.Optional[discord.Message], def __init__(
self,
pag: Paginator,
ctx: typing.Tuple[
typing.Optional[discord.Message],
discord.TextChannel, discord.TextChannel,
discord.ext.commands.Bot, discord.ext.commands.Bot,
discord.Message]) -> None: discord.Message,
],
) -> None:
self._pages = pag.process_pages() self._pages = pag.process_pages()
self._len_pages = len(self._pages) self._len_pages = len(self._pages)
self._current_page = 0 self._current_page = 0
self._message, self._channel, self._bot, self._calling_message = ctx self._message, self._channel, self._bot, self._calling_message = ctx
self._locked = True self._locked = True
if pag == Paginator(self._bot): if pag == Paginator(self._bot):
raise RuntimeError('Cannot create a book out of an empty Paginator.') raise RuntimeError("Cannot create a book out of an empty Paginator.")
def advance_page(self) -> None: def advance_page(self) -> None:
self._current_page += 1 self._current_page += 1
@ -300,14 +338,22 @@ class Book:
async def display_page(self) -> None: async def display_page(self) -> None:
if isinstance(self._pages[self._current_page], discord.Embed): if isinstance(self._pages[self._current_page], discord.Embed):
if self._message: if self._message:
await self._message.edit(content=None, embed=self._pages[self._current_page]) await self._message.edit(
content=None, embed=self._pages[self._current_page]
)
else: else:
self._message = await self._channel.send(embed=self._pages[self._current_page]) self._message = await self._channel.send(
embed=self._pages[self._current_page]
)
else: else:
if self._message: if self._message:
await self._message.edit(content=self._pages[self._current_page], embed=None) await self._message.edit(
content=self._pages[self._current_page], embed=None
)
else: else:
self._message = await self._channel.send(self._pages[self._current_page]) self._message = await self._channel.send(
self._pages[self._current_page]
)
async def create_book(self) -> None: async def create_book(self) -> None:
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
@ -315,12 +361,16 @@ class Book:
# noinspection PyShadowingNames # noinspection PyShadowingNames
def check(reaction, user): def check(reaction, user):
if self._locked: if self._locked:
return str(reaction.emoji) in self._bot.book_emojis.values() \ return (
and user == self._calling_message.author \ str(reaction.emoji) in self._bot.book_emojis.values()
and user == self._calling_message.author
and reaction.message.id == self._message.id and reaction.message.id == self._message.id
)
else: else:
return str(reaction.emoji) in self._bot.book_emojis.values() \ return (
str(reaction.emoji) in self._bot.book_emojis.values()
and reaction.message.id == self._message.id and reaction.message.id == self._message.id
)
await self.display_page() await self.display_page()
@ -332,14 +382,16 @@ class Book:
pass pass
else: else:
try: try:
await self._message.add_reaction(self._bot.book_emojis['unlock']) await self._message.add_reaction(self._bot.book_emojis["unlock"])
await self._message.add_reaction(self._bot.book_emojis['close']) await self._message.add_reaction(self._bot.book_emojis["close"])
except (discord.Forbidden, KeyError): except (discord.Forbidden, KeyError):
pass pass
while True: while True:
try: try:
reaction, user = await self._bot.wait_for('reaction_add', timeout=60, check=check) reaction, user = await self._bot.wait_for(
"reaction_add", timeout=60, check=check
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
try: try:
await self._message.clear_reactions() await self._message.clear_reactions()
@ -348,34 +400,42 @@ class Book:
raise asyncio.CancelledError raise asyncio.CancelledError
else: else:
await self._message.remove_reaction(reaction, user) await self._message.remove_reaction(reaction, user)
if str(reaction.emoji) == self._bot.book_emojis['close']: if str(reaction.emoji) == self._bot.book_emojis["close"]:
await self._calling_message.delete() await self._calling_message.delete()
await self._message.delete() await self._message.delete()
raise asyncio.CancelledError raise asyncio.CancelledError
elif str(reaction.emoji) == self._bot.book_emojis['forward']: elif str(reaction.emoji) == self._bot.book_emojis["forward"]:
self.advance_page() self.advance_page()
elif str(reaction.emoji) == self._bot.book_emojis['back']: elif str(reaction.emoji) == self._bot.book_emojis["back"]:
self.reverse_page() self.reverse_page()
elif str(reaction.emoji) == self._bot.book_emojis['end']: elif str(reaction.emoji) == self._bot.book_emojis["end"]:
self._current_page = self._len_pages - 1 self._current_page = self._len_pages - 1
elif str(reaction.emoji) == self._bot.book_emojis['start']: elif str(reaction.emoji) == self._bot.book_emojis["start"]:
self._current_page = 0 self._current_page = 0
elif str(reaction.emoji) == self._bot.book_emojis['hash']: elif str(reaction.emoji) == self._bot.book_emojis["hash"]:
m = await self._channel.send(f'Please enter a number in range 1 to {self._len_pages}') m = await self._channel.send(
f"Please enter a number in range 1 to {self._len_pages}"
)
def num_check(message): def num_check(message):
if self._locked: if self._locked:
return message.content.isdigit() \ return (
and 0 < int(message.content) <= self._len_pages \ message.content.isdigit()
and message.author == self._calling_message.author
else:
return message.content.isdigit() \
and 0 < int(message.content) <= self._len_pages and 0 < int(message.content) <= self._len_pages
and message.author == self._calling_message.author
)
else:
return (
message.content.isdigit()
and 0 < int(message.content) <= self._len_pages
)
try: try:
msg = await self._bot.wait_for('message', timeout=30, check=num_check) msg = await self._bot.wait_for(
"message", timeout=30, check=num_check
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
await m.edit(content='Message Timed out.') await m.edit(content="Message Timed out.")
else: else:
self._current_page = int(msg.content) - 1 self._current_page = int(msg.content) - 1
try: try:
@ -383,9 +443,11 @@ class Book:
await msg.delete() await msg.delete()
except discord.Forbidden: except discord.Forbidden:
pass pass
elif str(reaction.emoji) == self._bot.book_emojis['unlock']: elif str(reaction.emoji) == self._bot.book_emojis["unlock"]:
self._locked = False self._locked = False
await self._message.remove_reaction(reaction, self._channel.guild.me) await self._message.remove_reaction(
reaction, self._channel.guild.me
)
continue continue
await self.display_page() await self.display_page()

View File

@ -0,0 +1,230 @@
<<<<<<< HEAD:src/shared_libs/utils.py
"""
===
MIT License
Copyright (c) 2018 Dusty.P https://github.com/dustinpianalto
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from io import StringIO
import sys
import asyncio
import discord
from discord.ext.commands.formatter import Paginator
class Capturing(list):
def __enter__(self):
self._stdout = sys.stdout
sys.stdout = self._stringio = StringIO()
return self
def __exit__(self, *args):
self.extend(self._stringio.getvalue().splitlines())
del self._stringio # free up some memory
sys.stdout = self._stdout
def to_list_of_str(items, out: list=list(), level=1, recurse=0):
def rec_loop(item, key, out, level):
quote = '"'
if type(item) == list:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}[')
new_level = level + 1
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}]')
elif type(item) == dict:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{{')
new_level = level + 1
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}}}')
else:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{repr(item)},')
if type(items) == list:
if not recurse:
out = list()
out.append('[')
for item in items:
rec_loop(item, None, out, level)
if not recurse:
out.append(']')
elif type(items) == dict:
if not recurse:
out = list()
out.append('{')
for key in items:
rec_loop(items[key], key, out, level)
if not recurse:
out.append('}')
return out
def paginate(text, maxlen=1990):
paginator = Paginator(prefix='```py', max_size=maxlen+10)
if type(text) == list:
data = to_list_of_str(text)
elif type(text) == dict:
data = to_list_of_str(text)
else:
data = str(text).split('\n')
for line in data:
if len(line) > maxlen:
n = maxlen
for l in [line[i:i+n] for i in range(0, len(line), n)]:
paginator.add_line(l)
else:
paginator.add_line(line)
return paginator.pages
async def run_command(args):
# Create subprocess
process = await asyncio.create_subprocess_shell(
args,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return stdout
return stdout.decode().strip()
=======
"""
===
MIT License
Copyright (c) 2018 Dusty.P https://github.com/dustinpianalto
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from io import StringIO
import sys
import asyncio
import discord
from discord.ext.commands.formatter import Paginator
class Capturing(list):
def __enter__(self):
self._stdout = sys.stdout
sys.stdout = self._stringio = StringIO()
return self
def __exit__(self, *args):
self.extend(self._stringio.getvalue().splitlines())
del self._stringio # free up some memory
sys.stdout = self._stdout
def to_list_of_str(items, out: list = list(), level=1, recurse=0):
def rec_loop(item, key, out, level):
quote = '"'
if type(item) == list:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}[')
new_level = level + 1
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}]')
elif type(item) == dict:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{{')
new_level = level + 1
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}}}')
else:
out.append(
f'{" "*level}{quote+key+quote+": " if key else ""}{repr(item)},'
)
if type(items) == list:
if not recurse:
out = list()
out.append("[")
for item in items:
rec_loop(item, None, out, level)
if not recurse:
out.append("]")
elif type(items) == dict:
if not recurse:
out = list()
out.append("{")
for key in items:
rec_loop(items[key], key, out, level)
if not recurse:
out.append("}")
return out
def paginate(text, maxlen=1990):
paginator = Paginator(prefix="```py", max_size=maxlen + 10)
if type(text) == list:
data = to_list_of_str(text)
elif type(text) == dict:
data = to_list_of_str(text)
else:
data = str(text).split("\n")
for line in data:
if len(line) > maxlen:
n = maxlen
for l in [line[i : i + n] for i in range(0, len(line), n)]:
paginator.add_line(l)
else:
paginator.add_line(line)
return paginator.pages
async def run_command(args):
# Create subprocess
process = await asyncio.create_subprocess_shell(
args,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE,
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return stdout
return stdout.decode().strip()
>>>>>>> e62845ade82bc5e3ade059021693f99b8efcf6a9:sebimachine/shared_libs/utils.py

47
setup.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
"""
Setup.
"""
import re
from setuptools import setup
import traceback
package_name = 'sebimachine'
req_line_test = lambda l: l and l[0] != '#'
with open('README.md') as fp:
readme = fp.read()
with open('requirements.txt') as fp:
requirements = {*filter(req_line_test, map(str.lstrip, fp.read().split('\n')))}
with open(f'{package_name}/__init__.py') as fp:
attrs = {}
print('Attributes:')
for k, v in re.findall(r'^__(\w+)__\s?=\s?"([^"]*)"', fp.read(), re.M):
k = 'name' if k == 'title' else k
attrs[k] = v
print(k, v)
# Use pip on invoke to install requirements. Ensures we can essentially just run this
# script without setuptools arguments. TODO: fix.
try:
import pip
pip.main(['install', *install_requires])
except (ModuleNotFoundError, ImportError):
print('Failed to import pip. Install git dependencies manually.')
traceback.print_exc()
setup(
long_description=readme,
**attrs
)

View File

@ -1,15 +0,0 @@
#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
"""
Sebi-Machine.
"""
__author__ = 'Annihilator708'
# TODO: add yourselves here. I can't remember everyones handles.
__contributors__ = (__author__, 'Neko404NotFound', 'Dusty.P', 'davfsa', 'YashKandalkar')
__license__ = 'MIT'
__title__ = 'Sebi-Machine'
__version__ = 'tbd'
__repository__ = f'https://github.com/{__author__}/{__title__}'
__url__ = __repository__

View File

@ -1,158 +0,0 @@
# !/usr/bin/python
# -*- coding: utf8 -*-
"""
App entry point.
Something meaningful here, eventually.
"""
import asyncio
import json
import logging
import random
import traceback
import os
import sys
import discord
from discord.ext import commands
from src.config.config import LoadConfig
from src.shared_libs.loggable import Loggable
from src.shared_libs.ioutils import in_here
from src.shared_libs import database
from typing import Dict
# Init logging to output on INFO level to stderr.
logging.basicConfig(level='INFO')
# If uvloop is installed, change to that eventloop policy as it
# is more efficient
try:
# https://stackoverflow.com/a/45700730
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
logging.warning('Detected Windows. Changing event loop to ProactorEventLoop.')
else:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
del uvloop
except BaseException as ex:
logging.warning(f'Could not load uvloop. {type(ex).__qualname__}: {ex};',
'reverting to default impl.')
else:
logging.info(f'Using uvloop for asyncio event loop policy.')
# Bot Class
# Might be worth moving this to it's own file?
class SebiMachine(commands.Bot, LoadConfig, Loggable):
"""This discord is dedicated to http://www.discord.gg/GWdhBSp"""
def __init__(self):
# Initialize and attach config / settings
LoadConfig.__init__(self)
commands.Bot.__init__(self, command_prefix=self.defaultprefix)
with open(in_here('config', 'PrivateConfig.json')) as fp:
self.bot_secrets = json.load(fp)
self.db_con = database.DatabaseConnection(**self.bot_secrets['db-con'])
self.book_emojis: Dict[str, str] = {
'unlock': '🔓',
'start': '',
'back': '',
'hash': '#\N{COMBINING ENCLOSING KEYCAP}',
'forward': '',
'end': '',
'close': '🇽',
}
# Load plugins
# Add your cog file name in this list
with open(in_here('extensions.txt')) as cog_file:
cogs = cog_file.readlines()
for cog in cogs:
# Could this just be replaced with `strip()`?
cog = cog.replace('\n', '')
self.load_extension(f'src.cogs.{cog}')
self.logger.info(f'Loaded: {cog}')
async def on_ready(self):
"""On ready function"""
self.maintenance and self.logger.warning('MAINTENANCE ACTIVE')
with open(f'src/config/reboot', 'r') as f:
reboot = f.readlines()
if int(reboot[0]) == 1:
await self.get_channel(int(reboot[1])).send('Restart Finished.')
with open(f'src/config/reboot', 'w') as f:
f.write(f'0')
async def on_command_error(self, ctx, error):
"""
The event triggered when an error is raised while invoking a command.
ctx : Context
error : Exception
"""
jokes = ["I\'m a bit tipsy, I took to many screenshots...",
"I am rushing to the 24/7 store to get myself anti-bug spray...",
"Organizing turtle race...",
"There is no better place then 127.0.0.1...",
"Recycling Hex Decimal...",
"No worry, I get fixed :^)...",
"R.I.P, press F for respect...",
"The bug repellent dit not work...",
"You found a bug in the program. Unfortunately the joke did not fit here, better luck next time..."]
# CommandErrors triggered by other propagating errors tend to get wrapped. This means
# if we have a cause, we should probably consider unwrapping that so we get a useful
# message.
# If command is not found, return
em = discord.Embed(colour=self.error_color)
if isinstance(error, discord.ext.commands.errors.CommandNotFound):
em.title = 'Command Not Found'
em.description = f'{ctx.prefix}{ctx.invoked_with} is not a valid command.'
else:
error = error.__cause__ or error
tb = traceback.format_exception(type(error), error, error.__traceback__, limit=2, chain=False)
tb = ''.join(tb)
joke = random.choice(jokes)
fmt = f'**`{self.defaultprefix}{ctx.command}`**\n{joke}\n\n**{type(error).__name__}:**:\n```py\n{tb}\n```'
em.title = f'**{type(error).__name__}** in command {ctx.prefix}{ctx.command}'
em.description = str(error)
await ctx.send(embed=em)
async def on_message(self, message):
# Make sure people can't change the username
if message.guild:
if message.guild.me.display_name != self.display_name:
try:
await message.guild.me.edit(nick=self.display_name)
except:
pass
else:
if ('exec' in message.content or 'repl' in message.content or 'token' in message.content) \
and message.author != self.user:
await self.get_user(351794468870946827).send(f'{message.author.name} ({message.author.id}) is using me '
f'in DMs\n{message.content}')
# If author is a bot, ignore the message
if message.author.bot: return
# Make sure the command get processed as if it was typed with lowercase
# Split message.content one first space
command = message.content.split(None, 1)
if command:
command[0] = command[0].lower()
message.content = ' '.join(command)
message.content = ' '.join(command)
# process command
await self.process_commands(message)
client = SebiMachine()
client.run(client.bot_secrets["bot-key"])

View File

@ -1,231 +0,0 @@
from discord.ext import commands
import traceback
import discord
import inspect
import textwrap
from contextlib import redirect_stdout
import io
class REPL:
"""Python in Discords"""
def __init__(self, bot):
self.bot = bot
self._last_result = None
self.sessions = set()
def cleanup_code(self, content):
"""
Automatically removes code blocks from the code.
"""
# remove ```py\n```
if content.startswith('```') and content.endswith('```'):
return '\n'.join(content.split('\n')[1:-1])
# remove `foo`
return content.strip('` \n')
def get_syntax_error(self, e):
if e.text is None:
return '{0.__class__.__name__}: {0}'.format(e)
return '{0.text}{1:>{0.offset}}\n{2}: {0}'.format(e, '^', type(e).__name__)
@commands.command(name='exec')
async def _eval(self, ctx, *, body: str = None):
"""
Execute python code in discord chat.
Only the owner of this bot can use this command.
Alias:
- exec
Usage:
- exec < python code >
Example:
- exec print(546132)
"""
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
if body is None:
return await ctx.send(
'Please, use\n'
f'`{self.bot.config["prefix"]}exec`\n\n'
'\n`\\`\\`\\`py\n[python code]\n\\`\\`\\`\n'
'to get the most out of the command')
env = {
'bot': self.bot,
'ctx': ctx,
'channel': ctx.message.channel,
'author': ctx.message.author,
'server': ctx.message.guild,
'message': ctx.message,
'_': self._last_result
}
env.update(globals())
body = self.cleanup_code(body)
stdout = io.StringIO()
to_compile = 'async def func():\n%s' % textwrap.indent(body, ' ')
try:
exec(to_compile, env)
except SyntaxError as e:
try:
await ctx.send(f'```py\n{self.get_syntax_error(e)}\n```')
except Exception as e:
error = [self.get_syntax_error(e)[i:i + 2000] for i in
range(0, len(self.get_syntax_error(e)), 2000)]
for i in error:
await ctx.send(f'```py\n{i}\n```')
func = env['func']
try:
with redirect_stdout(stdout):
ret = await func()
except Exception as e:
value = stdout.getvalue()
try:
await ctx.send(f'```py\n{value}{traceback.format_exc()}\n```')
except Exception as e:
error = [value[i:i + 2000] for i in range(0, len(value), 2000)]
for i in error:
await ctx.send(f'```py\n{i}\n```')
tracebackerror = [traceback.format_exc()[i:i + 2000] for i in
range(0, len(traceback.format_exc()), 2000)]
for i in tracebackerror:
await ctx.send(f'```py\n{i}\n```')
else:
value = stdout.getvalue()
if ret is None:
if value:
try:
await ctx.send(f'```py\n{value}\n```')
except Exception as e:
code = [value[i:i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
else:
self._last_result = ret
try:
code = [value[i:i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
except Exception as e:
code = [value[i:i + 1980] for i in range(0, len(value), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
modifyd_ret = [ret[i:i + 1980] for i in range(0, len(ret), 1980)]
for i in modifyd_ret:
await ctx.send(f'```py\n{i}\n```')
@commands.command(hidden=True)
async def repl(self, ctx):
"""
Start a interactive python shell in chat.
Only the owner of this bot can use this command.
Usage:
- repl < python code >
Example:
- repl print(205554)
"""
if ctx.author.id not in self.bot.ownerlist:
return await ctx.send('Only my contributors can use me like this :blush:', delete_after=10)
msg = ctx.message
variables = {
'ctx': ctx,
'bot': self.bot,
'message': msg,
'server': msg.guild,
'channel': msg.channel,
'author': msg.author,
'_': None,
}
if msg.channel.id in self.sessions:
msg = await ctx.send('Already running a REPL session in this channel. Exit it with `quit`.')
self.sessions.add(msg.channel.id)
await ctx.send('Enter code to execute or evaluate. `exit()` or `quit` to exit.')
while True:
response = await self.bot.wait_for('message', check=lambda m: m.content.startswith(
'`') and m.author == ctx.author and m.channel == ctx.channel)
cleaned = self.cleanup_code(response.content)
if cleaned in ('quit', 'exit', 'exit()'):
msg = await ctx.send('Exiting.')
self.sessions.remove(msg.channel.id)
return
executor = exec
if cleaned.count('\n') == 0:
# single statement, potentially 'eval'
try:
code = compile(cleaned, '<repl session>', 'eval')
except SyntaxError:
pass
else:
executor = eval
if executor is exec:
try:
code = compile(cleaned, '<repl session>', 'exec')
except SyntaxError as e:
try:
await ctx.send(f'```Python\n{self.get_syntax_error(e)}\n```')
except Exception as e:
error = [self.get_syntax_error(e)[i:i + 2000] for i in
range(0, len(self.get_syntax_error(e)), 2000)]
for i in error:
await ctx.send(f'```Python\n{i}\n```')
variables['message'] = response
fmt = None
stdout = io.StringIO()
try:
with redirect_stdout(stdout):
result = executor(code, variables)
if inspect.isawaitable(result):
result = await result
except Exception as e:
value = stdout.getvalue()
await ctx.send(f'```Python\n{value}{traceback.format_exc()}\n```')
continue
else:
value = stdout.getvalue()
if result is not None:
fmt = '{}{}'.format(value, result)
variables['_'] = result
elif value:
fmt = value
try:
if fmt is not None:
if len(fmt) > 1980:
code = [fmt[i:i + 1980] for i in range(0, len(fmt), 1980)]
for i in code:
await ctx.send(f'```py\n{i}\n```')
else:
await ctx.send(fmt)
except discord.Forbidden:
pass
except discord.HTTPException as e:
await ctx.send(f'Unexpected error: `{e}`')
def setup(bot):
bot.add_cog(REPL(bot))

View File

@ -1,97 +0,0 @@
import discord
from discord.ext import commands
import json
import aiofiles
import asyncio
class Tag:
def __init__(self, bot):
self.bot = bot
with open("src/shared_libs/tags.json", "r") as fp:
json_data = fp.read()
global tags
tags = json.loads(json_data)
@commands.group(case_insensitive=True, invoke_without_command=True)
async def tag(self, ctx, tag=None):
"""Gets a tag"""
await ctx.trigger_typing()
if tag is None:
return await ctx.send('Please provide a argument. Do `help tag` for more info')
found = tags.get(tag, None)
if found is None:
return await ctx.send('Tag not found')
await ctx.send(found)
@tag.command(case_insensitive=True)
async def list(self, ctx):
"""Lists available tags"""
await ctx.trigger_typing()
desc = ""
for i in tags:
desc = desc + i + "\n"
if desc == "":
desc = "None"
em = discord.Embed(title='Available tags:', description=desc ,colour=discord.Colour(0x00FFFF))
await ctx.send(embed=em)
@tag.command(case_insensitive=True)
async def add(self, ctx, tag_name=None, *, tag_info=None):
"""Adds a new tag"""
await ctx.trigger_typing()
if not ctx.author.guild_permissions.manage_roles:
return await ctx.send("You are not allowed to do this")
if tag_name is None or tag_info is None:
return await ctx.send("Please provide a tag name and the tag info. Do `help tag` for more info")
exists = False
for i in tags:
if i == tag_name:
exists = True
if not exists:
tags.update({tag_name : tag_info})
async with aiofiles.open("src/shared_libs/tags.json", "w") as fp:
json_data = json.dumps(tags)
await fp.write(json_data)
return await ctx.send("The tag has been added")
await ctx.send("The tag already exists")
@tag.command(case_insensitive=True)
async def remove(self, ctx, tag=None):
"""Remove a existing tag"""
await ctx.trigger_typing()
if not ctx.author.guild_permissions.manage_roles:
return await ctx.send("You are not allowed to do this")
if tag is None:
return await ctx.send("Please provide a tag name and the tag info. Do `help tag` for more info")
found = None
for i in tags:
if i == tag:
found = i
if found is not None:
del tags[found]
async with aiofiles.open("src/shared_libs/tags.json", "w") as fp:
json_data = json.dumps(tags)
await fp.write(json_data)
return await ctx.send("The tag has been removed")
await ctx.send("The tag has not been found")
def setup(bot):
bot.add_cog(Tag(bot))

View File

@ -1,112 +0,0 @@
"""
===
MIT License
Copyright (c) 2018 Dusty.P https://github.com/dustinpianalto
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from io import StringIO
import sys
import asyncio
import discord
from discord.ext.commands.formatter import Paginator
class Capturing(list):
def __enter__(self):
self._stdout = sys.stdout
sys.stdout = self._stringio = StringIO()
return self
def __exit__(self, *args):
self.extend(self._stringio.getvalue().splitlines())
del self._stringio # free up some memory
sys.stdout = self._stdout
def to_list_of_str(items, out: list=list(), level=1, recurse=0):
def rec_loop(item, key, out, level):
quote = '"'
if type(item) == list:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}[')
new_level = level + 1
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}]')
elif type(item) == dict:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{{')
new_level = level + 1
out = to_list_of_str(item, out, new_level, 1)
out.append(f'{" "*level}}}')
else:
out.append(f'{" "*level}{quote+key+quote+": " if key else ""}{repr(item)},')
if type(items) == list:
if not recurse:
out = list()
out.append('[')
for item in items:
rec_loop(item, None, out, level)
if not recurse:
out.append(']')
elif type(items) == dict:
if not recurse:
out = list()
out.append('{')
for key in items:
rec_loop(items[key], key, out, level)
if not recurse:
out.append('}')
return out
def paginate(text, maxlen=1990):
paginator = Paginator(prefix='```py', max_size=maxlen+10)
if type(text) == list:
data = to_list_of_str(text)
elif type(text) == dict:
data = to_list_of_str(text)
else:
data = str(text).split('\n')
for line in data:
if len(line) > maxlen:
n = maxlen
for l in [line[i:i+n] for i in range(0, len(line), n)]:
paginator.add_line(l)
else:
paginator.add_line(line)
return paginator.pages
async def run_command(args):
# Create subprocess
process = await asyncio.create_subprocess_shell(
args,
# stdout must a pipe to be accessible as process.stdout
stdout=asyncio.subprocess.PIPE)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return stdout
return stdout.decode().strip()