@@ -12,7 +12,7 @@ use lru_time_cache::LruCache;
12
12
use serde_json:: Value ;
13
13
14
14
use graph:: {
15
- ipfs_client:: { IpfsClient , StatApi , StatResponse } ,
15
+ ipfs_client:: { IpfsClient , StatApi } ,
16
16
prelude:: { LinkResolver as LinkResolverTrait , * } ,
17
17
} ;
18
18
@@ -55,7 +55,7 @@ async fn select_fastest_client_with_stat(
55
55
path : String ,
56
56
timeout : Duration ,
57
57
do_retry : bool ,
58
- ) -> Result < ( StatResponse , Arc < IpfsClient > ) , Error > {
58
+ ) -> Result < ( u64 , Arc < IpfsClient > ) , Error > {
59
59
let mut err: Option < Error > = None ;
60
60
61
61
let mut stats: FuturesUnordered < _ > = clients
@@ -67,7 +67,11 @@ async fn select_fastest_client_with_stat(
67
67
retry_policy ( do_retry, "IPFS stat" , & logger) . run ( move || {
68
68
let path = path. clone ( ) ;
69
69
let c = c. cheap_clone ( ) ;
70
- async move { c. stat ( api, path, timeout) . map_ok ( move |s| ( s, i) ) . await }
70
+ async move {
71
+ c. stat_size ( api, path, timeout)
72
+ . map_ok ( move |s| ( s, i) )
73
+ . await
74
+ }
71
75
} )
72
76
} )
73
77
. collect ( ) ;
@@ -90,18 +94,14 @@ async fn select_fastest_client_with_stat(
90
94
}
91
95
92
96
// Returns an error if the stat is bigger than `max_file_bytes`
93
- fn restrict_file_size (
94
- path : & str ,
95
- stat : & StatResponse ,
96
- max_file_bytes : & Option < u64 > ,
97
- ) -> Result < ( ) , Error > {
97
+ fn restrict_file_size ( path : & str , size : u64 , max_file_bytes : & Option < u64 > ) -> Result < ( ) , Error > {
98
98
if let Some ( max_file_bytes) = max_file_bytes {
99
- if stat . size > * max_file_bytes {
99
+ if size > * max_file_bytes {
100
100
return Err ( anyhow ! (
101
101
"IPFS file {} is too large. It can be at most {} bytes but is {} bytes" ,
102
102
path,
103
103
max_file_bytes,
104
- stat . size
104
+ size
105
105
) ) ;
106
106
}
107
107
}
@@ -172,10 +172,10 @@ impl LinkResolverTrait for LinkResolver {
172
172
}
173
173
trace ! ( logger, "IPFS cache miss" ; "hash" => & path) ;
174
174
175
- let ( stat , client) = select_fastest_client_with_stat (
175
+ let ( size , client) = select_fastest_client_with_stat (
176
176
self . clients . cheap_clone ( ) ,
177
177
logger. cheap_clone ( ) ,
178
- StatApi :: Dag ,
178
+ StatApi :: Files ,
179
179
path. clone ( ) ,
180
180
self . timeout ,
181
181
self . retry ,
@@ -184,44 +184,40 @@ impl LinkResolverTrait for LinkResolver {
184
184
185
185
let max_cache_file_size = self . env_vars . mappings . max_ipfs_cache_file_size ;
186
186
let max_file_size = self . env_vars . mappings . max_ipfs_file_bytes . map ( |n| n as u64 ) ;
187
- restrict_file_size ( & path, & stat , & max_file_size) ?;
187
+ restrict_file_size ( & path, size , & max_file_size) ?;
188
188
189
- let path = path. clone ( ) ;
190
- let this = self . clone ( ) ;
189
+ let req_path = path. clone ( ) ;
191
190
let timeout = self . timeout ;
192
- let logger = logger. clone ( ) ;
193
191
let data = retry_policy ( self . retry , "ipfs.cat" , & logger)
194
192
. run ( move || {
195
- let path = path . clone ( ) ;
193
+ let path = req_path . clone ( ) ;
196
194
let client = client. clone ( ) ;
197
- let this = this. clone ( ) ;
198
- let logger = logger. clone ( ) ;
199
- async move {
200
- let data = client. cat_all ( path. clone ( ) , timeout) . await ?. to_vec ( ) ;
201
-
202
- // Only cache files if they are not too large
203
- if data. len ( ) <= max_cache_file_size {
204
- let mut cache = this. cache . lock ( ) . unwrap ( ) ;
205
- if !cache. contains_key ( & path) {
206
- cache. insert ( path. to_owned ( ) , data. clone ( ) ) ;
207
- }
208
- } else {
209
- debug ! ( logger, "File too large for cache" ;
210
- "path" => path,
211
- "size" => data. len( )
212
- ) ;
213
- }
214
- Result :: < Vec < u8 > , reqwest:: Error > :: Ok ( data)
215
- }
195
+ async move { Ok ( client. cat_all ( path. clone ( ) , timeout) . await ?. to_vec ( ) ) }
216
196
} )
217
197
. await ?;
218
198
199
+ // The size reported by `files/stat` is not guaranteed to be exact, so check the limit again.
200
+ restrict_file_size ( & path, data. len ( ) as u64 , & max_file_size) ?;
201
+
202
+ // Only cache files if they are not too large
203
+ if data. len ( ) <= max_cache_file_size {
204
+ let mut cache = self . cache . lock ( ) . unwrap ( ) ;
205
+ if !cache. contains_key ( & path) {
206
+ cache. insert ( path. to_owned ( ) , data. clone ( ) ) ;
207
+ }
208
+ } else {
209
+ debug ! ( logger, "File too large for cache" ;
210
+ "path" => path,
211
+ "size" => data. len( )
212
+ ) ;
213
+ }
214
+
219
215
Ok ( data)
220
216
}
221
217
222
218
async fn get_block ( & self , logger : & Logger , link : & Link ) -> Result < Vec < u8 > , Error > {
223
219
trace ! ( logger, "IPFS block get" ; "hash" => & link. link) ;
224
- let ( stat , client) = select_fastest_client_with_stat (
220
+ let ( size , client) = select_fastest_client_with_stat (
225
221
self . clients . cheap_clone ( ) ,
226
222
logger. cheap_clone ( ) ,
227
223
StatApi :: Block ,
@@ -232,7 +228,7 @@ impl LinkResolverTrait for LinkResolver {
232
228
. await ?;
233
229
234
230
let max_file_size = self . env_vars . mappings . max_ipfs_file_bytes . map ( |n| n as u64 ) ;
235
- restrict_file_size ( & link. link , & stat , & max_file_size) ?;
231
+ restrict_file_size ( & link. link , size , & max_file_size) ?;
236
232
237
233
let link = link. link . clone ( ) ;
238
234
let data = retry_policy ( self . retry , "ipfs.getBlock" , & logger)
@@ -253,18 +249,18 @@ impl LinkResolverTrait for LinkResolver {
253
249
// Discard the `/ipfs/` prefix (if present) to get the hash.
254
250
let path = link. link . trim_start_matches ( "/ipfs/" ) ;
255
251
256
- let ( stat , client) = select_fastest_client_with_stat (
252
+ let ( size , client) = select_fastest_client_with_stat (
257
253
self . clients . cheap_clone ( ) ,
258
254
logger. cheap_clone ( ) ,
259
- StatApi :: Dag ,
255
+ StatApi :: Files ,
260
256
path. to_string ( ) ,
261
257
self . timeout ,
262
258
self . retry ,
263
259
)
264
260
. await ?;
265
261
266
262
let max_file_size = Some ( self . env_vars . mappings . max_ipfs_map_file_size as u64 ) ;
267
- restrict_file_size ( path, & stat , & max_file_size) ?;
263
+ restrict_file_size ( path, size , & max_file_size) ?;
268
264
269
265
let mut stream = client. cat ( path. to_string ( ) ) . await ?. fuse ( ) . boxed ( ) . compat ( ) ;
270
266
0 commit comments