Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: mozilla-services/autopush-rs
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 1.74.2
Choose a base ref
...
head repository: mozilla-services/autopush-rs
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref

Commits on Mar 20, 2025

  1. chore: tag 1.74.2 (#864)

    #### Chore
    
    * tag 1.74.1 (#862)
    ([d07ee88](d07ee88))
    
    #### Bug Fixes
    
    * compare tracking keys in raw form. (#863)
    ([49a466b](49a466b))
    * Autoendpoint should be less strict about config options (#859)
    ([8598172](8598172))
    jrconlin authored Mar 20, 2025
    Copy the full SHA
    cc0d8ee View commit details

Commits on Mar 21, 2025

  1. bug: Add diagnostic for stage (#865)

    This is a temporary PR that should not go to production. This PR adds
    several diagnostics for issues that we're seeing on the stage server
    which include:
    
    1) Valid URL encoded tracking keys are being rejected by the base64
       decoder
    2) Trackable requests coming from FxA do not appear to be detected
    
    This adds some quick diagnostic code which will allow dev to determine
    the potential discrepencies.
    jrconlin authored Mar 21, 2025
    Copy the full SHA
    470ea1c View commit details
  2. chore: tag 1.74.3 (#866)

    #### Chore
    
    * tag 1.74.2 (#864)
    ([cc0d8ee](cc0d8ee))
    
    #### Bug Fixes
    
    * Add diagnostic for stage (#865)
    ([470ea1c](470ea1c))
    jrconlin authored Mar 21, 2025
    Copy the full SHA
    d22de1d View commit details

Commits on Mar 24, 2025

  1. Copy the full SHA
    e4c153e View commit details

Commits on Mar 28, 2025

  1. chore: Version update for March 2025 (#892)

    **Note** I am not yet updating the edition. I am not going to poke that
    dragon for a bit.
    jrconlin authored Mar 28, 2025
    Copy the full SHA
    9380c22 View commit details

Commits on Apr 9, 2025

  1. bug: Post release fixes for Reliability (#893)

    Address a number of post landing concerns:
    
    * Fix the `garbage_collect` method to use the correct Redis table
    * Have redis connection report to error logging (these are not service
    critical errors, since they relate to reporting)
    * Make some of the logging messages more consistent and easier to read.
    jrconlin authored Apr 9, 2025
    Copy the full SHA
    a9a181e View commit details

Commits on Apr 23, 2025

  1. feat: Switch to using async/pooled Redis (#896)

    A _lot_ of the files touched here basically make calls that were
    previously sync now async, and the ripple effect of that.
    
    Closes: [SYNC-4706](https://mozilla-hub.atlassian.net/browse/SYNC-4706)
    jrconlin authored Apr 23, 2025
    Copy the full SHA
    bb492e7 View commit details

Commits on Apr 25, 2025

  1. feat: normalize durations and timestamps (#837)

    feat: normalize durations and timestamps
    taddes authored Apr 25, 2025
    Copy the full SHA
    7018f25 View commit details

Commits on May 8, 2025

  1. Copy the full SHA
    6b4e83c View commit details

Commits on May 9, 2025

  1. code reviews

    gruberb committed May 9, 2025
    Copy the full SHA
    56c48ea View commit details
  2. Formatting Cargo.toml

    gruberb committed May 9, 2025
    Copy the full SHA
    a35b68a View commit details

Commits on May 12, 2025

  1. feat: Remove magic strings and add enum variants instead (#903)

    Closes: #897
    
    - Added a `BroadcastErrorType` enum
    - Added a `MessageType` enum with a conversion from
    `ClientMessage`/`ServerMessage` to `MessageType`
    - Added a `MetricName` enum
    - To work easier with Metrics, I added an extension trait in
    `autopush-common/metrics.rs`, with methods `inc`, `inc_raw`,
    `inc_with_tags`. The `inc` and `inc_with_tags` methods accept a
    `MetricName`
    - This replaces the `cadence::CountedExt` trait with our own
    `StatsdClientExt` trait with almost the same methods.
    
    Since it replaces a lot of text, and it's my first PR, I might have
    missed something. But the local tests run through.
    gruberb authored May 12, 2025
    Copy the full SHA
    537513a View commit details

Commits on May 14, 2025

  1. Copy the full SHA
    96cc713 View commit details

Commits on May 16, 2025

  1. chore: remove unused dependencies (#922)

    Closes: #921
    gruberb authored May 16, 2025
    Copy the full SHA
    1c47ad6 View commit details

Commits on May 23, 2025

  1. feat: Clean up terminal states (#899)

    Address potential problem where final message states may grow without
    bounds.
    This also updates the `reliability_report.py` to write to a Google
    Storage Bucket.
    
    
    Closes: #PUSH-12 / #895
    jrconlin authored May 23, 2025
    Copy the full SHA
    791e875 View commit details

Commits on May 27, 2025

  1. chore: tag 1.75.0 (#973)

    #### Features
    
    * Clean up terminal states (#899)
    ([791e875](791e875))
    * Remove magic strings and add enum variants instead (#903)
    ([537513a](537513a))
    * Remove magic strings and add enum variants instead
    ([6b4e83c](6b4e83c))
    * normalize durations and timestamps (#837)
    ([7018f25](7018f25))
    * Switch to using async/pooled Redis (#896)
    ([bb492e7](bb492e7))
    * remove stage key debugging code (#867)
    ([e4c153e](e4c153e))
    
    #### Bug Fixes
    
    * sentry middleware should use tags w/ metrics (#901)
    ([96cc713](96cc713))
    * Post release fixes for Reliability (#893)
    ([a9a181e](a9a181e))
    
    #### Chore
    
    * remove unused dependencies (#922)
    ([1c47ad6](1c47ad6))
    * Version update for March 2025 (#892)
    ([9380c22](9380c22))
    * tag 1.74.3 (#866)
    ([d22de1d](d22de1d))
    jrconlin authored May 27, 2025
    Copy the full SHA
    4c77b14 View commit details
  2. bug: Include image deploy script in CI (#974)

    Update several dependencies
    jrconlin authored May 27, 2025
    Copy the full SHA
    1d5d0b0 View commit details

Commits on May 28, 2025

  1. bug: Fix typo in circleci config (#976)

    autocomplete is not smart
    jrconlin authored May 28, 2025
    Copy the full SHA
    4d259b4 View commit details

Commits on May 29, 2025

  1. chore: tag 1.75.2 (#977)

    ### Bug Fixes
    
    *    Fix typo in circleci/config.yml (#976)
    *    Include image deploy script in CI (#974)
    jrconlin authored May 29, 2025
    Copy the full SHA
    9dc9d77 View commit details

Commits on Jun 4, 2025

  1. bug: Use full paths in compiled docker image (#981)

    Deployment is reporting that it can't find the executable, so specifying
    the full paths to all elements.
    jrconlin authored Jun 4, 2025
    Copy the full SHA
    1104856 View commit details

Commits on Jun 5, 2025

  1. chore: tag 1.75.3 (#982)

    #### Chore
    
    * tag 1.75.2 (#977)
    ([9dc9d77](9dc9d77))
    
    #### Bug Fixes
    
    * Use full paths in compiled docker image (#981)
    ([1104856](1104856))
    * Fix typo in circleci config (#976)
    ([4d259b4](4d259b4))
    jrconlin authored Jun 5, 2025
    Copy the full SHA
    4175c21 View commit details

Commits on Jun 8, 2025

  1. bug: Do not set the `GOOGLE_APPLICATION_CREDENTIALS in the reliabilit…

    …y docker image (#984)
    
    I'm getting a credential error with this, because I believe that the
    credentials are not being found.
    Going to remove this to see if there's a default set that is passed to
    the docker image.
    jrconlin authored Jun 8, 2025
    Copy the full SHA
    20cf810 View commit details

Commits on Jun 10, 2025

  1. bug: Fix the report args. (#987)

    Due to a few last minute recommendations, the args fell out of sync.
    
    Taking the opportunity to fix up some of the arguments to make better
    lexicographic sense.
    jrconlin authored Jun 10, 2025
    Copy the full SHA
    393207b View commit details

Commits on Jun 11, 2025

  1. bug: address Python3.12 issues (#989)

    * pyredis does not understand special `-1` values, use literals
    * add handlers for bucket create/access
    * fix zrange score/value switch
    
    note: got the bastion server access running properly, so able to test
    this against stage rather than locally, thus some of the fixes.
    
    The bastion doesn't have bucket access, so there may still need to be
    some fixes for the report writer.
    jrconlin authored Jun 11, 2025
    Copy the full SHA
    4e1e708 View commit details

Commits on Jun 16, 2025

  1. feat: Improve endpoint reliability check (#991)

    * Add some hinting to `health` to report vapid key signatures (to ensure
    that key values are propagating)
    * Add db check to autoendpoint `health` check
    * Try fix for off counts in `internal_record`
       * wrap action in a transaction (with retries), 
       * Add missing old state removal (not sure when/how that got dropped)
       * Add unit test for `internal_record`
    * Add `LOCK_` prefix for redis lock record (because otherwise it's
    confusing)
    
    ---------
    
    Co-authored-by: Philip Jenvey <pjenvey@underboss.org>
    jrconlin and pjenvey authored Jun 16, 2025
    Copy the full SHA
    e658a2f View commit details

Commits on Jun 18, 2025

  1. chore: tag 1.75.8 (#995)

    **Note:** 1.75.8 is an administrative tag due to outside process
    errors with the 1.75.7 release. Complex systems are complex, and lots
    of moving parts means lots of things can break in weird ways.
    Sometimes it's just better to start over.
    
    #### Chore
    
    *   tag 1.75.3 (#982)
    
    #### Bug Fixes
    
    * bug: address Python3.12 issues (#989)
    ([4e1e708](4e1e708))
    * bug: Fix the report args. (#987)
    ([393207b](393207b))
    * bug: Do not set the `GOOGLE_APPLICATION_CREDENTIALS in the reliability
    docker image (#984)
    ([20cf810](20cf810))
    
    #### Features
    
    * feat: Improve endpoint reliability check (#991)
    ([e658a2f](e658a2f))
    
    **note: 1.75.4 - 1.75.6 were not released. This version contains the
    cumulative fixes.**
    jrconlin authored Jun 18, 2025
    Copy the full SHA
    893733b View commit details

Commits on Jun 26, 2025

  1. chore: tag 1.75.9 (#998)

    #### Chore
    
    * tag 1.75.8 (#995)
    ([893733b](893733b))
    
    #### Bug Fixes
    
    * Address potential double counting reliability (#997)
    ([854071e](854071e))
    jrconlin authored Jun 26, 2025
    Copy the full SHA
    dc141dd View commit details
  2. bug: Fix order of expiry key by using a struct. (#999)

    The expiry key order was flipped, causing the counts table to be flooded
    by incorrect decremental values.
    
    (sigh)
    jrconlin authored Jun 26, 2025
    Copy the full SHA
    7dd4c44 View commit details

Commits on Jun 27, 2025

  1. chore: tag 1.75.10 (#1000)

    #### Bug Fixes
    
    * Fix order of expiry key by using a struct. (#999)
    ([7dd4c44](7dd4c44))
    
    #### Chore
    
    * tag 1.75.9 (#998)
    ([dc141dd](dc141dd))
    jrconlin authored Jun 27, 2025
    Copy the full SHA
    fe12e71 View commit details

Commits on Jun 30, 2025

  1. Copy the full SHA
    fac8cfa View commit details
  2. docs: mdbook-mermaid now needed (#1002)

    and upgrade mdbook tools
    
    Issue PUSH-536
    pjenvey authored Jun 30, 2025
    Copy the full SHA
    3a9c997 View commit details

Commits on Jul 1, 2025

  1. Copy the full SHA
    ac76ab2 View commit details
  2. chore: Clippy updates for 1.88.0 (#1004)

    clippy now complains about non-inline variables in `format!()` macros
    
    Co-authored-by: Philip Jenvey <pjenvey@underboss.org>
    jrconlin and pjenvey authored Jul 1, 2025
    Copy the full SHA
    05037ad View commit details

Commits on Jul 17, 2025

  1. Copy the full SHA
    31f21a9 View commit details
  2. bug: Skip previously deleted records (#1012)

    This patch shuffles some internal actions around to ensure that the bulk
    of operations succeed, even if there are internal failures.
    
    I do not expect this to fully resolve PUSH-541, but I cannot determine
    any additional errors until this issue is resolved.
    
    Issue: PUSH-541
    jrconlin authored Jul 17, 2025
    Copy the full SHA
    43c3ad7 View commit details

Commits on Jul 24, 2025

  1. bug: Store message state in Redis to solve for some reliability state…

    … management issues (#1005)
    
    Move the individual message state to Redis. This is to attempt to stop
    some reliability counts from trending negative. This is after the code
    paths were scrutinized to ensure that the struct cloning was minimized,
    and some states (notably, ones where there was a hand-off between active
    services) continued to trend negative.
    
    Unfortunately, the currently available version of Redis is 7.2, so many
    of the auto-expiring records (like
    [HSETEX](https://redis.io/docs/latest/commands/hsetex/) are not present,
    meaning that we'll have to use more keys, and use traditional `SET` with
    an `EX`piration set. Fortunately, that should not be a problem, since
    there appears to be plenty of available memory and CPU on the active
    Redis servers.
    
    The new approach _only_ updates counts if the prior state matches what
    has been last recorded.
    
    Closes: PUSH-546
    jrconlin authored Jul 24, 2025
    Copy the full SHA
    204163f View commit details
  2. chore: tag 1.76.0 (#1015)

    #### Bug Fixes
    
    * Store message state in Redis to solve for some reliability state
    management issues (#1005)
    ([204163f](204163f))
    * Skip previously deleted records (#1012)
    ([43c3ad7](43c3ad7))
    
    #### Features
    
    * Add VAPID header to load testing (#992)
    ([31f21a9](31f21a9))
    
    #### Chore
    
    * Clippy updates for 1.88.0 (#1004)
    ([05037ad](05037ad))
    * tag 1.75.10 (#1000)
    ([fe12e71](fe12e71))
    
    #### Doc
    
    * install mdbook-mermaid js assets on build (#1003)
    ([ac76ab2](ac76ab2))
    * mdbook-mermaid now needed (#1002)
    ([3a9c997](3a9c997))
    * diagram the reliability state transitions (#1001)
    ([fac8cfa](fac8cfa))
    jrconlin authored Jul 24, 2025
    Copy the full SHA
    93ccd4d View commit details

Commits on Aug 5, 2025

  1. bug: restore state_key generation outside of pipe (#1016)

    Also added some additional trace statements to dump out the Redis
    commands to aid in debugging
    
    Closes [PUSH-564](https://mozilla-hub.atlassian.net/browse/PUSH-564)
    jrconlin authored Aug 5, 2025
    Copy the full SHA
    f640074 View commit details

Commits on Aug 7, 2025

  1. bug: Use new state if error in record_reliability (#1018)

    This includes some fix-ups for rust 1.89.0
    
    Closes: PUSH-567
    jrconlin authored Aug 7, 2025
    Copy the full SHA
    3157fdd View commit details
  2. chore: tag 1.77.1 (#1019)

    #### Bug Fixes
    
    * Use new state if error in record_reliability (#1018)
    ([3157fdd](3157fdd))
    jrconlin authored Aug 7, 2025
    Copy the full SHA
    236244e View commit details

Commits on Aug 12, 2025

  1. fix: filter non-state entries from report (#1022)

    Due to a previous bug, invalid states were being recorded into redis.
    This was causing too many metrics being created in prometheus. Add a
    filter to prevent those invalid states from being reported.
    
    In addition, autoendpoint would not properly start up due to timeouts
    possibly related to frequent retries for reliability data. Instead of
    retrying, just fail the operation and report the state transition to the
    logs for further analysis.
    
    Closes [PUSH-570](https://mozilla-hub.atlassian.net/browse/PUSH-570)
    
    [PUSH-570]:
    https://mozilla-hub.atlassian.net/browse/PUSH-570?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
    jrconlin authored Aug 12, 2025
    Copy the full SHA
    3b014e4 View commit details
  2. chore: tag 1.77.2 (#1023)

    #### Chore
    
    * tag 1.77.1 (#1019)
    ([236244e](236244e))
    
    #### Bug Fixes
    
    * filter non-state entries from report (#1022)
    ([3b014e4](3b014e4))
    jrconlin authored Aug 12, 2025
    Copy the full SHA
    796674d View commit details

Commits on Aug 19, 2025

  1. Copy the full SHA
    5f48b4f View commit details
Showing with 4,090 additions and 1,883 deletions.
  1. +66 −3 .circleci/config.yml
  2. +4 −9 .github/workflows/publish_docs.yml
  3. +2 −0 .prettierignore
  4. +198 −0 CHANGELOG.md
  5. +980 −793 Cargo.lock
  6. +44 −39 Cargo.toml
  7. +1 −1 Dockerfile
  8. +4 −33 autoconnect/Cargo.toml
  9. +3 −2 autoconnect/autoconnect-common/Cargo.toml
  10. +14 −3 autoconnect/autoconnect-common/src/broadcast.rs
  11. +7 −3 autoconnect/autoconnect-common/src/megaphone.rs
  12. +64 −3 autoconnect/autoconnect-common/src/protocol.rs
  13. +4 −0 autoconnect/autoconnect-common/src/registry.rs
  14. +19 −4 autoconnect/autoconnect-common/src/test_support.rs
  15. +3 −0 autoconnect/autoconnect-settings/Cargo.toml
  16. +9 −3 autoconnect/autoconnect-settings/src/app_state.rs
  17. +10 −6 autoconnect/autoconnect-settings/src/lib.rs
  18. +1 −8 autoconnect/autoconnect-web/Cargo.toml
  19. +23 −5 autoconnect/autoconnect-web/src/dockerflow.rs
  20. +3 −1 autoconnect/autoconnect-web/src/routes.rs
  21. +23 −20 autoconnect/autoconnect-web/src/test.rs
  22. +3 −0 autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
  23. +36 −0 autoconnect/autoconnect-ws/autoconnect-ws-sm/src/error.rs
  24. +37 −24 autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs
  25. +14 −8 autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_server_notif.rs
  26. +39 −26 autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs
  27. +6 −7 autoconnect/autoconnect-ws/src/test.rs
  28. +13 −27 autoendpoint/Cargo.toml
  29. +5 −2 autoendpoint/src/extractors/notification.rs
  30. +9 −3 autoendpoint/src/extractors/notification_headers.rs
  31. +19 −14 autoendpoint/src/extractors/subscription.rs
  32. +5 −3 autoendpoint/src/extractors/user.rs
  33. +4 −2 autoendpoint/src/headers/vapid.rs
  34. +7 −10 autoendpoint/src/metrics.rs
  35. +7 −3 autoendpoint/src/routers/apns/router.rs
  36. +9 −7 autoendpoint/src/routers/common.rs
  37. +1 −1 autoendpoint/src/routers/fcm/client.rs
  38. +8 −4 autoendpoint/src/routers/fcm/router.rs
  39. +15 −10 autoendpoint/src/routers/webpush.rs
  40. +57 −8 autoendpoint/src/routes/health.rs
  41. +16 −7 autoendpoint/src/routes/registration.rs
  42. +8 −2 autoendpoint/src/server.rs
  43. +36 −3 autoendpoint/src/settings.rs
  44. +12 −9 autopush-common/Cargo.toml
  45. +22 −16 autopush-common/src/db/bigtable/bigtable_client/mod.rs
  46. +1 −1 autopush-common/src/db/bigtable/mod.rs
  47. +7 −13 autopush-common/src/db/bigtable/pool.rs
  48. +1 −1 autopush-common/src/db/mod.rs
  49. +2 −2 autopush-common/src/db/models.rs
  50. +2 −0 autopush-common/src/db/routing.rs
  51. +2 −2 autopush-common/src/endpoint.rs
  52. +40 −31 autopush-common/src/lib.rs
  53. +2 −2 autopush-common/src/logging.rs
  54. +169 −0 autopush-common/src/metric_name.rs
  55. +32 −3 autopush-common/src/metrics.rs
  56. +20 −8 autopush-common/src/middleware/sentry.rs
  57. +12 −1 autopush-common/src/notification.rs
  58. +44 −0 autopush-common/src/redis_util.rs
  59. +819 −103 autopush-common/src/reliability.rs
  60. +1 −1 autopush-common/src/sentry.rs
  61. +0 −2 autopush-common/src/util/mod.rs
  62. +1 −2 autopush-common/src/util/timing.rs
  63. +6 −0 docs/book.toml
  64. +3 −0 docs/make_book.sh
  65. +1 −1 docs/notes.md
  66. +1 −0 docs/src/SUMMARY.md
  67. +2 −0 docs/src/index.md
  68. +1 −1 docs/src/install.md
  69. +40 −2 docs/src/reliability.md
  70. +19 −0 scripts/reliability/Dockerfile
  71. +12 −0 scripts/reliability/README.md
  72. +26 −0 scripts/reliability/app.yaml
  73. +22 −0 scripts/reliability/pyproject.toml
  74. +661 −0 scripts/reliability/reliability_report.py
  75. 0 scripts/{ → reliability}/templates/reliable_report_template.md
  76. +0 −258 scripts/reliability_cron.py
  77. +0 −267 scripts/reliability_report.py
  78. +2 −1 tests/integration/Dockerfile
  79. +5 −2 tests/load/Dockerfile
  80. +18 −17 tests/load/README.md
  81. +0 −1 tests/load/docker-compose.yml
  82. +27 −0 tests/load/keys/README.md
  83. +5 −0 tests/load/keys/private_key.pem
  84. +4 −0 tests/load/keys/public_key.pem
  85. +1 −0 tests/load/keys/public_key.x962
  86. +2 −0 tests/load/kubernetes-config/locust-worker-controller.yml
  87. +45 −1 tests/load/locustfiles/locustfile.py
  88. +1 −0 tests/load/setup_k8s.sh
  89. +160 −28 tests/poetry.lock
  90. +1 −0 tests/pyproject.toml
69 changes: 66 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ executors:
audit-executor:
docker:
# NOTE: update version for all # RUST_VER
- image: rust:1.85
- image: rust:1.86
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
@@ -71,6 +71,12 @@ executors:
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
build-reliability-cron:
docker:
- image: docker:18.03.0-ce
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
build-test-container-executor:
docker:
- image: cimg/base:2025.02
@@ -108,7 +114,7 @@ commands:
apt install build-essential curl libstdc++6 libstdc++-12-dev clang libssl-dev pkg-config -y
apt install cmake -y
# RUST_VER
curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.85 -y
curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.86 -y
export PATH=$PATH:$HOME/.cargo/bin
echo 'export PATH=$PATH:$HOME/.cargo/bin' >> $BASH_ENV
rustc --version
@@ -363,6 +369,40 @@ jobs:
paths:
- <<parameters.image>>.tar

# Create the reliability cron docker image. This is a singleton that does some
# clean-up and reporting for the Push Reliability task.
build-reliability-cron:
executor: build-reliability-cron
resource_class: small
working_directory: /app
parameters:
image:
type: string
tag:
type: string
steps:
# Install these packages before checkout because git may not exist or work
- run:
name: Install Docker build dependencies
command: apk add --no-cache openssh-client git clang
- checkout
- setup_remote_docker
- docker_login
- run:
name: Build cron docker image
command: |
docker build --tag <<parameters.image>>:<<parameters.tag>> -f ./scripts/reliability/Dockerfile ./scripts/reliability
- run:
name: Save cron docker image
# note: deploy always expects the image to be saved in /tmp/cache
command: |
mkdir -p /tmp/cache
docker save -o /tmp/cache/<<parameters.image>>.tar "<<parameters.image>>"
- persist_to_workspace:
root: /tmp/cache
paths:
- <<parameters.image>>.tar

build-test-container:
executor: build-test-container-executor
parameters:
@@ -515,6 +555,17 @@ workflows:
filters:
tags:
only: /.*/

- build-reliability-cron:
name: build-reliability-cron
image: autopush-reliability-cron
tag: latest
filters:
tags:
only: /.*/
branches:
only: master

- build-test-container:
name: Build Load Test Image
image: autopush-load-tests
@@ -539,7 +590,7 @@ workflows:
tags:
only: /.*/

# Comment out the following two sections for local CircleCI testing.
# Comment out the following three sections for local CircleCI testing.
- deploy:
name: deploy-autoconnect
image: autoconnect
@@ -567,6 +618,18 @@ workflows:
only: /.*/
branches:
only: master

- deploy:
name: deploy-reliability-cron
image: autopush-reliability-cron
requires:
- build-reliability-cron
filters:
tags:
only: /.*/
branches:
only: master

- deploy:
name: Push Load Test Image
image: autopush-load-tests
13 changes: 4 additions & 9 deletions .github/workflows/publish_docs.yml
Original file line number Diff line number Diff line change
@@ -14,8 +14,8 @@ jobs:
build:
runs-on: ubuntu-latest
env:
MDBOOK_ENV: 0.4.40
MERMAID_ENV: 0.13.0
MDBOOK_ENV: 0.4.51
MERMAID_ENV: 0.15.0
DEST_DIR: /home/runner/.cargo/bin
steps:
- uses: actions/checkout@v4
@@ -32,13 +32,8 @@ jobs:
export PATH=$PATH:$DEST_DIR
curl -sSL "https://github.com/rust-lang/mdBook/releases/download/v$MDBOOK_ENV/mdbook-v$MDBOOK_ENV-x86_64-unknown-linux-gnu.tar.gz" | tar -xz --directory $DEST_DIR
curl -sSL "https://github.com/badboy/mdBook-mermaid/releases/download/v$MERMAID_ENV/mdbook-mermaid-v$MERMAID_ENV-x86_64-unknown-linux-gnu.tar.gz" | tar -xz --directory $DEST_DIR
- name: Build the main book
run: cd docs && mdbook build
- name: Build API docs
run: cargo doc --all-features --workspace --no-deps
# bring just the built docs over into the artifact (there's a lot of build detrius)
- name: Copy cargo docs to API dir
run: mkdir -p docs/output/api && cp -r target/doc/* docs/output/api
- name: Build docs
run: cd docs && sh make_book.sh
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
2 changes: 2 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# `clog` uses '* ' as the bullet prefix. Changing this would cause the bulk of the file to be reformatted.
CHANGELOG.md
198 changes: 198 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,201 @@
<a name="1.77.3"></a>
## 1.77.3 (2025-08-19)


#### Bug Fixes

* Only attempt to create a new state if the old is not set (#1025) ([5f48b4f5](https://github.com/mozilla-services/autopush-rs/commit/5f48b4f5aa859c45fa3ae00c8db7c6d3bed8e77b))

#### Chore

* tag 1.77.2 (#1023) ([796674db](https://github.com/mozilla-services/autopush-rs/commit/796674db6fabd11c5b6eae22b08cd204292e464f))



<a name="1.77.2"></a>
## 1.77.2 (2025-08-12)


#### Chore

* tag 1.77.1 (#1019) ([236244e9](https://github.com/mozilla-services/autopush-rs/commit/236244e9f018ff4ac589a2836fc7922bdf4ecc94))

#### Bug Fixes

* filter non-state entries from report (#1022) ([3b014e43](https://github.com/mozilla-services/autopush-rs/commit/3b014e4329a7b9dbab0769013cf3ea231a0adedd))



<a name="1.77.1"></a>
## 1.77.1 (2025-08-07)

#### Bug Fixes

* Use new state if error in record_reliability (#1018) ([3157fdd4ace371139323efc40f906614c5ec8a8a](https://github.com/mozilla-services/autopush-rs/commit/3157fdd4ace371139323efc40f906614c5ec8a8a))



<a name="1.76.0"></a>
## 1.76.0 (2025-07-24)


#### Bug Fixes

* Store message state in Redis to solve for some reliability state management issues (#1005) ([204163f2](https://github.com/mozilla-services/autopush-rs/commit/204163f2b0d4ebe47ab4ae47ba3b42798220a647))
* Skip previously deleted records (#1012) ([43c3ad74](https://github.com/mozilla-services/autopush-rs/commit/43c3ad74293438f90884ada65bd063086a87f965))

#### Features

* Add VAPID header to load testing (#992) ([31f21a9d](https://github.com/mozilla-services/autopush-rs/commit/31f21a9d9b93cbe5176c1442c8e5e2eb48c16746))

#### Chore

* Clippy updates for 1.88.0 (#1004) ([05037ad0](https://github.com/mozilla-services/autopush-rs/commit/05037ad0f9d033b019e1c1927f5751125247c02b))
* tag 1.75.10 (#1000) ([fe12e719](https://github.com/mozilla-services/autopush-rs/commit/fe12e71990983ce89b9ed03375fb04492d3b8713))

#### Doc

* install mdbook-mermaid js assets on build (#1003) ([ac76ab26](https://github.com/mozilla-services/autopush-rs/commit/ac76ab26aa3caf0ce2eb7e49d169046cdc5de792))
* mdbook-mermaid now needed (#1002) ([3a9c997c](https://github.com/mozilla-services/autopush-rs/commit/3a9c997c113e9f9db2cb88e88b86f62dde20e577))
* diagram the reliability state transitions (#1001) ([fac8cfa5](https://github.com/mozilla-services/autopush-rs/commit/fac8cfa5ec7e3b9c34162a3e7a616668cfd80fa2))



<a name="1.75.10"></a>
## 1.75.10 (2025-06-26)


#### Bug Fixes

* Fix order of expiry key by using a struct. (#999) ([7dd4c44e](https://github.com/mozilla-services/autopush-rs/commit/7dd4c44efb1c7d1ed117c8fd5073032a71e9351c))

#### Chore

* tag 1.75.9 (#998) ([dc141ddc](https://github.com/mozilla-services/autopush-rs/commit/dc141ddc3bacfefe00b1d6777582eeed22601e19))



<a name="1.75.9"></a>
## 1.75.9 (2025-06-25)


#### Chore

* tag 1.75.8 (#995) ([893733bf](https://github.com/mozilla-services/autopush-rs/commit/893733bf7472612f1751035f43a7bef0013778e6))

#### Bug Fixes

* Address potential double counting reliability (#997) ([854071e9](https://github.com/mozilla-services/autopush-rs/commit/854071e9a4260dbef39cd7a4d3108b895eb93cfd))



<a name="1.75.8"></a>
## 1.75.8 (2025-06-18)

**Note:** 1.75.8 is an administrative tag due to outside process
errors with the 1.75.7 release. Complex systems are complex, and lots
of moving parts means lots of things can break in weird ways.
Sometimes it's just better to start over.

<a name="1.75.7"></a>
## 1.75.7 (2025-06-17)

#### Chore

* tag 1.75.3 (#982)

#### Bug Fixes

* bug: address Python3.12 issues (#989) ([4e1e7080](https://github.com/mozilla-services/autopush-rs/commit/4e1e7080f8790c790e2486ce61f09452474a1c15))
* bug: Fix the report args. (#987) ([393207bf](https://github.com/mozilla-services/autopush-rs/commit/393207bfc0f8cbb5c6bac564b1f614ea607c7d0d))
* bug: Do not set the `GOOGLE_APPLICATION_CREDENTIALS in the reliability docker image (#984) ([20cf810a](https://github.com/mozilla-services/autopush-rs/commit/20cf810a571aac84ca08d983b9e5ecd32cddae7b))

#### Features

* feat: Improve endpoint reliability check (#991) ([e658a2f8](https://github.com/mozilla-services/autopush-rs/commit/e658a2f8524a0f4c075520525759790c04f42a6a))

**note: 1.75.4 - 1.75.6 were not released. This version contains the cumulative fixes.**


<a name="1.75.3"></a>
## 1.75.3 (2025-06-04)


#### Chore

* tag 1.75.2 (#977) ([9dc9d774](https://github.com/mozilla-services/autopush-rs/commit/9dc9d7749b4418615faf095d171a6ce2840e775d))

#### Bug Fixes

* Use full paths in compiled docker image (#981) ([11048566](https://github.com/mozilla-services/autopush-rs/commit/1104856605ad35d6113ecd5d1945accee9bab1c9))
* Fix typo in circleci config (#976) ([4d259b42](https://github.com/mozilla-services/autopush-rs/commit/4d259b42a66563ce92952b4e385c1f55aab90b1b))



<a name="1.75.2"></a>
## 1.75.2 (2025-05-28)


### Bug Fixes

* Fix typo in circleci/config.yml (#976)
* Include image deploy script in CI (#974)

<a name="1.75.0"></a>
## 1.75.0 (2025-05-23)


#### Features

* Clean up terminal states (#899) ([791e8751](https://github.com/mozilla-services/autopush-rs/commit/791e8751d9f3425a099addc90d074e8f68b260bb))
* Remove magic strings and add enum variants instead (#903) ([537513ab](https://github.com/mozilla-services/autopush-rs/commit/537513ab4f3b4789a3e2912a64b425a263def08f))
* Remove magic strings and add enum variants instead ([6b4e83ce](https://github.com/mozilla-services/autopush-rs/commit/6b4e83ce0c1145811467dff100cc2fbc28766de9))
* normalize durations and timestamps (#837) ([7018f25d](https://github.com/mozilla-services/autopush-rs/commit/7018f25d9bc21415c63176cd57968330d645b5c4))
* Switch to using async/pooled Redis (#896) ([bb492e7d](https://github.com/mozilla-services/autopush-rs/commit/bb492e7d72a5586dc874f006494240e758b551c5))
* remove stage key debugging code (#867) ([e4c153e7](https://github.com/mozilla-services/autopush-rs/commit/e4c153e7dece3e7cc938ca672bd9cab547bdd7e8))

#### Bug Fixes

* sentry middleware should use tags w/ metrics (#901) ([96cc7138](https://github.com/mozilla-services/autopush-rs/commit/96cc7138e354878dc0d7e7661dc28f945a8ec638))
* Post release fixes for Reliability (#893) ([a9a181e2](https://github.com/mozilla-services/autopush-rs/commit/a9a181e2cdab25f18d73b0e1b0f16d189df8447d))

#### Chore

* remove unused dependencies (#922) ([1c47ad6d](https://github.com/mozilla-services/autopush-rs/commit/1c47ad6db855ecd88380128ea7b6faba2bfbbb8d))
* Version update for March 2025 (#892) ([9380c22f](https://github.com/mozilla-services/autopush-rs/commit/9380c22f9cf166efbd47e854897ddaccd59aef86))
* tag 1.74.3 (#866) ([d22de1dc](https://github.com/mozilla-services/autopush-rs/commit/d22de1dc40c5822845fb4b5f0c4b8568dc946883))



<a name="1.74.3"></a>
## 1.74.3 (2025-03-21)


#### Chore

* tag 1.74.2 (#864) ([cc0d8ee4](https://github.com/mozilla-services/autopush-rs/commit/cc0d8ee4ee7072a32ccc03243d6e9ce60897aa50))

#### Bug Fixes

* Add diagnostic for stage (#865) ([470ea1c3](https://github.com/mozilla-services/autopush-rs/commit/470ea1c3556d7bb68a8d947d153a67969eba218e))



<a name="1.74.2"></a>
## 1.74.2 (2025-03-20)


#### Chore

* tag 1.74.1 (#862) ([d07ee886](https://github.com/mozilla-services/autopush-rs/commit/d07ee8866a04bda426508e59d1b3fa4657b1f3f1))

#### Bug Fixes

* compare tracking keys in raw form. (#863) ([49a466b8](https://github.com/mozilla-services/autopush-rs/commit/49a466b89aba59640654c84e884f6839364ad636))
* Autoendpoint should be less strict about config options (#859) ([8598172d](https://github.com/mozilla-services/autopush-rs/commit/8598172d84a7139f2038f9b95ec1bca0413b3c9f))



<a name="1.74.1"></a>
## 1.74.1 (2025-03-18)

1,773 changes: 980 additions & 793 deletions Cargo.lock

Large diffs are not rendered by default.

83 changes: 44 additions & 39 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,86 +1,87 @@
[workspace]
members = [
"autopush-common",
"autoendpoint",
"autoconnect",
"autoconnect/autoconnect-common",
"autoconnect/autoconnect-settings",
"autoconnect/autoconnect-web",
"autoconnect/autoconnect-ws",
"autoconnect/autoconnect-ws/autoconnect-ws-sm",
"autopush-common",
"autoendpoint",
"autoconnect",
"autoconnect/autoconnect-common",
"autoconnect/autoconnect-settings",
"autoconnect/autoconnect-web",
"autoconnect/autoconnect-ws",
"autoconnect/autoconnect-ws/autoconnect-ws-sm",
]
resolver = "2"

[workspace.package]
version = "1.74.1"
version = "1.77.3"
authors = [
"Ben Bangert <ben@groovie.org>",
"JR Conlin <jrconlin@mozilla.com>",
"Phil Jenvey <pjenvey@underboss.org>",
"Alex Crichton <alex@alexcrichton.com>",
"Mark Drobnak <mdrobnak@mozilla.com>",
"Ben Bangert <ben@groovie.org>",
"JR Conlin <jrconlin@mozilla.com>",
"Phil Jenvey <pjenvey@underboss.org>",
"Alex Crichton <alex@alexcrichton.com>",
"Mark Drobnak <mdrobnak@mozilla.com>",
]
edition = "2021"
rust-version = "1.86.0"

[workspace.dependencies]
# ideally, this would contain any crates that are shared between crates.
# there are some lingering code interdependencies that prevent that, but it should
# remain a goal for the overall refactor.

deadpool = { version = "0.10", features = ["managed", "rt_tokio_1"] }
deadpool = { version = "0.12", features = ["managed", "rt_tokio_1"] }
actix = "0.13"
actix-cors = "0.7"
actix-http = "3.2"
actix-http = "3.11"
actix-rt = "2.7"
actix-test = "0.1"
actix-web = "4.2"
actix-web = "4.11"
actix-ws = "0.3"
backtrace = "0.3"
base64 = "0.22"
bytes = "1.4"
bytestring = "1.2"
cadence = "1.2"
chrono = "0.4"
config = "0.14"
ctor = "0.2"
config = "0.15"
ctor = "0.4"
docopt = "1.1"
env_logger = "0.11"
fernet = "0.2.0"
futures = { version = "0.3", features = ["compat"] }
futures-util = { version = "0.3", features = [
"async-await",
"compat",
"sink",
"io",
"async-await",
"compat",
"sink",
"io",
] }
futures-locks = "0.7"
hex = "0.4.2"
httparse = "1.3"
hyper = "1.2"
hyper = "1.6"
lazy_static = "1.4"
log = { version = "0.4", features = [
"max_level_debug",
"release_max_level_info",
"max_level_debug",
"release_max_level_info",
] }
mockall = "0.12"
mockall = "0.13"
mozsvc-common = "0.2"
openssl = { version = "0.10" }
rand = "0.8"
rand = "0.9"
regex = "1.4"
reqwest = { version = "0.12", features = ["json", "blocking"] }
sentry = { version = "0.32", features = [
"debug-logs",
sentry = { version = "0.38", features = [
"debug-logs",
] } # Using debug-logs avoids https://github.com/getsentry/sentry-rust/issues/237
sentry-core = { version = "0.32" }
sentry-actix = "0.32"
sentry-backtrace = "0.32"
sentry-core = { version = "0.38" }
sentry-actix = "0.38"
sentry-backtrace = "0.38"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
slog = { version = "2.7", features = [
"dynamic-keys",
"max_level_trace",
"release_max_level_info",
"dynamic-keys",
"max_level_trace",
"release_max_level_info",
] }
slog-async = "2.6"
slog-envlogger = "2.2.0"
@@ -89,13 +90,14 @@ slog-scope = "4.4"
slog-stdlog = "4.1"
slog-term = "2.6"
strum = { version = "0.27", features = ["derive"] }
thiserror = "1.0"
tokio = "1.38"
strum_macros = "0.27"
thiserror = "2.0"
tokio = "1.45"
tokio-compat-02 = "0.2"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-openssl = "0.6"
uuid = { version = "1.1", features = ["serde", "v4"] }
uuid = { version = "1.17", features = ["serde", "v4"] }
url = "2.5"

autoconnect = { path = "./autoconnect" }
@@ -106,5 +108,8 @@ autoconnect_ws = { path = "./autoconnect/autoconnect-ws" }
autoconnect_ws_clientsm = { path = "./autoconnect/autoconnect-ws/autoconnect-ws-clientsm" }
autopush_common = { path = "./autopush-common", features = ["bigtable"] }

[workspace.lints.clippy]
result_large_err = "allow" # 1.87.0 will flag ApiError as too large.

[profile.release]
debug = 1
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# NOTE: Ensure builder's Rust version matches CI's in .circleci/config.yml
# RUST_VER
FROM rust:1.85-bookworm AS builder
FROM rust:1.86-bookworm AS builder
ARG CRATE

ADD . /app
37 changes: 4 additions & 33 deletions autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -7,51 +7,22 @@ edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix.workspace = true
actix-cors.workspace = true
actix-http.workspace = true
actix-rt.workspace = true
actix-test.workspace = true
actix-server = "2.3"
actix-service = "2.0"
actix-web.workspace = true
#actix-web-actors.workspace = true
actix-ws.workspace = true
bytestring.workspace = true
cadence.workspace = true
config.workspace = true
docopt = "1.1"
env_logger.workspace = true
fernet.workspace = true
futures.workspace = true
futures-locks.workspace = true
futures-util.workspace = true
hex.workspace = true
lazy_static.workspace = true
log.workspace = true
mozsvc-common.workspace = true
reqwest.workspace = true
sentry.workspace = true
sentry-actix.workspace = true
sentry-core.workspace = true
serde.workspace = true
serde_derive.workspace = true
serde_json.workspace = true
slog.workspace = true
slog-async.workspace = true
slog-mozlog-json.workspace = true
slog-scope.workspace = true
slog-stdlog.workspace = true
slog-term.workspace = true
uuid.workspace = true

autoconnect_ws.workspace = true
autoconnect_common.workspace = true
autoconnect_settings.workspace = true
autoconnect_web.workspace = true
autoconnect_ws.workspace = true
autopush_common.workspace = true

actix-server = "2.3"
actix-service = "2.0"
docopt = "1.1"

[features]
default = ["bigtable", "reliable_report"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
5 changes: 3 additions & 2 deletions autoconnect/autoconnect-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ version.workspace = true

[dependencies]
actix-web.workspace = true
bytestring.workspace = true
cadence.workspace = true
futures.workspace = true
futures-locks.workspace = true
@@ -18,11 +19,11 @@ sentry.workspace = true
serde.workspace = true
serde_derive.workspace = true
serde_json.workspace = true
slog.workspace = true
slog-scope.workspace = true
strum.workspace = true
strum_macros.workspace = true
uuid.workspace = true


autopush_common.workspace = true

[features]
17 changes: 14 additions & 3 deletions autoconnect/autoconnect-common/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
use std::collections::HashMap;

use serde_derive::{Deserialize, Serialize};
use strum_macros::{AsRefStr, Display};

use autopush_common::errors::{ApcErrorKind, Result};

@@ -72,6 +73,12 @@ struct BroadcastRevision {
broadcast: BroadcastKey,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr, Display)]
pub enum BroadcastErrorKind {
#[strum(serialize = "Broadcast not found")]
NotFound,
}

/// A provided Broadcast/Version used for `BroadcastSubsInit`, client comparisons, and outgoing
/// deltas
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
@@ -85,7 +92,7 @@ impl Broadcast {
pub fn error(self) -> Broadcast {
Broadcast {
broadcast_id: self.broadcast_id,
version: "Broadcast not found".to_string(),
version: BroadcastErrorKind::NotFound.to_string(),
}
}
}
@@ -191,7 +198,9 @@ impl BroadcastChangeTracker {
let key = self
.broadcast_registry
.lookup_key(&broadcast.broadcast_id)
.ok_or(ApcErrorKind::BroadcastError("Broadcast not found".into()))?;
.ok_or(ApcErrorKind::BroadcastError(
BroadcastErrorKind::NotFound.to_string(),
))?;

if let Some(ver) = self.broadcast_versions.get_mut(&key) {
if *ver == broadcast.version {
@@ -200,7 +209,9 @@ impl BroadcastChangeTracker {
*ver = broadcast.version;
} else {
trace!("📢 Not found: {b_id}");
return Err(ApcErrorKind::BroadcastError("Broadcast not found".into()).into());
return Err(
ApcErrorKind::BroadcastError(BroadcastErrorKind::NotFound.to_string()).into(),
);
}

trace!("📢 New version of {b_id}");
10 changes: 7 additions & 3 deletions autoconnect/autoconnect-common/src/megaphone.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{collections::HashMap, error::Error, io, sync::Arc, time::Duration};

use actix_web::rt;
use cadence::{CountedExt, StatsdClient};
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;
use cadence::StatsdClient;
use serde_derive::Deserialize;
use tokio::sync::RwLock;

@@ -39,7 +41,9 @@ pub async fn init_and_spawn_megaphone_updater(
if let Err(e) = updater(&broadcaster, &http, &url, &token).await {
report_updater_error(&metrics, e);
} else {
metrics.incr_with_tags("megaphone.updater.ok").send();
metrics
.incr_with_tags(MetricName::MegaphoneUpdaterOk)
.send();
}
}
});
@@ -59,7 +63,7 @@ fn report_updater_error(metrics: &Arc<StatsdClient>, err: reqwest::Error) {
"unknown"
};
metrics
.incr_with_tags("megaphone.updater.error")
.incr_with_tags(MetricName::MegaphoneUpdaterError)
.with_tag("reason", reason)
.send();
if reason == "unknown" {
67 changes: 64 additions & 3 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -10,10 +10,44 @@ use std::collections::HashMap;
use std::str::FromStr;

use serde_derive::{Deserialize, Serialize};
use strum_macros::{AsRefStr, Display, EnumString};
use uuid::Uuid;

use autopush_common::notification::Notification;

/// Message types for WebPush protocol messages.
///
/// This enum should be used instead of string literals when referring to message types.
/// String serialization is handled automatically via the strum traits.
///
/// Example:
/// ```
/// use autoconnect_common::protocol::MessageType;
///
/// let message_type = MessageType::Hello;
/// let message_str = message_type.as_ref(); // Returns "hello"
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, AsRefStr, Display, EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum MessageType {
Hello,
Register,
Unregister,
BroadcastSubscribe,
Ack,
Nack,
Ping,
Notification,
Broadcast,
}

impl MessageType {
/// Returns the expected message type string for error messages
pub fn expected_msg(&self) -> String {
format!(r#"Expected messageType="{}""#, self.as_ref())
}
}

#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(untagged)]
pub enum BroadcastValue {
@@ -69,6 +103,21 @@ pub enum ClientMessage {
Ping,
}

impl ClientMessage {
/// Get the message type of this message
pub fn message_type(&self) -> MessageType {
match self {
ClientMessage::Hello { .. } => MessageType::Hello,
ClientMessage::Register { .. } => MessageType::Register,
ClientMessage::Unregister { .. } => MessageType::Unregister,
ClientMessage::BroadcastSubscribe { .. } => MessageType::BroadcastSubscribe,
ClientMessage::Ack { .. } => MessageType::Ack,
ClientMessage::Nack { .. } => MessageType::Nack,
ClientMessage::Ping => MessageType::Ping,
}
}
}

impl FromStr for ClientMessage {
type Err = serde_json::error::Error;

@@ -141,11 +190,23 @@ pub enum ServerMessage {
}

impl ServerMessage {
/// Get the message type of this message
pub fn message_type(&self) -> MessageType {
match self {
ServerMessage::Hello { .. } => MessageType::Hello,
ServerMessage::Register { .. } => MessageType::Register,
ServerMessage::Unregister { .. } => MessageType::Unregister,
ServerMessage::Broadcast { .. } => MessageType::Broadcast,
ServerMessage::Notification(..) => MessageType::Notification,
ServerMessage::Ping => MessageType::Ping,
}
}

pub fn to_json(&self) -> Result<String, serde_json::error::Error> {
match self {
// clients recognize {"messageType": "ping"} but traditionally both
// client/server send the empty object version
ServerMessage::Ping => Ok("{}".to_owned()),
// Both client and server understand the verbose `{"messageType": "ping"}` and the abbreviated `{}`
// as valid ping messages. The server defaults to the shorter `{}` form.
ServerMessage::Ping => Ok("{}".to_string()),
_ => serde_json::to_string(self),
}
}
4 changes: 4 additions & 0 deletions autoconnect/autoconnect-common/src/registry.rs
Original file line number Diff line number Diff line change
@@ -92,4 +92,8 @@ impl ClientRegistry {
}
Err(ApcErrorKind::GeneralError("User not connected".into()).into())
}

pub async fn count(&self) -> usize {
self.clients.read().await.len()
}
}
23 changes: 19 additions & 4 deletions autoconnect/autoconnect-common/src/test_support.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use bytestring::ByteString;
use uuid::Uuid;

use crate::protocol::MessageType;
use autopush_common::{
db::{mock::MockDbClient, User},
util::timing::ms_since_epoch,
@@ -13,10 +15,23 @@ pub const DUMMY_CHID: Uuid = Uuid::from_u128(0xdeadbeef_0000_0000_abad_1dea00000

/// A minimal websocket Push "hello" message, used by an unregistered UA with
/// no existing channel subscriptions
pub const HELLO: &str = r#"{"messageType": "hello", "use_webpush": true}"#;
/// A post initial registration response
pub const HELLO_AGAIN: &str = r#"{"messageType": "hello", "use_webpush": true,
"uaid": "deadbeef-0000-0000-deca-fbad00000000"}"#;
pub fn hello_json() -> ByteString {
format!(
r#"{{"messageType": "{}", "use_webpush": true}}"#,
MessageType::Hello.as_ref()
)
.into()
}

pub fn hello_again_json() -> ByteString {
format!(
r#"{{"messageType": "{}", "use_webpush": true,
"uaid": "{}"}}"#,
MessageType::Hello.as_ref(),
DUMMY_UAID
)
.into()
}

pub const CURRENT_MONTH: &str = "message_2018_06";

3 changes: 3 additions & 0 deletions autoconnect/autoconnect-settings/Cargo.toml
Original file line number Diff line number Diff line change
@@ -26,3 +26,6 @@ autopush_common.workspace = true
bigtable = ["autopush_common/bigtable"]
emulator = ["bigtable"]
reliable_report = ["autopush_common/reliable_report"]

[lints]
workspace = true
12 changes: 9 additions & 3 deletions autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -92,14 +92,20 @@ impl AppState {

#[cfg(feature = "reliable_report")]
let reliability = Arc::new(
PushReliability::new(&settings.reliability_dsn, db.clone()).map_err(|e| {
ConfigError::Message(format!("Could not start Reliability connection: {:?}", e))
PushReliability::new(
&settings.reliability_dsn,
db.clone(),
&metrics,
settings.reliability_retry_count,
)
.map_err(|e| {
ConfigError::Message(format!("Could not start Reliability connection: {e:?}"))
})?,
);
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()
.unwrap_or_else(|e| panic!("Error while building reqwest::Client: {}", e));
.unwrap_or_else(|e| panic!("Error while building reqwest::Client: {e}"));
let broadcaster = Arc::new(RwLock::new(BroadcastChangeTracker::new(Vec::new())));

let router_url = settings.router_url();
16 changes: 10 additions & 6 deletions autoconnect/autoconnect-settings/src/lib.rs
Original file line number Diff line number Diff line change
@@ -113,6 +113,9 @@ pub struct Settings {
/// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters)
/// for details.
pub reliability_dsn: Option<String>,
#[cfg(feature = "reliable_report")]
/// Max number of retries for retries for Redis transactions
pub reliability_retry_count: usize,
}

impl Default for Settings {
@@ -146,6 +149,8 @@ impl Default for Settings {
actix_workers: None,
#[cfg(feature = "reliable_report")]
reliability_dsn: None,
#[cfg(feature = "reliable_report")]
reliability_retry_count: autopush_common::redis_util::MAX_TRANSACTION_LOOP,
}
}
}
@@ -198,7 +203,7 @@ impl Settings {
if let Some(ref hostname) = self.hostname {
if self.resolve_hostname {
resolve_ip(hostname)
.unwrap_or_else(|_| panic!("Failed to resolve provided hostname: {}", hostname))
.unwrap_or_else(|_| panic!("Failed to resolve provided hostname: {hostname}"))
} else {
hostname.clone()
}
@@ -213,8 +218,7 @@ impl Settings {
let non_zero = |val: Duration, name| {
if val.is_zero() {
return Err(ConfigError::Message(format!(
"Invalid {}_{}: cannot be 0",
ENV_PREFIX, name
"Invalid {ENV_PREFIX}_{name}: cannot be 0"
)));
}
Ok(())
@@ -303,9 +307,9 @@ mod tests {
fn test_default_settings() {
// Test that the Config works the way we expect it to.
use std::env;
let port = format!("{}__PORT", ENV_PREFIX).to_uppercase();
let msg_limit = format!("{}__MSG_LIMIT", ENV_PREFIX).to_uppercase();
let fernet = format!("{}__CRYPTO_KEY", ENV_PREFIX).to_uppercase();
let port = format!("{ENV_PREFIX}__PORT").to_uppercase();
let msg_limit = format!("{ENV_PREFIX}__MSG_LIMIT").to_uppercase();
let fernet = format!("{ENV_PREFIX}__CRYPTO_KEY").to_uppercase();

let v1 = env::var(&port);
let v2 = env::var(&msg_limit);
9 changes: 1 addition & 8 deletions autoconnect/autoconnect-web/Cargo.toml
Original file line number Diff line number Diff line change
@@ -5,22 +5,15 @@ edition.workspace = true
version.workspace = true

[dependencies]
actix-web.workspace = true
actix-http.workspace = true
actix-rt.workspace = true
actix-ws.workspace = true
backtrace.workspace = true
bytes.workspace = true
bytestring.workspace = true
cadence.workspace = true
actix-web.workspace = true
futures-util.workspace = true
reqwest.workspace = true
serde_json.workspace = true
slog-scope.workspace = true
thiserror.workspace = true
uuid.workspace = true


autoconnect_common.workspace = true
autoconnect_settings.workspace = true
autoconnect_ws.workspace = true
28 changes: 23 additions & 5 deletions autoconnect/autoconnect-web/src/dockerflow.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ use actix_web::{
use serde_json::json;

use autoconnect_settings::AppState;
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;

use crate::error::ApiError;

@@ -27,19 +29,35 @@ pub fn config(config: &mut web::ServiceConfig) {

/// Handle the `/health` and `/__heartbeat__` routes
pub async fn health_route(state: Data<AppState>) -> Json<serde_json::Value> {
let healthy = state
#[allow(unused_mut)]
let mut health = json!({
"status": if state
.db
.health_check()
.await
.map_err(|e| {
error!("Autoconnect Health Error: {:?}", e);
e
})
.is_ok();
Json(json!({
"status": if healthy { "OK" } else { "ERROR" },
.is_ok() { "OK" } else {"ERROR"},
"version": env!("CARGO_PKG_VERSION"),
}))
"connections": state.clients.count().await
});

#[cfg(feature = "reliable_report")]
{
health["reliability"] = json!(state.reliability.health_check().await.unwrap_or_else(|e| {
state
.metrics
.incr_with_tags(MetricName::ErrorRedisUnavailable)
.with_tag("application", "autoconnect")
.send();
error!("🔍🟥 Reliability reporting down: {:?}", e);
"ERROR"
}));
}

Json(health)
}

/// Handle the `/status` route
4 changes: 3 additions & 1 deletion autoconnect/autoconnect-web/src/routes.rs
Original file line number Diff line number Diff line change
@@ -43,9 +43,11 @@ pub async fn push_route(
.await;
}
// Attempt to send the notification to the UA using WebSocket protocol, or store on failure.
// NOTE: Since this clones the notification, there is a potential to
// double count the reliability state.
let result = app_state
.clients
.notify(uaid.into_inner(), notif.clone())
.notify(uaid.into_inner(), notif.clone_without_reliability_state())
.await;
if result.is_ok() {
#[cfg(feature = "reliable_report")]
43 changes: 23 additions & 20 deletions autoconnect/autoconnect-web/src/test.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,10 @@ use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio::io::{AsyncRead, AsyncWrite};

use autoconnect_common::test_support::{hello_again_db, hello_db, DUMMY_UAID, HELLO, HELLO_AGAIN};
use autoconnect_common::protocol::MessageType;
use autoconnect_common::test_support::{
hello_again_db, hello_again_json, hello_db, hello_json, DUMMY_UAID,
};
use autoconnect_settings::{AppState, Settings};
use autopush_common::notification::Notification;

@@ -28,7 +31,7 @@ async fn json_msg(
) -> serde_json::Value {
let item = framed.next().await.unwrap().unwrap();
let ws::Frame::Text(bytes) = item else {
panic!("Expected Text not: {:#?}", item);
panic!("Expected Text not: {item:#?}");
};
serde_json::from_slice(&bytes).unwrap()
}
@@ -41,10 +44,10 @@ pub async fn hello_new_user() {
});

let mut framed = srv.ws().await.unwrap();
framed.send(ws::Message::Text(HELLO.into())).await.unwrap();
framed.send(ws::Message::Text(hello_json())).await.unwrap();

let msg = json_msg(&mut framed).await;
assert_eq!(msg["messageType"], "hello");
assert_eq!(msg["messageType"], MessageType::Hello.as_ref());
assert_eq!(msg["status"], 200);
// Ensure that the outbound response to the client includes the
// `use_webpush` flag set to `true`
@@ -63,12 +66,12 @@ pub async fn hello_again() {

let mut framed = srv.ws().await.unwrap();
framed
.send(ws::Message::Text(HELLO_AGAIN.into()))
.send(ws::Message::Text(hello_again_json()))
.await
.unwrap();

let msg = json_msg(&mut framed).await;
assert_eq!(msg["messageType"], "hello");
assert_eq!(msg["messageType"], MessageType::Hello.as_ref());
assert_eq!(msg["uaid"], DUMMY_UAID.as_simple().to_string());
}

@@ -78,13 +81,13 @@ pub async fn unsupported_websocket_message() {

let mut framed = srv.ws().await.unwrap();
framed
.send(ws::Message::Binary(HELLO.into()))
.send(ws::Message::Binary(hello_json().into_bytes()))
.await
.unwrap();

let item = framed.next().await.unwrap().unwrap();
let ws::Frame::Close(Some(close_reason)) = item else {
panic!("Expected Close(Some(..)) not {:#?}", item);
panic!("Expected Close(Some(..)) not {item:#?}");
};
assert_eq!(close_reason.code, actix_http::ws::CloseCode::Unsupported);
assert!(framed.next().await.is_none());
@@ -98,16 +101,16 @@ pub async fn invalid_webpush_message() {
});

let mut framed = srv.ws().await.unwrap();
framed.send(ws::Message::Text(HELLO.into())).await.unwrap();
framed.send(ws::Message::Text(hello_json())).await.unwrap();

let msg = json_msg(&mut framed).await;
assert_eq!(msg["status"], 200);

framed.send(ws::Message::Text(HELLO.into())).await.unwrap();
framed.send(ws::Message::Text(hello_json())).await.unwrap();

let item = framed.next().await.unwrap().unwrap();
let ws::Frame::Close(Some(close_reason)) = item else {
panic!("Expected Close(Some(..)) not {:#?}", item);
panic!("Expected Close(Some(..)) not {item:#?}");
};
assert_eq!(close_reason.code, actix_http::ws::CloseCode::Error);
assert!(framed.next().await.is_none());
@@ -123,14 +126,14 @@ pub async fn malformed_webpush_message() {
let mut framed = srv.ws().await.unwrap();
framed
.send(ws::Message::Text(
json!({"messageType": "foo"}).to_string().into(),
json!({"messageType": "foo"}).to_string().into(), // Intentionally using invalid message type for test
))
.await
.unwrap();

let item = framed.next().await.unwrap().unwrap();
let ws::Frame::Close(Some(close_reason)) = item else {
panic!("Expected Close(Some(..)) not {:#?}", item);
panic!("Expected Close(Some(..)) not {item:#?}");
};
assert_eq!(close_reason.code, actix_http::ws::CloseCode::Error);
assert_eq!(close_reason.description.unwrap(), "Json");
@@ -147,12 +150,12 @@ pub async fn direct_notif() {

let mut framed = srv.ws().await.unwrap();
framed
.send(ws::Message::Text(HELLO_AGAIN.into()))
.send(ws::Message::Text(hello_again_json()))
.await
.unwrap();

let msg = json_msg(&mut framed).await;
assert_eq!(msg["messageType"], "hello");
assert_eq!(msg["messageType"], MessageType::Hello.as_ref());

app_state
.clients
@@ -168,7 +171,7 @@ pub async fn direct_notif() {

// Is a small sleep/tick needed here?
let msg = json_msg(&mut framed).await;
assert_eq!(msg["messageType"], "notification");
assert_eq!(msg["messageType"], MessageType::Notification.as_ref());
assert_eq!(msg["data"], "foo");
}

@@ -190,7 +193,7 @@ pub async fn broadcast_after_ping() {
.add_broadcast(("foo/bar".to_owned(), "v1".to_owned()).into());
let mut srv = test_server(app_state.clone());

let hello = json!({"messageType": "hello", "use_webpush": true,
let hello = json!({"messageType": MessageType::Hello.as_ref(), "use_webpush": true,
"broadcasts": {"foo/bar": "v1"}});
let mut framed = srv.ws().await.unwrap();
framed
@@ -199,7 +202,7 @@ pub async fn broadcast_after_ping() {
.unwrap();

let msg = json_msg(&mut framed).await;
assert_eq!(msg["messageType"], "hello");
assert_eq!(msg["messageType"], MessageType::Hello.as_ref());
let broadcasts = msg["broadcasts"]
.as_object()
.expect("!broadcasts.is_object()");
@@ -209,7 +212,7 @@ pub async fn broadcast_after_ping() {
tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
let item = framed.next().await.unwrap().unwrap();
let ws::Frame::Ping(payload) = item else {
panic!("Expected Ping not: {:#?}", item);
panic!("Expected Ping not: {item:#?}");
};

broadcaster
@@ -223,7 +226,7 @@ pub async fn broadcast_after_ping() {
tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
let msg = json_msg(&mut framed).await;
assert_eq!(msg.as_object().map_or(0, |o| o.len()), 2);
assert_eq!(msg["messageType"], "broadcast");
assert_eq!(msg["messageType"], MessageType::Broadcast.as_ref());
let broadcasts = msg["broadcasts"]
.as_object()
.expect("!broadcasts.is_object()");
3 changes: 3 additions & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -33,3 +33,6 @@ autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = []

[lints]
workspace = true
36 changes: 36 additions & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/src/error.rs
Original file line number Diff line number Diff line change
@@ -3,8 +3,27 @@ use std::{error::Error, fmt};
use actix_ws::CloseCode;
use backtrace::Backtrace;

use autoconnect_common::protocol::{ClientMessage, MessageType, ServerMessage};
use autopush_common::{db::error::DbError, errors::ApcError, errors::ReportableError};

/// Trait for types that can provide a MessageType
pub trait MessageTypeProvider {
/// Returns the message type of this object
fn message_type(&self) -> MessageType;
}

impl MessageTypeProvider for ClientMessage {
fn message_type(&self) -> MessageType {
self.message_type()
}
}

impl MessageTypeProvider for ServerMessage {
fn message_type(&self) -> MessageType {
self.message_type()
}
}

/// WebSocket state machine errors
#[derive(Debug)]
pub struct SMError {
@@ -48,6 +67,23 @@ impl SMError {
pub fn invalid_message(description: String) -> Self {
SMErrorKind::InvalidMessage(description).into()
}

/// Creates an invalid message error for an expected message type
pub fn expected_message_type(expected: MessageType) -> Self {
SMErrorKind::InvalidMessage(expected.expected_msg()).into()
}

/// Validates a message is of the expected type, returning an error if not
pub fn validate_message_type<T>(expected: MessageType, msg: &T) -> Result<(), Self>
where
T: MessageTypeProvider,
{
if msg.message_type() == expected {
Ok(())
} else {
Err(Self::expected_message_type(expected))
}
}
}

impl ReportableError for SMError {
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::collections::HashMap;

use cadence::CountedExt;
use uuid::Uuid;

use autoconnect_common::{
broadcast::Broadcast,
protocol::{BroadcastValue, ClientAck, ClientMessage, ServerMessage},
protocol::{BroadcastValue, ClientAck, ClientMessage, MessageType, ServerMessage},
};
use autopush_common::{
endpoint::make_endpoint, metric_name::MetricName, metrics::StatsdClientExt,
util::sec_since_epoch,
};
use autopush_common::{endpoint::make_endpoint, util::sec_since_epoch};

use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};
@@ -52,6 +54,7 @@ impl WebPushClient {
"uaid" => &self.uaid.to_string(),
"channel_id" => &channel_id_str,
"key" => &key,
"message_type" => MessageType::Register.as_ref(),
);
let channel_id = Uuid::try_parse(&channel_id_str).map_err(|_| {
SMError::invalid_message(format!("Invalid channelID: {channel_id_str}"))
@@ -64,7 +67,10 @@ impl WebPushClient {

let (status, push_endpoint) = match self.do_register(&channel_id, key).await {
Ok(endpoint) => {
let _ = self.app_state.metrics.incr("ua.command.register");
let _ = self
.app_state
.metrics
.incr(MetricName::UaCommand(MessageType::Register.to_string()));
self.stats.registers += 1;
(200, endpoint)
}
@@ -123,6 +129,7 @@ impl WebPushClient {
"uaid" => &self.uaid.to_string(),
"channel_id" => &channel_id.to_string(),
"code" => &code,
"message_type" => MessageType::Unregister.as_ref(),
);
// TODO: (copied from previous state machine) unregister should check
// the format of channel_id like register does
@@ -136,7 +143,7 @@ impl WebPushClient {
Ok(_) => {
self.app_state
.metrics
.incr_with_tags("ua.command.unregister")
.incr_with_tags(MetricName::UaCommand(MessageType::Unregister.to_string()))
.with_tag("code", &code.unwrap_or(200).to_string())
.send();
self.stats.unregisters += 1;
@@ -155,7 +162,7 @@ impl WebPushClient {
&mut self,
broadcasts: HashMap<String, String>,
) -> Result<Option<ServerMessage>, SMError> {
trace!("WebPushClient:broadcast_subscribe");
trace!("WebPushClient:broadcast_subscribe"; "message_type" => MessageType::BroadcastSubscribe.as_ref());
let broadcasts = Broadcast::from_hashmap(broadcasts);
let mut response: HashMap<String, BroadcastValue> = HashMap::new();

@@ -178,8 +185,11 @@ impl WebPushClient {

/// Acknowledge receipt of one or more Push Notifications
async fn ack(&mut self, updates: &[ClientAck]) -> Result<Vec<ServerMessage>, SMError> {
trace!("✅ WebPushClient:ack");
let _ = self.app_state.metrics.incr("ua.command.ack");
trace!("✅ WebPushClient:ack"; "message_type" => MessageType::Ack.as_ref());
let _ = self
.app_state
.metrics
.incr(MetricName::UaCommand(MessageType::Ack.to_string()));

for notif in updates {
// Check the list of unacked "direct" (unstored) notifications. We only want to
@@ -210,13 +220,14 @@ impl WebPushClient {
if let Some(pos) = pos {
debug!(
"✅ Ack (Stored)";
"channel_id" => notif.channel_id.as_hyphenated().to_string(),
"version" => &notif.version
"channel_id" => notif.channel_id.as_hyphenated().to_string(),
"version" => &notif.version,
"message_type" => MessageType::Ack.as_ref()
);
// Get the stored notification record.
let n = &mut self.ack_state.unacked_stored_notifs[pos];
let is_topic = n.topic.is_some();
debug!("✅ Ack notif: {:?}", &n);
let acked_notification = &mut self.ack_state.unacked_stored_notifs[pos];
let is_topic = acked_notification.topic.is_some();
debug!("✅ Ack notif: {:?}", &acked_notification);
// Only force delete Topic messages, since they don't have a timestamp.
// Other messages persist in the database, to be, eventually, cleaned up by their
// TTL. We will need to update the `CurrentTimestamp` field for the channel
@@ -225,17 +236,18 @@ impl WebPushClient {
if is_topic {
debug!(
"✅ WebPushClient:ack removing Stored, sort_key: {}",
&n.chidmessageid()
&acked_notification.chidmessageid()
);
self.app_state
.db
.remove_message(&self.uaid, &n.chidmessageid())
.remove_message(&self.uaid, &acked_notification.chidmessageid())
.await?;
// NOTE: timestamp messages may still be in state of flux: they're not fully
// ack'd (removed/unable to be resurrected) until increment_storage is called,
// so their reliability is recorded there
#[cfg(feature = "reliable_report")]
n.record_reliability(&self.app_state.reliability, notif.reliability_state())
acked_notification
.record_reliability(&self.app_state.reliability, notif.reliability_state())
.await;
}
let n = self.ack_state.unacked_stored_notifs.remove(pos);
@@ -260,14 +272,14 @@ impl WebPushClient {
/// Negative Acknowledgement (a Client error occurred) of one or more Push
/// Notifications
fn nack(&mut self, code: Option<i32>) {
trace!("WebPushClient:nack");
trace!("WebPushClient:nack"; "message_type" => MessageType::Nack.as_ref());
// only metric codes expected from the client (or 0)
let code = code
.and_then(|code| (301..=303).contains(&code).then_some(code))
.unwrap_or(0);
self.app_state
.metrics
.incr_with_tags("ua.command.nack")
.incr_with_tags(MetricName::UaCommand(MessageType::Nack.to_string()))
.with_tag("code", &code.to_string())
.send();
self.stats.nacks += 1;
@@ -278,7 +290,7 @@ impl WebPushClient {
/// Note this is the WebPush Protocol level's Ping: this differs from the
/// lower level WebSocket Ping frame (handled by the `webpush_ws` handler).
fn ping(&mut self) -> Result<ServerMessage, SMError> {
trace!("WebPushClient:ping");
trace!("WebPushClient:ping"; "message_type" => MessageType::Ping.as_ref());
// TODO: why is this 45 vs the comment describing a minute? and 45
// should be a setting
// Clients shouldn't ping > than once per minute or we disconnect them
@@ -299,17 +311,18 @@ impl WebPushClient {
/// method) before proceeding to read the next batch (or potential other
/// actions such as `reset_uaid`).
async fn post_process_all_acked(&mut self) -> Result<Vec<ServerMessage>, SMError> {
trace!("▶️ WebPushClient:post_process_all_acked");
trace!("▶️ WebPushClient:post_process_all_acked"; "message_type" => MessageType::Notification.as_ref());
let flags = &self.flags;
if flags.check_storage {
if flags.increment_storage {
debug!(
"▶️ WebPushClient:post_process_all_acked check_storage && increment_storage"
"▶️ WebPushClient:post_process_all_acked check_storage && increment_storage";
"message_type" => MessageType::Notification.as_ref()
);
self.increment_storage().await?;
}

debug!("▶️ WebPushClient:post_process_all_acked check_storage");
debug!("▶️ WebPushClient:post_process_all_acked check_storage"; "message_type" => MessageType::Notification.as_ref());
let smsgs = self.check_storage_loop().await?;
if !smsgs.is_empty() {
debug_assert!(self.flags.check_storage);
@@ -327,10 +340,10 @@ impl WebPushClient {
debug_assert!(!self.ack_state.unacked_notifs());
let flags = &self.flags;
if flags.old_record_version {
debug!("▶️ WebPushClient:post_process_all_acked; resetting uaid");
debug!("▶️ WebPushClient:post_process_all_acked; resetting uaid"; "message_type" => MessageType::Notification.as_ref());
self.app_state
.metrics
.incr_with_tags("ua.expiration")
.incr_with_tags(MetricName::UaExpiration)
.with_tag("reason", "old_record_version")
.send();
self.app_state.db.remove_user(&self.uaid).await?;
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::mem;

use cadence::{Counted, CountedExt};
use cadence::Counted;

use autoconnect_common::protocol::{ServerMessage, ServerNotification};
use autopush_common::{
db::CheckStorageResponse, notification::Notification, util::sec_since_epoch,
db::CheckStorageResponse, metric_name::MetricName, metrics::StatsdClientExt,
notification::Notification, util::sec_since_epoch,
};

use super::WebPushClient;
@@ -14,7 +15,7 @@ impl WebPushClient {
/// Handle a `ServerNotification` for this user
///
/// `ServerNotification::Disconnect` is emitted by the same autoconnect
/// node recieving it when a User has logged into that same node twice to
/// node receiving it when a User has logged into that same node twice to
/// "Ghost" (disconnect) the first user's session for its second session.
///
/// Other variants are emitted by autoendpoint
@@ -42,11 +43,16 @@ impl WebPushClient {
/// Send a Direct Push Notification to this user
fn notif(&mut self, notif: Notification) -> Result<ServerMessage, SMError> {
trace!("WebPushClient::notif Sending a direct notif");
// The notification we return here is sent directly to the client.
// No reliability state is recorded.
let response = notif.clone();
if notif.ttl != 0 {
self.ack_state.unacked_direct_notifs.push(notif.clone());
// Consume the original notification by adding it to the
// unacked stack. This will eventually record the state.
self.ack_state.unacked_direct_notifs.push(notif);
}
self.emit_send_metrics(&notif, "Direct");
Ok(ServerMessage::Notification(notif))
self.emit_send_metrics(&response, "Direct");
Ok(ServerMessage::Notification(response))
}

/// Top level read of Push Notifications from storage
@@ -351,7 +357,7 @@ impl WebPushClient {
// trigger a re-register
self.app_state
.metrics
.incr_with_tags("ua.expiration")
.incr_with_tags(MetricName::UaExpiration)
.with_tag("reason", "too_many_messages")
.send();
self.app_state.db.remove_user(&self.uaid).await?;
@@ -365,7 +371,7 @@ impl WebPushClient {
let metrics = &self.app_state.metrics;
let ua_info = &self.ua_info;
metrics
.incr_with_tags("ua.notification.sent")
.incr_with_tags(MetricName::UaNotificationSent)
.with_tag("source", source)
.with_tag("topic", &notif.topic.is_some().to_string())
.with_tag("os", &ua_info.metrics_os)
65 changes: 39 additions & 26 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::{collections::HashMap, fmt, sync::Arc};

use cadence::{CountedExt, Histogrammed};
use cadence::Histogrammed;
use uuid::Uuid;

use autoconnect_common::{
broadcast::{Broadcast, BroadcastSubs, BroadcastSubsInit},
protocol::{BroadcastValue, ClientMessage, ServerMessage},
protocol::{BroadcastValue, ClientMessage, MessageType, ServerMessage},
};
use autoconnect_settings::{AppState, Settings};
use autopush_common::{
db::{User, USER_RECORD_VERSION},
metric_name::MetricName,
metrics::StatsdClientExt,
util::{ms_since_epoch, ms_utc_midnight},
};

@@ -53,18 +55,22 @@ impl UnidentifiedClient {
msg: ClientMessage,
) -> Result<(WebPushClient, impl IntoIterator<Item = ServerMessage>), SMError> {
trace!("❓UnidentifiedClient::on_client_msg");
// Validate we received a Hello message before proceeding
SMError::validate_message_type(MessageType::Hello, &msg)?;

// Extract fields from the Hello message
let ClientMessage::Hello {
uaid,
broadcasts,
_channel_ids,
} = msg
else {
return Err(SMError::invalid_message(
r#"Expected messageType="hello""#.to_owned(),
));
// This should never happen due to the validate_message_type check above
return Err(SMError::expected_message_type(MessageType::Hello));
};
debug!(
"👋UnidentifiedClient::on_client_msg Hello from uaid?: {:?}",
"👋UnidentifiedClient::on_client_msg {} from uaid?: {:?}",
MessageType::Hello.as_ref(),
uaid
);

@@ -78,12 +84,14 @@ impl UnidentifiedClient {
} = self.get_or_create_user(original_uaid).await?;
let uaid = user.uaid;
debug!(
"💬UnidentifiedClient::on_client_msg Hello! uaid: {} existing_user: {}",
uaid, existing_user,
"💬UnidentifiedClient::on_client_msg {}! uaid: {} existing_user: {}",
MessageType::Hello.as_ref(),
uaid,
existing_user,
);
self.app_state
.metrics
.incr_with_tags("ua.command.hello")
.incr_with_tags(MetricName::UaCommand(MessageType::Hello.to_string()))
.with_tag("uaid", {
if existing_user {
"existing"
@@ -134,7 +142,10 @@ impl UnidentifiedClient {

/// Lookup a User or return a new User record if the lookup failed
async fn get_or_create_user(&self, uaid: Option<Uuid>) -> Result<GetOrCreateUser, SMError> {
trace!("❓UnidentifiedClient::get_or_create_user");
trace!(
"❓UnidentifiedClient::get_or_create_user for {}",
MessageType::Hello.as_ref()
);
let connected_at = ms_since_epoch();

if let Some(uaid) = uaid {
@@ -149,12 +160,12 @@ impl UnidentifiedClient {
};
user.node_id = Some(self.app_state.router_url.to_owned());
if user.connected_at > connected_at {
let _ = self.app_state.metrics.incr("ua.already_connected");
let _ = self.app_state.metrics.incr(MetricName::UaAlreadyConnected);
return Err(SMErrorKind::AlreadyConnected.into());
}
user.connected_at = connected_at;
if !self.app_state.db.update_user(&mut user).await? {
let _ = self.app_state.metrics.incr("ua.already_connected");
let _ = self.app_state.metrics.incr(MetricName::UaAlreadyConnected);
return Err(SMErrorKind::AlreadyConnected.into());
}
return Ok(GetOrCreateUser {
@@ -186,7 +197,10 @@ impl UnidentifiedClient {
&self,
broadcasts: &[Broadcast],
) -> (BroadcastSubs, HashMap<String, BroadcastValue>) {
trace!("UnidentifiedClient::broadcast_init");
trace!(
"UnidentifiedClient::broadcast_init for {}",
MessageType::Hello.as_ref()
);
let bc = self.app_state.broadcaster.read().await;
let BroadcastSubsInit(broadcast_subs, delta) = bc.broadcast_delta(broadcasts);
let mut response = Broadcast::vec_into_hashmap(delta);
@@ -210,11 +224,12 @@ struct GetOrCreateUser {

#[cfg(test)]
mod tests {
use std::{str::FromStr, sync::Arc};
use std::str::FromStr;
use std::sync::Arc;

use autoconnect_common::{
protocol::ClientMessage,
test_support::{hello_again_db, hello_db, DUMMY_CHID, DUMMY_UAID, UA},
protocol::{ClientMessage, MessageType},
test_support::{hello_again_db, hello_again_json, hello_db, DUMMY_CHID, DUMMY_UAID, UA},
};
use autoconnect_settings::AppState;

@@ -233,14 +248,18 @@ mod tests {

#[tokio::test]
async fn reject_not_hello() {
// Test with Ping message
let client = uclient(Default::default());
let err = client
.on_client_msg(ClientMessage::Ping)
.await
.err()
.unwrap();
assert!(matches!(err.kind, SMErrorKind::InvalidMessage(_)));
// Verify error message contains expected message type
assert!(format!("{err}").contains(MessageType::Hello.as_ref()));

// Test with Register message
let client = uclient(Default::default());
let err = client
.on_client_msg(ClientMessage::Register {
@@ -251,6 +270,8 @@ mod tests {
.err()
.unwrap();
assert!(matches!(err.kind, SMErrorKind::InvalidMessage(_)));
// Verify error message contains expected message type
assert!(format!("{err}").contains(MessageType::Hello.as_ref()));
}

#[tokio::test]
@@ -259,16 +280,8 @@ mod tests {
db: hello_again_db(DUMMY_UAID).into_boxed_arc(),
..Default::default()
});
// Use a constructed JSON structure here to capture the sort of input we expect,
// which may not match what we derive into.
let js = serde_json::json!({
"messageType": "hello",
"uaid": DUMMY_UAID,
"use_webpush": true,
"channelIDs": [],
"broadcasts": {}
})
.to_string();
// Use hello_again_json helper which properly uses MessageType enum
let js = hello_again_json();
let msg: ClientMessage = serde_json::from_str(&js).unwrap();
client.on_client_msg(msg).await.expect("Hello failed");
}
13 changes: 6 additions & 7 deletions autoconnect/autoconnect-ws/src/test.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use futures::pin_mut;

use autoconnect_common::{
protocol::ServerMessage,
test_support::{hello_db, HELLO, UA},
test_support::{hello_db, hello_json, UA},
};
use autoconnect_settings::{AppState, Settings};
use autoconnect_ws_sm::UnidentifiedClient;
@@ -28,10 +28,9 @@ async fn handshake_timeout() {
..Settings::test_settings()
};
let client = uclient(AppState::from_settings(settings).unwrap());

let s = stream! {
tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
yield Ok(actix_ws::Message::Text(HELLO.into()));
yield Ok(actix_ws::Message::Text(hello_json()));
};
pin_mut!(s);
let err = webpush_ws(client, &mut MockSession::new(), s)
@@ -55,7 +54,7 @@ async fn basic() {
session.expect_ping().never();

let s = futures::stream::iter(vec![
Ok(actix_ws::Message::Text(HELLO.into())),
Ok(actix_ws::Message::Text(hello_json())),
Ok(actix_ws::Message::Nop),
]);
webpush_ws(client, &mut session, s)
@@ -78,7 +77,7 @@ async fn websocket_ping() {
session.expect_ping().times(1).return_once(|_| Ok(()));

let s = stream! {
yield Ok(actix_ws::Message::Text(HELLO.into()));
yield Ok(actix_ws::Message::Text(hello_json()));
tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
};
pin_mut!(s);
@@ -103,7 +102,7 @@ async fn auto_ping_timeout() {
session.expect_ping().times(1).return_once(|_| Ok(()));

let s = stream! {
yield Ok(actix_ws::Message::Text(HELLO.into()));
yield Ok(actix_ws::Message::Text(hello_json()));
tokio::time::sleep(Duration::from_secs_f32(0.35)).await;
};
pin_mut!(s);
@@ -127,7 +126,7 @@ async fn auto_ping_timeout_after_pong() {
session.expect_ping().times(2).returning(|_| Ok(()));

let s = stream! {
yield Ok(actix_ws::Message::Text(HELLO.into()));
yield Ok(actix_ws::Message::Text(hello_json()));
tokio::time::sleep(Duration::from_secs_f32(0.2)).await;
yield Ok(actix_ws::Message::Pong("".into()));
tokio::time::sleep(Duration::from_secs_f32(0.35)).await;
40 changes: 13 additions & 27 deletions autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
@@ -5,57 +5,40 @@ authors.workspace = true
edition.workspace = true

[dependencies]
a2 = "0.10"
actix-cors.workspace = true
actix-http.workspace = true
actix-web.workspace = true
actix-rt.workspace = true
actix-cors.workspace = true
actix-web.workspace = true
async-trait = "0.1"
backtrace.workspace = true
base64.workspace = true
cadence.workspace = true
config.workspace = true
chrono.workspace = true
config.workspace = true
docopt.workspace = true
fernet.workspace = true
futures.workspace = true
futures-util.workspace = true
hex.workspace = true
jsonwebtoken = "9.3.0"
lazy_static.workspace = true
log.workspace = true
openssl.workspace = true
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
sentry.workspace = true
sentry-actix.workspace = true
sentry-core.workspace = true
serde.workspace = true
serde_derive.workspace = true
serde_json.workspace = true
slog.workspace = true
slog-async.workspace = true
slog-mozlog-json.workspace = true
slog-scope.workspace = true
slog-stdlog.workspace = true
slog-term.workspace = true
thiserror.workspace = true
tokio.workspace = true
url.workspace = true
uuid.workspace = true
validator = "0.20"
validator_derive = "0.20"
yup-oauth2 = "8.1"

a2 = { version = "0.10" }
bytebuffer = "2.1"
again = { version = "0.1.2", default-features = false, features = [
"log",
"rand",
] }
async-trait = "0.1"
autopush_common = { path = "../autopush-common" }
jsonwebtoken = "9.3.0"
validator = "0.19"
validator_derive = "0.19"

yup-oauth2 = "8.1"
# Updating to 9+ requires configuring rust-tls (See https://github.com/dermesser/yup-oauth2/issues/235)
# Updating to 9+ requires configuring rust-tls (See https://github.com/dermesser/yup-oauth2/issues/235) <- this ticket is resolved, update
# yup-oauth2 = { version = "10.0.1", features = ["hyper-rustls"] }

# For mockito test debugging
@@ -85,3 +68,6 @@ stub = []
log_vapid = []

reliable_report = ["autopush_common/reliable_report"]

[lints]
workspace = true
7 changes: 5 additions & 2 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ pub struct Notification {
/// The current state the message was in (if tracked)
pub reliable_state: Option<autopush_common::reliability::ReliabilityState>,
#[cfg(feature = "reliable_report")]
#[cfg(feature = "reliable_report")]
pub reliability_id: Option<String>,
}

@@ -234,6 +233,10 @@ impl Notification {
&self.reliable_state,
Some(self.timestamp + self.headers.ttl as u64),
)
.await;
.await
.inspect_err(|e| {
warn!("🔍⚠️ Unable to record reliability state log: {:?}", e);
})
.unwrap_or(Some(state))
}
}
12 changes: 9 additions & 3 deletions autoendpoint/src/extractors/notification_headers.rs
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ impl NotificationHeaders {
// Enforce a maximum TTL, but don't error
// NOTE: In order to trap for negative TTLs, this should be a
// signed value, otherwise we will error out with NO_TTL.
.map(|ttl| min(ttl, MAX_NOTIFICATION_TTL as i64))
.map(|ttl| min(ttl, MAX_NOTIFICATION_TTL.num_seconds()))
.ok_or(ApiErrorKind::NoTTL)?;
let topic = get_owned_header(req, "topic");

@@ -218,6 +218,7 @@ mod tests {
use crate::error::{ApiErrorKind, ApiResult};
use actix_web::test::TestRequest;
use autopush_common::MAX_NOTIFICATION_TTL;
use chrono::TimeDelta;

/// Assert that a result is a validation error and check its serialization
/// against the JSON value.
@@ -283,12 +284,17 @@ mod tests {
#[test]
fn maximum_ttl() {
let req = TestRequest::post()
.insert_header(("TTL", (MAX_NOTIFICATION_TTL + 1).to_string()))
.insert_header((
"TTL",
(MAX_NOTIFICATION_TTL + TimeDelta::seconds(1))
.num_seconds()
.to_string(),
))
.to_http_request();
let result = NotificationHeaders::from_request(&req, false);

assert!(result.is_ok());
assert_eq!(result.unwrap().ttl, MAX_NOTIFICATION_TTL as i64);
assert_eq!(result.unwrap().ttl, MAX_NOTIFICATION_TTL.num_seconds());
}

/// A valid topic results in no errors
33 changes: 19 additions & 14 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
@@ -4,11 +4,13 @@ use std::error::Error;
use actix_web::{dev::Payload, web::Data, FromRequest, HttpRequest};
use autopush_common::{
db::User,
metric_name::MetricName,
metrics::StatsdClientExt,
tags::Tags,
util::{b64_decode_std, b64_decode_url},
};

use cadence::{CountedExt, StatsdClient};
use cadence::StatsdClient;
use futures::{future::LocalBoxFuture, FutureExt};
use jsonwebtoken::{Algorithm, DecodingKey, Validation};
use openssl::hash::MessageDigest;
@@ -62,23 +64,26 @@ impl FromRequest for Subscription {
.map_err(|e| {
// Since we're decrypting and endpoint, we get a lot of spam links.
// This can fill our logs.
trace!("🔐 fernet: {:?}", e);
trace!("🔐 fernet: {:?}", e.to_string());
ApiErrorKind::InvalidToken
})?;

// Parse VAPID and extract public key.
let vapid: Option<VapidHeaderWithKey> = parse_vapid(&token_info, &app_state.metrics)?
.map(|vapid| extract_public_key(vapid, &token_info))
.transpose()?;
trace!("raw vapid: {:?}", &vapid);
trace!("🔍 raw vapid: {:?}", &vapid);
// Validate the VAPID JWT token, fetch the claims, and record the version
if let Some(with_key) = &vapid {
// Validate the VAPID JWT token and record the version
validate_vapid_jwt(with_key, &app_state.settings, &app_state.metrics)?;
app_state.metrics.incr(&format!(
"updates.vapid.draft{:02}",
// Use the UpdatesVapidDraft metric with a formatted version
let vapid_version_metric = format!(
"{}{:02}",
MetricName::UpdatesVapidDraft.as_ref(),
with_key.vapid.version()
))?;
);
app_state.metrics.incr_raw(&vapid_version_metric)?;
};
// If this is a known VAPID key, create a reliability_id from
// either the content of the vapid assertions, or the request
@@ -104,7 +109,7 @@ impl FromRequest for Subscription {
.insert("error".to_owned(), e.as_metric().to_owned());
app_state
.metrics
.incr_with_tags("notification.auth.error")
.incr_with_tags(MetricName::NotificationAuthError)
.with_tag("error", e.as_metric())
.send();
})
@@ -113,7 +118,7 @@ impl FromRequest for Subscription {
info!("VAPID sub: {sub}");
};
// For now, record that we had a good (?) VAPID sub,
app_state.metrics.incr("notification.auth.ok")?;
app_state.metrics.incr(MetricName::NotificationAuthOk)?;
};

match token_info.api_version {
@@ -144,7 +149,7 @@ impl FromRequest for Subscription {

app_state
.metrics
.incr(&format!("updates.vapid.draft{:02}", vapid.vapid.version()))?;
.incr_raw(&format!("updates.vapid.draft{:02}", vapid.vapid.version()))?;
}

Ok(Subscription {
@@ -184,13 +189,13 @@ fn parse_vapid(token_info: &TokenInfo, metrics: &StatsdClient) -> ApiResult<Opti

let vapid = VapidHeader::parse(auth_header).inspect_err(|e| {
metrics
.incr_with_tags("notification.auth.error")
.incr_with_tags(MetricName::NotificationAuthError)
.with_tag("error", e.as_metric())
.send();
})?;

metrics
.incr_with_tags("notification.auth")
.incr_with_tags(MetricName::NotificationAuth)
.with_tag("vapid", &vapid.version().to_string())
.with_tag("scheme", &vapid.scheme)
.send();
@@ -318,7 +323,7 @@ fn validate_vapid_jwt(
// NOTE: This will fail if `exp` is specified as anything instead of a numeric or if a required field is empty
jsonwebtoken::errors::ErrorKind::Json(e) => {
metrics
.incr_with_tags("notification.auth.bad_vapid.json")
.incr_with_tags(MetricName::NotificationAuthBadVapidJson)
.with_tag(
"error",
match e.classify() {
@@ -344,7 +349,7 @@ fn validate_vapid_jwt(
return Err(VapidError::InvalidAudience.into());
}
jsonwebtoken::errors::ErrorKind::MissingRequiredClaim(e) => {
return Err(VapidError::InvalidVapid(format!("Missing required {}", e)).into());
return Err(VapidError::InvalidVapid(format!("Missing required {e}")).into());
}
_ => {
// Attempt to match up the majority of ErrorKind variants.
@@ -369,7 +374,7 @@ fn validate_vapid_jwt(
"Other".to_owned()
};
metrics
.incr_with_tags("notification.auth.bad_vapid.other")
.incr_with_tags(MetricName::NotificationAuthBadVapidOther)
.with_tag("error", &label)
.send();
error!("Bad Aud: Unexpected VAPID error: {:?}", &e);
8 changes: 5 additions & 3 deletions autoendpoint/src/extractors/user.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,9 @@ use crate::extractors::routers::RouterType;
use crate::server::AppState;
use actix_http::StatusCode;
use autopush_common::db::{client::DbClient, User};
use cadence::{CountedExt, StatsdClient};
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;
use cadence::StatsdClient;
use uuid::Uuid;

/// Perform some validations on the user, including:
@@ -37,7 +39,7 @@ pub async fn validate_user(
// record the bridge error for accounting reasons.
app_state
.metrics
.incr_with_tags("notification.bridge.error")
.incr_with_tags(MetricName::NotificationBridgeError)
.with_tag("platform", "gcm")
.with_tag("reason", "gcm_kill")
.with_tag("error", &StatusCode::GONE.to_string())
@@ -68,7 +70,7 @@ async fn validate_webpush_user(user: &User, channel_id: &Uuid, db: &dyn DbClient
/// Drop a user and increment associated metric
pub async fn drop_user(uaid: Uuid, db: &dyn DbClient, metrics: &StatsdClient) -> ApiResult<()> {
metrics
.incr_with_tags("updates.drop_user")
.incr_with_tags(MetricName::UpdatesDropUser)
.with_tag("errno", "102")
.send();

6 changes: 4 additions & 2 deletions autoendpoint/src/headers/vapid.rs
Original file line number Diff line number Diff line change
@@ -3,11 +3,12 @@ use std::collections::HashMap;
use std::fmt;

use base64::Engine;
use chrono::TimeDelta;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::headers::util::split_key_value;
use autopush_common::util::{sec_since_epoch, ONE_DAY_IN_SECONDS};
use autopush_common::util::sec_since_epoch;

pub const ALLOWED_SCHEMES: [&str; 3] = ["bearer", "webpush", "vapid"];

@@ -37,8 +38,9 @@ impl Default for VapidClaims {
}

impl VapidClaims {
/// Returns default expiration of one day from creation (in seconds).
pub fn default_exp() -> u64 {
sec_since_epoch() + ONE_DAY_IN_SECONDS
sec_since_epoch() + TimeDelta::days(1).num_seconds() as u64
}
}

17 changes: 7 additions & 10 deletions autoendpoint/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{sync::Arc, time::Instant};

use actix_web::{dev::Payload, web::Data, FromRequest, HttpMessage, HttpRequest};
use cadence::{CountedExt, Metric, MetricError, NopMetricSink, StatsdClient, Timed};
use cadence::{Metric, MetricError, NopMetricSink, StatsdClient, Timed};
use futures::future;

use autopush_common::tags::Tags;
use autopush_common::{metric_name::MetricName, metrics::StatsdClientExt, tags::Tags};

use crate::{error::ApiError, server::AppState, settings::Settings};

@@ -128,28 +128,25 @@ impl Metrics {
}

// increment a counter with no tags data.
pub fn incr(self, label: &str) {
self.incr_with_tags(label, None)
pub fn incr(self, metric: MetricName) {
self.incr_with_tags(metric, None)
}

pub fn incr_with_tags(self, label: &str, tags: Option<Tags>) {
pub fn incr_with_tags(self, metric: MetricName, tags: Option<Tags>) {
if let Some(client) = self.client.as_ref() {
let mut tagged = client.incr_with_tags(label);
let mut tagged = client.incr_with_tags(metric.clone());
let mut mtags = self.tags.clone().unwrap_or_default();
if let Some(t) = tags {
mtags.tags.extend(t.tags)
}
let tag_keys = mtags.tags.keys();
for key in tag_keys.clone() {
// REALLY wants a static here, or at least a well defined ref.
tagged = tagged.with_tag(key, mtags.tags.get(key).unwrap());
}
// Include any "hard coded" tags.
// incr = incr.with_tag("version", env!("CARGO_PKG_VERSION"));
match tagged.try_send() {
Err(e) => {
// eat the metric, but log the error
warn!("⚠️ Metric {} error: {:?}", label, e; mtags);
warn!("⚠️ Metric {} error: {:?}", metric.as_ref(), e; mtags);
}
Ok(v) => trace!("☑️ {:?}", v.as_metric_str()),
}
10 changes: 7 additions & 3 deletions autoendpoint/src/routers/apns/router.rs
Original file line number Diff line number Diff line change
@@ -501,7 +501,7 @@ impl Router for ApnsRouter {
// mutable, but we are also essentially consuming the
// notification nothing else should modify it.
notification
.record_reliability(&self.reliability, ReliabilityState::Transmitted)
.record_reliability(&self.reliability, ReliabilityState::BridgeTransmitted)
.await;
}

@@ -530,7 +530,7 @@ mod tests {
use autopush_common::db::client::DbClient;
use autopush_common::db::mock::MockDbClient;
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;
use autopush_common::{redis_util::MAX_TRANSACTION_LOOP, reliability::PushReliability};
use cadence::StatsdClient;
use mockall::predicate;
use std::collections::HashMap;
@@ -576,6 +576,8 @@ mod tests {

/// Create a router for testing, using the given APNS client
fn make_router(client: MockApnsClient, db: Box<dyn DbClient>) -> ApnsRouter {
let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());

ApnsRouter {
clients: {
let mut map = HashMap::new();
@@ -593,7 +595,9 @@ mod tests {
metrics: Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)),
db: db.clone(),
#[cfg(feature = "reliable_report")]
reliability: Arc::new(PushReliability::new(&None, db.clone()).unwrap()),
reliability: Arc::new(
PushReliability::new(&None, db.clone(), &metrics, MAX_TRANSACTION_LOOP).unwrap(),
),
}
}

16 changes: 9 additions & 7 deletions autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
@@ -4,8 +4,10 @@ use crate::headers::vapid::VapidHeaderWithKey;
use crate::routers::RouterError;
use actix_web::http::StatusCode;
use autopush_common::db::client::DbClient;
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;
use autopush_common::util::InsertOpt;
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use cadence::{Counted, StatsdClient, Timed};
use std::collections::HashMap;
use uuid::Uuid;

@@ -129,13 +131,13 @@ pub async fn handle_error(
metrics,
platform,
app_id,
&format!("upstream_{}", status),
&format!("upstream_{status}"),
error.status(),
error.errno(),
),

_ => {
warn!("Unknown error while sending bridge request: {}", error);
warn!("Unknown error while sending bridge request: {error}");
incr_error_metric(
metrics,
platform,
@@ -170,7 +172,7 @@ pub fn incr_error_metric(
) {
// I'd love to extract the status and errno from the passed ApiError, but a2 error handling makes that impossible.
metrics
.incr_with_tags("notification.bridge.error")
.incr_with_tags(MetricName::NotificationBridgeError)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.with_tag("reason", reason)
@@ -187,13 +189,13 @@ pub fn incr_success_metrics(
notification: &Notification,
) {
metrics
.incr_with_tags("notification.bridge.sent")
.incr_with_tags(MetricName::NotificationBridgeSent)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
.send();
metrics
.count_with_tags(
"notification.message_data",
MetricName::NotificationMessageData.as_ref(),
notification.data.as_ref().map(String::len).unwrap_or(0) as i64,
)
.with_tag("platform", platform)
@@ -202,7 +204,7 @@ pub fn incr_success_metrics(
.send();
metrics
.time_with_tags(
"notification.total_request_time",
MetricName::NotificationTotalRequestTime.as_ref(),
(autopush_common::util::sec_since_epoch() - notification.timestamp) * 1000,
)
.with_tag("platform", platform)
2 changes: 1 addition & 1 deletion autoendpoint/src/routers/fcm/client.rs
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ impl FcmClient {
let response = self
.http_client
.post(self.endpoint.clone())
.header("Authorization", format!("Bearer {}", token))
.header("Authorization", format!("Bearer {token}"))
.header("Content-Type", "application/json")
.json(&message)
.timeout(self.timeout)
12 changes: 8 additions & 4 deletions autoendpoint/src/routers/fcm/router.rs
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ impl Router for FcmRouter {

let (routing_token, app_id) =
self.routing_info(router_data, &notification.subscription.user.uaid)?;
let ttl = MAX_FCM_NOTIFICATION_TTL
let ttl = (MAX_FCM_NOTIFICATION_TTL.num_seconds() as u64)
.min(self.settings.min_ttl.max(notification.headers.ttl as u64));

// Send the notification to FCM
@@ -199,7 +199,7 @@ impl Router for FcmRouter {
// mutable, but we are also essentially consuming the
// notification nothing else should modify it.
notification
.record_reliability(&self.reliability, ReliabilityState::Transmitted)
.record_reliability(&self.reliability, ReliabilityState::BridgeTransmitted)
.await;
// Sent successfully, update metrics and make response
trace!("Send request was successful");
@@ -231,7 +231,7 @@ mod tests {
use autopush_common::db::client::DbClient;
use autopush_common::db::mock::MockDbClient;
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;
use autopush_common::{redis_util::MAX_TRANSACTION_LOOP, reliability::PushReliability};
use std::sync::Arc;

use cadence::StatsdClient;
@@ -249,6 +249,8 @@ mod tests {
db: Box<dyn DbClient>,
) -> FcmRouter {
let url = &server.url();
let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());

FcmRouter::new(
FcmSettings {
base_url: Url::parse(url).unwrap(),
@@ -271,7 +273,9 @@ mod tests {
Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)),
db.clone(),
#[cfg(feature = "reliable_report")]
Arc::new(PushReliability::new(&None, db.clone()).unwrap()),
Arc::new(
PushReliability::new(&None, db.clone(), &metrics, MAX_TRANSACTION_LOOP).unwrap(),
),
)
.await
.unwrap()
25 changes: 15 additions & 10 deletions autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;
use cadence::{Counted, CountedExt, StatsdClient, Timed};
use cadence::{Counted, StatsdClient, Timed};
use reqwest::{Response, StatusCode};
use serde_json::Value;
use std::collections::{hash_map::RandomState, HashMap};
@@ -15,6 +15,8 @@ use crate::headers::vapid::VapidHeaderWithKey;
use crate::routers::{Router, RouterError, RouterResponse};

use autopush_common::db::{client::DbClient, User};
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;

/// The router for desktop user agents.
///
@@ -87,10 +89,10 @@ impl Router for WebPushRouter {
Err(error) => {
if let ApiErrorKind::ReqwestError(error) = &error.kind {
if error.is_timeout() {
self.metrics.incr("error.node.timeout")?;
self.metrics.incr(MetricName::ErrorNodeTimeout)?;
};
if error.is_connect() {
self.metrics.incr("error.node.connect")?;
self.metrics.incr(MetricName::ErrorNodeConnect)?;
};
};
debug!("✉ Error while sending webpush notification: {}", error);
@@ -102,7 +104,7 @@ impl Router for WebPushRouter {
// Couldn't send the message! So revert to the prior state if we have one
if let Some(revert_state) = revert_state {
trace!(
"🔎 Revert {:?} from {:?} to {:?}",
"🔎⚠️ Revert {:?} from {:?} to {:?}",
&notification.reliability_id,
&notification.reliable_state,
revert_state
@@ -120,7 +122,7 @@ impl Router for WebPushRouter {
delivered, dropping it"
);
self.metrics
.incr_with_tags("notification.message.expired")
.incr_with_tags(MetricName::NotificationMessageExpired)
// TODO: include `internal` if meta is set.
.with_tag("topic", &topic)
.send();
@@ -175,7 +177,7 @@ impl Router for WebPushRouter {
trace!("✉ Node has delivered the message");
self.metrics
.time_with_tags(
"notification.total_request_time",
MetricName::NotificationTotalRequestTime.as_ref(),
(notification.timestamp - autopush_common::util::sec_since_epoch())
* 1000,
)
@@ -282,7 +284,7 @@ impl WebPushRouter {
/// Remove the node ID from a user. This is done if the user is no longer
/// connected to the node.
async fn remove_node_id(&self, user: &User, node_id: &str) -> ApiResult<()> {
self.metrics.incr("updates.client.host_gone").ok();
self.metrics.incr(MetricName::UpdatesClientHostGone).ok();
let removed = self
.db
.remove_node_id(&user.uaid, node_id, user.connected_at, &user.version)
@@ -314,7 +316,7 @@ impl WebPushRouter {
) -> RouterResponse {
self.metrics
.count_with_tags(
"notification.message_data",
MetricName::NotificationMessageData.as_ref(),
notification.data.as_ref().map(String::len).unwrap_or(0) as i64,
)
.with_tag("destination", destination_tag)
@@ -350,19 +352,22 @@ mod test {
use crate::headers::vapid::VapidClaims;
use autopush_common::errors::ReportableError;
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;
use autopush_common::{redis_util::MAX_TRANSACTION_LOOP, reliability::PushReliability};

use super::*;
use autopush_common::db::mock::MockDbClient;

fn make_router(db: Box<dyn DbClient>) -> WebPushRouter {
let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build());
WebPushRouter {
db: db.clone(),
metrics: Arc::new(StatsdClient::from_sink("autopush", cadence::NopMetricSink)),
http: reqwest::Client::new(),
endpoint_url: Url::parse("http://localhost:8080/").unwrap(),
#[cfg(feature = "reliable_report")]
reliability: Arc::new(PushReliability::new(&None, db).unwrap()),
reliability: Arc::new(
PushReliability::new(&None, db, &metrics, MAX_TRANSACTION_LOOP).unwrap(),
),
}
}

65 changes: 57 additions & 8 deletions autoendpoint/src/routes/health.rs
Original file line number Diff line number Diff line change
@@ -9,10 +9,12 @@ use actix_web::{
use reqwest::StatusCode;
use serde_json::json;

use autopush_common::db::error::DbResult;

use crate::error::{ApiErrorKind, ApiResult};
use crate::server::AppState;
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;
use autopush_common::util::b64_encode_url;
use autopush_common::{db::error::DbResult, errors::ApcError};

/// Handle the `/health` and `/__heartbeat__` routes
pub async fn health_route(state: Data<AppState>) -> Json<serde_json::Value> {
@@ -22,13 +24,60 @@ pub async fn health_route(state: Data<AppState>) -> Json<serde_json::Value> {
routers.insert("apns", state.apns_router.active());
routers.insert("fcm", state.fcm_router.active());

let health = json!({
"status": "OK",
"version": env!("CARGO_PKG_VERSION"),
"router_table": router_health,
"message_table": message_health,
"routers": routers});
let mut health = json!({
"status": if state
.db
.health_check()
.await
.map_err(|e| {
error!("Autoendpoint health error: {:?}", e);
e
})
.is_ok() {
"OK"
} else {
"ERROR"
},
"version": env!("CARGO_PKG_VERSION"),
"router_table": router_health,
"message_table": message_health,
"routers": routers,
});

#[cfg(feature = "reliable_report")]
{
let reliability_health: Result<String, ApcError> = state
.reliability
.health_check()
.await
.map(|_| {
let keys: Vec<String> = state
.settings
.tracking_keys()
.unwrap_or_default()
.iter()
.map(|k|
// Hint the key values
b64_encode_url(k)[..8].to_string())
.collect();
if keys.is_empty() {
Ok("NO_TRACKING_KEYS".to_owned())
} else {
Ok(format!("OK: {}", keys.join(",")))
}
})
.unwrap_or_else(|e| {
// Record that Redis is down.
state
.metrics
.incr_with_tags(MetricName::ReliabilityErrorRedisUnavailable)
.with_tag("application", "autoendpoint")
.send();
error!("🔍🟥 Reliability reporting down: {:?}", e);
Ok("STORE_ERROR".to_owned())
});
health["reliability"] = json!(reliability_health);
}
Json(health)
}

23 changes: 16 additions & 7 deletions autoendpoint/src/routes/registration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use actix_web::web::{Data, Json};
use actix_web::{HttpRequest, HttpResponse};
use cadence::{CountedExt, Histogrammed, StatsdClient};
use cadence::{Histogrammed, StatsdClient};
use uuid::Uuid;

use crate::error::{ApiErrorKind, ApiResult};
@@ -15,6 +15,8 @@ use crate::server::AppState;

use autopush_common::db::User;
use autopush_common::endpoint::make_endpoint;
use autopush_common::metric_name::MetricName;
use autopush_common::metrics::StatsdClientExt;

/// Handle the `POST /v1/{router_type}/{app_id}/registration` route
pub async fn register_uaid_route(
@@ -32,7 +34,7 @@ pub async fn register_uaid_route(
trace!("🌍 token = {}", router_data_input.token);
let router = routers.get(path_args.router_type);
let router_data = router.register(&router_data_input, &path_args.app_id)?;
incr_metric("ua.command.register", &app_state.metrics, &request);
incr_metric(MetricName::UaCommandRegister, &app_state.metrics, &request);

// Register user and channel in database
let user = User::builder()
@@ -185,7 +187,7 @@ pub async fn get_channels_route(
// use the "metrics" version since we need to consider cardinality.
app_state
.metrics
.incr_with_tags("ua.connection.check")
.incr_with_tags(MetricName::UaConnectionCheck)
.with_tag("os", &os)
.with_tag("browser", &browser)
.send();
@@ -194,7 +196,10 @@ pub async fn get_channels_route(

app_state
.metrics
.histogram_with_tags("ua.connection.channel_count", channel_ids.len() as u64)
.histogram_with_tags(
MetricName::UaConnectionChannelCount.as_ref(),
channel_ids.len() as u64,
)
.with_tag_value("mobile")
.send();

@@ -220,7 +225,11 @@ pub async fn unregister_channel_route(
let uaid = path_args.user.uaid;
debug!("🌍 Unregistering CHID {channel_id} for UAID {uaid}");

incr_metric("ua.command.unregister", &app_state.metrics, &request);
incr_metric(
MetricName::UaCommandUnregister,
&app_state.metrics,
&request,
);
let channel_did_exist = app_state.db.remove_channel(&uaid, &channel_id).await?;

if channel_did_exist {
@@ -232,9 +241,9 @@ pub async fn unregister_channel_route(
}

/// Increment a metric with data from the request
fn incr_metric(name: &str, metrics: &StatsdClient, request: &HttpRequest) {
fn incr_metric(metric: MetricName, metrics: &StatsdClient, request: &HttpRequest) {
metrics
.incr_with_tags(name)
.incr_with_tags(metric)
.with_tag(
"user_agent",
get_header(request, "User-Agent").unwrap_or("unknown"),
10 changes: 8 additions & 2 deletions autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
@@ -94,8 +94,14 @@ impl Server {
};
#[cfg(feature = "reliable_report")]
let reliability = Arc::new(
PushReliability::new(&settings.reliability_dsn, db.clone()).map_err(|e| {
ApiErrorKind::General(format!("Could not initialize Reliability Report: {:?}", e))
PushReliability::new(
&settings.reliability_dsn,
db.clone(),
&metrics,
settings.reliability_retry_count,
)
.map_err(|e| {
ApiErrorKind::General(format!("Could not initialize Reliability Report: {e:?}"))
})?,
);
let http = reqwest::ClientBuilder::new()
39 changes: 36 additions & 3 deletions autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
@@ -63,6 +63,9 @@ pub struct Settings {
/// storage system. See [Connection Parameters](https://docs.rs/redis/latest/redis/#connection-parameters)
/// for details.
pub reliability_dsn: Option<String>,
#[cfg(feature = "reliable_report")]
/// Max number of retries for retries for Redis transactions
pub reliability_retry_count: usize,
}

impl Default for Settings {
@@ -96,6 +99,8 @@ impl Default for Settings {
stub: StubSettings::default(),
#[cfg(feature = "reliable_report")]
reliability_dsn: None,
#[cfg(feature = "reliable_report")]
reliability_retry_count: autopush_common::redis_util::MAX_TRANSACTION_LOOP,
}
}
}
@@ -184,7 +189,7 @@ impl Settings {
for v in Self::read_list_from_str(keys, "Invalid AUTOEND_TRACKING_KEYS") {
result.push(
util::b64_decode(v)
.map_err(|e| ConfigError::Message(format!("Invalid tracking key: {:?}", e)))?,
.map_err(|e| ConfigError::Message(format!("Invalid tracking key: {e:?}")))?,
);
}
trace!("🔍 tracking_keys: {result:?}");
@@ -210,17 +215,19 @@ impl VapidTracker {
pub fn is_trackable(&self, vapid: &VapidHeaderWithKey) -> bool {
// ideally, [Settings.with_env_and_config_file()] does the work of pre-populating
// the Settings.tracking_vapid_pubs cache, but we can't rely on that.

let key = match util::b64_decode(&vapid.public_key) {
Ok(v) => v,
Err(e) => {
// This error is not fatal, and should not happen often. During preliminary
// runs, however, we do want to try and spot them.
warn!("VAPID: tracker failure {e}");
warn!("🔍 VAPID: tracker failure {e}");
return false;
}
};
let result = self.0.contains(&key);
debug!("🔍 Checking {:?} {}", util::b64_encode_url(&key), {

debug!("🔍 Checking {:?} {}", &vapid.public_key, {
if result {
"Match!"
} else {
@@ -361,6 +368,32 @@ mod tests {
Ok(())
}

#[test]
fn test_multi_tracking_keys() -> ApiResult<()> {
// Handle the case where the settings may use Standard encoding instead of Base64 encoding.
let settings = Settings{
tracking_keys: r#"[BLbZTvXsQr0rdvLQr73ETRcseSpoof5xV83NiPK9U-Qi00DjNJct1N6EZtTBMD0uh-nNjtLAxik1XP9CZXrKtTg,BHDgfiL1hz4oIBFaxxS9jkzyAVing-W9jjt_7WUeFjWS5Invalid5EjC8TQKddJNP3iow7UW6u8JE3t7u_y3Plc]"#.to_owned(),
..Default::default()
};

let test_header = VapidHeaderWithKey {
vapid: VapidHeader {
scheme: "".to_owned(),
token: "".to_owned(),
version_data: crate::headers::vapid::VapidVersionData::Version1,
},
public_key: "BLbZTvXsQr0rdvLQr73ETRcseSpoof5xV83NiPK9U-Qi00DjNJct1N6EZtTBMD0uh-nNjtLAxik1XP9CZXrKtTg".to_owned()
};

let key_set = settings.tracking_keys().unwrap();
assert!(!key_set.is_empty());

let reliability = VapidTracker(key_set);
assert!(reliability.is_trackable(&test_header));

Ok(())
}

#[test]
fn test_reliability_id() -> ApiResult<()> {
let mut headers = HeaderMap::new();
21 changes: 12 additions & 9 deletions autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -19,14 +19,10 @@ deadpool.workspace = true
fernet.workspace = true
futures.workspace = true
futures-util.workspace = true
hex.workspace = true
httparse.workspace = true
lazy_static.workspace = true
mockall.workspace = true
openssl.workspace = true
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
sentry-backtrace.workspace = true
sentry.workspace = true
serde.workspace = true
@@ -40,14 +36,15 @@ slog-scope.workspace = true
slog-stdlog.workspace = true
slog-term.workspace = true
strum.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
uuid.workspace = true
url.workspace = true

again = "0.1"
async-trait = "0.1"
derive_builder = "0.20"
gethostname = "0.4"
gethostname = "1.0"
num_cpus = "1.16"
woothee = "0.13"

@@ -62,13 +59,14 @@ prometheus-client = { version = "0.23", optional = true }
protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+
form_urlencoded = { version = "1.2", optional = true }

redis = { version = "0.28", optional = true }
deadpool-redis = { version = "0.21", optional = true }
redis-module = { version = "2.0", optional = true }
redis = { version = "0.31", optional = true }

[dev-dependencies]
actix-rt = "2.8"
mockito = "0.31"
redis-test = "0.8"
mockito = "0.31" # mockito > 0.31 will require significant reworking of the tests and is not critical yet.
redis-test = { version = "0.11", features = ["aio"] }
tempfile = "3.2.0"
tokio = { workspace = true, features = ["macros"] }

@@ -86,4 +84,9 @@ bigtable = [
emulator = [
"bigtable",
] # used for testing big table, requires an external bigtable emulator running.
reliable_report = ["dep:redis", "dep:prometheus-client", "dep:redis-module"]
reliable_report = [
"dep:redis",
"dep:prometheus-client",
"dep:redis-module",
"dep:deadpool-redis",
]
38 changes: 22 additions & 16 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,8 @@ use std::time::{Duration, SystemTime};

use again::RetryPolicy;
use async_trait::async_trait;
use cadence::{CountedExt, StatsdClient};
use cadence::StatsdClient;
use chrono::TimeDelta;
use futures_util::StreamExt;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
@@ -27,6 +28,8 @@ use crate::db::{
error::{DbError, DbResult},
DbSettings, Notification, User, MAX_ROUTER_TTL, USER_RECORD_VERSION,
};
use crate::metric_name::MetricName;
use crate::metrics::StatsdClientExt;

pub use self::metadata::MetadataBuilder;
use self::row::{Row, RowCells};
@@ -55,7 +58,9 @@ const MESSAGE_TOPIC_FAMILY: &str = "message_topic";
#[cfg(feature = "reliable_report")]
const RELIABLE_LOG_FAMILY: &str = "reliability";
#[cfg(feature = "reliable_report")]
const RELIABLE_LOG_TTL: u64 = crate::db::MAX_NOTIFICATION_TTL * 2;
/// The maximum TTL for reliability logging (60 days).
/// /// In most use cases, converted to seconds through .num_seconds().
pub const RELIABLE_LOG_TTL: TimeDelta = TimeDelta::days(60);

pub(crate) const RETRY_COUNT: usize = 5;

@@ -270,7 +275,7 @@ fn retryable_internal_err(status: &RpcStatus) -> bool {

pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
let mut metric = metrics
.incr_with_tags("database.retry")
.incr_with_tags(MetricName::DatabaseRetry)
.with_tag("error", err_type)
.with_tag("type", "bigtable");
if let Some(code) = code {
@@ -303,7 +308,7 @@ pub fn retryable_grpcio_err(metrics: &Arc<StatsdClient>) -> impl Fn(&grpcio::Err
metric(
metrics,
"CallFailure",
Some(&format!("{:?}", grpc_call_status)),
Some(&format!("{grpc_call_status:?}")),
);
}
retry
@@ -768,10 +773,7 @@ impl BigTableClientImpl {
"reliable_state",
)?)
.map_err(|e| {
DbError::DeserializeString(format!(
"Could not parse reliable_state {:?}",
e
))
DbError::DeserializeString(format!("Could not parse reliable_state {e:?}"))
})?,
);
}
@@ -799,7 +801,8 @@ impl BigTableClientImpl {
fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
let row_key = user.uaid.simple().to_string();
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL);
let expiry =
std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);

let mut cells: Vec<cell::Cell> = vec![
cell::Cell {
@@ -1005,7 +1008,7 @@ impl DbClient for BigTableClientImpl {
// https://github.com/mozilla-services/autopush-rs/pull/640
trace!("🉑 Dropping an incomplete user record for {}", row_key);
self.metrics
.incr_with_tags("database.drop_user")
.incr_with_tags(MetricName::DatabaseDropUser)
.with_tag("reason", "incomplete_record")
.send();
self.remove_user(uaid).await?;
@@ -1081,7 +1084,8 @@ impl DbClient for BigTableClientImpl {
// easy/efficient
let row_key = uaid.simple().to_string();
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL);
let expiry =
std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);

// Note: updating the version column isn't necessary here because this
// write only adds a new (or updates an existing) column with a 0 byte
@@ -1124,7 +1128,8 @@ impl DbClient for BigTableClientImpl {

// and write a new version cell
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL);
let expiry =
std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
row.cells
.insert(ROUTER_FAMILY.to_owned(), vec![new_version_cell(expiry)]);
mutations.extend(self.get_mutations(row.cells)?);
@@ -1256,7 +1261,7 @@ impl DbClient for BigTableClientImpl {
self.write_row(row).await?;

self.metrics
.incr_with_tags("notification.message.stored")
.incr_with_tags(MetricName::NotificationMessageStored)
.with_tag("topic", &is_topic.to_string())
.with_tag("database", &self.name())
.send();
@@ -1295,7 +1300,8 @@ impl DbClient for BigTableClientImpl {
&row_key,
timestamp.to_be_bytes().to_vec()
);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL);
let expiry =
std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL.num_seconds() as u64);
let mut row = Row::new(row_key.clone());

row.cells.insert(
@@ -1327,7 +1333,7 @@ impl DbClient for BigTableClientImpl {
debug!("🉑🔥 Deleting message {}", &row_key);
self.delete_row(&row_key).await?;
self.metrics
.incr_with_tags("notification.message.deleted")
.incr_with_tags(MetricName::NotificationMessageDeleted)
.with_tag("database", &self.name())
.send();
Ok(())
@@ -1478,7 +1484,7 @@ impl DbClient for BigTableClientImpl {
let row_key = reliability_id.to_owned();

let mut row = Row::new(row_key);
let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL);
let expiry = SystemTime::now() + Duration::from_secs(RELIABLE_LOG_TTL.num_seconds() as u64);

// Log the latest transition time for this id.
let cells: Vec<cell::Cell> = vec![cell::Cell {
2 changes: 1 addition & 1 deletion autopush-common/src/db/bigtable/mod.rs
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ impl TryFrom<&str> for BigTableDbSettings {
type Error = DbError;
fn try_from(setting_string: &str) -> Result<Self, Self::Error> {
let mut me: Self = serde_json::from_str(setting_string)
.map_err(|e| DbError::General(format!("Could not parse DdbSettings: {:?}", e)))?;
.map_err(|e| DbError::General(format!("Could not parse DdbSettings: {e:?}")))?;

if me.table_name.starts_with('/') {
return Err(DbError::ConnectionError(
20 changes: 7 additions & 13 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ use std::{
};

use actix_web::rt;
use async_trait::async_trait;
use cadence::StatsdClient;
use deadpool::managed::{Manager, PoolConfig, PoolError, QueueMode, RecycleError, Timeouts};
use grpcio::{Channel, ChannelBuilder, ChannelCredentials, EnvBuilder};
@@ -66,24 +65,21 @@ impl BigTablePool {
let bt_settings = BigTableDbSettings::try_from(settings.db_settings.as_str())?;
debug!("🉑 DSN: {}", &endpoint);
// Url::parsed() doesn't know how to handle `grpc:` schema, so it returns "null".
let parsed = url::Url::parse(endpoint).map_err(|e| {
DbError::ConnectionError(format!("Invalid DSN: {:?} : {:?}", endpoint, e))
})?;
let parsed = url::Url::parse(endpoint)
.map_err(|e| DbError::ConnectionError(format!("Invalid DSN: {endpoint:?} : {e:?}")))?;
let connection = format!(
"{}:{}",
parsed
.host_str()
.ok_or_else(|| DbError::ConnectionError(format!(
"Invalid DSN: Unparsable host {:?}",
endpoint
"Invalid DSN: Unparsable host {endpoint:?}"
)))?,
parsed.port().unwrap_or(DEFAULT_GRPC_PORT)
);
// Make sure the path is empty.
if !parsed.path().is_empty() {
return Err(DbError::ConnectionError(format!(
"Invalid DSN: Table paths belong in AUTO*_DB_SETTINGS `tab: {:?}",
endpoint
"Invalid DSN: Table paths belong in AUTO*_DB_SETTINGS `tab: {endpoint:?}`"
)));
}
debug!("🉑 connection string {}", &connection);
@@ -175,11 +171,9 @@ impl fmt::Debug for BigtableClientManager {
}
}

#[async_trait]
impl Manager for BigtableClientManager {
type Error = BigTableError;
type Type = BigtableDb;
// TODO: Deadpool 0.11+ introduces new lifetime constratints.

/// Create a new Bigtable Client with it's own channel.
/// `BigtableClient` is the most atomic we can go.
@@ -203,14 +197,14 @@ impl Manager for BigtableClientManager {
if let Some(timeout) = self.settings.database_pool_connection_ttl {
if Instant::now() - metrics.created > timeout {
debug!("🏊 Recycle requested (old).");
return Err(RecycleError::Message("Connection too old".to_owned()));
return Err(RecycleError::message("Connection too old"));
}
}
if let Some(timeout) = self.settings.database_pool_max_idle {
if let Some(recycled) = metrics.recycled {
if Instant::now() - recycled > timeout {
debug!("🏊 Recycle requested (idle).");
return Err(RecycleError::Message("Connection too idle".to_owned()));
return Err(RecycleError::message("Connection too idle"));
}
}
}
@@ -221,7 +215,7 @@ impl Manager for BigtableClientManager {
.inspect_err(|e| debug!("🏊 Recycle requested (health). {:?}", e))?
{
debug!("🏊 Health check failed");
return Err(RecycleError::Message("Health check failed".to_owned()));
return Err(RecycleError::message("Health check failed"));
}

Ok(())
2 changes: 1 addition & 1 deletion autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ pub use reporter::spawn_pool_periodic_reporter;

use crate::notification::Notification;
use crate::util::timing::ms_since_epoch;
use crate::{MAX_NOTIFICATION_TTL, MAX_ROUTER_TTL};
use crate::MAX_ROUTER_TTL;
use models::RangeKey;

pub const USER_RECORD_VERSION: u64 = 1;
4 changes: 2 additions & 2 deletions autopush-common/src/db/models.rs
Original file line number Diff line number Diff line change
@@ -67,8 +67,8 @@ impl RangeKey {
pub(crate) fn parse_chidmessageid(key: &str) -> Result<RangeKey> {
lazy_static! {
static ref RE: RegexSet = RegexSet::new([
format!("^{}:\\S+:\\S+$", TOPIC_NOTIFICATION_PREFIX).as_str(),
format!("^{}:\\d+:\\S+$", STANDARD_NOTIFICATION_PREFIX).as_str(),
format!("^{TOPIC_NOTIFICATION_PREFIX}:\\S+:\\S+$").as_str(),
format!("^{STANDARD_NOTIFICATION_PREFIX}:\\d+:\\S+$").as_str(),
"^\\S{3,}:\\S+$"
])
.unwrap();
2 changes: 2 additions & 0 deletions autopush-common/src/db/routing.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Rust 1.89.0 flags `StorageType` as unused even with `bigtable` set as a default feature.
#[allow(dead_code)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub(crate) enum StorageType {
BigTable,
4 changes: 2 additions & 2 deletions autopush-common/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -34,13 +34,13 @@ pub fn make_endpoint(
base.extend(key_digest.iter());
let encrypted = fernet.encrypt(&base).trim_matches('=').to_string();
let final_url = root.join(&format!("v2/{encrypted}")).map_err(|e| {
ApcErrorKind::GeneralError(format!("Encrypted endpoint data is not URL-safe {:?}", e))
ApcErrorKind::GeneralError(format!("Encrypted endpoint data is not URL-safe {e:?}"))
})?;
Ok(final_url.to_string())
} else {
let encrypted = fernet.encrypt(&base).trim_matches('=').to_string();
let final_url = root.join(&format!("v1/{encrypted}")).map_err(|e| {
ApcErrorKind::GeneralError(format!("Encrypted endpoint data is not URL-safe {:?}", e))
ApcErrorKind::GeneralError(format!("Encrypted endpoint data is not URL-safe {e:?}"))
})?;
Ok(final_url.to_string())
}
Loading