diff --git a/.changeset/shiny-ties-relate.md b/.changeset/shiny-ties-relate.md new file mode 100644 index 0000000000..ed15098fec --- /dev/null +++ b/.changeset/shiny-ties-relate.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Simplify configuration by giving all options a default. Now the only required setting is the db connection parameters. diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 1e08d6a7a7..319c0f481d 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -81,28 +81,20 @@ otel_simple_processor = config :opentelemetry, processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1) -connection_opts = - if Config.config_env() == :test do - [ - hostname: "localhost", - port: 54321, - username: "postgres", - password: "password", - database: "postgres", - sslmode: :disable - ] - else - {:ok, database_url_config} = - env!("DATABASE_URL", :string) - |> Electric.ConfigParser.parse_postgresql_uri() - - database_ipv6_config = - env!("DATABASE_USE_IPV6", :boolean, false) - - database_url_config ++ [ipv6: database_ipv6_config] - end - -config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts) +# only pre-configure the connection opts in test env +# for others, rely on `DATABASE_URL` +if Config.config_env() == :test do + config :electric, + connection_opts: + Electric.Utils.obfuscate_password( + hostname: "localhost", + port: 54321, + username: "postgres", + password: "password", + database: "postgres", + sslmode: :disable + ) +end enable_integration_testing = env!("ENABLE_INTEGRATION_TESTING", :boolean, false) cache_max_age = env!("CACHE_MAX_AGE", :integer, 60) diff --git a/packages/sync-service/lib/electric.ex b/packages/sync-service/lib/electric.ex index 61c98f9fc1..9c1404f07d 100644 --- a/packages/sync-service/lib/electric.ex +++ b/packages/sync-service/lib/electric.ex @@ -6,7 +6,7 @@ defmodule Electric do """ @spec instance_id() :: binary | no_return def instance_id do - Application.fetch_env!(:electric, :instance_id) + Application.get_env(:electric, :instance_id, :default) end @type relation :: {schema :: String.t(), table :: String.t()} diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index efb2f7974f..91090ab849 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -18,7 +18,7 @@ defmodule Electric.Application do def start(_type, _args) do :erlang.system_flag(:backtrace_depth, 50) - config = configure() + config = Electric.Application.Configuration.load() shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id) @@ -70,13 +70,13 @@ defmodule Electric.Application do get_service_status: &Electric.ServiceStatus.check/0, inspector: config.inspector, long_poll_timeout: 20_000, - max_age: Application.fetch_env!(:electric, :cache_max_age), - stale_age: Application.fetch_env!(:electric, :cache_stale_age), + max_age: Application.get_env(:electric, :cache_max_age, 60), + stale_age: Application.get_env(:electric, :cache_stale_age, 60 * 5), allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)}, - port: Application.fetch_env!(:electric, :service_port), + port: Application.get_env(:electric, :service_port, 3000), thousand_island_options: http_listener_options()} ], - prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)), + prometheus_endpoint(Application.get_env(:electric, :prometheus_port, nil)), [{Electric.Connection.Supervisor, connection_manager_opts}] ]) @@ -86,65 +86,6 @@ defmodule Electric.Application do ) end - # This function is called once in the application's start() callback. It reads configuration - # from the OTP application env, runs some pre-processing functions and stores the processed - # configuration as a single map using `:persistent_term`. - defp configure do - electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id) - - {storage_module, storage_in_opts} = Application.fetch_env!(:electric, :storage) - storage_opts = storage_module.shared_opts(storage_in_opts) - storage = {storage_module, storage_opts} - - {kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv) - persistent_kv = apply(kv_module, kv_fun, [kv_params]) - - replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id) - publication_name = "electric_publication_#{replication_stream_id}" - slot_name = "electric_slot_#{replication_stream_id}" - - get_pg_version_fn = fn -> - Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager) - end - - prepare_tables_mfa = - {Electric.Postgres.Configuration, :configure_tables_for_replication!, - [get_pg_version_fn, publication_name]} - - inspector = - {Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector} - - shape_cache_opts = [ - electric_instance_id: electric_instance_id, - storage: storage, - inspector: inspector, - prepare_tables_fn: prepare_tables_mfa, - chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold), - log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id), - consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id), - registry: Registry.ShapeChanges - ] - - config = %Electric.Application.Configuration{ - electric_instance_id: electric_instance_id, - storage: storage, - persistent_kv: persistent_kv, - connection_opts: Application.fetch_env!(:electric, :connection_opts), - replication_opts: %{ - stream_id: replication_stream_id, - publication_name: publication_name, - slot_name: slot_name - }, - pool_opts: %{ - size: Application.fetch_env!(:electric, :db_pool_size) - }, - inspector: inspector, - shape_cache_opts: shape_cache_opts - } - - Electric.Application.Configuration.save(config) - end - defp prometheus_endpoint(nil), do: [] defp prometheus_endpoint(port) do diff --git a/packages/sync-service/lib/electric/application/configuration.ex b/packages/sync-service/lib/electric/application/configuration.ex index 36cbd45c64..17c92da17e 100644 --- a/packages/sync-service/lib/electric/application/configuration.ex +++ b/packages/sync-service/lib/electric/application/configuration.ex @@ -19,19 +19,122 @@ defmodule Electric.Application.Configuration do @persistent_key __MODULE__ - @spec save(t) :: t - def save(config) do - # Make sure the application configuration is only stored once. + # This function is called once in the application's start() callback. It reads configuration + # from the OTP application env, runs some pre-processing functions and stores the processed + # configuration as a single map using `:persistent_term`. + @spec load() :: t() + def load do try do - _ = :persistent_term.get(@persistent_key) - raise "Trying to overwrite previously stored application configuration" + # Make sure the application configuration is only stored once. + _config = :persistent_term.get(@persistent_key) rescue ArgumentError -> - :ok = :persistent_term.put(@persistent_key, config) - config + build() |> save() end end - @spec get :: t + defp build do + electric_instance_id = Application.get_env(:electric, :electric_instance_id, :default) + + {storage_module, storage_in_opts} = + Application.get_env(:electric, :storage, default_storage(electric_instance_id)) + + storage_opts = storage_module.shared_opts(storage_in_opts) + storage = {storage_module, storage_opts} + + {kv_module, kv_fun, kv_params} = + Application.get_env(:electric, :persistent_kv, default_persistent_kv()) + + persistent_kv = apply(kv_module, kv_fun, [kv_params]) + + replication_stream_id = Application.get_env(:electric, :replication_stream_id, "default") + publication_name = "electric_publication_#{replication_stream_id}" + slot_name = "electric_slot_#{replication_stream_id}" + + get_pg_version_fn = fn -> + Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager) + end + + prepare_tables_mfa = + {Electric.Postgres.Configuration, :configure_tables_for_replication!, + [get_pg_version_fn, publication_name]} + + inspector = + {Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector} + + shape_cache_opts = [ + electric_instance_id: electric_instance_id, + storage: storage, + inspector: inspector, + prepare_tables_fn: prepare_tables_mfa, + chunk_bytes_threshold: + Application.get_env( + :electric, + :chunk_bytes_threshold, + Electric.ShapeCache.LogChunker.default_chunk_size_threshold() + ), + log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id), + consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id), + registry: Registry.ShapeChanges + ] + + %Electric.Application.Configuration{ + electric_instance_id: electric_instance_id, + storage: storage, + persistent_kv: persistent_kv, + connection_opts: connection_opts(), + replication_opts: %{ + stream_id: replication_stream_id, + publication_name: publication_name, + slot_name: slot_name + }, + pool_opts: %{ + size: Application.get_env(:electric, :db_pool_size, 20) + }, + inspector: inspector, + shape_cache_opts: shape_cache_opts + } + end + + defp connection_opts do + if connection_opts = Application.get_env(:electric, :connection_opts, nil) do + connection_opts + else + database_url = Dotenvy.env!("DATABASE_URL", :string) + + case Electric.ConfigParser.parse_postgresql_uri(database_url) |> dbg do + {:ok, database_url_config} -> + database_ipv6_config = + Dotenvy.env!("DATABASE_USE_IPV6", :boolean, false) + + database_url_config ++ [ipv6: database_ipv6_config] + + {:error, reason} -> + raise RuntimeError, message: "Invalid DATABASE_URL: #{reason}" + end + end + |> Electric.Utils.obfuscate_password() + end + + defp default_persistent_kv do + {Electric.PersistentKV.Filesystem, :new!, root: storage_dir("state")} + end + + defp default_storage(instance_id) do + {Electric.ShapeCache.FileStorage, + storage_dir: storage_dir("shapes"), electric_instance_id: instance_id} + end + + defp storage_dir(sub_dir) do + Path.join("./persistent", sub_dir) + end + + @spec save(t()) :: t() + defp save(config) do + :ok = :persistent_term.put(@persistent_key, config) + config + end + + @spec get() :: t() def get, do: :persistent_term.get(@persistent_key) end diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry.ex index 36068fbb62..543f430cf5 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry.ex @@ -1,5 +1,6 @@ defmodule Electric.Telemetry do use Supervisor + import Telemetry.Metrics def start_link(init_arg) do @@ -12,8 +13,8 @@ defmodule Electric.Telemetry do ] children - |> add_statsd_reporter(Application.fetch_env!(:electric, :telemetry_statsd_host)) - |> add_prometheus_reporter(Application.fetch_env!(:electric, :prometheus_port)) + |> add_statsd_reporter(Application.get_env(:electric, :telemetry_statsd_host, nil)) + |> add_prometheus_reporter(Application.get_env(:electric, :prometheus_port, nil)) |> Supervisor.init(strategy: :one_for_one) end