Skip to content

Commit 9d260fb

Browse files
committed
all: Make number of copy workers configurable
1 parent 716138a commit 9d260fb

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

Diff for: docs/environment-variables.md

+4
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ those.
231231
timeout is hit, the batch size is reset to 1 so we can be sure that
232232
batches stay below `GRAPH_STORE_BATCH_TARGET_DURATION` and the smaller
233233
batch is retried. Value is in seconds and defaults to unlimited.
234+
- `GRAPH_STORE_BATCH_WORKERS`: The number of workers to use for batch
235+
operations. If there are idle connectiosn, each subgraph copy operation
236+
will use up to this many workers to copy tables in parallel. Defaults
237+
to 1 and must be at least 1
234238
- `GRAPH_START_BLOCK`: block hash:block number where the forked subgraph will start indexing at.
235239
- `GRAPH_FORK_BASE`: api url for where the graph node will fork from, use `https://api.thegraph.com/subgraphs/id/`
236240
for the hosted service.

Diff for: graph/src/env/store.rs

+12
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ pub struct EnvVarsStore {
8585
/// this. Set by `GRAPH_STORE_BATCH_TIMEOUT`. Unlimited by default
8686
pub batch_timeout: Option<Duration>,
8787

88+
/// The number of workers to use for batch operations. If there are idle
89+
/// connectiosn, each subgraph copy operation will use up to this many
90+
/// workers to copy tables in parallel. Defaults to 1 and must be at
91+
/// least 1
92+
pub batch_workers: usize,
93+
8894
/// Prune tables where we will remove at least this fraction of entity
8995
/// versions by rebuilding the table. Set by
9096
/// `GRAPH_STORE_HISTORY_REBUILD_THRESHOLD`. The default is 0.5
@@ -175,6 +181,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
175181
write_queue_size: x.write_queue_size,
176182
batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs),
177183
batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs),
184+
batch_workers: x.batch_workers,
178185
rebuild_threshold: x.rebuild_threshold.0,
179186
delete_threshold: x.delete_threshold.0,
180187
history_slack_factor: x.history_slack_factor.0,
@@ -194,6 +201,9 @@ impl TryFrom<InnerStore> for EnvVarsStore {
194201
);
195202
}
196203
}
204+
if vars.batch_workers < 1 {
205+
bail!("GRAPH_STORE_BATCH_WORKERS must be at least 1");
206+
}
197207
Ok(vars)
198208
}
199209
}
@@ -239,6 +249,8 @@ pub struct InnerStore {
239249
batch_target_duration_in_secs: u64,
240250
#[envconfig(from = "GRAPH_STORE_BATCH_TIMEOUT")]
241251
batch_timeout_in_secs: Option<u64>,
252+
#[envconfig(from = "GRAPH_STORE_BATCH_WORKERS", default = "1")]
253+
batch_workers: usize,
242254
#[envconfig(from = "GRAPH_STORE_HISTORY_REBUILD_THRESHOLD", default = "0.5")]
243255
rebuild_threshold: ZeroToOneF64,
244256
#[envconfig(from = "GRAPH_STORE_HISTORY_DELETE_THRESHOLD", default = "0.05")]

Diff for: store/postgres/src/copy.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ impl Connection {
840840
logger,
841841
conn,
842842
pool,
843-
workers: 5,
843+
workers: ENV_VARS.store.batch_workers,
844844
src,
845845
dst,
846846
target_block,
@@ -945,7 +945,7 @@ impl Connection {
945945
// and there are more tables to be copied, we can start more
946946
// workers, up to `self.workers` many
947947
let mut workers = Vec::new();
948-
while !state.unfinished.is_empty() && !workers.is_empty() {
948+
while !state.unfinished.is_empty() || !workers.is_empty() {
949949
// We usually add at least one job here, except if we are out of
950950
// tables to copy. In that case, we go through the `while` loop
951951
// every time one of the tables we are currently copying

0 commit comments

Comments
 (0)