diff --git a/.github/workflows/test-stack-reusable-workflow.yml b/.github/workflows/test-stack-reusable-workflow.yml
index 29903c17151..9ef64fa69d4 100644
--- a/.github/workflows/test-stack-reusable-workflow.yml
+++ b/.github/workflows/test-stack-reusable-workflow.yml
@@ -107,22 +107,22 @@ jobs:
if: ${{ inputs.skip-pypuppeteer == false }}
run: |
# Playwright via Sockpuppetbrowser fetch
- docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/fetchers/test_content.py'
- docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_errorhandling.py'
- docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/visualselector/test_fetch_data.py'
- docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/fetchers/test_custom_js_before_content.py'
+ docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 --live-server-wait=20 tests/fetchers/test_content.py'
+ docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 --live-server-wait=20 tests/test_errorhandling.py'
+ docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 --live-server-wait=20 tests/visualselector/test_fetch_data.py'
+ docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 --live-server-wait=20 tests/fetchers/test_custom_js_before_content.py'
- name: Pyppeteer and SocketPuppetBrowser - Headers and requests checks
if: ${{ inputs.skip-pypuppeteer == false }}
run: |
# Settings headers playwright tests - Call back in from Sockpuppetbrowser, check headers
- docker run --name "changedet" --hostname changedet --rm -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "FLASK_SERVER_NAME=changedet" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000?dumpio=true" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_request.py'
+ docker run --name "changedet" --hostname changedet --rm -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "FLASK_SERVER_NAME=changedet" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000?dumpio=true" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 --live-server-wait=20 tests/test_request.py'
- name: Pyppeteer and SocketPuppetBrowser - Restock detection
if: ${{ inputs.skip-pypuppeteer == false }}
run: |
# restock detection via playwright - added name=changedet here so that playwright and sockpuppetbrowser can connect to it
- docker run --rm --name "changedet" -e "FLASK_SERVER_NAME=changedet" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-port=5004 --live-server-host=0.0.0.0 tests/restock/test_restock.py'
+ docker run --rm --name "changedet" -e "FLASK_SERVER_NAME=changedet" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-port=5004 --live-server-host=0.0.0.0 --live-server-wait=20 tests/restock/test_restock.py'
# SELENIUM
- name: Specific tests in built container for Selenium
@@ -132,7 +132,7 @@ jobs:
- name: Specific tests in built container for headers and requests checks with Selenium
run: |
- docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "WEBDRIVER_URL=http://selenium:4444/wd/hub" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_request.py'
+ docker run --name "changedet" --hostname changedet --rm -e "FLASK_SERVER_NAME=changedet" -e "WEBDRIVER_URL=http://selenium:4444/wd/hub" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio; pytest --live-server-host=0.0.0.0 --live-server-port=5004 --live-server-wait=20 tests/test_request.py'
# OTHER STUFF
- name: Test SMTP notification mime types
diff --git a/changedetectionio/diff.py b/changedetectionio/diff.py
index 1fa9b60a4ba..cd83e8b43fd 100644
--- a/changedetectionio/diff.py
+++ b/changedetectionio/diff.py
@@ -43,19 +43,19 @@ def customSequenceMatcher(
yield before[alo:ahi]
elif include_removed and tag == 'delete':
if html_colour:
- yield [f'{line}' for line in same_slicer(before, alo, ahi)]
+ yield [f'{line}' for line in same_slicer(before, alo, ahi)]
else:
yield [f"(removed) {line}" for line in same_slicer(before, alo, ahi)] if include_change_type_prefix else same_slicer(before, alo, ahi)
elif include_replaced and tag == 'replace':
if html_colour:
- yield [f'{line}' for line in same_slicer(before, alo, ahi)] + \
- [f'{line}' for line in same_slicer(after, blo, bhi)]
+ yield [f'{line}' for line in same_slicer(before, alo, ahi)] + \
+ [f'{line}' for line in same_slicer(after, blo, bhi)]
else:
yield [f"(changed) {line}" for line in same_slicer(before, alo, ahi)] + \
[f"(into) {line}" for line in same_slicer(after, blo, bhi)] if include_change_type_prefix else same_slicer(before, alo, ahi) + same_slicer(after, blo, bhi)
elif include_added and tag == 'insert':
if html_colour:
- yield [f'{line}' for line in same_slicer(after, blo, bhi)]
+ yield [f'{line}' for line in same_slicer(after, blo, bhi)]
else:
yield [f"(added) {line}" for line in same_slicer(after, blo, bhi)] if include_change_type_prefix else same_slicer(after, blo, bhi)
diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py
index f603e012171..fcf0d00e36a 100644
--- a/changedetectionio/flask_app.py
+++ b/changedetectionio/flask_app.py
@@ -12,6 +12,7 @@
import time
import timeago
+from .html_tools import escape_mixed_content
from .processors import find_processors, get_parent_module, get_custom_watch_obj_for_processor
from .safe_jinja import render as jinja_render
from changedetectionio.strtobool import strtobool
@@ -539,6 +540,9 @@ def ajax_callback_send_notification_test(watch_uuid=None):
import apprise
import random
from .apprise_asset import asset
+ from .notification import default_notification_format
+ from .update_worker import build_notification_object_for_watch
+
apobj = apprise.Apprise(asset=asset)
# so that the custom endpoints are registered
@@ -595,6 +599,8 @@ def ajax_callback_send_notification_test(watch_uuid=None):
# Only use if present, if not set in n_object it should use the default system value
if 'notification_format' in request.form and request.form['notification_format'].strip():
n_object['notification_format'] = request.form.get('notification_format', '').strip()
+ else:
+ n_object['notification_format'] = default_notification_format
if 'notification_title' in request.form and request.form['notification_title'].strip():
n_object['notification_title'] = request.form.get('notification_title', '').strip()
@@ -610,9 +616,14 @@ def ajax_callback_send_notification_test(watch_uuid=None):
else:
n_object['notification_body'] = "Test body"
- n_object['as_async'] = False
- n_object.update(watch.extra_notification_token_values())
+ n_object = build_notification_object_for_watch(watch, n_object, datastore.data['settings']['application'].get('notification_body'))
+
+ if n_object['notification_format'].startswith('HTML'):
+ n_object['notification_body'] = escape_mixed_content(n_object['notification_body'])
+
from .notification import process_notification
+ n_object['as_async'] = False
+ # Now we send the notification_body after everything is compiled
sent_obj = process_notification(n_object, datastore)
except Exception as e:
diff --git a/changedetectionio/html_tools.py b/changedetectionio/html_tools.py
index b710077f1e5..e21158d795c 100644
--- a/changedetectionio/html_tools.py
+++ b/changedetectionio/html_tools.py
@@ -500,3 +500,40 @@ def get_triggered_text(content, trigger_text):
i += 1
return triggered_text
+
+
+from bs4 import BeautifulSoup
+import html
+
+
+def escape_mixed_content(document):
+ import uuid
+ # Parse the document as HTML
+
+ # Generate a single random hash for placeholders
+ random_hash = f"__PLACEHOLDER_{uuid.uuid4().hex}__"
+ placeholder_map = []
+
+ #
to something else so we can preserve them
+ random_hash_br = f"__BR_{uuid.uuid4().hex}__"
+ document = document.replace('
', random_hash_br)
+
+ soup = BeautifulSoup(document, 'html.parser')
+
+ # Find all and
/
+ for tag in soup.find_all("span", class_="cdio"):
+ placeholder_map.append(str(tag)) # Save the tag as a string
+ tag.replace_with(random_hash) # Replace tag with the placeholder
+
+
+
+ # Escape the entire document
+ escaped_html = html.escape(str(soup))
+
+ # Restore all occurrences of placeholders with the original tags
+ for original_tag in placeholder_map:
+ escaped_html = escaped_html.replace(random_hash, original_tag, 1) # Replace one occurrence at a time
+
+ escaped_html = escaped_html.replace( random_hash_br, "
")
+ return escaped_html
+
diff --git a/changedetectionio/notification.py b/changedetectionio/notification.py
index 7eed328d129..8e156c77ac2 100644
--- a/changedetectionio/notification.py
+++ b/changedetectionio/notification.py
@@ -4,6 +4,7 @@
import apprise
from loguru import logger
+from changedetectionio.html_tools import escape_mixed_content
valid_tokens = {
'base_url': '',
@@ -85,6 +86,8 @@ def process_notification(n_object, datastore):
n_body = n_body.replace("\n", '
')
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
+ if n_object['notification_format'].startswith('HTML'):
+ n_body = escape_mixed_content(n_body)
url = url.strip()
if url.startswith('#'):
@@ -161,7 +164,6 @@ def process_notification(n_object, datastore):
attach=n_object.get('screenshot', None)
)
-
# Returns empty string if nothing found, multi-line string otherwise
log_value = logs.getvalue()
diff --git a/changedetectionio/tests/test_notification.py b/changedetectionio/tests/test_notification.py
index a87e1b7e224..37fcdc58aa7 100644
--- a/changedetectionio/tests/test_notification.py
+++ b/changedetectionio/tests/test_notification.py
@@ -454,8 +454,18 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage
assert b"Error: You must have atleast one watch configured for 'test notification' to work" in res.data
+ client.get(
+ url_for("form_delete", uuid="all"),
+ follow_redirects=True
+ )
+
def _test_color_notifications(client, notification_body_token):
+ client.get(
+ url_for("form_delete", uuid="all"),
+ follow_redirects=True
+ )
+
from changedetectionio.diff import ADDED_STYLE, REMOVED_STYLE
set_original_response()
@@ -494,9 +504,9 @@ def _test_color_notifications(client, notification_body_token):
wait_for_all_checks(client)
set_modified_response()
-
-
+
res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
+
assert b'1 watches queued for rechecking.' in res.data
wait_for_all_checks(client)
@@ -504,7 +514,8 @@ def _test_color_notifications(client, notification_body_token):
with open("test-datastore/notification.txt", 'r') as f:
x = f.read()
- assert f'Which is across multiple lines' in x
+ assert f'Which is across multiple lines' in x
+ assert f'
' in x
client.get(
diff --git a/changedetectionio/update_worker.py b/changedetectionio/update_worker.py
index 28647bada95..fc73796527c 100644
--- a/changedetectionio/update_worker.py
+++ b/changedetectionio/update_worker.py
@@ -16,6 +16,77 @@
from loguru import logger
+def build_notification_object_for_watch(watch, n_object, default_app_settings_notification_format):
+ from changedetectionio import diff
+ from changedetectionio.notification import default_notification_format_for_watch
+
+ dates = []
+ trigger_text = ''
+
+ if watch:
+ watch_history = watch.history
+ dates = list(watch_history.keys())
+ trigger_text = watch.get('trigger_text', [])
+
+ # Add text that was triggered
+ if len(dates):
+ snapshot_contents = watch.get_history_snapshot(dates[-1])
+ else:
+ snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
+
+ # If we ended up here with "System default"
+ if n_object.get('notification_format') == default_notification_format_for_watch:
+ n_object['notification_format'] = default_app_settings_notification_format
+
+ html_colour_enable = False
+ # HTML needs linebreak, but MarkDown and Text can use a linefeed
+ if n_object.get('notification_format') == 'HTML':
+ line_feed_sep = "
"
+ # Snapshot will be plaintext on the disk, convert to some kind of HTML
+ snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
+ elif n_object.get('notification_format') == 'HTML Color':
+ line_feed_sep = "
"
+ # Snapshot will be plaintext on the disk, convert to some kind of HTML
+ snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
+ html_colour_enable = True
+ else:
+ line_feed_sep = "\n"
+
+ triggered_text = ''
+ if len(trigger_text):
+ from . import html_tools
+ triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
+ if triggered_text:
+ triggered_text = line_feed_sep.join(triggered_text)
+
+ # Could be called as a 'test notification' with only 1 snapshot available
+ prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
+ current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
+
+ if len(dates) > 1:
+ prev_snapshot = watch.get_history_snapshot(dates[-2])
+ current_snapshot = watch.get_history_snapshot(dates[-1])
+
+ n_object.update({
+ 'current_snapshot': snapshot_contents,
+ 'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
+ 'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
+ 'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep,
+ html_colour=html_colour_enable),
+ 'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
+ 'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
+ 'notification_timestamp': time.time(),
+ 'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
+ 'triggered_text': triggered_text,
+ 'uuid': watch.get('uuid') if watch else None,
+ 'watch_url': watch.get('url') if watch else None,
+ })
+
+ if watch:
+ n_object.update(watch.extra_notification_token_values())
+
+ return n_object
+
class update_worker(threading.Thread):
current_uuid = None
@@ -27,75 +98,8 @@ def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
super().__init__(*args, **kwargs)
def queue_notification_for_watch(self, notification_q, n_object, watch):
- from changedetectionio import diff
- from changedetectionio.notification import default_notification_format_for_watch
-
- dates = []
- trigger_text = ''
-
now = time.time()
-
- if watch:
- watch_history = watch.history
- dates = list(watch_history.keys())
- trigger_text = watch.get('trigger_text', [])
-
- # Add text that was triggered
- if len(dates):
- snapshot_contents = watch.get_history_snapshot(dates[-1])
- else:
- snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
-
- # If we ended up here with "System default"
- if n_object.get('notification_format') == default_notification_format_for_watch:
- n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
-
- html_colour_enable = False
- # HTML needs linebreak, but MarkDown and Text can use a linefeed
- if n_object.get('notification_format') == 'HTML':
- line_feed_sep = "
"
- # Snapshot will be plaintext on the disk, convert to some kind of HTML
- snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
- elif n_object.get('notification_format') == 'HTML Color':
- line_feed_sep = "
"
- # Snapshot will be plaintext on the disk, convert to some kind of HTML
- snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
- html_colour_enable = True
- else:
- line_feed_sep = "\n"
-
- triggered_text = ''
- if len(trigger_text):
- from . import html_tools
- triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
- if triggered_text:
- triggered_text = line_feed_sep.join(triggered_text)
-
- # Could be called as a 'test notification' with only 1 snapshot available
- prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
- current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
-
- if len(dates) > 1:
- prev_snapshot = watch.get_history_snapshot(dates[-2])
- current_snapshot = watch.get_history_snapshot(dates[-1])
-
- n_object.update({
- 'current_snapshot': snapshot_contents,
- 'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
- 'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
- 'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
- 'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
- 'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
- 'notification_timestamp': now,
- 'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
- 'triggered_text': triggered_text,
- 'uuid': watch.get('uuid') if watch else None,
- 'watch_url': watch.get('url') if watch else None,
- })
-
- if watch:
- n_object.update(watch.extra_notification_token_values())
-
+ n_object = build_notification_object_for_watch(watch, n_object, self.datastore.data['settings']['application'].get('notification_format'))
logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s")
logger.debug("Queued notification for sending")
notification_q.put(n_object)