Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edit messages instead of deleting and recreating #25

Merged
merged 2 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ _deps
.vs
*.project
*.workspace
discord_bot.json
/zerotier
3 changes: 3 additions & 0 deletions discord_bot.json.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"token": "<private>"
}
233 changes: 112 additions & 121 deletions discord_bot.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
from typing import List, Dict, Any, Optional
import discord
import json
import time
import asyncio
from collections import deque
import datetime
import discord
import json
import logging
import math
import re
import warnings
from typing import Optional
import time
from typing import Any, Deque, Dict, List, Optional

# Constants
DISCORD_CHANNEL_ID = 1061483226767556719
logger = logging.getLogger(__name__)

intents = discord.Intents.default()
intents.message_content = True

client = discord.Client(intents=intents)
current_online = 0
global_online_list_message: Optional[discord.Message] = None
global_channel: Optional[discord.TextChannel] = None
gameTTL = 120 # games are marked as active for x seconds every time they show up
channel: Optional[discord.TextChannel] = None

config: Dict[str, Any] = {
'channel': 1061483226767556719,
'game_ttl': 120,
'refresh_seconds': 60,
'banlist_file': './banlist',
'gamelist_program': './devilutionx-gamelist'
}

def escape_discord_formatting_characters(text: str) -> str:
return re.sub(r'([-\\*_#|~:@[\]()<>`])', r'\\\1', text)


def format_game(game: Dict[str, Any]) -> str:
global gameTTL
ended = time.time() - game['last_seen'] >= gameTTL
def format_game_message(game: Dict[str, Any]) -> str:
ended = time.time() - game['last_seen'] >= config['game_ttl']
text = '';
if ended:
text += '~~' + game['id'].upper() + '~~'
Expand Down Expand Up @@ -103,37 +105,10 @@ def format_game(game: Dict[str, Any]) -> str:
return text


async def update_status_message() -> None:
global current_online
global global_channel
global global_online_list_message
if global_online_list_message is not None:
try:
await global_online_list_message.delete()
except discord.errors.NotFound:
pass
global_online_list_message = None
text = 'There are currently **' + str(current_online) + '** public games.'
def format_status_message(current_online: int) -> str:
if current_online == 1:
text = 'There is currently **' + str(current_online) + '** public game.'
assert isinstance(global_channel, discord.TextChannel)
global_online_list_message = await global_channel.send(text)


async def update_game_message(game_id: str) -> None:
global global_channel
text = format_game(game_list[game_id])
if 'message' in game_list[game_id]:
message = game_list[game_id]['message'];
if isinstance(message, discord.Message):
if message.content != text:
try:
await message.edit(content=text)
except discord.errors.NotFound:
pass
return
assert isinstance(global_channel, discord.TextChannel)
game_list[game_id]['message'] = await global_channel.send(text)
return 'There is currently **' + str(current_online) + '** public game.'
return 'There are currently **' + str(current_online) + '** public games.'


def format_time_delta(minutes: int) -> str:
Expand All @@ -157,30 +132,6 @@ def format_time_delta(minutes: int) -> str:
return text


async def end_game_message(game_id: str) -> None:
if 'message' in game_list[game_id]:
message = game_list[game_id]['message'];
if not isinstance(message, discord.Message):
return
try:
await message.edit(content=format_game(game_list[game_id]))
except discord.errors.NotFound:
pass


async def remove_game_messages(game_ids: List[str]) -> None:
for gameId in game_ids:
if 'message' in game_list[gameId]:
message = game_list[gameId]['message'];
if not isinstance(message, discord.Message):
continue
try:
await message.delete()
except discord.errors.NotFound:
pass
del game_list[gameId]['message']


def any_player_name_is_invalid(players: List[str]) -> bool:
for name in players:
# using the same restricted character list as DevilutionX, see
Expand All @@ -199,36 +150,50 @@ def any_player_name_is_invalid(players: List[str]) -> bool:


def any_player_name_contains_a_banned_word(players: List[str]) -> bool:
with open('./banlist', 'r') as ban_list_file:
words = set([line.strip().upper() for line in ban_list_file.read().split('\n') if line.strip()])
if config['banlist_file'] != '':
try:
with open(config['banlist_file'], 'r') as ban_list_file:
words = set([line.strip().upper() for line in ban_list_file.read().split('\n') if line.strip()])

for name in players:
for word in words:
if word in name.upper():
return True
for name in players:
for word in words:
if word in name.upper():
return True
except:
logger.warn('Unable to load banlist file')

return False


game_list: Dict[str, Dict[str, Any]] = {}
background_task_running = 0
async def update_message(message: discord.Message, text: str) -> Optional[discord.Message]:
if message.content != text:
try:
await message.edit(content=text)
except discord.errors.NotFound:
return None
return message


async def send_message(text: str) -> discord.Message:
assert isinstance(channel, discord.TextChannel)
return await channel.send(text)


async def background_task() -> None:
global gameTTL
global current_online
known_games: Dict[str, Dict[str, Any]] = {}
active_messages: Deque[discord.Message] = deque()

last_refresh = 0.0
refresh_seconds = 60 # refresh gamelist every x seconds
while True:
try:
sleep_time = refresh_seconds - (time.time() - last_refresh)
sleep_time = config['refresh_seconds'] - (time.time() - last_refresh)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
last_refresh = time.time()

# Call the external program and get the output
proc = await asyncio.create_subprocess_shell(
'./devilutionx-gamelist',
config['gamelist_program'],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)

Expand All @@ -244,61 +209,87 @@ async def background_task() -> None:
# Load the output as a JSON list
games = json.loads(output)

ct = datetime.datetime.now()
print('[' + str(ct) + '] Refreshing game list - ' + str(len(games)) + ' games')
logger.info('Refreshing game list - ' + str(len(games)) + ' games')

for game in games:
if any_player_name_is_invalid(game['players']) or any_player_name_contains_a_banned_word(game['players']):
continue

key = game['id'].upper()
if key in game_list:
game_list[key]['players'] = game['players']
game_list[key]['last_seen'] = time.time()
continue
if key in known_games:
known_games[key]['players'] = game['players']
else:
known_games[key] = game
known_games[key]['first_seen'] = time.time()

game_list[key] = game
game_list[key]['first_seen'] = time.time()
game_list[key]['last_seen'] = time.time()
known_games[key]['last_seen'] = time.time()

ended_games = []
for key, game in game_list.items():
if time.time() - game['last_seen'] < gameTTL:
continue
ended_games.append(key)
await end_game_message(key)
ended_games = [key for key, game in known_games.items() if time.time() - game['last_seen'] >= config['game_ttl']]

for key in ended_games:
del game_list[key]

if len(ended_games) != 0:
await remove_game_messages(list(game_list.keys()))

for gameId in game_list.keys():
await update_game_message(gameId)

if (current_online == len(game_list)) or len(ended_games) != 0:
continue

current_online = len(game_list)
await update_status_message()

activity = discord.Activity(name='Games online: '+str(current_online), type=discord.ActivityType.watching)
if active_messages:
await update_message(active_messages.popleft(), format_game_message(known_games[key]))
del known_games[key]

message_index = 0
for game in known_games.values():
message_text = format_game_message(game)
if message_index < len(active_messages):
message = await update_message(active_messages[message_index], message_text)
assert message is not None
else:
message = await send_message(message_text)
assert message is not None
active_messages.append(message)
message_index += 1

game_count = len(known_games)
if (len(active_messages) <= game_count):
message = await send_message(format_status_message(game_count))
assert message is not None
active_messages.append(message)
else:
await update_message(active_messages[game_count], format_status_message(game_count))

activity = discord.Activity(name='Games online: '+str(game_count), type=discord.ActivityType.watching)
await client.change_presence(activity=activity)
except discord.errors.DiscordServerError as server_error:
warnings.warn(repr(server_error))
logger.warn(repr(server_error))


@client.event
async def on_ready() -> None:
print(f'We have logged in as {client.user}')
global global_channel
channel = client.get_channel(DISCORD_CHANNEL_ID)
assert isinstance(channel, discord.TextChannel)
global_channel = channel
logger.info(f'We have logged in as {client.user}')

maybeChannel = client.get_channel(config['channel'])
assert isinstance(maybeChannel, discord.TextChannel)

global channel
channel = maybeChannel
await background_task()

with open('./discord_bot_token', 'r') as file:
token = file.readline()

client.run(token)
def run(runtimeConfig: Dict[str, Any]) -> None:
assert 'token' in runtimeConfig

for key, value in runtimeConfig.items():
config[key] = value

client.run(config['token'])


def main() -> None:
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('[{asctime}] [{levelname:<8}] {name}: {message}', '%Y-%m-%d %H:%M:%S', style='{')
handler.setFormatter(formatter)
logger.addHandler(handler)

with open('./discord_bot.json', 'r') as file:
runtimeConfig = json.load(file)

run(runtimeConfig)


if __name__ == '__main__':
main()
Loading