Update rcon lib to not use asyncio

Rework ListPlayers endpoint to use the new lib
This commit is contained in:
Dustin Pianalto 2019-12-16 18:28:22 -09:00
parent bb15f92632
commit 658454cb94
3 changed files with 98 additions and 149 deletions

View File

@ -7,89 +7,79 @@ arcon_log = logging.getLogger('arcon_lib')
class ARKServer(rcon.RCONConnection):
def __init__(self, *args, monitor_chat: bool=False, server_chat_channel: int=None,
server_messages_channel: int=None, **kwargs):
def __init__(self, *args, monitor_chat: bool = False, server_chat_channel: int = None,
server_messages_channel: int = None, **kwargs):
self.monitor_chat = monitor_chat
self.server_chat_channel = server_chat_channel
self.server_messages_channel = server_messages_channel
super().__init__(*args, **kwargs)
async def run_command(self, command: str, multi_packet: bool=False, reconnect_counter: int=0) \
def run_command(self, command: str, multi_packet: bool = False) \
-> Union[rcon.RCONPacket, str]:
arcon_log.debug(f'Command requested: {command}')
if self.authenticated:
packet = rcon.RCONPacket(next(self.packet_id), rcon.SERVERDATA_EXECCOMMAND, command)
with await self.lock:
try:
arcon_log.debug(f'Sending packet {packet.packet_id}')
self.send_packet(packet)
arcon_log.debug(f'Packet Sent.')
except ConnectionResetError:
arcon_log.info(f'Connection to {self.host}:{self.port} lost')
finally:
arcon_log.debug(f'Waiting for response to packet {packet.packet_id}')
try:
arcon_log.debug(f'Sending packet {packet.packet_id}')
await self.send_packet(packet)
arcon_log.debug(f'Packet Sent.')
except ConnectionResetError:
arcon_log.info(f'Connection to {self.host}:{self.port} lost, Reconnecting...')
self.lock.release()
await self._reconnect_and_resend(packet)
await self.lock.acquire()
finally:
arcon_log.debug(f'Waiting for response to packet {packet.packet_id}')
try:
response = await self.read(packet, multi_packet=multi_packet)
except asyncio.TimeoutError as e:
if reconnect_counter > 5:
return 'Reached max reconnects. Closing connection.'
arcon_log.warning(f'No response received: {e}\nAttempting to reconnect #{reconnect_counter}')
self.lock.release()
await self._reconnect()
await self.lock.acquire()
response = await self.run_command(command=command, multi_packet=multi_packet,
reconnect_counter=reconnect_counter + 1)
response = self.read(packet, multi_packet=multi_packet)
except asyncio.TimeoutError as e:
arcon_log.warning(f'No response received: {e}\nPlease try again later.')
else:
arcon_log.debug(f'Response Received:\n{response.packet_type}:{response.packet_id}:{response.body}')
response.body = response.body.strip('\x00\x00').strip()
return response
response.body = response.body.strip('\x00\x00').strip()
return response
else:
return 'Server is not Authenticated. Please let the Admin know of this issue.'
async def getchat(self) -> str:
response = await self.run_command(command='getchat', multi_packet=True)
def getchat(self) -> str:
response = self.run_command(command='getchat', multi_packet=True)
return response.body if isinstance(response, rcon.RCONPacket) else response
async def saveworld(self) -> str:
response = await self.run_command(command='saveworld')
def saveworld(self) -> str:
response = self.run_command(command='saveworld')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def serverchat(self, message: str) -> str:
response = await self.run_command(command=f'serverchat {message}')
def serverchat(self, message: str) -> str:
response = self.run_command(command=f'serverchat {message}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def broadcast(self, message: str) -> str:
response = await self.run_command(command=f'broadcast {message}')
def broadcast(self, message: str) -> str:
response = self.run_command(command=f'broadcast {message}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def listplayers(self) -> str:
response = await self.run_command(command=f'listplayers')
def listplayers(self) -> str:
response = self.run_command(command=f'listplayers')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def whitelist(self, steam_id: str) -> str:
response = await self.run_command(command=f'AllowPlayerToJoinNoCheck {steam_id}')
def whitelist(self, steam_id: str) -> str:
response = self.run_command(command=f'AllowPlayerToJoinNoCheck {steam_id}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def ban_player(self, steam_id: int) -> str:
response = await self.run_command(command=f'BanPlayer {steam_id}')
def ban_player(self, steam_id: int) -> str:
response = self.run_command(command=f'BanPlayer {steam_id}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def unban_player(self, steam_id: int) -> str:
response = await self.run_command(command=f'UnbanPlayer {steam_id}')
def unban_player(self, steam_id: int) -> str:
response = self.run_command(command=f'UnbanPlayer {steam_id}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def kick_player(self, steam_id: int) -> str:
response = await self.run_command(command=f'KickPlayer {steam_id}')
def kick_player(self, steam_id: int) -> str:
response = self.run_command(command=f'KickPlayer {steam_id}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def stop_server(self) -> int:
saved = await self.saveworld()
def stop_server(self) -> int:
saved = self.saveworld()
if saved == 'World Saved':
await self.serverchat(saved)
await asyncio.sleep(10)
response = await self.run_command(command='DoExit')
self.serverchat(saved)
asyncio.sleep(10)
response = self.run_command(command='DoExit')
if response.body == 'Exiting...':
return 0
else:
@ -97,22 +87,22 @@ class ARKServer(rcon.RCONConnection):
else:
return 1
async def get_logs(self):
response = await self.run_command(command=f'GetGameLog', multi_packet=True)
def get_logs(self):
response = self.run_command(command=f'GetGameLog', multi_packet=True)
return response.body if isinstance(response, rcon.RCONPacket) else response
async def server_chat_to_steam_id(self, steam_id: int, message: str) -> str:
response = await self.run_command(command=f'ServerChatTo {steam_id} {message}')
def server_chat_to_steam_id(self, steam_id: int, message: str) -> str:
response = self.run_command(command=f'ServerChatTo {steam_id} {message}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def server_chat_to_player_name(self, player_name: str, message: str) -> str:
response = await self.run_command(command=f'ServerChatToPlayer "{player_name}" {message}')
def server_chat_to_player_name(self, player_name: str, message: str) -> str:
response = self.run_command(command=f'ServerChatToPlayer "{player_name}" {message}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def set_time_of_day(self, hour: int, minute: int=00, seconds: int=00) -> str:
response = await self.run_command(command=f'SetTimeOfDay {hour}:{minute}:{seconds}')
def set_time_of_day(self, hour: int, minute: int = 00, seconds: int = 00) -> str:
response = self.run_command(command=f'SetTimeOfDay {hour}:{minute}:{seconds}')
return response.body if isinstance(response, rcon.RCONPacket) else response
async def destroy_wild_dinos(self):
response = await self.run_command(command='DestroyWildDinos')
def destroy_wild_dinos(self):
response = self.run_command(command='DestroyWildDinos')
return response.body if isinstance(response, rcon.RCONPacket) else response

View File

@ -1,7 +1,8 @@
import asyncio
import logging
import itertools
import struct
import socket
import time
# Packet types
SERVERDATA_AUTH = 3
@ -15,7 +16,7 @@ rcon_log = logging.getLogger('rcon_lib')
class RCONPacket:
def __init__(self, packet_id: int=0, packet_type: int=-1, body: str=''):
def __init__(self, packet_id: int = 0, packet_type: int = -1, body: str = ''):
self.packet_id = packet_id
self.packet_type = packet_type
self.body = body
@ -40,7 +41,7 @@ class RCONPacket:
class RCONConnection:
"""Connection to an RCON server"""
def __init__(self, host: str, port: int, password: str='', single_packet: bool=False, loop=None):
def __init__(self, host: str, port: int, password: str = '', single_packet: bool = False):
"""Create a New RCON Connection
Parameters:
@ -50,134 +51,95 @@ class RCONConnection:
single_packet (bool): True for servers who don't give 0 length SERVERDATA_RESPONSE_VALUE requests
"""
self.host = host
self.host = socket.gethostbyname(host)
self.port = port
self.password = password
self.single_packet = single_packet
self.packet_id = itertools.count(1)
self.loop = loop or asyncio.get_event_loop()
self.reader = None
self.writer = None
self.lock = asyncio.Lock()
self.socket: socket.socket = None
self.authenticated = False
async def connect(self):
def connect(self):
"""Returns -1 if connection times out
Returns 1 if connection and auth are successful
Returns 0 if auth fails"""
try:
rcon_log.debug(f'Connecting to {self.host}:{self.port}...')
self.reader, self.writer = await asyncio.open_connection(self.host, self.port, loop=self.loop)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
except TimeoutError as e:
rcon_log.error(f'Timeout error: {e}')
return -1
else:
rcon_log.debug('Connected. Attempting to Authenticate...')
auth_packet = RCONPacket(next(self.packet_id), SERVERDATA_AUTH, self.password)
with await self.lock:
await self.send_packet(auth_packet)
response = await self.read()
if response.packet_type == SERVERDATA_AUTH_RESPONSE and response.packet_id != -1:
rcon_log.debug(f'Authorized {response.packet_type}:{response.packet_id}:{response.body}')
self.authenticated = True
return 1
else:
rcon_log.debug(f'Not Authorized {response.packet_type}:{response.packet_id}:{response.body}')
self.authenticated = False
return 0
self.send_packet(auth_packet)
response = self.read()
if response.packet_type == SERVERDATA_AUTH_RESPONSE and response.packet_id != -1:
rcon_log.debug(f'Authorized {response.packet_type}:{response.packet_id}:{response.body}')
self.authenticated = True
return 1
else:
rcon_log.debug(f'Not Authorized {response.packet_type}:{response.packet_id}:{response.body}')
self.authenticated = False
return 0
async def _reconnect(self):
self.writer = None
self.reader = None
connected = await self.connect()
rcon_log.info(f'Connection completed with a return of {connected}')
if connected != -1:
rcon_log.info('Connected')
else:
rcon_log.warning('Connection Failed')
return connected
async def _reconnect_and_resend(self, packet):
connected = await self._reconnect()
if connected != -1:
await asyncio.sleep(0.1)
rcon_log.info(f'Re-sending packet {packet.packet_id}')
await self.send_packet(packet)
rcon_log.info(f'Packet Sent.')
return connected
else:
return connected
async def keep_alive(self):
while True:
await asyncio.sleep(60)
ka_packet = RCONPacket(next(self.packet_id), SERVERDATA_EXECCOMMAND, '')
try:
with await self.lock:
await asyncio.wait_for(self.send_packet(ka_packet), 10, loop=self.loop)
await asyncio.wait_for(self.read(ka_packet), 10, loop=self.loop)
except asyncio.TimeoutError:
self.reader = None
self.writer = None
await self.connect()
async def send_packet(self, packet):
def send_packet(self, packet):
if packet.size() > 4096:
rcon_log.error('Packet Size is larger than 4096 bytes. Cannot send packet.')
raise RuntimeWarning('Packet Size is larger than 4096 bytes. Cannot send packet.')
if self.writer is None:
await self.connect()
if self.socket is None:
self.connect()
rcon_log.debug(f'Sending Packet {packet.packet_id}: {packet.pack() if packet.packet_type is not SERVERDATA_AUTH else "Censored for Password Security."}')
self.writer.write(packet.pack())
await self.writer.drain()
self.socket.send(packet.pack())
rcon_log.debug(f'Packet {packet.packet_id} Sent.')
async def read(self, request: RCONPacket=None, multi_packet=False) -> RCONPacket:
def read(self, request: RCONPacket = None, multi_packet=False) -> RCONPacket:
rcon_log.debug(f'Waiting to receive response to packet {request.packet_id if request else None}')
response = RCONPacket()
try:
if request:
while response.packet_id != request.packet_id and response.packet_id < request.packet_id:
if multi_packet:
if request is None:
rcon_log.warning('A request packet is required to receive a multi packet response')
raise ValueError('A request packet is required to receive a multi packet response')
await asyncio.sleep(.01)
response = await self._receive_multi_packet()
time.sleep(.01)
response = self._receive_multi_packet()
rcon_log.debug(f'Received Multi-Packet response to packet {request.packet_id}:\n'
f'{response.packet_type}:{response.packet_id}:{response.body}')
else:
response = await self.receive_packet()
response = self.receive_packet()
rcon_log.debug(f'Received Single-Packet response to packet {request.packet_id}:\n'
f'{response.packet_type}:{response.packet_id}:{response.body}')
else:
response = await self.receive_packet()
response = self.receive_packet()
rcon_log.debug(f'Received Single-Packet response:\n'
f'{response.packet_type}:{response.packet_id}:{response.body}')
except struct.error as e:
rcon_log.error(f'Struct Error: {e}')
response = RCONPacket(body='Error receiving data from the server. Attempting to reconnect. '
response = RCONPacket(body='Error receiving data from the server. '
'Please try again in a little bit.')
self.lock.release()
await self._reconnect()
await self.lock.acquire()
except AttributeError as e:
rcon_log.error(f'Attribute Error: {e}')
response = RCONPacket(body='Error receiving data from the server. Attempting to reconnect. '
response = RCONPacket(body='Error receiving data from the server. '
'Please try again in a little bit.')
self.lock.release()
await self._reconnect()
await self.lock.acquire()
return response
async def receive_packet(self):
header = await self.reader.read(struct.calcsize('<3i'))
def receive_packet(self):
header = self.socket.recv(struct.calcsize('<3i'))
(packet_size, packet_id, packet_type) = struct.unpack('<3i', header)
body = await self.reader.read(packet_size - 8)
body = self.socket.recv(packet_size - 8)
return RCONPacket(packet_id, packet_type, body.decode('ascii'))
async def _receive_multi_packet(self):
header = await self.reader.read(struct.calcsize('<3i'))
def _receive_multi_packet(self):
header = self.socket.recv(struct.calcsize('<3i'))
(packet_size, packet_id, packet_type) = struct.unpack('<3i', header)
body = await self.reader.readuntil(separator=b'\x00\x00')
body = self._read_all(packet_size)
return RCONPacket(packet_id, packet_type, body.decode('ascii'))
def _read_all(self, chunk_size):
fragments = []
while True:
part = self.socket.recv(chunk_size)
fragments.append(part)
if len(part) < chunk_size:
break
return b''.join(fragments)

View File

@ -8,7 +8,7 @@ from .rcon_lib import arcon
from .models import RconServer
from .utils import create_error_response, create_success_response, create_rcon_response
from utils.api_utils import PaginatedAPIView
from geeksbot_web.utils.api_utils import PaginatedAPIView
from .serializers import RconServerSerializer
# Create your views here.
@ -59,13 +59,10 @@ class ListPlayers(PaginatedAPIView):
def get(self, request, guild_id, name, format=None):
server: RconServer = RconServer.get_server(guild_id, name)
if server:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
ark = arcon.ARKServer(host=server.ip, port=server.port, password=server.password, loop=loop)
connected = loop.run_until_complete(ark.connect())
connected = ark.connect()
if connected == 1:
resp = loop.run_until_complete(ark.listplayers())
resp = ark.listplayers()
if resp == 'No Players Connected':
return create_rcon_response(resp, status=status.HTTP_204_NO_CONTENT)
else: