diff --git a/evap/evaluation/management/commands/tools.py b/evap/evaluation/management/commands/tools.py index fef8f38d0a..ee07ec9e4a 100644 --- a/evap/evaluation/management/commands/tools.py +++ b/evap/evaluation/management/commands/tools.py @@ -35,7 +35,7 @@ def log_exceptions(cls): class NewClass(cls): def handle(self, *args, **options): try: - super().handle(args, options) + super().handle(*args, **options) except Exception: logger.exception("Management command '%s' failed. Traceback follows: ", sys.argv[1]) raise diff --git a/evap/evaluation/migrations/0147_course_cms_id_evaluation_cms_id.py b/evap/evaluation/migrations/0147_course_cms_id_evaluation_cms_id.py new file mode 100644 index 0000000000..932db7ca95 --- /dev/null +++ b/evap/evaluation/migrations/0147_course_cms_id_evaluation_cms_id.py @@ -0,0 +1,27 @@ +# Generated by Django 5.0.4 on 2024-05-13 20:59 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("evaluation", "0146_grade_reminder_template"), + ] + + operations = [ + migrations.AddField( + model_name="course", + name="cms_id", + field=models.CharField( + blank=True, max_length=255, null=True, unique=True, verbose_name="campus management system id" + ), + ), + migrations.AddField( + model_name="evaluation", + name="cms_id", + field=models.CharField( + blank=True, max_length=255, null=True, unique=True, verbose_name="campus management system id" + ), + ), + ] diff --git a/evap/evaluation/models.py b/evap/evaluation/models.py index 84ab25ce73..5003e2e1b1 100644 --- a/evap/evaluation/models.py +++ b/evap/evaluation/models.py @@ -331,6 +331,11 @@ class Course(LoggedModel): # grade publishers can set this to True, then the course will be handled as if final grades have already been uploaded gets_no_grade_documents = models.BooleanField(verbose_name=_("gets no grade documents"), default=False) + # unique reference for import from campus management system + cms_id = models.CharField( + verbose_name=_("campus management system id"), blank=True, null=True, unique=True, max_length=255 + ) + class Meta: unique_together = [ ["semester", "name_de"], @@ -468,6 +473,11 @@ class State(models.IntegerChoices): verbose_name=_("wait for grade upload before publishing"), default=True ) + # unique reference for import from campus management system + cms_id = models.CharField( + verbose_name=_("campus management system id"), blank=True, null=True, unique=True, max_length=255 + ) + class TextAnswerReviewState(Enum): do_not_call_in_templates = True # pylint: disable=invalid-name NO_TEXTANSWERS = auto() diff --git a/evap/staff/forms.py b/evap/staff/forms.py index e41b01aa6d..c036520412 100644 --- a/evap/staff/forms.py +++ b/evap/staff/forms.py @@ -310,6 +310,7 @@ def __init__(self, data=None, *, instance: Course): "_voter_count", "voters", "votetimestamp", + "cms_id", } CONTRIBUTION_COPIED_FIELDS = { diff --git a/evap/staff/importers/json.py b/evap/staff/importers/json.py new file mode 100644 index 0000000000..0a69730b65 --- /dev/null +++ b/evap/staff/importers/json.py @@ -0,0 +1,328 @@ +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any, TypedDict + +from django.conf import settings +from django.core.mail import EmailMultiAlternatives +from django.db import transaction +from django.utils.timezone import now + +from evap.evaluation.models import Contribution, Course, CourseType, Evaluation, Program, Semester, UserProfile +from evap.evaluation.tools import clean_email +from evap.staff.tools import update_or_create_with_changes, update_with_changes + +logger = logging.getLogger("import") + + +class ImportStudent(TypedDict): + gguid: str + email: str + name: str + christianname: str + + +class ImportLecturer(TypedDict): + gguid: str + email: str + name: str + christianname: str + titlefront: str + + +class ImportCourse(TypedDict): + cprid: str + scale: str + + +class ImportRelated(TypedDict): + gguid: str + + +class ImportAppointment(TypedDict): + begin: str + end: str + + +class ImportEvent(TypedDict): + gguid: str + lvnr: int + title: str + title_en: str + type: str + isexam: bool + courses: list[ImportCourse] + relatedevents: ImportRelated + appointments: list[ImportAppointment] + lecturers: list[ImportRelated] + students: list[ImportRelated] + + +class ImportDict(TypedDict): + students: list[ImportStudent] + lecturers: list[ImportLecturer] + events: list[ImportEvent] + + +@dataclass +class NameChange: + old_last_name: str + old_first_name_given: str + new_last_name: str + new_first_name_given: str + email: str + + +@dataclass +class ImportStatistics: + name_changes: list[NameChange] = field(default_factory=list) + new_courses: list[Course] = field(default_factory=list) + new_evaluations: list[Evaluation] = field(default_factory=list) + updated_courses: list[Course] = field(default_factory=list) + updated_evaluations: list[Evaluation] = field(default_factory=list) + attempted_changes: list[Evaluation] = field(default_factory=list) + + @staticmethod + def _make_heading(heading: str, separator: str = "-") -> str: + return f"{heading}\n{separator * len(heading)}\n" + + @staticmethod + def _make_total(total: int) -> str: + return f"({total} in total)\n\n" + + @staticmethod + def _make_stats(heading: str, objects: list) -> str: + log = ImportStatistics._make_heading(heading) + log += ImportStatistics._make_total(len(objects)) + for obj in objects: + log += f"- {obj}\n" + log += "\n" + return log + + def get_log(self) -> str: + log = self._make_heading("JSON IMPORTER REPORT", "=") + log += "\n" + log += f"Import finished at {now()}\n\n" + + log += self._make_heading("Name Changes") + log += self._make_total(len(self.name_changes)) + for name_change in self.name_changes: + log += f"- {name_change.old_first_name_given} {name_change.old_last_name} → {name_change.new_first_name_given} {name_change.new_last_name} (email: {name_change.email})\n" + + log += self._make_stats("New Courses", self.new_courses) + log += self._make_stats("New Evaluations", self.new_evaluations) + log += self._make_stats("Updated Courses", self.updated_courses) + log += self._make_stats("Updated Evaluations", self.updated_evaluations) + log += self._make_stats("Attempted Changes", self.attempted_changes) + + return log + + def send_mail(self): + subject = "[EvaP] JSON importer log" + + managers = UserProfile.objects.filter(groups__name="Manager", email__isnull=False) + if not managers: + return + mail = EmailMultiAlternatives( + subject, + self.get_log(), + settings.SERVER_EMAIL, + [manager.email for manager in managers], + ) + mail.send() + + +class JSONImporter: + DATETIME_FORMAT = "%d.%m.%Y %H:%M" + + def __init__(self, semester: Semester) -> None: + self.semester = semester + self.user_profile_map: dict[str, UserProfile] = {} + self.course_type_cache: dict[str, CourseType] = {} + self.program_cache: dict[str, Program] = {} + self.course_map: dict[str, Course] = {} + self.statistics = ImportStatistics() + + def _get_course_type(self, name: str) -> CourseType: + if name in self.course_type_cache: + return self.course_type_cache[name] + + course_type, __ = CourseType.objects.get_or_create(name_de=name, defaults={"name_en": name}) + self.course_type_cache[name] = course_type + return course_type + + def _get_program(self, name: str) -> Program: + if name in self.program_cache: + return self.program_cache[name] + + program, __ = Program.objects.get_or_create(name_de=name, defaults={"name_en": name}) + self.program_cache[name] = program + return program + + def _get_user_profiles(self, data: list[ImportRelated]) -> list[UserProfile]: + return [self.user_profile_map[related["gguid"]] for related in data] + + def _create_name_change_from_changes(self, user_profile: UserProfile, changes: dict[str, tuple[Any, Any]]) -> None: + change = NameChange( + old_last_name=changes["last_name"][0] if changes.get("last_name") else user_profile.last_name, + old_first_name_given=( + changes["first_name_given"][0] if changes.get("first_name_given") else user_profile.first_name_given + ), + new_last_name=user_profile.last_name, + new_first_name_given=user_profile.first_name_given, + email=user_profile.email, + ) + self.statistics.name_changes.append(change) + + def _import_students(self, data: list[ImportStudent]) -> None: + for entry in data: + email = clean_email(entry["email"]) + user_profile, __, changes = update_or_create_with_changes( + UserProfile, + email=email, + defaults={"last_name": entry["name"], "first_name_given": entry["christianname"]}, + ) + if changes: + self._create_name_change_from_changes(user_profile, changes) + + self.user_profile_map[entry["gguid"]] = user_profile + + def _import_lecturers(self, data: list[ImportLecturer]) -> None: + for entry in data: + email = clean_email(entry["email"]) + user_profile, __, changes = update_or_create_with_changes( + UserProfile, + email=email, + defaults={ + "last_name": entry["name"], + "first_name_given": entry["christianname"], + "title": entry["titlefront"], + }, + ) + if changes: + self._create_name_change_from_changes(user_profile, changes) + + self.user_profile_map[entry["gguid"]] = user_profile + + def _import_course(self, data: ImportEvent) -> Course: + course_type = self._get_course_type(data["type"]) + programs = [self._get_program(c["cprid"]) for c in data["courses"]] + responsibles = self._get_user_profiles(data["lecturers"]) + course, created, changes = update_or_create_with_changes( + Course, + semester=self.semester, + cms_id=data["gguid"], + defaults={"name_de": data["title"], "name_en": data["title_en"], "type": course_type}, + ) + course.programs.set(programs) + course.responsibles.set(responsibles) + + if changes: + self.statistics.updated_courses.append(course) + if created: + self.statistics.new_courses.append(course) + + self.course_map[data["gguid"]] = course + + return course + + # pylint: disable=too-many-locals + def _import_evaluation(self, course: Course, data: ImportEvent) -> Evaluation: + course_end = datetime.strptime(data["appointments"][0]["end"], self.DATETIME_FORMAT) + + if data["isexam"]: + # Set evaluation time frame of three days for exam evaluations: + evaluation_start_datetime = course_end.replace(hour=8, minute=0, second=0, microsecond=0) + timedelta( + days=1 + ) + evaluation_end_date = (course_end + timedelta(days=3)).date() + + name_de = "Klausur" + name_en = "Exam" + else: + # Set evaluation time frame of two weeks for normal evaluations: + # Start datetime is at 8:00 am on the monday in the week before the event ends + evaluation_start_datetime = course_end.replace(hour=8, minute=0, second=0, microsecond=0) - timedelta( + weeks=1, days=course_end.weekday() + ) + # End date is on the sunday in the week the event ends + evaluation_end_date = (course_end + timedelta(days=6 - course_end.weekday())).date() + + name_de, name_en = "", "" + + # If events are graded for any program, wait for grade upload before publishing + wait_for_grade_upload_before_publishing = any(grade["scale"] for grade in data["courses"]) + + participants = self._get_user_profiles(data["students"]) + + defaults = { + "name_de": name_de, + "name_en": name_en, + "vote_start_datetime": evaluation_start_datetime, + "vote_end_date": evaluation_end_date, + "wait_for_grade_upload_before_publishing": wait_for_grade_upload_before_publishing, + } + evaluation, created = Evaluation.objects.get_or_create( + course=course, + cms_id=data["gguid"], + defaults=defaults, + ) + if evaluation.state < Evaluation.State.APPROVED: + direct_changes = update_with_changes(evaluation, defaults) + + participant_changes = set(evaluation.participants.all()) != set(participants) + evaluation.participants.set(participants) + + any_lecturers_changed = False + for lecturer in data["lecturers"]: + __, lecturer_created, lecturer_changes = self._import_contribution(evaluation, lecturer) + if lecturer_changes or lecturer_created: + any_lecturers_changed = True + + if direct_changes or participant_changes or any_lecturers_changed: + self.statistics.updated_evaluations.append(evaluation) + else: + self.statistics.attempted_changes.append(evaluation) + + if created: + self.statistics.new_evaluations.append(evaluation) + + return evaluation + + def _import_contribution( + self, evaluation: Evaluation, data: ImportRelated + ) -> tuple[Contribution, bool, dict[str, tuple[any, any]]]: + user_profile = self.user_profile_map[data["gguid"]] + + contribution, created, changes = update_or_create_with_changes( + Contribution, + evaluation=evaluation, + contributor=user_profile, + ) + return contribution, created, changes + + def _import_events(self, data: list[ImportEvent]) -> None: + # Divide in two lists so corresponding courses are imported before their exams + non_exam_events = (event for event in data if not event["isexam"]) + exam_events = (event for event in data if event["isexam"]) + + for event in non_exam_events: + course = self._import_course(event) + + self._import_evaluation(course, event) + + for event in exam_events: + course = self.course_map[event["relatedevents"]["gguid"]] + + self._import_evaluation(course, event) + + @transaction.atomic + def import_dict(self, data: ImportDict) -> None: + self._import_students(data["students"]) + self._import_lecturers(data["lecturers"]) + self._import_events(data["events"]) + self.statistics.send_mail() + + def import_json(self, data: str) -> None: + self.import_dict(json.loads(data)) diff --git a/evap/staff/management/commands/__init__.py b/evap/staff/management/commands/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/evap/staff/management/commands/json_import.py b/evap/staff/management/commands/json_import.py new file mode 100644 index 0000000000..3c7c7e1e78 --- /dev/null +++ b/evap/staff/management/commands/json_import.py @@ -0,0 +1,29 @@ +import logging + +from django.core.management.base import BaseCommand + +from evap.evaluation.management.commands.tools import log_exceptions +from evap.evaluation.models import Semester +from evap.staff.importers.json import JSONImporter + +logger = logging.getLogger(__name__) + + +@log_exceptions +class Command(BaseCommand): + help = "Import enrollments from JSON file." + + def add_arguments(self, parser): + # Positional arguments + parser.add_argument("semester", type=int) + parser.add_argument("file", type=str) + + def handle(self, *args, **options): + try: + semester = Semester.objects.get(pk=options["semester"]) + except Semester.DoesNotExist: + self.stdout.write(self.style.ERROR("Semester does not exist.")) + return + + with open(options["file"]) as file: + JSONImporter(semester).import_json(file.read()) diff --git a/evap/staff/tests/test_json_importer.py b/evap/staff/tests/test_json_importer.py new file mode 100644 index 0000000000..e82ab17130 --- /dev/null +++ b/evap/staff/tests/test_json_importer.py @@ -0,0 +1,307 @@ +import json +import os +from datetime import date, datetime +from io import StringIO +from tempfile import TemporaryDirectory +from unittest.mock import patch + +from django.core import mail +from django.core.management import call_command +from django.test import TestCase +from model_bakery import baker + +from evap.evaluation.models import Contribution, Course, Evaluation, Questionnaire, Semester, UserProfile +from evap.evaluation.tests.tools import make_manager +from evap.staff.importers.json import ImportDict, JSONImporter, NameChange + +EXAMPLE_DATA: ImportDict = { + "students": [ + {"gguid": "0x1", "email": "1@example.com", "name": "1", "christianname": "1"}, + {"gguid": "0x2", "email": "2@example.com", "name": "2", "christianname": "2"}, + ], + "lecturers": [ + {"gguid": "0x3", "email": "3@example.com", "name": "3", "christianname": "3", "titlefront": "Prof. Dr."}, + {"gguid": "0x4", "email": "4@example.com", "name": "4", "christianname": "4", "titlefront": "Dr."}, + ], + "events": [ + { + "gguid": "0x5", + "lvnr": 1, + "title": "Prozessorientierte Informationssysteme", + "title_en": "Process-oriented information systems", + "type": "Vorlesung", + "isexam": False, + "courses": [ + {"cprid": "BA-Inf", "scale": "GRADE_PARTICIPATION"}, + {"cprid": "MA-Inf", "scale": "GRADE_PARTICIPATION"}, + ], + "relatedevents": {"gguid": "0x6"}, + "appointments": [{"begin": "15.04.2024 10:15", "end": "15.07.2024 11:45"}], + "lecturers": [{"gguid": "0x3"}], + "students": [{"gguid": "0x1"}, {"gguid": "0x2"}], + }, + { + "gguid": "0x6", + "lvnr": 2, + "title": "Prozessorientierte Informationssysteme", + "title_en": "Process-oriented information systems", + "type": "Klausur", + "isexam": True, + "courses": [ + {"cprid": "BA-Inf", "scale": ""}, + {"cprid": "MA-Inf", "scale": ""}, + ], + "relatedevents": {"gguid": "0x5"}, + "appointments": [{"begin": "29.07.2024 10:15", "end": "29.07.2024 11:45"}], + "lecturers": [{"gguid": "0x3"}, {"gguid": "0x4"}], + "students": [{"gguid": "0x1"}, {"gguid": "0x2"}], + }, + ], +} +EXAMPLE_JSON = json.dumps(EXAMPLE_DATA) + + +class TestImportUserProfiles(TestCase): + def setUp(self): + self.students = EXAMPLE_DATA["students"] + self.lecturers = EXAMPLE_DATA["lecturers"] + + self.semester = baker.make(Semester) + + def test_import_students(self): + self.assertEqual(UserProfile.objects.count(), 0) + + importer = JSONImporter(self.semester) + importer._import_students(self.students) + + user_profiles = UserProfile.objects.all() + + for i, user_profile in enumerate(user_profiles.order_by("email")): + self.assertEqual(user_profile.email, self.students[i]["email"]) + self.assertEqual(user_profile.last_name, self.students[i]["name"]) + self.assertEqual(user_profile.first_name_given, self.students[i]["christianname"]) + + self.assertEqual(importer.statistics.name_changes, []) + + def test_import_existing_students(self): + user_profile = baker.make( + UserProfile, email=self.students[0]["email"], last_name="Doe", first_name_given="Jane" + ) + + importer = JSONImporter(self.semester) + importer._import_students(self.students) + + self.assertEqual(UserProfile.objects.count(), 2) + + user_profile.refresh_from_db() + + self.assertEqual(user_profile.email, self.students[0]["email"]) + self.assertEqual(user_profile.last_name, self.students[0]["name"]) + self.assertEqual(user_profile.first_name_given, self.students[0]["christianname"]) + + self.assertEqual( + importer.statistics.name_changes, + [ + NameChange( + old_last_name="Doe", + old_first_name_given="Jane", + new_last_name=self.students[0]["name"], + new_first_name_given=self.students[0]["christianname"], + email=self.students[0]["email"], + ) + ], + ) + + def test_import_lecturers(self): + self.assertEqual(UserProfile.objects.count(), 0) + + importer = JSONImporter(self.semester) + importer._import_lecturers(self.lecturers) + + user_profiles = UserProfile.objects.all() + + for i, user_profile in enumerate(user_profiles.order_by("email")): + self.assertEqual(user_profile.email, self.lecturers[i]["email"]) + self.assertEqual(user_profile.last_name, self.lecturers[i]["name"]) + self.assertEqual(user_profile.first_name_given, self.lecturers[i]["christianname"]) + self.assertEqual(user_profile.title, self.lecturers[i]["titlefront"]) + + self.assertEqual(importer.statistics.name_changes, []) + + def test_import_existing_lecturers(self): + user_profile = baker.make( + UserProfile, email=self.lecturers[0]["email"], last_name="Doe", first_name_given="Jane" + ) + + importer = JSONImporter(self.semester) + importer._import_lecturers(self.lecturers) + + self.assertEqual(UserProfile.objects.count(), 2) + + user_profile.refresh_from_db() + + self.assertEqual(user_profile.email, self.lecturers[0]["email"]) + self.assertEqual(user_profile.last_name, self.lecturers[0]["name"]) + self.assertEqual(user_profile.first_name_given, self.lecturers[0]["christianname"]) + self.assertEqual(user_profile.title, self.lecturers[0]["titlefront"]) + + self.assertEqual( + importer.statistics.name_changes, + [ + NameChange( + old_last_name="Doe", + old_first_name_given="Jane", + new_last_name=self.lecturers[0]["name"], + new_first_name_given=self.lecturers[0]["christianname"], + email=self.lecturers[0]["email"], + ) + ], + ) + + +class TestImportEvents(TestCase): + def setUp(self): + self.semester = baker.make(Semester) + + def _import(self): + importer = JSONImporter(self.semester) + importer.import_json(EXAMPLE_JSON) + return importer + + def test_import_courses(self): + importer = self._import() + + self.assertEqual(Course.objects.count(), 1) + course = Course.objects.first() + + self.assertEqual(course.semester, self.semester) + self.assertEqual(course.cms_id, EXAMPLE_DATA["events"][0]["gguid"]) + self.assertEqual(course.name_de, EXAMPLE_DATA["events"][0]["title"]) + self.assertEqual(course.name_en, EXAMPLE_DATA["events"][0]["title_en"]) + self.assertEqual(course.type.name_de, EXAMPLE_DATA["events"][0]["type"]) + self.assertSetEqual( + {d.name_de for d in course.programs.all()}, {d["cprid"] for d in EXAMPLE_DATA["events"][0]["courses"]} + ) + self.assertSetEqual( + set(course.responsibles.values_list("email", flat=True)), + {"3@example.com"}, + ) + + main_evaluation = Evaluation.objects.get(name_en="") + self.assertEqual(main_evaluation.course, course) + self.assertEqual(main_evaluation.name_de, "") + self.assertEqual(main_evaluation.name_en, "") + # [{"begin": "15.04.2024 10:15", "end": "15.07.2024 11:45"}] + self.assertEqual(main_evaluation.vote_start_datetime, datetime(2024, 7, 8, 8, 0)) + self.assertEqual(main_evaluation.vote_end_date, date(2024, 7, 21)) + self.assertSetEqual( + set(main_evaluation.participants.values_list("email", flat=True)), + {"1@example.com", "2@example.com"}, + ) + self.assertTrue(main_evaluation.wait_for_grade_upload_before_publishing) + + self.assertEqual(Contribution.objects.filter(evaluation=main_evaluation).count(), 2) + self.assertSetEqual( + set( + Contribution.objects.filter(evaluation=main_evaluation, contributor__isnull=False).values_list( + "contributor__email", flat=True + ) + ), + {"3@example.com"}, + ) + + exam_evaluation = Evaluation.objects.get(name_en="Exam") + self.assertEqual(exam_evaluation.course, course) + self.assertEqual(exam_evaluation.name_de, "Klausur") + self.assertEqual(exam_evaluation.name_en, "Exam") + # [{"begin": "29.07.2024 10:15", "end": "29.07.2024 11:45"}] + self.assertEqual(exam_evaluation.vote_start_datetime, datetime(2024, 7, 30, 8, 0)) + self.assertEqual(exam_evaluation.vote_end_date, date(2024, 8, 1)) + self.assertSetEqual( + set(exam_evaluation.participants.values_list("email", flat=True)), + {"1@example.com", "2@example.com"}, + ) + self.assertFalse(exam_evaluation.wait_for_grade_upload_before_publishing) + + self.assertEqual(Contribution.objects.filter(evaluation=exam_evaluation).count(), 3) + self.assertSetEqual( + set( + Contribution.objects.filter(evaluation=exam_evaluation, contributor__isnull=False).values_list( + "contributor__email", flat=True + ) + ), + {"3@example.com", "4@example.com"}, + ) + + self.assertEqual(len(importer.statistics.new_courses), 1) + self.assertEqual(len(importer.statistics.new_evaluations), 2) + + def test_import_courses_evaluation_approved(self): + self._import() + + evaluation = Evaluation.objects.get(name_en="") + + evaluation.name_en = "Test" + evaluation.save() + + importer = self._import() + + evaluation = Evaluation.objects.get(pk=evaluation.pk) + + self.assertEqual(evaluation.name_en, "") + self.assertEqual(len(importer.statistics.attempted_changes), 0) + + evaluation.general_contribution.questionnaires.add( + baker.make(Questionnaire, type=Questionnaire.Type.CONTRIBUTOR) + ) + evaluation.manager_approve() + evaluation.name_en = "Test" + evaluation.save() + + importer = self._import() + + evaluation = Evaluation.objects.get(pk=evaluation.pk) + + self.assertEqual(evaluation.name_en, "Test") + + self.assertEqual(len(importer.statistics.attempted_changes), 1) + + def test_import_courses_update(self): + self._import() + + self.assertEqual(Course.objects.count(), 1) + course = Course.objects.all()[0] + course.name_de = "Doe" + course.name_en = "Jane" + course.save() + + importer = self._import() + + course.refresh_from_db() + + self.assertEqual(course.name_de, EXAMPLE_DATA["events"][0]["title"]) + self.assertEqual(course.name_en, EXAMPLE_DATA["events"][0]["title_en"]) + + self.assertEqual(len(importer.statistics.updated_courses), 1) + self.assertEqual(len(importer.statistics.new_courses), 0) + + def test_importer_log_email_sent(self): + manager = make_manager() + + self._import() + + self.assertEqual(len(mail.outbox), 1) + self.assertEqual(mail.outbox[0].subject, "[EvaP] JSON importer log") + self.assertEqual(mail.outbox[0].recipients(), [manager.email]) + + @patch("evap.staff.importers.json.JSONImporter.import_json") + def test_management_command(self, mock_import_json): + output = StringIO() + + with TemporaryDirectory() as temp_dir: + test_filename = os.path.join(temp_dir, "test.json") + with open(test_filename, "w", encoding="utf-8") as f: + f.write(EXAMPLE_JSON) + call_command("json_import", self.semester.id, test_filename, stdout=output) + + mock_import_json.assert_called_once_with(EXAMPLE_JSON) diff --git a/evap/staff/tools.py b/evap/staff/tools.py index 23b10fe42c..4e75f97951 100644 --- a/evap/staff/tools.py +++ b/evap/staff/tools.py @@ -2,13 +2,14 @@ from collections.abc import Iterable from datetime import date, datetime, timedelta from enum import Enum +from typing import Any, TypeVar from django.conf import settings from django.contrib import messages from django.contrib.auth.models import Group from django.core.exceptions import SuspiciousOperation from django.db import transaction -from django.db.models import Count +from django.db.models import Count, Model from django.urls import reverse from django.utils.html import escape, format_html, format_html_join from django.utils.safestring import SafeString @@ -381,3 +382,41 @@ def user_edit_link(user_id): reverse("staff:user_edit", kwargs={"user_id": user_id}), _("edit user"), ) + + +T = TypeVar("T") + + +def update_or_create_with_changes( + model: type[T], + defaults=None, + **kwargs, +) -> tuple[T, bool, dict[str, tuple[Any, Any]]]: + """Do update_or_create and track changed values.""" + + if not defaults: + defaults = {} + + obj, created = model.objects.get_or_create(**kwargs, defaults=defaults) + + if created: + return obj, True, {} + + changes = update_with_changes(obj, defaults) + + return obj, False, changes + + +def update_with_changes(obj: Model, defaults: dict[str, any]) -> dict[str, tuple[Any, Any]]: + """Update a model instance and track changed values.""" + + changes = {} + for key, value in defaults.items(): + if getattr(obj, key) != value: + changes[key] = (getattr(obj, key), value) + setattr(obj, key, value) + + if changes: + obj.save() + + return changes