Skip to content

Commit 94822ca

Browse files
committed
Add support for Amazon S3 bucket storage backend.
This adds support for using Amazon's S3 bucket as a storage backend via the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency library at this time since the underlying AWS-S3 library does not yet have support for `Eio`.
1 parent 722ab04 commit 94822ca

File tree

8 files changed

+250
-1
lines changed

8 files changed

+250
-1
lines changed

.github/workflows/build-and-test.yml

+29-1
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,41 @@ jobs:
2929
local-packages:
3030
- zarr.opam
3131

32+
env:
33+
AWS_ACCESS_KEY_ID: minioadmin
34+
AWS_SECRET_ACCESS_KEY: minioadmin
35+
36+
services:
37+
minio:
38+
image: fclairamb/minio-github-actions
39+
ports:
40+
- 9000:9000
41+
3242
name: Ocaml version - ${{ matrix.ocaml-compiler }} - ${{ matrix.os }}
3343
steps:
3444
- name: checkout
3545
uses: actions/checkout@v4
3646
with:
3747
fetch-depth: 2
3848

49+
- name: Setup Minio
50+
run: |
51+
mkdir ~/.aws
52+
echo '[default]' > ~/.aws/credentials
53+
echo 'aws_access_key_id = minioadmin' >> ~/.aws/credentials
54+
echo 'aws_secret_access_key = minioadmin' >> ~/.aws/credentials
55+
pip3 install minio
56+
python3 - <<'EOF'
57+
from minio import Minio
58+
minio = Minio(
59+
'localhost:9000',
60+
access_key='minioadmin',
61+
secret_key='minioadmin',
62+
secure=False
63+
)
64+
minio.make_bucket('test-bucket-lwt', location='us-east-1')
65+
EOF
66+
3967
- name: setup-ocaml
4068
uses: ocaml/setup-ocaml@v3
4169
with:
@@ -45,7 +73,7 @@ jobs:
4573
run: |
4674
opam install --deps-only --with-test --with-doc --yes zarr
4775
opam install bytesrw conf-zlib conf-zstd --yes
48-
opam install lwt --yes
76+
opam install lwt aws-s3-lwt --yes
4977
opam exec -- dune build zarr zarr-sync zarr-lwt
5078
5179
- name: setup ocaml-5-specific

Makefile

+5
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,8 @@ docs:
2626
.PHONY: view-docs
2727
view-docs: docs
2828
chromium _build/default/_doc/_html/index.html
29+
30+
.PHONY: minio
31+
minio:
32+
mkdir -p /tmp/minio/test-bucket-lwt
33+
docker run --rm -it -p 9000:9000 -v /tmp/minio:/minio minio/minio:latest server /minio

dune-project

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
(and (>= 4.14.0)))
6464
(zarr (= :version))
6565
(lwt (>= 2.5.1))
66+
(aws-s3-lwt (>= 4.8.1))
6667
(odoc :with-doc)
6768
(ounit2 :with-test)
6869
(ppx_deriving :with-test)

zarr-lwt.opam

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ depends: [
1313
"ocaml" {>= "4.14.0"}
1414
"zarr" {= version}
1515
"lwt" {>= "2.5.1"}
16+
"aws-s3-lwt" {>= "4.8.1"}
1617
"odoc" {with-doc}
1718
"ounit2" {with-test}
1819
"ppx_deriving" {with-test}

zarr-lwt/src/dune

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(public_name zarr-lwt)
44
(libraries
55
zarr
6+
aws-s3-lwt
67
lwt
78
lwt.unix)
89
(ocamlopt_flags

zarr-lwt/src/storage.ml

+174
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,177 @@ module FilesystemStore = struct
163163

164164
include Zarr.Storage.Make(IO)
165165
end
166+
167+
module AmazonS3Store = struct
168+
module Credentials = Aws_s3_lwt.Credentials
169+
module S3 = Aws_s3_lwt.S3
170+
171+
open Deferred.Infix
172+
open Deferred.Syntax
173+
174+
exception Request_failed of S3.error
175+
176+
let empty_content () = S3.{
177+
storage_class = Standard;
178+
meta_headers = None;
179+
etag = String.empty;
180+
key = String.empty;
181+
last_modified = 0.;
182+
size = 0
183+
}
184+
185+
let fold_or_catch ~not_found res =
186+
let return_or_raise r () = match r with
187+
| Ok v -> Deferred.return v
188+
| Error e -> raise (Request_failed e)
189+
and on_exception ~not_found = function
190+
| Request_failed S3.Not_found -> Lwt.return (not_found ())
191+
| exn -> raise exn
192+
in
193+
Lwt.catch (return_or_raise res) (on_exception ~not_found)
194+
195+
let raise_not_found k () = raise (Zarr.Storage.Key_not_found k)
196+
197+
let empty_Ls = Fun.const ([], S3.Ls.Done)
198+
199+
let fold_continuation ~return ~more = function
200+
| S3.Ls.Done -> Deferred.return return
201+
| S3.Ls.More continuation ->
202+
continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) ->
203+
more xs cont
204+
205+
module IO = struct
206+
module Deferred = Deferred
207+
208+
type t =
209+
{retries : int
210+
;bucket : string
211+
;cred : Credentials.t
212+
;endpoint : Aws_s3.Region.endpoint}
213+
214+
let size t key =
215+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
216+
let f ~endpoint () = S3.head ~bucket ~credentials ~key ~endpoint () in
217+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
218+
let+ c = fold_or_catch ~not_found:empty_content res in
219+
c.size
220+
221+
let is_member t key =
222+
let+ size = size t key in
223+
if size = 0 then false else true
224+
225+
let get t key =
226+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
227+
let f ~endpoint () = S3.get ~bucket ~credentials ~endpoint ~key () in
228+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
229+
fold_or_catch ~not_found:(raise_not_found key) res
230+
231+
let get_partial_values t key ranges =
232+
let read_range t key (ofs, len) =
233+
let range = match len with
234+
| None -> S3.{first = Some ofs; last = None}
235+
| Some l -> S3.{first = Some ofs; last = Some (ofs + l - 1)}
236+
in
237+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
238+
let f ~endpoint () = S3.get ~bucket ~credentials ~endpoint ~range ~key () in
239+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
240+
let+ data = fold_or_catch ~not_found:(raise_not_found key) res in
241+
[data]
242+
in
243+
Deferred.concat_map (read_range t key) ranges
244+
245+
let set t key data =
246+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
247+
let f ~endpoint () = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
248+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
249+
let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in
250+
Deferred.return_unit
251+
252+
let set_partial_values t key ?(append=false) rsv =
253+
let* size = size t key in
254+
let* ov = match size with
255+
| 0 -> Deferred.return String.empty
256+
| _ -> get t key
257+
in
258+
let f = if append || ov = String.empty then
259+
fun acc (_, v) -> acc ^ v else
260+
fun acc (rs, v) ->
261+
let s = Bytes.unsafe_of_string acc in
262+
Bytes.blit_string v 0 s rs String.(length v);
263+
Bytes.unsafe_to_string s
264+
in
265+
set t key (List.fold_left f ov rsv)
266+
267+
let erase t key =
268+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
269+
let f ~endpoint () = S3.delete ~bucket ~credentials ~endpoint ~key () in
270+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
271+
fold_or_catch ~not_found:(Fun.const ()) res
272+
273+
let rec delete_keys t cont () =
274+
let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in
275+
fold_continuation ~return:() ~more:(del t) cont
276+
277+
and delete_content t S3.{key; _} = erase t key
278+
279+
and erase_prefix t prefix =
280+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
281+
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
282+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
283+
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
284+
Deferred.iter (delete_content t) xs >>= delete_keys t rest
285+
286+
let rec list t =
287+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
288+
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint () in
289+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
290+
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
291+
accumulate_keys (List.map content_key xs) rest
292+
293+
and content_key S3.{key; _} = key
294+
295+
and accumulate_keys acc cont =
296+
let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in
297+
fold_continuation ~return:acc ~more:(append acc) cont
298+
299+
module S = Set.Make(String)
300+
301+
let rec partition_keys prefix ((l, r) as acc) cont =
302+
let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in
303+
fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont
304+
305+
and add prefix (l, r) (c : S3.content) =
306+
let size = String.length prefix in
307+
if not (String.contains_from c.key size '/') then c.key :: l, r else
308+
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r
309+
310+
and list_dir t prefix =
311+
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
312+
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
313+
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
314+
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
315+
let init = List.fold_left (add prefix) ([], S.empty) xs in
316+
partition_keys prefix init rest
317+
318+
let rec rename t prefix new_prefix =
319+
let upload t (k, v) = set t k v in
320+
let* xs = list t in
321+
let to_delete = List.filter (String.starts_with ~prefix) xs in
322+
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
323+
let* () = Deferred.iter (upload t) data in
324+
Deferred.iter (erase t) to_delete
325+
326+
and rename_and_add ~t ~prefix ~new_prefix acc k =
327+
let l = String.length prefix in
328+
let k' = new_prefix ^ String.sub k l (String.length k - l) in
329+
let+ a = get t k in (k', a) :: acc
330+
end
331+
332+
let with_open ?(scheme=`Http) ?(inet=`V4) ?(retries=3) ~region ~bucket ~profile f =
333+
let* res = Credentials.Helper.get_credentials ~profile () in
334+
let cred = Result.fold ~ok:Fun.id ~error:raise res in
335+
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
336+
f IO.{bucket; cred; endpoint; retries}
337+
338+
include Zarr.Storage.Make(IO)
339+
end

zarr-lwt/src/storage.mli

+32
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,35 @@ module FilesystemStore : sig
2020
2121
@raise Failure if [dir] is not a Zarr store path. *)
2222
end
23+
24+
(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
25+
module AmazonS3Store : sig
26+
exception Request_failed of Aws_s3_lwt.S3.error
27+
28+
include Zarr.Storage.STORE with module Deferred = Deferred
29+
30+
val with_open :
31+
?scheme:[ `Http | `Https ] ->
32+
?inet:[ `V4 | `V6 ] ->
33+
?retries:int ->
34+
region:Aws_s3.Region.t ->
35+
bucket:string ->
36+
profile:string ->
37+
(t -> 'a Lwt.t) ->
38+
'a Lwt.t
39+
(** [with_open ~region ~bucket ~profile f] opens an S3 bucket store with
40+
bucket name [bucket] at region [region] using credentials specified by
41+
profile [profile]. The credentials are read locally from a [~/.aws/credentials]
42+
file or from an IAM service if the profile or file is not available.
43+
Function [f] is applied to the store's open handle and its output is
44+
returned to the caller.
45+
46+
{ul
47+
{- [scheme] is the HTTP scheme to use when connecting to S3, and must be
48+
one of [`Http | `Https]. Defaults to [`Http].}
49+
{- [inet] is the IP version and must be one of [`V4 | `V6]. Defaults to [`V4].}
50+
{- [retries] is the number of times to retry a request should it return an error.}
51+
}
52+
53+
@raise Request_failed if an error occurs while sending a request to the S3 service. *)
54+
end

zarr-lwt/test/test_lwt.ml

+7
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,18 @@ let _ =
141141
(Zarr.Storage.Not_a_filesystem_store fn)
142142
(fun () -> FilesystemStore.open_store fn);
143143

144+
(* ZipStore configuration *)
144145
let zpath = tmp_dir ^ ".zip" in
146+
(* AmazonS3Store configuration *)
147+
let region = Aws_s3.Region.minio ~port:9000 ~host:"localhost" ()
148+
and bucket = "test-bucket-lwt"
149+
and profile = "default" in
150+
145151
Lwt_main.run @@ Lwt.join
146152
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
147153
(* test just opening the now exisitant archive created by the previous test. *)
148154
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
155+
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
149156
;test_storage (module MemoryStore) @@ MemoryStore.create ()
150157
;test_storage (module FilesystemStore) s])
151158
])

0 commit comments

Comments
 (0)