Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re #2866 - Make sure any HTML type notifications have their content escaped - except for our added/remove/changed markup #2893

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions changedetectionio/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ def customSequenceMatcher(
yield before[alo:ahi]
elif include_removed and tag == 'delete':
if html_colour:
yield [f'<span style="{REMOVED_STYLE}">{line}</span>' for line in same_slicer(before, alo, ahi)]
yield [f'<span class="cdio" style="{REMOVED_STYLE}">{line}</span>' for line in same_slicer(before, alo, ahi)]
else:
yield [f"(removed) {line}" for line in same_slicer(before, alo, ahi)] if include_change_type_prefix else same_slicer(before, alo, ahi)
elif include_replaced and tag == 'replace':
if html_colour:
yield [f'<span style="{REMOVED_STYLE}">{line}</span>' for line in same_slicer(before, alo, ahi)] + \
[f'<span style="{ADDED_STYLE}">{line}</span>' for line in same_slicer(after, blo, bhi)]
yield [f'<span class="cdio" style="{REMOVED_STYLE}">{line}</span>' for line in same_slicer(before, alo, ahi)] + \
[f'<span class="cdio" style="{ADDED_STYLE}">{line}</span>' for line in same_slicer(after, blo, bhi)]
else:
yield [f"(changed) {line}" for line in same_slicer(before, alo, ahi)] + \
[f"(into) {line}" for line in same_slicer(after, blo, bhi)] if include_change_type_prefix else same_slicer(before, alo, ahi) + same_slicer(after, blo, bhi)
elif include_added and tag == 'insert':
if html_colour:
yield [f'<span style="{ADDED_STYLE}">{line}</span>' for line in same_slicer(after, blo, bhi)]
yield [f'<span class="cdio" style="{ADDED_STYLE}">{line}</span>' for line in same_slicer(after, blo, bhi)]
else:
yield [f"(added) {line}" for line in same_slicer(after, blo, bhi)] if include_change_type_prefix else same_slicer(after, blo, bhi)

Expand Down
11 changes: 9 additions & 2 deletions changedetectionio/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import timeago

from .html_tools import escape_mixed_content
from .processors import find_processors, get_parent_module, get_custom_watch_obj_for_processor
from .safe_jinja import render as jinja_render
from changedetectionio.strtobool import strtobool
Expand Down Expand Up @@ -539,6 +540,7 @@ def ajax_callback_send_notification_test(watch_uuid=None):
import apprise
import random
from .apprise_asset import asset
from .update_worker import build_notification_object_for_watch
apobj = apprise.Apprise(asset=asset)

# so that the custom endpoints are registered
Expand Down Expand Up @@ -610,9 +612,14 @@ def ajax_callback_send_notification_test(watch_uuid=None):
else:
n_object['notification_body'] = "Test body"

n_object['as_async'] = False
n_object.update(watch.extra_notification_token_values())
n_object = build_notification_object_for_watch(watch, n_object, datastore.data['settings']['application'].get('notification_body'))

if n_object['notification_format'].startswith('HTML'):
n_object['notification_body'] = escape_mixed_content(n_object['notification_body'])

from .notification import process_notification
n_object['as_async'] = False
# Now we send the notification_body after everything is compiled
sent_obj = process_notification(n_object, datastore)

except Exception as e:
Expand Down
37 changes: 37 additions & 0 deletions changedetectionio/html_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,3 +500,40 @@ def get_triggered_text(content, trigger_text):
i += 1

return triggered_text


from bs4 import BeautifulSoup
import html


def escape_mixed_content(document):
import uuid
# Parse the document as HTML

# Generate a single random hash for placeholders
random_hash = f"__PLACEHOLDER_{uuid.uuid4().hex}__"
placeholder_map = []

# <br> to something else so we can preserve them
random_hash_br = f"__BR_{uuid.uuid4().hex}__"
document = document.replace('<br>', random_hash_br)

soup = BeautifulSoup(document, 'html.parser')

# Find all <span class="cdio"> and <br>/<br/>
for tag in soup.find_all("span", class_="cdio"):
placeholder_map.append(str(tag)) # Save the tag as a string
tag.replace_with(random_hash) # Replace tag with the placeholder



# Escape the entire document
escaped_html = html.escape(str(soup))

# Restore all occurrences of placeholders with the original tags
for original_tag in placeholder_map:
escaped_html = escaped_html.replace(random_hash, original_tag, 1) # Replace one occurrence at a time

escaped_html = escaped_html.replace( random_hash_br, "<br>")
return escaped_html

4 changes: 3 additions & 1 deletion changedetectionio/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import apprise
from loguru import logger

from changedetectionio.html_tools import escape_mixed_content

valid_tokens = {
'base_url': '',
Expand Down Expand Up @@ -85,6 +86,8 @@ def process_notification(n_object, datastore):
n_body = n_body.replace("\n", '<br>')

n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
if n_object['notification_format'].startswith('HTML'):
n_body = escape_mixed_content(n_body)

url = url.strip()
if url.startswith('#'):
Expand Down Expand Up @@ -161,7 +164,6 @@ def process_notification(n_object, datastore):
attach=n_object.get('screenshot', None)
)


# Returns empty string if nothing found, multi-line string otherwise
log_value = logs.getvalue()

Expand Down
3 changes: 2 additions & 1 deletion changedetectionio/tests/test_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ def _test_color_notifications(client, notification_body_token):

with open("test-datastore/notification.txt", 'r') as f:
x = f.read()
assert f'<span style="{REMOVED_STYLE}">Which is across multiple lines' in x
assert f'<span class="cdio" style="{REMOVED_STYLE}">Which is across multiple lines' in x
assert f'<br>' in x


client.get(
Expand Down
140 changes: 72 additions & 68 deletions changedetectionio/update_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,77 @@

from loguru import logger

def build_notification_object_for_watch(watch, n_object, default_app_settings_notification_format):
from changedetectionio import diff
from changedetectionio.notification import default_notification_format_for_watch

dates = []
trigger_text = ''

if watch:
watch_history = watch.history
dates = list(watch_history.keys())
trigger_text = watch.get('trigger_text', [])

# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(dates[-1])
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."

# If we ended up here with "System default"
if n_object.get('notification_format') == default_notification_format_for_watch:
n_object['notification_format'] = default_app_settings_notification_format

html_colour_enable = False
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object.get('notification_format') == 'HTML':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
elif n_object.get('notification_format') == 'HTML Color':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
html_colour_enable = True
else:
line_feed_sep = "\n"

triggered_text = ''
if len(trigger_text):
from . import html_tools
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
if triggered_text:
triggered_text = line_feed_sep.join(triggered_text)

# Could be called as a 'test notification' with only 1 snapshot available
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"

if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(dates[-2])
current_snapshot = watch.get_history_snapshot(dates[-1])

n_object.update({
'current_snapshot': snapshot_contents,
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep,
html_colour=html_colour_enable),
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
'notification_timestamp': time.time(),
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
'triggered_text': triggered_text,
'uuid': watch.get('uuid') if watch else None,
'watch_url': watch.get('url') if watch else None,
})

if watch:
n_object.update(watch.extra_notification_token_values())

return n_object

class update_worker(threading.Thread):
current_uuid = None

Expand All @@ -27,75 +98,8 @@ def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
super().__init__(*args, **kwargs)

def queue_notification_for_watch(self, notification_q, n_object, watch):
from changedetectionio import diff
from changedetectionio.notification import default_notification_format_for_watch

dates = []
trigger_text = ''

now = time.time()

if watch:
watch_history = watch.history
dates = list(watch_history.keys())
trigger_text = watch.get('trigger_text', [])

# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(dates[-1])
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."

# If we ended up here with "System default"
if n_object.get('notification_format') == default_notification_format_for_watch:
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')

html_colour_enable = False
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object.get('notification_format') == 'HTML':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
elif n_object.get('notification_format') == 'HTML Color':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
html_colour_enable = True
else:
line_feed_sep = "\n"

triggered_text = ''
if len(trigger_text):
from . import html_tools
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
if triggered_text:
triggered_text = line_feed_sep.join(triggered_text)

# Could be called as a 'test notification' with only 1 snapshot available
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"

if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(dates[-2])
current_snapshot = watch.get_history_snapshot(dates[-1])

n_object.update({
'current_snapshot': snapshot_contents,
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
'notification_timestamp': now,
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
'triggered_text': triggered_text,
'uuid': watch.get('uuid') if watch else None,
'watch_url': watch.get('url') if watch else None,
})

if watch:
n_object.update(watch.extra_notification_token_values())

n_object = build_notification_object_for_watch(watch, n_object, self.datastore.data['settings']['application'].get('notification_format'))
logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s")
logger.debug("Queued notification for sending")
notification_q.put(n_object)
Expand Down
Loading