Skip to content

Commit 18e5d62

Browse files
committed
add identity janitor
1 parent 7b3c863 commit 18e5d62

File tree

6 files changed

+254
-24
lines changed

6 files changed

+254
-24
lines changed

config/config.exs

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,26 @@ config :vrhose,
1414
config :vrhose,
1515
ecto_repos: [VRHose.Repo]
1616

17-
config :vrhose, VRHose.Repo, database: "vrhose_#{Mix.env()}.db"
17+
repos = [
18+
VRHose.Repo,
19+
VRHose.Repo.Replica1,
20+
VRHose.Repo.Replica2,
21+
VRHose.Repo.Replica3,
22+
VRHose.Repo.Replica4,
23+
VRHose.Repo.JanitorReplica
24+
]
25+
26+
for repo <- repos do
27+
config :vrhose, repo,
28+
cache_size: -8_000,
29+
pool_size: 1,
30+
auto_vacuum: :incremental,
31+
telemetry_prefix: [:vrhose, :repo],
32+
telemetry_event: [VRHose.Repo.Instrumenter],
33+
queue_target: 500,
34+
queue_interval: 2000,
35+
database: "vrhose_#{Mix.env()}.db"
36+
end
1837

1938
# Configures the endpoint
2039
config :vrhose, VRHoseWeb.Endpoint,

lib/vrhose/application.ex

+67-17
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,25 @@ defmodule VRHose.Application do
2121

2222
children =
2323
[
24-
VRHose.Repo,
25-
VRHoseWeb.Telemetry,
26-
{VRHose.QuickLeader, name: VRHose.QuickLeader},
27-
{Finch,
28-
name: VRHose.Finch,
29-
pools: %{
30-
:default => [size: 50, count: 50]
31-
}},
32-
{DNSCluster, query: Application.get_env(:vrhose, :dns_cluster_query) || :ignore},
33-
{Phoenix.PubSub, name: VRHose.PubSub},
34-
{
35-
Registry,
36-
# , partitions: System.schedulers_online()},
37-
keys: :duplicate, name: Registry.Timeliners
38-
},
39-
{ExHashRing.Ring, name: VRHose.Hydrator.Ring}
24+
VRHoseWeb.Telemetry
4025
] ++
26+
repos() ++
27+
[
28+
{VRHose.QuickLeader, name: VRHose.QuickLeader},
29+
{Finch,
30+
name: VRHose.Finch,
31+
pools: %{
32+
:default => [size: 50, count: 50]
33+
}},
34+
{DNSCluster, query: Application.get_env(:vrhose, :dns_cluster_query) || :ignore},
35+
{Phoenix.PubSub, name: VRHose.PubSub},
36+
{
37+
Registry,
38+
# , partitions: System.schedulers_online()},
39+
keys: :duplicate, name: Registry.Timeliners
40+
},
41+
{ExHashRing.Ring, name: VRHose.Hydrator.Ring}
42+
] ++
4143
hydration_workers() ++
4244
[
4345
{VRHose.Ingestor, name: {:global, VRHose.Ingestor}},
@@ -53,16 +55,51 @@ defmodule VRHose.Application do
5355
id: "websocket"
5456
},
5557
VRHoseWeb.Endpoint
56-
] ++ timeliner_workers()
58+
] ++ timeliner_workers() ++ janitor_workers()
5759

5860
start_telemetry()
61+
IO.inspect(children, label: "application tree")
5962

6063
# See https://hexdocs.pm/elixir/Supervisor.html
6164
# for other strategies and supported options
6265
opts = [strategy: :one_for_one, name: VRHose.Supervisor, max_restarts: 10]
6366
Supervisor.start_link(children, opts)
6467
end
6568

69+
def primaries() do
70+
Application.fetch_env!(:vrhose, :ecto_repos)
71+
end
72+
73+
defp repos() do
74+
Application.fetch_env!(:vrhose, :ecto_repos)
75+
|> Enum.map(fn primary ->
76+
primary
77+
|> to_string
78+
|> then(fn
79+
"Elixir.VRHose.Repo" <> _ ->
80+
spec = primary.repo_spec()
81+
[primary] ++ spec.read_replicas ++ spec.dedicated_replicas
82+
83+
_ ->
84+
[]
85+
end)
86+
end)
87+
|> Enum.reduce(fn x, acc -> x ++ acc end)
88+
|> Enum.map(fn repo ->
89+
case Application.fetch_env(:vrhose, repo) do
90+
:error ->
91+
raise RuntimeError, "Repo #{repo} not configured"
92+
93+
{:ok, cfg} ->
94+
if Access.get(cfg, :database) == nil do
95+
raise RuntimeError, "Repo #{repo} not configured. missing database"
96+
end
97+
98+
repo
99+
end
100+
end)
101+
end
102+
66103
def hydration_workers() do
67104
1..20
68105
|> Enum.map(fn i ->
@@ -129,6 +166,19 @@ defmodule VRHose.Application do
129166
Logger.info("telemetry started!")
130167
end
131168

169+
defp janitor_specs do
170+
[
171+
[VRHose.Identity.Janitor, [every: 8 * 60, jitter: -60..60]]
172+
]
173+
end
174+
175+
defp janitor_workers do
176+
janitor_specs()
177+
|> Enum.map(fn [module, opts] ->
178+
VRHose.Tinycron.new(module, opts)
179+
end)
180+
end
181+
132182
# Tell Phoenix to update the endpoint configuration
133183
# whenever the application is updated.
134184
@impl true

lib/vrhose/identity.ex

+44-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ defmodule VRHose.Identity do
2222

2323
def one(did) do
2424
query = from(s in __MODULE__, where: s.did == ^did, select: s)
25-
Repo.one(query, log: false)
25+
Repo.replica(did).one(query, log: false)
2626
end
2727

2828
def fake(did) do
@@ -54,4 +54,47 @@ defmodule VRHose.Identity do
5454
log: false
5555
)
5656
end
57+
58+
defmodule Janitor do
59+
require Logger
60+
61+
import Ecto.Query
62+
alias VRHose.Identity
63+
alias VRHose.Repo.JanitorReplica
64+
65+
def tick() do
66+
Logger.info("cleaning identities...")
67+
68+
expiry_time =
69+
NaiveDateTime.utc_now()
70+
|> NaiveDateTime.add(-1, :day)
71+
|> DateTime.from_naive!("Etc/UTC")
72+
|> DateTime.to_unix()
73+
74+
deleted_count =
75+
from(s in Identity,
76+
where:
77+
fragment("unixepoch(?)", s.inserted_at) <
78+
^expiry_time,
79+
limit: 1000
80+
)
81+
|> JanitorReplica.all()
82+
|> Enum.chunk_every(10)
83+
|> Enum.map(fn chunk ->
84+
chunk
85+
|> Enum.map(fn identity ->
86+
Repo.delete(identity)
87+
1
88+
end)
89+
|> then(fn count ->
90+
:timer.sleep(1500)
91+
count
92+
end)
93+
|> Enum.sum()
94+
end)
95+
|> Enum.sum()
96+
97+
Logger.info("deleted #{deleted_count} identities")
98+
end
99+
end
57100
end

lib/vrhose/repo.ex

+17-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
11
defmodule VRHose.Repo do
2-
use Ecto.Repo,
3-
otp_app: :vrhose,
4-
adapter: Ecto.Adapters.SQLite3,
5-
pool_size: 1,
6-
loggers: [VRHose.Repo.Instrumenter, Ecto.LogEntry]
2+
use VRHose.Repo.Base,
3+
primary: VRHose.Repo,
4+
read_replicas: [
5+
VRHose.Repo.Replica1,
6+
VRHose.Repo.Replica2,
7+
VRHose.Repo.Replica3,
8+
VRHose.Repo.Replica4
9+
],
10+
dedicated_replicas: [
11+
VRHose.Repo.JanitorReplica
12+
]
13+
14+
# use Ecto.Repo,
15+
# otp_app: :vrhose,
16+
# adapter: Ecto.Adapters.SQLite3,
17+
# pool_size: 1,
18+
# loggers: [VRHose.Repo.Instrumenter, Ecto.LogEntry]
719

820
defmodule Instrumenter do
921
use Prometheus.EctoInstrumenter

lib/vrhose/repo_base.ex

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
defmodule VRHose.Repo.Base do
2+
defmacro __using__(opts) do
3+
quote bind_quoted: [opts: opts] do
4+
use Ecto.Repo,
5+
otp_app: :vrhose,
6+
adapter: Ecto.Adapters.SQLite3,
7+
8+
# sqlite does not do multi-writer. pool_size is effectively one,
9+
# if it's larger than one, then Database Busy errors haunt you
10+
# the trick to make concurrency happen is to create "read replicas"
11+
# that are effectively a pool of readers. this works because we're in WAL mode
12+
pool_size: 1,
13+
loggers: [VRHose.Repo.Instrumenter, Ecto.LogEntry]
14+
15+
@read_replicas opts[:read_replicas]
16+
@dedicated_replicas opts[:dedicated_replicas]
17+
18+
def repo_spec do
19+
%{read_replicas: @read_replicas, dedicated_replicas: @dedicated_replicas}
20+
end
21+
22+
def replica() do
23+
Enum.random(@read_replicas)
24+
end
25+
26+
def replica(identifier)
27+
when is_number(identifier) or is_bitstring(identifier) or is_atom(identifier) do
28+
@read_replicas |> Enum.at(rem(identifier |> :erlang.phash2(), length(@read_replicas)))
29+
end
30+
31+
for repo <- @read_replicas ++ @dedicated_replicas do
32+
default_dynamic_repo =
33+
if Mix.env() == :test do
34+
opts[:primary]
35+
else
36+
repo
37+
end
38+
39+
defmodule repo do
40+
use Ecto.Repo,
41+
otp_app: :vrhose,
42+
adapter: Ecto.Adapters.SQLite3,
43+
pool_size: 1,
44+
loggers: [VRHose.Repo.Instrumenter, Ecto.LogEntry],
45+
read_only: true,
46+
default_dynamic_repo: default_dynamic_repo
47+
end
48+
end
49+
end
50+
end
51+
end

lib/vrhose/tinycron.ex

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
defmodule VRHose.Tinycron do
2+
use GenServer
3+
require Logger
4+
5+
def new(module, opts \\ []) do
6+
%{
7+
id: module,
8+
start: {__MODULE__, :start_link, [module, opts |> Keyword.put(:name, module)]}
9+
}
10+
end
11+
12+
def start_link(module, opts \\ []) do
13+
GenServer.start_link(__MODULE__, [module, opts], opts)
14+
end
15+
16+
def noop(mod, value) do
17+
GenServer.cast(mod, {:noop, value})
18+
end
19+
20+
@impl true
21+
def init([module, opts]) do
22+
state = %{module: module, opts: opts}
23+
schedule_work(state)
24+
{:ok, state}
25+
end
26+
27+
@impl true
28+
def handle_info(:work, %{module: module} = state) do
29+
unless state |> Map.get(:noop, false) do
30+
Logger.debug("running #{inspect(state.module)}")
31+
module.tick()
32+
end
33+
34+
schedule_work(state)
35+
{:noreply, state}
36+
end
37+
38+
@impl true
39+
def handle_cast({:noop, value}, state) do
40+
{:noreply, state |> Map.put(:noop, value)}
41+
end
42+
43+
defp schedule_work(state) do
44+
every_seconds = state.opts |> Keyword.get(:every) || 10
45+
jitter_seconds_range = state.opts |> Keyword.get(:jitter) || -2..2
46+
first..last//step = jitter_seconds_range
47+
# turn it into milliseconds for greater jitter possibilities
48+
jitter_milliseconds_range = (first * 1000)..(last * 1000)//step
49+
50+
# prevent jitter from creating negative next_tick_time by doing max(0, next_tick_time)
51+
next_tick_time = max(0, every_seconds * 1000 + Enum.random(jitter_milliseconds_range))
52+
Logger.debug("scheduling #{inspect(state.module)} in #{next_tick_time}ms")
53+
Process.send_after(self(), :work, next_tick_time)
54+
end
55+
end

0 commit comments

Comments
 (0)