@@ -20,19 +20,21 @@ seastar::logger plog("perf");
2020
2121class tester {
2222 std::chrono::seconds _duration;
23- unsigned _parallel;
2423 std::string _object_name;
2524 size_t _object_size;
2625 semaphore _mem;
2726 shared_ptr<s3::client> _client;
28- utils::estimated_histogram _reads_hist ;
27+ utils::estimated_histogram _latencies ;
2928 unsigned _errors = 0 ;
29+ unsigned _part_size_mb;
30+ bool _remove_file;
3031
31- static s3::endpoint_config_ptr make_config () {
32+ static s3::endpoint_config_ptr make_config (unsigned sockets ) {
3233 s3::endpoint_config cfg;
3334 cfg.port = 443 ;
3435 cfg.use_https = true ;
3536 cfg.region = tests::getenv_safe (" AWS_DEFAULT_REGION" );
37+ cfg.max_connections = sockets;
3638
3739 return make_lw_shared<s3::endpoint_config>(std::move (cfg));
3840 }
@@ -42,16 +44,20 @@ class tester {
4244 std::chrono::steady_clock::time_point now () const { return std::chrono::steady_clock::now (); }
4345
4446public:
45- tester (std::chrono::seconds dur, unsigned prl , size_t obj_size)
47+ tester (std::chrono::seconds dur, unsigned sockets, unsigned part_size, sstring object_name , size_t obj_size)
4648 : _duration(dur)
47- , _parallel(prl)
48- , _object_name(fmt::format(" /{}/perfobject-{}-{}" , tests::getenv_safe(" S3_BUCKET_FOR_TEST" ), ::getpid(), this_shard_id()))
49+ , _object_name(std::move(object_name))
4950 , _object_size(obj_size)
5051 , _mem(memory::stats().total_memory())
51- , _client(s3::client::make(tests::getenv_safe(" S3_SERVER_ADDRESS_FOR_TEST" ), make_config(), _mem))
52+ , _client(s3::client::make(tests::getenv_safe(" S3_SERVER_ADDRESS_FOR_TEST" ), make_config(sockets), _mem))
53+ , _part_size_mb(part_size)
54+ , _remove_file(false )
5255 {}
5356
54- future<> start () {
57+ private:
58+ future<> make_temporary_file () {
59+ _object_name = fmt::format (" /{}/perfobject-{}-{}" , tests::getenv_safe (" S3_BUCKET_FOR_TEST" ), ::getpid (), this_shard_id ());
60+ _remove_file = true ;
5561 plog.debug (" Creating {} of {} bytes" , _object_name, _object_size);
5662
5763 auto out = output_stream<char >(_client->make_upload_sink (_object_name));
@@ -74,35 +80,46 @@ class tester {
7480 }
7581 }
7682
77- private:
83+ public:
84+ future<> start () {
85+ if (_object_name.empty ()) {
86+ co_await make_temporary_file ();
87+ }
88+ }
7889
79- future<> do_run () {
90+ future<> run_download () {
91+ plog.info (" Downloading" );
8092 auto until = now () + _duration;
8193 uint64_t off = 0 ;
8294 do {
8395 auto start = now ();
8496 try {
8597 co_await _client->get_object_contiguous (_object_name, s3::range{off, chunk_size});
8698 off = (off + chunk_size) % (_object_size - chunk_size);
87- _reads_hist .add (std::chrono::duration_cast<std::chrono::milliseconds>(now () - start).count ());
99+ _latencies .add (std::chrono::duration_cast<std::chrono::milliseconds>(now () - start).count ());
88100 } catch (...) {
89101 _errors++;
90102 }
91103 } while (now () < until);
92104 }
93105
94- public:
95- future<> run () {
96- co_await coroutine::parallel_for_each (std::views::iota (0u , _parallel), [this ] (auto fnr) -> future<> {
97- plog.debug (" Running {} fiber" , fnr);
98- co_await seastar::sleep (std::chrono::milliseconds (fnr)); // make some discrepancy
99- co_await do_run ();
100- });
106+ future<> run_upload () {
107+ plog.info (" Uploading" );
108+ auto file_name = fs::path (_object_name);
109+ auto sz = co_await seastar::file_size (file_name.native ());
110+ _object_name = fmt::format (" /{}/{}" , tests::getenv_safe (" S3_BUCKET_FOR_TEST" ), file_name.filename ().native ());
111+ _remove_file = true ;
112+ auto start = now ();
113+ co_await _client->upload_file (file_name, _object_name, {}, _part_size_mb << 20 );
114+ auto time = std::chrono::duration_cast<std::chrono::duration<double >>(now () - start);
115+ plog.info (" Uploaded {}MB in {}s, speed {}MB/s" , sz >> 20 , time.count (), (sz >> 20 ) / time.count ());
101116 }
102117
103118 future<> stop () {
104- plog.debug (" Removing {}" , _object_name);
105- co_await _client->delete_object (_object_name);
119+ if (_remove_file) {
120+ plog.debug (" Removing {}" , _object_name);
121+ co_await _client->delete_object (_object_name);
122+ }
106123 co_await _client->close ();
107124
108125 auto print_percentiles = [] (const utils::estimated_histogram& hist) {
@@ -115,31 +132,41 @@ class tester {
115132 hist.percentile (1.0 )
116133 );
117134 };
118- plog.info (" reads total: {:5}, errors: {:5}; latencies: {}" , _reads_hist ._count , _errors, print_percentiles (_reads_hist ));
135+ plog.info (" requests total: {:5}, errors: {:5}; latencies: {}" , _latencies ._count , _errors, print_percentiles (_latencies ));
119136 }
120137};
121138
122139int main (int argc, char ** argv) {
123140 namespace bpo = boost::program_options;
124141 app_template app;
125142 app.add_options ()
143+ (" upload" , " test file upload" )
126144 (" duration" , bpo::value<unsigned >()->default_value (10 ), " seconds to run" )
127- (" parallel" , bpo::value<unsigned >()->default_value (1 ), " number of parallel fibers" )
145+ (" sockets" , bpo::value<unsigned >()->default_value (1 ), " maximum number of socket for http client" )
146+ (" part_size_mb" , bpo::value<unsigned >()->default_value (5 ), " part size" )
147+ (" object_name" , bpo::value<sstring>()->default_value (" " ), " use given object/file name" )
128148 (" object_size" , bpo::value<size_t >()->default_value (1 << 20 ), " size of test object" )
129149 ;
130150
131151 return app.run (argc, argv, [&app] () -> future<> {
132152 auto dur = std::chrono::seconds (app.configuration ()[" duration" ].as <unsigned >());
133- auto prl = app.configuration ()[" parallel" ].as <unsigned >();
153+ auto sks = app.configuration ()[" sockets" ].as <unsigned >();
154+ auto part_size = app.configuration ()[" part_size_mb" ].as <unsigned >();
155+ auto oname = app.configuration ()[" object_name" ].as <sstring>();
134156 auto osz = app.configuration ()[" object_size" ].as <size_t >();
157+ auto upload = app.configuration ().contains (" upload" );
135158 sharded<tester> test;
136159 plog.info (" Creating" );
137- co_await test.start (dur, prl , osz);
160+ co_await test.start (dur, sks, part_size, oname , osz);
138161 plog.info (" Starting" );
139162 co_await test.invoke_on_all (&tester::start);
140163 try {
141164 plog.info (" Running" );
142- co_await test.invoke_on_all (&tester::run);
165+ if (upload) {
166+ co_await test.invoke_on_all (&tester::run_upload);
167+ } else {
168+ co_await test.invoke_on_all (&tester::run_download);
169+ }
143170 } catch (...) {
144171 plog.error (" Error running: {}" , std::current_exception ());
145172 }
0 commit comments