Create command decorator
Create process_commands method Only allow coroutine functions as commands Finish add_command method
This commit is contained in:
parent
2ac0156c9b
commit
1d8b16add0
@ -9,6 +9,7 @@ from morpheus.core.utils import maybe_coroutine
|
|||||||
from morpheus.core.events import RoomEvent
|
from morpheus.core.events import RoomEvent
|
||||||
from morpheus.core.content import MessageContentBase
|
from morpheus.core.content import MessageContentBase
|
||||||
from .context import Context
|
from .context import Context
|
||||||
|
from .command import Command
|
||||||
|
|
||||||
|
|
||||||
class Bot(Client):
|
class Bot(Client):
|
||||||
@ -19,8 +20,7 @@ class Bot(Client):
|
|||||||
):
|
):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
super(Bot, self).__init__(prefix=prefix, homeserver=homeserver)
|
super(Bot, self).__init__(prefix=prefix, homeserver=homeserver)
|
||||||
self.commands: Dict[str, dict] = {}
|
self.commands: Dict[str, Command] = {}
|
||||||
self.aliases: Dict[str, str] = {}
|
|
||||||
|
|
||||||
def run(self, user_id: str = None, password: str = None, token: str = None, loop: Optional[asyncio.AbstractEventLoop] = None):
|
def run(self, user_id: str = None, password: str = None, token: str = None, loop: Optional[asyncio.AbstractEventLoop] = None):
|
||||||
loop = loop or self.loop or asyncio.get_event_loop()
|
loop = loop or self.loop or asyncio.get_event_loop()
|
||||||
@ -70,32 +70,38 @@ class Bot(Client):
|
|||||||
if not ctx:
|
if not ctx:
|
||||||
return
|
return
|
||||||
|
|
||||||
command_name = self.aliases.get(ctx.called_with)
|
command = self.commands.get(ctx.called_with)
|
||||||
if not command_name:
|
if not command:
|
||||||
return
|
return
|
||||||
command_dict = self.commands.get(command_name)
|
await command.invoke(ctx, ctx.body.split(' '))
|
||||||
if not command_dict:
|
|
||||||
del self.aliases[ctx.called_with]
|
|
||||||
parser: ArgumentParser = command_dict['parser']
|
|
||||||
command = command_dict['command']
|
|
||||||
args = parser.parse_args(ctx.body.split(' '))
|
|
||||||
|
|
||||||
def listener(self, name=None):
|
def listener(self, name=None):
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
self.register_handler(name, func)
|
self.register_handler(name, func)
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
def add_command(self, name: str, func: callable):
|
def add_command(self, name: str, aliases: list, func: callable):
|
||||||
if not callable(func):
|
|
||||||
raise TypeError('Command function must be callable')
|
|
||||||
|
|
||||||
if not isawaitable(func):
|
|
||||||
raise TypeError('Command function must be a coroutine')
|
|
||||||
|
|
||||||
if not name:
|
if not name:
|
||||||
name = func.__name__
|
name = func.__name__
|
||||||
|
|
||||||
if name in self.commands:
|
if name.startswith('_'):
|
||||||
|
raise RuntimeWarning(f'Command names cannot start with an underscore')
|
||||||
|
|
||||||
|
if aliases is None:
|
||||||
|
aliases = []
|
||||||
|
|
||||||
|
if not isinstance(aliases, list) or any([not isinstance(alias, str) for alias in aliases]):
|
||||||
|
raise RuntimeWarning(f'Aliases must be a list of strings.')
|
||||||
|
|
||||||
|
if name in self.commands or any([alias in self.commands for alias in aliases]):
|
||||||
raise RuntimeWarning(f'Command {name} has already been registered')
|
raise RuntimeWarning(f'Command {name} has already been registered')
|
||||||
|
|
||||||
self.commands[name] = func
|
command = Command(func)
|
||||||
|
self.commands[name] = command
|
||||||
|
for alias in aliases:
|
||||||
|
self.commands[alias] = command
|
||||||
|
|
||||||
|
def command(self, name: Optional[str] = None, aliases: Optional[list] = None):
|
||||||
|
def decorator(func):
|
||||||
|
self.add_command(name=name, aliases=aliases, func=func)
|
||||||
|
return decorator
|
||||||
|
|||||||
@ -8,8 +8,8 @@ class Command:
|
|||||||
if not callable(function):
|
if not callable(function):
|
||||||
raise RuntimeError('The function to make a command from must be a callable')
|
raise RuntimeError('The function to make a command from must be a callable')
|
||||||
|
|
||||||
# if not inspect.isawaitable(function):
|
if not inspect.iscoroutinefunction(function):
|
||||||
# raise RuntimeError('The function to make a command from must be a coroutine')
|
raise RuntimeError('The function to make a command from must be a coroutine')
|
||||||
|
|
||||||
self.extension = extension
|
self.extension = extension
|
||||||
self.signature = inspect.signature(function)
|
self.signature = inspect.signature(function)
|
||||||
@ -53,7 +53,7 @@ class Command:
|
|||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
def invoke(self, ctx, args_list):
|
async def invoke(self, ctx, args_list):
|
||||||
iterator = iter(self.signature.parameters.items())
|
iterator = iter(self.signature.parameters.items())
|
||||||
|
|
||||||
if self.extension:
|
if self.extension:
|
||||||
@ -78,4 +78,4 @@ class Command:
|
|||||||
else:
|
else:
|
||||||
kwargs[key] = params.__dict__[key]
|
kwargs[key] = params.__dict__[key]
|
||||||
|
|
||||||
self.function(ctx, *args, **kwargs)
|
await self.function(ctx, *args, **kwargs)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user