-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathcounting.py
222 lines (196 loc) · 8.5 KB
/
counting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import logging
from datetime import datetime
from decimal import Decimal
from discord import User
from discord.ext import commands
from discord.ext.commands import Bot, BucketType, Cog, Context, cooldown
from discord.errors import NotFound
from sqlalchemy.exc import SQLAlchemyError
from models import db_session
from models.counting import CountingRun, CountingUser
from models.user import User as UserModel
from utils import get_database_user_from_id, is_decimal, get_name_string
LONG_HELP_TEXT = """
Starts a counting game where each player must name the next number in the sequence until someone names an invalid number
"""
SHORT_HELP_TEXT = """Starts a counting game"""
class Counting(Cog):
def __init__(self, bot: Bot):
self.bot = bot
self.currently_playing = False
self.channel = None
@commands.hybrid_group(help=LONG_HELP_TEXT, brief=SHORT_HELP_TEXT)
async def counting(self, ctx: Context):
# If user does not use a subcommand assume they want to play the game
if not ctx.invoked_subcommand:
if self.currently_playing:
channel = (
"this channel"
if self.channel == ctx.message.channel
else self.channel.mention
)
await ctx.send(f"There is already a game being played in {channel}!")
return
self.currently_playing = True
self.channel = ctx.channel
started_at = datetime.utcnow()
await ctx.send("The game begins!")
# The count starts at 0.
count = 0
# The number of successful replies in a row.
length = 0
# We need to determine what the step is.
# It will be the first decimal number sent in the same channel.
def check_dec(m):
return m.channel == self.channel and is_decimal(m.content)
msg = await self.bot.wait_for("message", check=check_dec)
# Set the step.
await msg.add_reaction("✅")
step = Decimal(msg.content)
length += 1
count += step
# Dict for users to correct replies
# NB no points for the first user - the initial message cannot be wrong
players = dict()
# last_player replaced by last_message, as it contains reference to last player, as well as extra info for checking correctness
last_message = msg
while self.currently_playing:
# Wait for the next numeric message sent by a different person in the same channel
def check_dec_player(m):
return check_dec(m) and m.author != last_message.author
msg = await self.bot.wait_for("message", check=check_dec_player)
value = Decimal(msg.content)
if msg.author.id not in players:
players[msg.author.id] = 0
if value == count + step:
# If the number is correct, increase the count and length.
count += step
length += 1
last_message = msg
players[msg.author.id] += 1
await msg.add_reaction("✅")
else:
# Otherwise, try and retreive the last message
try:
await ctx.fetch_message(last_message.id)
# If the last message was retreived, no error occured, so this was a genuine mistake
await msg.add_reaction("❌")
await ctx.send(
f"Gone wrong at {count}! The next number was {count + step}.\n"
f"This chain lasted {length} consecutive messages."
)
break
except NotFound:
# If the last message count be found, an error is thrown, and the message was deleted - name/shame
ctx.send(f"Oops! {get_name_string(last_message)} deleted their message. The next number is {count + step}")
# Save this run to the database
ended_at = datetime.utcnow()
run = CountingRun(
started_at=started_at,
ended_at=ended_at,
length=length,
step=step,
)
db_session.add(run)
# Save the players who played into the database
for player, correct in players.items():
# The last message sent is the incorrect one
wrong = 1 if msg.author.id == player else 0
db_user = get_database_user_from_id(player)
# If we can't find the user, skip
if not db_user:
continue
# See if they've taken part in the counting game before
counting_user = (
db_session.query(CountingUser)
.filter(CountingUser.user_id == db_user.id)
.one_or_none()
)
if counting_user is None:
# Create a new entry
counting_user = CountingUser(
user_id=db_user.id, correct_replies=correct, wrong_replies=wrong
)
else:
counting_user.correct_replies += correct
counting_user.wrong_replies += wrong
db_session.add(counting_user)
try:
db_session.commit()
await ctx.send("Run recorded!")
except SQLAlchemyError as e:
db_session.rollback()
logging.exception(e)
await ctx.send("Something went wrong. The run could not be recorded.")
# Reset the cog's state.
self.currently_playing = False
self.channel = None
@counting.command(help="Show the top 5 users in the counting game")
async def leaderboard(self, ctx: Context):
top5 = (
db_session.query(CountingUser)
.order_by(CountingUser.correct_replies.desc())
.limit(5)
.all()
)
message = ["Here are the top 5 users by correct answers: ", ""]
for i, c_user in enumerate(top5):
username = (
db_session.query(UserModel)
.filter(UserModel.id == c_user.id)
.first()
.username
)
message.append(
f"• #{i + 1}. **{username}**: {c_user.correct_replies}✅ {c_user.wrong_replies}❌"
)
await ctx.send("\n".join(message))
@counting.command(help="Show the top 5 longest runs recorded")
async def top(self, ctx: Context):
top5 = (
db_session.query(CountingRun)
.order_by(CountingRun.length.desc())
.limit(5)
.all()
)
message = ["Here are the top 5 longest runs:" ""]
for i, c_run in enumerate(top5):
start = c_run.started_at.strftime("%x %X")
end = c_run.ended_at.strftime("%x %X")
message.append(
f"• #{i + 1}. **length {c_run.length}**, step {c_run.step}, took place {start}-{end}"
)
await ctx.send("\n".join(message))
@cooldown(2, 30, BucketType.user)
@counting.command(help="Look up a user's stats")
async def user(self, ctx: Context, user: User):
# Give me Result.Bind please
db_user = (
db_session.query(UserModel).filter(UserModel.user_uid == user.id).first()
)
if db_user is None:
await ctx.send("Could not find user!")
return
c_user = (
db_session.query(CountingUser)
.filter(CountingUser.user_id == db_user.id)
.first()
)
if c_user is None:
await ctx.send("User has not played any counting games!")
return
await ctx.send(
f"**{db_user.username}**: {c_user.correct_replies}✅ {c_user.wrong_replies}❌"
)
@counting.command(help="Look up your own stats")
async def me(self, ctx: Context):
await self.user(ctx, ctx.author)
@user.error
async def user_error(self, ctx: Context, err):
self.currently_playing = False
await ctx.send(err)
@counting.error
async def counting_error(self, ctx: Context, err):
self.currently_playing = False
async def setup(bot: Bot):
await bot.add_cog(Counting(bot))