Skip to content

Commit

Permalink
[change] Renamed celery tasks with leading underscore #569
Browse files Browse the repository at this point in the history
Closes #569
  • Loading branch information
pandafy committed Apr 4, 2024
1 parent 7bc8cb0 commit cd21359
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
4 changes: 2 additions & 2 deletions openwisp_monitoring/device/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from ...db import device_data_query, timeseries_db
from ...monitoring.signals import threshold_crossed
from ...monitoring.tasks import timeseries_write
from ...monitoring.tasks import _timeseries_write
from ...settings import CACHE_TIMEOUT
from .. import settings as app_settings
from .. import tasks
Expand Down Expand Up @@ -275,7 +275,7 @@ def save_data(self, time=None):
self._transform_data()
time = time or now()
options = dict(tags={'pk': self.pk}, timestamp=time, retention_policy=SHORT_RP)
timeseries_write(name=self.__key, values={'data': self.json()}, **options)
_timeseries_write(name=self.__key, values={'data': self.json()}, **options)
cache_key = get_device_cache_key(device=self, context='current-data')
# cache current data to allow getting it without querying the timeseries DB
cache.set(
Expand Down
6 changes: 3 additions & 3 deletions openwisp_monitoring/monitoring/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)
from ..exceptions import InvalidChartConfigException, InvalidMetricConfigException
from ..signals import pre_metric_write, threshold_crossed
from ..tasks import delete_timeseries, timeseries_batch_write, timeseries_write
from ..tasks import _timeseries_batch_write, _timeseries_write, delete_timeseries

User = get_user_model()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -457,7 +457,7 @@ def write(
{'value': extra_values[self.alert_field]}
)
if write:
timeseries_write(name=self.key, values=values, **options)
_timeseries_write(name=self.key, values=values, **options)
return {'name': self.key, 'values': values, **options}

@classmethod
Expand All @@ -469,7 +469,7 @@ def batch_write(cls, raw_data):
write_data.append(metric.write(**kwargs, write=False))
except ValueError as error:
error_dict[metric.key] = str(error)
timeseries_batch_write(write_data)
_timeseries_batch_write(write_data)
if error_dict:
raise ValueError(error_dict)

Expand Down
16 changes: 8 additions & 8 deletions openwisp_monitoring/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _metric_post_write(name, values, metric, check_threshold_kwargs, **kwargs):
autoretry_for=(TimeseriesWriteException,),
**RETRY_OPTIONS
)
def _timeseries_write(
def timeseries_write(
self, name, values, metric=None, check_threshold_kwargs=None, **kwargs
):
"""
Expand All @@ -49,15 +49,15 @@ def _timeseries_write(
_metric_post_write(name, values, metric, check_threshold_kwargs, **kwargs)


def timeseries_write(name, values, metric=None, check_threshold_kwargs=None, **kwargs):
def _timeseries_write(name, values, metric=None, check_threshold_kwargs=None, **kwargs):
"""
If the timeseries database is using UDP to write data,
then write data synchronously.
"""
if timeseries_db.use_udp:
func = _timeseries_write
func = timeseries_write
else:
func = _timeseries_write.delay
func = timeseries_write.delay
metric = metric.pk if metric else None
func(
name=name,
Expand All @@ -74,7 +74,7 @@ def timeseries_write(name, values, metric=None, check_threshold_kwargs=None, **k
autoretry_for=(TimeseriesWriteException,),
**RETRY_OPTIONS
)
def _timeseries_batch_write(self, data):
def timeseries_batch_write(self, data):
"""
Similar to timeseries_write function above, but operates on
list of metric data (batch operation)
Expand All @@ -84,17 +84,17 @@ def _timeseries_batch_write(self, data):
_metric_post_write(**metric_data)


def timeseries_batch_write(data):
def _timeseries_batch_write(data):
"""
If the timeseries database is using UDP to write data,
then write data synchronously.
"""
if timeseries_db.use_udp:
_timeseries_batch_write(data=data)
timeseries_batch_write(data=data)
else:
for item in data:
item['metric'] = item['metric'].pk
_timeseries_batch_write.delay(data=data)
timeseries_batch_write.delay(data=data)


@shared_task(base=OpenwispCeleryTask)
Expand Down

0 comments on commit cd21359

Please sign in to comment.