Skip to content

Commit

Permalink
Simplify configuration of the electric app
Browse files Browse the repository at this point in the history
When electric is included as a dependency then the configuration done in
our `config/runtime.exs` is not used, and the application needs to set
electric up itself.

Because we've made every configuration option required in the code
(relying on runtime.exs) this is onerous.

But, because every option except the db connection actually has sensible
defaults, we can simplify this and reduce the required configuration
settings down to the db connection stuff.

The only downside to this is that now we have default values for the
parameters in two locations, runtime.exs and with every
`Application.get_env/3` call.

I took the opportunity to re-factor the configuration stuff and move it
out of the application.ex module.
  • Loading branch information
magnetised committed Oct 25, 2024
1 parent 7de9f1d commit bcce918
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 97 deletions.
5 changes: 5 additions & 0 deletions .changeset/shiny-ties-relate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Simplify configuration by giving all options a default. Now the only required setting is the db connection parameters.
36 changes: 14 additions & 22 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,20 @@ otel_simple_processor =
config :opentelemetry,
processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1)

connection_opts =
if Config.config_env() == :test do
[
hostname: "localhost",
port: 54321,
username: "postgres",
password: "password",
database: "postgres",
sslmode: :disable
]
else
{:ok, database_url_config} =
env!("DATABASE_URL", :string)
|> Electric.ConfigParser.parse_postgresql_uri()

database_ipv6_config =
env!("DATABASE_USE_IPV6", :boolean, false)

database_url_config ++ [ipv6: database_ipv6_config]
end

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)
# only pre-configure the connection opts in test env
# for others, rely on `DATABASE_URL`
if Config.config_env() == :test do
config :electric,
connection_opts:
Electric.Utils.obfuscate_password(
hostname: "localhost",
port: 54321,
username: "postgres",
password: "password",
database: "postgres",
sslmode: :disable
)
end

enable_integration_testing = env!("ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("CACHE_MAX_AGE", :integer, 60)
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Electric do
"""
@spec instance_id() :: binary | no_return
def instance_id do
Application.fetch_env!(:electric, :instance_id)
Application.get_env(:electric, :instance_id, :default)
end

@type relation :: {schema :: String.t(), table :: String.t()}
Expand Down
69 changes: 5 additions & 64 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Electric.Application do
def start(_type, _args) do
:erlang.system_flag(:backtrace_depth, 50)

config = configure()
config = Electric.Application.Configuration.load()

shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)

Expand Down Expand Up @@ -70,13 +70,13 @@ defmodule Electric.Application do
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
max_age: Application.get_env(:electric, :cache_max_age, 60),
stale_age: Application.get_env(:electric, :cache_stale_age, 60 * 5),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
port: Application.fetch_env!(:electric, :service_port),
port: Application.get_env(:electric, :service_port, 3000),
thousand_island_options: http_listener_options()}
],
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)),
prometheus_endpoint(Application.get_env(:electric, :prometheus_port, nil)),
[{Electric.Connection.Supervisor, connection_manager_opts}]
])

Expand All @@ -86,65 +86,6 @@ defmodule Electric.Application do
)
end

# This function is called once in the application's start() callback. It reads configuration
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
defp configure do
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)

{storage_module, storage_in_opts} = Application.fetch_env!(:electric, :storage)
storage_opts = storage_module.shared_opts(storage_in_opts)
storage = {storage_module, storage_opts}

{kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv)
persistent_kv = apply(kv_module, kv_fun, [kv_params])

replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id)
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

get_pg_version_fn = fn ->
Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager)
end

prepare_tables_mfa =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version_fn, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector}

shape_cache_opts = [
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges
]

config = %Electric.Application.Configuration{
electric_instance_id: electric_instance_id,
storage: storage,
persistent_kv: persistent_kv,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
replication_opts: %{
stream_id: replication_stream_id,
publication_name: publication_name,
slot_name: slot_name
},
pool_opts: %{
size: Application.fetch_env!(:electric, :db_pool_size)
},
inspector: inspector,
shape_cache_opts: shape_cache_opts
}

Electric.Application.Configuration.save(config)
end

defp prometheus_endpoint(nil), do: []

defp prometheus_endpoint(port) do
Expand Down
119 changes: 111 additions & 8 deletions packages/sync-service/lib/electric/application/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,122 @@ defmodule Electric.Application.Configuration do

@persistent_key __MODULE__

@spec save(t) :: t
def save(config) do
# Make sure the application configuration is only stored once.
# This function is called once in the application's start() callback. It reads configuration
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
@spec load() :: t()
def load do
try do
_ = :persistent_term.get(@persistent_key)
raise "Trying to overwrite previously stored application configuration"
# Make sure the application configuration is only stored once.
_config = :persistent_term.get(@persistent_key)
rescue
ArgumentError ->
:ok = :persistent_term.put(@persistent_key, config)
config
build() |> save()
end
end

@spec get :: t
defp build do
electric_instance_id = Application.get_env(:electric, :electric_instance_id, :default)

{storage_module, storage_in_opts} =
Application.get_env(:electric, :storage, default_storage(electric_instance_id))

storage_opts = storage_module.shared_opts(storage_in_opts)
storage = {storage_module, storage_opts}

{kv_module, kv_fun, kv_params} =
Application.get_env(:electric, :persistent_kv, default_persistent_kv())

persistent_kv = apply(kv_module, kv_fun, [kv_params])

replication_stream_id = Application.get_env(:electric, :replication_stream_id, "default")
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

get_pg_version_fn = fn ->
Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager)
end

prepare_tables_mfa =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version_fn, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector}

shape_cache_opts = [
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold:
Application.get_env(
:electric,
:chunk_bytes_threshold,
Electric.ShapeCache.LogChunker.default_chunk_size_threshold()
),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges
]

%Electric.Application.Configuration{
electric_instance_id: electric_instance_id,
storage: storage,
persistent_kv: persistent_kv,
connection_opts: connection_opts(),
replication_opts: %{
stream_id: replication_stream_id,
publication_name: publication_name,
slot_name: slot_name
},
pool_opts: %{
size: Application.get_env(:electric, :db_pool_size, 20)
},
inspector: inspector,
shape_cache_opts: shape_cache_opts
}
end

defp connection_opts do
if connection_opts = Application.get_env(:electric, :connection_opts, nil) do
connection_opts
else
database_url = Dotenvy.env!("DATABASE_URL", :string)

case Electric.ConfigParser.parse_postgresql_uri(database_url) |> dbg do
{:ok, database_url_config} ->
database_ipv6_config =
Dotenvy.env!("DATABASE_USE_IPV6", :boolean, false)

database_url_config ++ [ipv6: database_ipv6_config]

{:error, reason} ->
raise RuntimeError, message: "Invalid DATABASE_URL: #{reason}"
end
end
|> Electric.Utils.obfuscate_password()
end

defp default_persistent_kv do
{Electric.PersistentKV.Filesystem, :new!, root: storage_dir("state")}
end

defp default_storage(instance_id) do
{Electric.ShapeCache.FileStorage,
storage_dir: storage_dir("shapes"), electric_instance_id: instance_id}
end

defp storage_dir(sub_dir) do
Path.join("./persistent", sub_dir)
end

@spec save(t()) :: t()
defp save(config) do
:ok = :persistent_term.put(@persistent_key, config)
config
end

@spec get() :: t()
def get, do: :persistent_term.get(@persistent_key)
end
5 changes: 3 additions & 2 deletions packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Electric.Telemetry do
use Supervisor

import Telemetry.Metrics

def start_link(init_arg) do
Expand All @@ -12,8 +13,8 @@ defmodule Electric.Telemetry do
]

children
|> add_statsd_reporter(Application.fetch_env!(:electric, :telemetry_statsd_host))
|> add_prometheus_reporter(Application.fetch_env!(:electric, :prometheus_port))
|> add_statsd_reporter(Application.get_env(:electric, :telemetry_statsd_host, nil))
|> add_prometheus_reporter(Application.get_env(:electric, :prometheus_port, nil))
|> Supervisor.init(strategy: :one_for_one)
end

Expand Down

0 comments on commit bcce918

Please sign in to comment.