From 658454cb945114415d4fdfc1a73df67685a4f8f6 Mon Sep 17 00:00:00 2001 From: Dustin Pianalto Date: Mon, 16 Dec 2019 18:28:22 -0900 Subject: [PATCH] Update rcon lib to not use asyncio Rework ListPlayers endpoint to use the new lib --- geeksbot_web/rcon/rcon_lib/arcon.py | 110 +++++++++++------------- geeksbot_web/rcon/rcon_lib/rcon.py | 128 ++++++++++------------------ geeksbot_web/rcon/views.py | 9 +- 3 files changed, 98 insertions(+), 149 deletions(-) diff --git a/geeksbot_web/rcon/rcon_lib/arcon.py b/geeksbot_web/rcon/rcon_lib/arcon.py index 71d776a..5841ea1 100644 --- a/geeksbot_web/rcon/rcon_lib/arcon.py +++ b/geeksbot_web/rcon/rcon_lib/arcon.py @@ -7,89 +7,79 @@ arcon_log = logging.getLogger('arcon_lib') class ARKServer(rcon.RCONConnection): - def __init__(self, *args, monitor_chat: bool=False, server_chat_channel: int=None, - server_messages_channel: int=None, **kwargs): + def __init__(self, *args, monitor_chat: bool = False, server_chat_channel: int = None, + server_messages_channel: int = None, **kwargs): self.monitor_chat = monitor_chat self.server_chat_channel = server_chat_channel self.server_messages_channel = server_messages_channel super().__init__(*args, **kwargs) - async def run_command(self, command: str, multi_packet: bool=False, reconnect_counter: int=0) \ + def run_command(self, command: str, multi_packet: bool = False) \ -> Union[rcon.RCONPacket, str]: arcon_log.debug(f'Command requested: {command}') if self.authenticated: packet = rcon.RCONPacket(next(self.packet_id), rcon.SERVERDATA_EXECCOMMAND, command) - with await self.lock: + try: + arcon_log.debug(f'Sending packet {packet.packet_id}') + self.send_packet(packet) + arcon_log.debug(f'Packet Sent.') + except ConnectionResetError: + arcon_log.info(f'Connection to {self.host}:{self.port} lost') + finally: + arcon_log.debug(f'Waiting for response to packet {packet.packet_id}') try: - arcon_log.debug(f'Sending packet {packet.packet_id}') - await self.send_packet(packet) - arcon_log.debug(f'Packet Sent.') - except ConnectionResetError: - arcon_log.info(f'Connection to {self.host}:{self.port} lost, Reconnecting...') - self.lock.release() - await self._reconnect_and_resend(packet) - await self.lock.acquire() - finally: - arcon_log.debug(f'Waiting for response to packet {packet.packet_id}') - try: - response = await self.read(packet, multi_packet=multi_packet) - except asyncio.TimeoutError as e: - if reconnect_counter > 5: - return 'Reached max reconnects. Closing connection.' - arcon_log.warning(f'No response received: {e}\nAttempting to reconnect #{reconnect_counter}') - self.lock.release() - await self._reconnect() - await self.lock.acquire() - response = await self.run_command(command=command, multi_packet=multi_packet, - reconnect_counter=reconnect_counter + 1) + response = self.read(packet, multi_packet=multi_packet) + except asyncio.TimeoutError as e: + arcon_log.warning(f'No response received: {e}\nPlease try again later.') + else: arcon_log.debug(f'Response Received:\n{response.packet_type}:{response.packet_id}:{response.body}') - response.body = response.body.strip('\x00\x00').strip() - return response + response.body = response.body.strip('\x00\x00').strip() + return response else: return 'Server is not Authenticated. Please let the Admin know of this issue.' - async def getchat(self) -> str: - response = await self.run_command(command='getchat', multi_packet=True) + def getchat(self) -> str: + response = self.run_command(command='getchat', multi_packet=True) return response.body if isinstance(response, rcon.RCONPacket) else response - async def saveworld(self) -> str: - response = await self.run_command(command='saveworld') + def saveworld(self) -> str: + response = self.run_command(command='saveworld') return response.body if isinstance(response, rcon.RCONPacket) else response - async def serverchat(self, message: str) -> str: - response = await self.run_command(command=f'serverchat {message}') + def serverchat(self, message: str) -> str: + response = self.run_command(command=f'serverchat {message}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def broadcast(self, message: str) -> str: - response = await self.run_command(command=f'broadcast {message}') + def broadcast(self, message: str) -> str: + response = self.run_command(command=f'broadcast {message}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def listplayers(self) -> str: - response = await self.run_command(command=f'listplayers') + def listplayers(self) -> str: + response = self.run_command(command=f'listplayers') return response.body if isinstance(response, rcon.RCONPacket) else response - async def whitelist(self, steam_id: str) -> str: - response = await self.run_command(command=f'AllowPlayerToJoinNoCheck {steam_id}') + def whitelist(self, steam_id: str) -> str: + response = self.run_command(command=f'AllowPlayerToJoinNoCheck {steam_id}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def ban_player(self, steam_id: int) -> str: - response = await self.run_command(command=f'BanPlayer {steam_id}') + def ban_player(self, steam_id: int) -> str: + response = self.run_command(command=f'BanPlayer {steam_id}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def unban_player(self, steam_id: int) -> str: - response = await self.run_command(command=f'UnbanPlayer {steam_id}') + def unban_player(self, steam_id: int) -> str: + response = self.run_command(command=f'UnbanPlayer {steam_id}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def kick_player(self, steam_id: int) -> str: - response = await self.run_command(command=f'KickPlayer {steam_id}') + def kick_player(self, steam_id: int) -> str: + response = self.run_command(command=f'KickPlayer {steam_id}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def stop_server(self) -> int: - saved = await self.saveworld() + def stop_server(self) -> int: + saved = self.saveworld() if saved == 'World Saved': - await self.serverchat(saved) - await asyncio.sleep(10) - response = await self.run_command(command='DoExit') + self.serverchat(saved) + asyncio.sleep(10) + response = self.run_command(command='DoExit') if response.body == 'Exiting...': return 0 else: @@ -97,22 +87,22 @@ class ARKServer(rcon.RCONConnection): else: return 1 - async def get_logs(self): - response = await self.run_command(command=f'GetGameLog', multi_packet=True) + def get_logs(self): + response = self.run_command(command=f'GetGameLog', multi_packet=True) return response.body if isinstance(response, rcon.RCONPacket) else response - async def server_chat_to_steam_id(self, steam_id: int, message: str) -> str: - response = await self.run_command(command=f'ServerChatTo {steam_id} {message}') + def server_chat_to_steam_id(self, steam_id: int, message: str) -> str: + response = self.run_command(command=f'ServerChatTo {steam_id} {message}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def server_chat_to_player_name(self, player_name: str, message: str) -> str: - response = await self.run_command(command=f'ServerChatToPlayer "{player_name}" {message}') + def server_chat_to_player_name(self, player_name: str, message: str) -> str: + response = self.run_command(command=f'ServerChatToPlayer "{player_name}" {message}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def set_time_of_day(self, hour: int, minute: int=00, seconds: int=00) -> str: - response = await self.run_command(command=f'SetTimeOfDay {hour}:{minute}:{seconds}') + def set_time_of_day(self, hour: int, minute: int = 00, seconds: int = 00) -> str: + response = self.run_command(command=f'SetTimeOfDay {hour}:{minute}:{seconds}') return response.body if isinstance(response, rcon.RCONPacket) else response - async def destroy_wild_dinos(self): - response = await self.run_command(command='DestroyWildDinos') + def destroy_wild_dinos(self): + response = self.run_command(command='DestroyWildDinos') return response.body if isinstance(response, rcon.RCONPacket) else response diff --git a/geeksbot_web/rcon/rcon_lib/rcon.py b/geeksbot_web/rcon/rcon_lib/rcon.py index deeafee..d8e3719 100644 --- a/geeksbot_web/rcon/rcon_lib/rcon.py +++ b/geeksbot_web/rcon/rcon_lib/rcon.py @@ -1,7 +1,8 @@ -import asyncio import logging import itertools import struct +import socket +import time # Packet types SERVERDATA_AUTH = 3 @@ -15,7 +16,7 @@ rcon_log = logging.getLogger('rcon_lib') class RCONPacket: - def __init__(self, packet_id: int=0, packet_type: int=-1, body: str=''): + def __init__(self, packet_id: int = 0, packet_type: int = -1, body: str = ''): self.packet_id = packet_id self.packet_type = packet_type self.body = body @@ -40,7 +41,7 @@ class RCONPacket: class RCONConnection: """Connection to an RCON server""" - def __init__(self, host: str, port: int, password: str='', single_packet: bool=False, loop=None): + def __init__(self, host: str, port: int, password: str = '', single_packet: bool = False): """Create a New RCON Connection Parameters: @@ -50,134 +51,95 @@ class RCONConnection: single_packet (bool): True for servers who don't give 0 length SERVERDATA_RESPONSE_VALUE requests """ - self.host = host + self.host = socket.gethostbyname(host) self.port = port self.password = password self.single_packet = single_packet self.packet_id = itertools.count(1) - self.loop = loop or asyncio.get_event_loop() - self.reader = None - self.writer = None - self.lock = asyncio.Lock() + self.socket: socket.socket = None self.authenticated = False - async def connect(self): + def connect(self): """Returns -1 if connection times out Returns 1 if connection and auth are successful Returns 0 if auth fails""" try: rcon_log.debug(f'Connecting to {self.host}:{self.port}...') - self.reader, self.writer = await asyncio.open_connection(self.host, self.port, loop=self.loop) + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.host, self.port)) except TimeoutError as e: rcon_log.error(f'Timeout error: {e}') return -1 else: rcon_log.debug('Connected. Attempting to Authenticate...') auth_packet = RCONPacket(next(self.packet_id), SERVERDATA_AUTH, self.password) - with await self.lock: - await self.send_packet(auth_packet) - response = await self.read() - if response.packet_type == SERVERDATA_AUTH_RESPONSE and response.packet_id != -1: - rcon_log.debug(f'Authorized {response.packet_type}:{response.packet_id}:{response.body}') - self.authenticated = True - return 1 - else: - rcon_log.debug(f'Not Authorized {response.packet_type}:{response.packet_id}:{response.body}') - self.authenticated = False - return 0 + self.send_packet(auth_packet) + response = self.read() + if response.packet_type == SERVERDATA_AUTH_RESPONSE and response.packet_id != -1: + rcon_log.debug(f'Authorized {response.packet_type}:{response.packet_id}:{response.body}') + self.authenticated = True + return 1 + else: + rcon_log.debug(f'Not Authorized {response.packet_type}:{response.packet_id}:{response.body}') + self.authenticated = False + return 0 - async def _reconnect(self): - self.writer = None - self.reader = None - connected = await self.connect() - rcon_log.info(f'Connection completed with a return of {connected}') - if connected != -1: - rcon_log.info('Connected') - else: - rcon_log.warning('Connection Failed') - return connected - - async def _reconnect_and_resend(self, packet): - connected = await self._reconnect() - if connected != -1: - await asyncio.sleep(0.1) - rcon_log.info(f'Re-sending packet {packet.packet_id}') - await self.send_packet(packet) - rcon_log.info(f'Packet Sent.') - return connected - else: - return connected - - async def keep_alive(self): - while True: - await asyncio.sleep(60) - ka_packet = RCONPacket(next(self.packet_id), SERVERDATA_EXECCOMMAND, '') - try: - with await self.lock: - await asyncio.wait_for(self.send_packet(ka_packet), 10, loop=self.loop) - await asyncio.wait_for(self.read(ka_packet), 10, loop=self.loop) - except asyncio.TimeoutError: - self.reader = None - self.writer = None - await self.connect() - - async def send_packet(self, packet): + def send_packet(self, packet): if packet.size() > 4096: rcon_log.error('Packet Size is larger than 4096 bytes. Cannot send packet.') raise RuntimeWarning('Packet Size is larger than 4096 bytes. Cannot send packet.') - if self.writer is None: - await self.connect() + if self.socket is None: + self.connect() rcon_log.debug(f'Sending Packet {packet.packet_id}: {packet.pack() if packet.packet_type is not SERVERDATA_AUTH else "Censored for Password Security."}') - self.writer.write(packet.pack()) - await self.writer.drain() + self.socket.send(packet.pack()) rcon_log.debug(f'Packet {packet.packet_id} Sent.') - async def read(self, request: RCONPacket=None, multi_packet=False) -> RCONPacket: + def read(self, request: RCONPacket = None, multi_packet=False) -> RCONPacket: rcon_log.debug(f'Waiting to receive response to packet {request.packet_id if request else None}') response = RCONPacket() try: if request: while response.packet_id != request.packet_id and response.packet_id < request.packet_id: if multi_packet: - if request is None: - rcon_log.warning('A request packet is required to receive a multi packet response') - raise ValueError('A request packet is required to receive a multi packet response') - await asyncio.sleep(.01) - response = await self._receive_multi_packet() + time.sleep(.01) + response = self._receive_multi_packet() rcon_log.debug(f'Received Multi-Packet response to packet {request.packet_id}:\n' f'{response.packet_type}:{response.packet_id}:{response.body}') else: - response = await self.receive_packet() + response = self.receive_packet() rcon_log.debug(f'Received Single-Packet response to packet {request.packet_id}:\n' f'{response.packet_type}:{response.packet_id}:{response.body}') else: - response = await self.receive_packet() + response = self.receive_packet() rcon_log.debug(f'Received Single-Packet response:\n' f'{response.packet_type}:{response.packet_id}:{response.body}') except struct.error as e: rcon_log.error(f'Struct Error: {e}') - response = RCONPacket(body='Error receiving data from the server. Attempting to reconnect. ' + response = RCONPacket(body='Error receiving data from the server. ' 'Please try again in a little bit.') - self.lock.release() - await self._reconnect() - await self.lock.acquire() except AttributeError as e: rcon_log.error(f'Attribute Error: {e}') - response = RCONPacket(body='Error receiving data from the server. Attempting to reconnect. ' + response = RCONPacket(body='Error receiving data from the server. ' 'Please try again in a little bit.') - self.lock.release() - await self._reconnect() - await self.lock.acquire() return response - async def receive_packet(self): - header = await self.reader.read(struct.calcsize('<3i')) + def receive_packet(self): + header = self.socket.recv(struct.calcsize('<3i')) (packet_size, packet_id, packet_type) = struct.unpack('<3i', header) - body = await self.reader.read(packet_size - 8) + body = self.socket.recv(packet_size - 8) return RCONPacket(packet_id, packet_type, body.decode('ascii')) - async def _receive_multi_packet(self): - header = await self.reader.read(struct.calcsize('<3i')) + def _receive_multi_packet(self): + header = self.socket.recv(struct.calcsize('<3i')) (packet_size, packet_id, packet_type) = struct.unpack('<3i', header) - body = await self.reader.readuntil(separator=b'\x00\x00') + body = self._read_all(packet_size) return RCONPacket(packet_id, packet_type, body.decode('ascii')) + + def _read_all(self, chunk_size): + fragments = [] + while True: + part = self.socket.recv(chunk_size) + fragments.append(part) + if len(part) < chunk_size: + break + return b''.join(fragments) diff --git a/geeksbot_web/rcon/views.py b/geeksbot_web/rcon/views.py index e259919..fbbac3d 100644 --- a/geeksbot_web/rcon/views.py +++ b/geeksbot_web/rcon/views.py @@ -8,7 +8,7 @@ from .rcon_lib import arcon from .models import RconServer from .utils import create_error_response, create_success_response, create_rcon_response -from utils.api_utils import PaginatedAPIView +from geeksbot_web.utils.api_utils import PaginatedAPIView from .serializers import RconServerSerializer # Create your views here. @@ -59,13 +59,10 @@ class ListPlayers(PaginatedAPIView): def get(self, request, guild_id, name, format=None): server: RconServer = RconServer.get_server(guild_id, name) if server: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop = asyncio.get_event_loop() ark = arcon.ARKServer(host=server.ip, port=server.port, password=server.password, loop=loop) - connected = loop.run_until_complete(ark.connect()) + connected = ark.connect() if connected == 1: - resp = loop.run_until_complete(ark.listplayers()) + resp = ark.listplayers() if resp == 'No Players Connected': return create_rcon_response(resp, status=status.HTTP_204_NO_CONTENT) else: