This repository has been archived by the owner on Sep 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
tiny-matrix-bot.py
executable file
·183 lines (167 loc) · 6.57 KB
/
tiny-matrix-bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python3
# pylint: disable=broad-except
# pylint: disable=consider-using-f-string
# pylint: disable=invalid-name
# pylint: disable=missing-docstring
# pylint: disable=too-few-public-methods
import asyncio
import os
import re
import subprocess
import sys
import time
import traceback
import nio
class TinyMatrixBot:
accept_invites = None
access_token = None
homeserver = None
proxy = None
scripts_path = None
user_id = None
_client = None
_initial_sync_done = False
_last_event_timestamp = time.time() * 1000
_scripts = None
def _run_script(self, script_path, script_env=None):
script_name = os.path.basename(script_path)
print(f"running script {script_name} with env {script_env}")
env = os.environ.copy()
if script_env:
env.update(script_env)
try:
run = subprocess.run(
[script_path],
env=env,
stdout=subprocess.PIPE,
check=False,
universal_newlines=True,
)
except Exception:
print(traceback.format_exc().strip())
return False
if run.returncode != 0:
print(" non-zero exit code")
return False
output = run.stdout.strip()
if not output:
print(" no output")
return False
return output
def _load_scripts(self, scripts_path):
scripts = {}
for file in os.listdir(scripts_path):
script_path = os.path.join(self.scripts_path, file)
script_name = os.path.basename(script_path)
if script_name[0] == ".":
continue
if not os.access(script_path, os.R_OK):
print(f"script {script_name} is not readable")
continue
if not os.access(script_path, os.X_OK):
print(f"script {script_name} is not executable")
continue
script_regex = self._run_script(script_path, {"CONFIG": "1"})
if not script_regex:
print(f"script {script_name} loading failed")
continue
print(f"script {script_name} loaded with regex {script_regex}")
scripts.update({script_path: script_regex})
return scripts
def __init__(self):
required_env_vars = ["TMB_HOMESERVER", "TMB_ACCESS_TOKEN", "TMB_USER_ID"]
for env_var in os.environ:
if not env_var.startswith("TMB_"):
continue
if env_var in required_env_vars:
required_env_vars.remove(env_var)
setattr(self, env_var.lower()[4:], os.environ[env_var])
if required_env_vars:
raise Exception("missing {}".format(", ".join(required_env_vars)))
if self.accept_invites is None:
self.accept_invites = ":{}$".format(re.escape(self.user_id.split(":")[1]))
if self.scripts_path is None:
self.scripts_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"scripts-enabled"
)
if os.path.isdir(self.scripts_path):
self._scripts = self._load_scripts(self.scripts_path)
async def _on_error(self, response):
if self._client:
await self._client.close()
raise Exception(response)
async def _on_sync(self, _response):
if not self._initial_sync_done:
self._initial_sync_done = True
for room_id in self._client.rooms:
print(f"joined room {room_id}")
print("initial sync done, ready for work")
async def _on_invite(self, room, event):
if not re.search(self.accept_invites, event.sender, re.IGNORECASE):
print(f"invite from {event.sender} to {room.room_id} rejected")
await self._client.room_leave(room.room_id)
else:
print(f"invite from {event.sender} to {room.room_id} accepted")
await self._client.join(room.room_id)
async def _on_message(self, room, event):
await self._client.update_receipt_marker(room.room_id, event.event_id)
if event.sender == self._client.user_id:
return
if event.server_timestamp <= self._last_event_timestamp:
return
self._last_event_timestamp = event.server_timestamp
if not self._scripts:
print("no scripts")
return
for script_path, script_regex in self._scripts.items():
if not re.search(script_regex, event.body, re.IGNORECASE):
continue
script_name = os.path.basename(script_path)
print(f"script {script_name} triggered in {room.room_id}")
script_output = self._run_script(
script_path,
{
"TMB_ROOM_ID": room.room_id,
"TMB_SENDER": event.sender,
"TMB_BODY": event.body,
},
)
if not script_output:
continue
print(f"sending message to {room.room_id}")
await self._client.room_typing(room.room_id, True)
for message_body in script_output.split("\n\n"):
time.sleep(0.8)
await self._client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": message_body},
)
await self._client.room_typing(room.room_id, False)
async def run(self):
print(f"connecting to {self.homeserver}")
self._client = nio.AsyncClient(self.homeserver, proxy=self.proxy)
self._client.access_token = self.access_token
self._client.device_id = "TinyMatrixBot"
self._client.user_id = self.user_id
self._client.add_response_callback(self._on_error, nio.SyncError)
self._client.add_response_callback(self._on_sync, nio.SyncResponse)
self._client.add_event_callback(self._on_invite, nio.InviteMemberEvent)
self._client.add_event_callback(self._on_message, nio.RoomMessageText)
await self._client.sync_forever(timeout=30000)
await self._client.close()
if __name__ == "__main__":
asyncio_debug = False
if "TMB_DEBUG" in os.environ:
import logging
logging.basicConfig(level=logging.DEBUG)
asyncio_debug = True
try:
TMB = TinyMatrixBot()
asyncio.run(TMB.run(), debug=asyncio_debug)
except Exception:
print(traceback.format_exc().strip())
sys.exit(1)
except KeyboardInterrupt:
sys.exit(0)