-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathsed.py
126 lines (101 loc) · 3.57 KB
/
sed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import multiprocessing
import re
from collections import defaultdict, deque
from telethon import events
last_msgs = defaultdict(lambda: deque(maxlen=10))
last_replies = defaultdict(lambda: deque(maxlen=10))
SED_PATTERN = re.compile(r'^s/((?:\\/|[^/])+)/((?:\\/|[^/])*)/?(.*)')
PREFIX = '「sed」\n'
VOWELS = set("aeiouAEIOU")
class UnknownFlag(ValueError):
def __init__(self, flag):
super().__init__(f'unknown flag: {flag}')
self.flag = flag
def timeout(func, n, *args):
def proc(pf, po, *pa):
try:
po.send((pf(*pa), None))
except BaseException as e:
po.send((None, e))
inp, out = multiprocessing.Pipe()
p = multiprocessing.Process(target=proc, args=(func, out, *args))
p.start()
p.join(n)
if p.is_alive():
p.terminate()
p.join()
raise TimeoutError(f'Process {func} timed out after {n}')
elif inp.poll(n):
res, err = inp.recv()
if err:
raise err
else:
return res
else:
raise ChildProcessError('Process exited without sending data')
def build_substitute(pattern, repl, flag_str):
repl = repl.replace('\\/', '/').replace('\\0', '\\g<0>')
count = 1
flags = 0
for f in (flag_str or ''):
if f in 'Gg':
count = 0
continue
try:
flags |= getattr(re.RegexFlag, f.upper())
except AttributeError:
raise UnknownFlag(f) from None
def substitute(string):
if string.startswith(PREFIX):
string = string[len(PREFIX):]
s, i = re.subn(pattern, repl, string, count=count, flags=flags)
if i > 0:
return PREFIX + s
return substitute
async def init(bot):
@bot.on(events.NewMessage(pattern=SED_PATTERN))
async def handler(event):
if event.is_reply:
messages = [await event.get_reply_message()]
else:
messages = reversed(last_msgs[event.chat_id])
try:
substitute = build_substitute(*event.pattern_match.groups())
except UnknownFlag as e:
await event.reply(str(e))
return
for message in messages:
try:
new = timeout(substitute, 0.2, message.raw_text)
except TimeoutError:
await message.reply('are you… trying to DoS me?')
break
except Exception as e:
ex_string = str(e).strip()
if ex_string:
ex_name = type(e).__name__
v = ex_name[0] in VOWELS
await message.reply(f'you caused a{"n" if v else ""} {ex_name}, dummy')
else:
await message.reply(f'you caused "{ex_string}", dummy')
break
if new is None:
continue
try:
sent = await message.reply(new, parse_mode=None)
except Exception as e:
await message.reply(f'owh :(\n{e}')
else:
last_msgs[event.chat_id].append(sent)
where = event.chat_id if event.is_channel else None
last_replies[where].append((event.id, sent.id))
break
raise events.StopPropagation
@bot.on(events.NewMessage)
async def handler(event):
last_msgs[event.chat_id].append(event.message)
@bot.on(events.MessageDeleted)
async def handler(event):
for src, dst in last_replies[event.chat_id]:
if src in event.deleted_ids:
await bot.delete_messages(event.chat_id, dst)