Skip to content

Commit

Permalink
Ensure request-aware config reader in user object when using config w…
Browse files Browse the repository at this point in the history
…rapper
  • Loading branch information
stijn-uva committed Sep 17, 2024
1 parent bbe79e4 commit b21866d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 42 deletions.
4 changes: 4 additions & 0 deletions common/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ def __init__(self, config, user=None, tags=None, request=None):
self.tags = tags
self.request = request

# this ensures the user object in turn reads from the wrapper
if self.user:
self.user.with_config(self)


def set(self, *args, **kwargs):
"""
Expand Down
109 changes: 67 additions & 42 deletions common/lib/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from email.mime.text import MIMEText
from common.lib.helpers import send_email
from common.lib.exceptions import DataSetException
from common.config_manager import config
from common.config_manager import config as global_config


class User:
Expand All @@ -28,12 +28,13 @@ class User:
is_authenticated = False
is_active = False
is_anonymous = True
config = None
db = None

name = "anonymous"

@staticmethod
def get_by_login(db, name, password):
def get_by_login(db, name, password, config=None):
"""
Get user object, if login is correct
Expand All @@ -43,6 +44,8 @@ def get_by_login(db, name, password):
:param db: Database connection object
:param name: User name
:param password: User password
:param config: Configuration manager. Can be used for request-aware user objects using ConfigWrapper. Empty to
use a global configuration manager.
:return: User object, or `None` if login was invalid
"""
user = db.fetchone("SELECT * FROM users WHERE name = %s", (name,))
Expand All @@ -54,30 +57,34 @@ def get_by_login(db, name, password):
return None
else:
# valid login!
return User(db, user, authenticated=True)
return User(db, user, authenticated=True, config=config)

@staticmethod
def get_by_name(db, name):
def get_by_name(db, name, config=None):
"""
Get user object for given user name
:param db: Database connection object
:param str name: Username to get object for
:param config: Configuration manager. Can be used for request-aware user objects using ConfigWrapper. Empty to
use a global configuration manager.
:return: User object, or `None` for invalid user name
"""
user = db.fetchone("SELECT * FROM users WHERE name = %s", (name,))
if not user:
return None
else:
return User(db, user)
return User(db, user, config=config)

@staticmethod
def get_by_token(db, token):
def get_by_token(db, token, config=None):
"""
Get user object for given token, if token is valid
:param db: Database connection object
:param str token: Token to get object for
:param config: Configuration manager. Can be used for request-aware user objects using ConfigWrapper. Empty to
use a global configuration manager.
:return: User object, or `None` for invalid token
"""
user = db.fetchone(
Expand All @@ -86,36 +93,9 @@ def get_by_token(db, token):
if not user:
return None
else:
return User(db, user)
return User(db, user, config=config)

def can_access_dataset(self, dataset, role=None):
"""
Check if this user should be able to access a given dataset.
This depends mostly on the dataset's owner, which should match the
user if the dataset is private. If the dataset is not private, or
if the user is an admin or the dataset is private but assigned to
an anonymous user, the dataset can be accessed.
:param dataset: The dataset to check access to
:return bool:
"""
if not dataset.is_private:
return True

elif self.is_admin:
return True

elif dataset.is_accessible_by(self, role=role):
return True

elif dataset.get_owners == ("anonymous",):
return True

else:
return False

def __init__(self, db, data, authenticated=False):
def __init__(self, db, data, authenticated=False, config=None):
"""
Instantiate user object
Expand All @@ -127,6 +107,9 @@ def __init__(self, db, data, authenticated=False):
"""
self.db = db
self.data = data

self.config = config if config else global_config

try:
self.userdata = json.loads(self.data["userdata"])
except (TypeError, json.JSONDecodeError):
Expand Down Expand Up @@ -170,7 +153,7 @@ def get_name(self):
if self.data["name"] == "anonymous":
return "Anonymous"
elif self.data["name"] == "autologin":
return config.get("flask.autologin.name")
return self.config.get("flask.autologin.name")
else:
return self.data["name"]

Expand All @@ -184,6 +167,21 @@ def get_token(self):
"""
return self.generate_token(regenerate=False)

def with_config(self, config):
"""
Connect user to configuration manager
By default, the user object reads from the global configuration
manager. For frontend operations it may be desireable to use a
request-aware configuration manager, but this is only available after
the user has been instantiated. This method can thus be used to connect
the user to that config manager later when it is available.
:param config: Configuration manager object
:return:
"""
self.config = config

def clear_token(self):
"""
Reset password rest token
Expand All @@ -195,6 +193,33 @@ def clear_token(self):
"""
self.db.update("users", data={"register_token": "", "timestamp_token": 0}, where={"name": self.get_id()})

def can_access_dataset(self, dataset, role=None):
"""
Check if this user should be able to access a given dataset.
This depends mostly on the dataset's owner, which should match the
user if the dataset is private. If the dataset is not private, or
if the user is an admin or the dataset is private but assigned to
an anonymous user, the dataset can be accessed.
:param dataset: The dataset to check access to
:return bool:
"""
if not dataset.is_private:
return True

elif self.is_admin:
return True

elif dataset.is_accessible_by(self, role=role):
return True

elif dataset.get_owners == ("anonymous",):
return True

else:
return False

@property
def is_special(self):
"""
Expand Down Expand Up @@ -246,7 +271,7 @@ def email_token(self, new=False):
account?
:return str: Link for the user to set their password with
"""
if not config.get('mail.server'):
if not self.config.get('mail.server'):
raise RuntimeError("No e-mail server configured. 4CAT cannot send any e-mails.")

if self.is_special:
Expand All @@ -258,14 +283,14 @@ def email_token(self, new=False):
register_token = self.generate_token(regenerate=True)

# prepare welcome e-mail
sender = config.get('mail.noreply')
sender = self.config.get('mail.noreply')
message = MIMEMultipart("alternative")
message["From"] = sender
message["To"] = username

# the actual e-mail...
url_base = config.get("flask.server_name")
protocol = "https" if config.get("flask.https") else "http"
url_base = self.config.get("flask.server_name")
protocol = "https" if self.config.get("flask.https") else "http"
url = "%s://%s/reset-password/?token=%s" % (protocol, url_base, register_token)

# we use slightly different e-mails depending on whether this is the first time setting a password
Expand Down Expand Up @@ -408,7 +433,7 @@ def get_notifications(self):
:return list: Notifications, as a list of dictionaries
"""
tag_recipients = ["!everyone", *[f"!{tag}" for tag in self.data["tags"]]]
tag_recipients = ["!everyone", *[f"!{tag}" for tag in self.config.get_active_tags(self)]]
if self.is_admin:
# for backwards compatibility - used to be called '!admins' even if the tag is 'admin'
tag_recipients.append("!admins")
Expand Down Expand Up @@ -457,7 +482,7 @@ def sort_user_tags(self):
tags = self.data["tags"]
sorted_tags = []

for tag in config.get("flask.tag_order"):
for tag in self.config.get("flask.tag_order"):
if tag in tags:
sorted_tags.append(tag)

Expand Down

0 comments on commit b21866d

Please sign in to comment.