Skip to content

Commit c82f662

Browse files
committed
Reduce I/O amount while writing/reading bucket files in the reduction step
1 parent d07b6c2 commit c82f662

File tree

5 files changed

+249
-156
lines changed

5 files changed

+249
-156
lines changed

src/file_abstractions.h

+82
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636

3737
#include <zlib.h>
3838

39+
#include "types.h"
40+
#include "util_log.h"
41+
3942
// We need zlib >= 1.2.5 for transparent writing
4043
#if ZLIB_VERNUM < 0x1250
4144
#error zlib >= 1.2.5 is required
@@ -47,6 +50,68 @@
4750
// Never write a compressed file ("transparent" writing)
4851
#define GZFILE_WRITE_MODE "wTb"
4952

53+
template <int bytenum>
54+
struct integer_type {
55+
};
56+
57+
template <>
58+
struct integer_type<1> {
59+
typedef uint8_t type;
60+
};
61+
62+
template <>
63+
struct integer_type<2> {
64+
typedef uint16_t type;
65+
};
66+
template <>
67+
struct integer_type<4> {
68+
typedef uint32_t type;
69+
};
70+
// Shift 64 is war
71+
// template <>
72+
// struct integer_type<8> {
73+
// typedef uint64_t type;
74+
// };
75+
76+
template <int byte_num>
77+
void writelen(gzFile fout, BWTPosition len)
78+
{
79+
static char intbuff[byte_num*64];
80+
const uint8_t shift_amount= (byte_num * 8 -1);
81+
const BWTPosition mask= ((1ull << shift_amount) -1);
82+
unsigned int bpos= 0;
83+
do
84+
{
85+
typename integer_type<byte_num>::type towrite=(len & mask);
86+
len >>= shift_amount;
87+
if (len)
88+
towrite |= (1ull << shift_amount);
89+
memcpy(intbuff+bpos, (char *)&towrite, byte_num);
90+
bpos += byte_num;
91+
}
92+
while(len);
93+
gzwrite(fout, intbuff, sizeof(char)*bpos);
94+
}
95+
96+
template <int byte_num>
97+
BWTPosition readlen(gzFile fin)
98+
{
99+
const unsigned int shift_amount= (byte_num * 8 -1);
100+
const BWTPosition mask= ((1ull << shift_amount) -1);
101+
BWTPosition p =0;
102+
typename integer_type<byte_num>::type partialread =0;
103+
uint8_t iter=0;
104+
do{
105+
partialread =0;
106+
const size_t gcount= gzread(fin, (char *)(&partialread), byte_num);
107+
if(gcount != byte_num)
108+
return 0;
109+
p |= ((partialread & mask) << (shift_amount * iter));
110+
++iter;
111+
}while(partialread & (1ull << shift_amount));
112+
return p;
113+
}
114+
50115
class gzip_file_t {
51116
private:
52117
gzFile file;
@@ -102,6 +167,10 @@ class gzip_file_t {
102167
return result;
103168
}
104169

170+
bool is_open() const {
171+
return (file != Z_NULL);
172+
}
173+
105174
template <typename TFileValue>
106175
bool read(TFileValue& value) {
107176
const int byteread= gzread(file, (void*)&value, sizeof(TFileValue) );
@@ -126,6 +195,19 @@ class gzip_file_t {
126195
return bytewritten/sizeof(TFileValue);
127196
}
128197

198+
template <typename TFileValue, int byte_num>
199+
void write_compressed(const TFileValue value) {
200+
static_assert(std::is_integral<TFileValue>::value && sizeof(TFileValue)<=sizeof(BWTPosition),
201+
"write_compressed can be used only for integral types of size at most that of BWTPosition");
202+
writelen<byte_num>(file, value);
203+
}
204+
205+
template <typename TFileValue, int byte_num>
206+
void read_compressed(TFileValue& value) {
207+
static_assert(std::is_integral<TFileValue>::value && sizeof(TFileValue)<=sizeof(BWTPosition),
208+
"read_compressed can be used only for integral types of size at most that of BWTPosition");
209+
value= readlen<byte_num>(file);
210+
}
129211
};
130212

131213

src/sgbuild/redbuild.cpp

+46-10
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ struct GZipBuilder {
5050

5151
file_t* operator()(const std::string& name)
5252
{
53-
return new gzip_file_t(name, WRITE_MODE);
53+
file_t* res= new gzip_file_t(name, WRITE_MODE);
54+
_FAIL_IF(!res->is_open());
55+
return res;
5456
}
5557

5658
};
@@ -247,6 +249,46 @@ parse_cmds(const int argc, const char** argv)
247249
return opts;
248250
}
249251

252+
bool write_bucket_elem(gzip_file_t& out_bucketfile,
253+
SequenceNumber arc[4],
254+
BWTPosition label[2],
255+
SequenceLength l) {
256+
out_bucketfile.write(arc[0]);
257+
out_bucketfile.write_compressed<SequenceNumber, 1>(arc[1]-arc[0]);
258+
out_bucketfile.write(arc[2]);
259+
out_bucketfile.write_compressed<SequenceNumber, 1>(arc[3]-arc[2]);
260+
out_bucketfile.write(label[0]);
261+
out_bucketfile.write_compressed<BWTPosition, 2>(label[1]-label[0]);
262+
out_bucketfile.write(l);
263+
return true;
264+
}
265+
266+
bool read_bucket_elem(gzip_file_t& in_bucketfile,
267+
SequenceNumber arc[4],
268+
BWTPosition label[2],
269+
SequenceLength& l) {
270+
SequenceNumber diff_arc1(0), diff_arc2(0);
271+
BWTPosition diff_label(0);
272+
273+
bool res= in_bucketfile.read(arc[0]);
274+
if (!res) return false;
275+
in_bucketfile.read_compressed<SequenceNumber, 1>(diff_arc1);
276+
arc[1]= arc[0]+diff_arc1;
277+
278+
res= in_bucketfile.read(arc[2]);
279+
if (!res) return false;
280+
in_bucketfile.read_compressed<SequenceNumber, 1>(diff_arc2);
281+
arc[3]= arc[2]+diff_arc2;
282+
283+
res= in_bucketfile.read(label[0]);
284+
if (!res) return false;
285+
in_bucketfile.read_compressed<BWTPosition, 2>(diff_label);
286+
label[1]= label[0]+diff_label;
287+
288+
return in_bucketfile.read(l);
289+
}
290+
291+
250292
void multiplex_intervals(vector<gzip_file_t*>& in_arc_files,
251293
vector<gzip_file_t*>& in_label_files,
252294
bucket_outfilemanager_t& out_bucketfiles,
@@ -266,17 +308,13 @@ void multiplex_intervals(vector<gzip_file_t*>& in_arc_files,
266308
orig_sourceend= sourceend;
267309
sourceend= (bucket_no+1)*max_bucket_length;
268310
while (sourceend<orig_sourceend) {
269-
out_bucketfiles[bucket_no].write(arc);
270-
out_bucketfiles[bucket_no].write(label);
271-
out_bucketfiles[bucket_no].write(l);
311+
write_bucket_elem(out_bucketfiles[bucket_no], arc, label, l);
272312
sourcebegin= sourceend;
273313
sourceend += max_bucket_length;
274314
++bucket_no;
275315
}
276316
sourceend= orig_sourceend;
277-
out_bucketfiles[bucket_no].write(arc);
278-
out_bucketfiles[bucket_no].write(label);
279-
out_bucketfiles[bucket_no].write(l);
317+
write_bucket_elem(out_bucketfiles[bucket_no], arc, label, l);
280318
}
281319
}
282320
}
@@ -296,9 +334,7 @@ void reduce_bucket(EndPosManager& eomgr,
296334
SequenceLength l;
297335
SequenceNumber &destbegin= arc[0], &destend= arc[1];
298336
SequenceNumber &sourcebegin= arc[2], &sourceend= arc[3];
299-
while (in_bucketfile.read(reinterpret_cast<char*>(&arc), sizeof(arc)) &&
300-
in_bucketfile.read(reinterpret_cast<char*>(&label), sizeof(label)) &&
301-
in_bucketfile.read(reinterpret_cast<char*>(&l), sizeof(l))) {
337+
while (read_bucket_elem(in_bucketfile, arc, label, l)) {
302338
_FAIL_IF((sourceend - base_vertex > bucket_length));
303339
for(SequenceNumber src= sourcebegin; src < sourceend; ++src) {
304340
for(SequenceNumber dst= destbegin; dst < destend; ++dst) {

src/util.cpp

-62
Original file line numberDiff line numberDiff line change
@@ -55,68 +55,6 @@ NuclConv::NuclConv() {
5555
}
5656

5757

58-
template <int bytenum>
59-
struct integer_type {
60-
};
61-
62-
template <>
63-
struct integer_type<1> {
64-
typedef uint8_t type;
65-
};
66-
67-
template <>
68-
struct integer_type<2> {
69-
typedef uint16_t type;
70-
};
71-
template <>
72-
struct integer_type<4> {
73-
typedef uint32_t type;
74-
};
75-
// Shift 64 is war
76-
// template <>
77-
// struct integer_type<8> {
78-
// typedef uint64_t type;
79-
// };
80-
81-
template <int byte_num>
82-
void writelen(gzFile fout, BWTPosition len)
83-
{
84-
static char intbuff[byte_num*64];
85-
const uint8_t shift_amount= (byte_num * 8 -1);
86-
const BWTPosition mask= ((1ull << shift_amount) -1);
87-
unsigned int bpos= 0;
88-
do
89-
{
90-
typename integer_type<byte_num>::type towrite=(len & mask);
91-
len >>= shift_amount;
92-
if (len)
93-
towrite |= (1ull << shift_amount);
94-
memcpy(intbuff+bpos, (char *)&towrite, byte_num);
95-
bpos += byte_num;
96-
}
97-
while(len);
98-
gzwrite(fout, intbuff, sizeof(char)*bpos);
99-
}
100-
101-
template <int byte_num>
102-
BWTPosition readlen(gzFile fin)
103-
{
104-
const unsigned int shift_amount= (byte_num * 8 -1);
105-
const BWTPosition mask= ((1ull << shift_amount) -1);
106-
BWTPosition p =0;
107-
typename integer_type<byte_num>::type partialread =0;
108-
uint8_t iter=0;
109-
do{
110-
partialread =0;
111-
const size_t gcount= gzread(fin, (char *)(&partialread), byte_num);
112-
if(gcount != byte_num)
113-
return 0;
114-
p |= ((partialread & mask) << (shift_amount * iter));
115-
++iter;
116-
}while(partialread & (1ull << shift_amount));
117-
return p;
118-
}
119-
12058
void
12159
write_interval( gzFile fout, const QInterval& i )
12260
{

src/util.h

+2-84
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
#include "q_interval.h"
5151
#include "arcInterval.h"
5252
#include "edgeLabelInterval.h"
53+
#include "file_abstractions.h"
54+
#include "util_log.h"
5355

5456
using std::ofstream;
5557
using std::ifstream;
@@ -103,89 +105,5 @@ bool read_interval ( gzFile , EdgeLabelInterval& );
103105
// Return a string representing the current tim
104106
std::string now( const char* format );
105107

106-
#ifdef DEBUG
107-
#define DEBUG_LOG(s) \
108-
do { \
109-
std::cerr << "DBG:" \
110-
<< __FILE__ << ":" \
111-
<< std::setiosflags (std::ios::left) << std::setw(4) \
112-
<< __LINE__ \
113-
<< " " << s << std::endl; \
114-
} while (0)
115-
#else
116-
#define DEBUG_LOG(s) do {} while(0)
117-
#endif
118-
119-
#ifdef DEBUG_VERBOSE
120-
#define DEBUG_LOG_VERBOSE(s) DEBUG_LOG(s)
121-
#else
122-
#define DEBUG_LOG_VERBOSE(s) do {} while(0)
123-
#endif
124-
125-
126-
127-
#define QUOTE(str) #str
128-
#define EXPAND_AND_QUOTE(str) QUOTE(str)
129-
130-
#define _MY_FAIL \
131-
do { \
132-
DEBUG_LOG("Failure."); \
133-
throw std::logic_error("Failure at " + \
134-
std::string(__FILE__) + ":" + \
135-
std::string(EXPAND_AND_QUOTE(__LINE__)) ); \
136-
} while (0)
137-
138-
#define _FAIL_AND_LOG( _MESSAGE_ ) \
139-
do { \
140-
DEBUG_LOG( _MESSAGE_ ); \
141-
throw std::logic_error(_MESSAGE_); \
142-
} while (0)
143-
144-
#define _FAIL_IF( _TEST_ ) \
145-
do { \
146-
if (_TEST_) { \
147-
DEBUG_LOG("Condition '" << \
148-
EXPAND_AND_QUOTE(_TEST_) << \
149-
"' verified."); \
150-
_MY_FAIL; \
151-
} \
152-
} while (0)
153-
154-
155-
156-
#ifdef DEBUG // DEBUG is ON
157-
158-
#define _FAIL_IF_DBG( _TEST_ ) \
159-
do { \
160-
if (_TEST_) { \
161-
DEBUG_LOG("Condition '" << \
162-
EXPAND_AND_QUOTE(_TEST_) << \
163-
"' verified."); \
164-
_MY_FAIL; \
165-
} \
166-
} while (0)
167-
168-
#define PERFORM_AND_CHECK( TEST ) \
169-
do { \
170-
bool __test_perform_##__LINE__ = TEST; \
171-
if (!__test_perform_##__LINE__) { \
172-
DEBUG_LOG("Error while performing '" << \
173-
EXPAND_AND_QUOTE(TEST) << "'."); \
174-
_MY_FAIL; \
175-
} \
176-
} while(0)
177-
178-
#else // DEBUG is OFF
179-
180-
#define _FAIL_IF_DBG( _TEST_ ) \
181-
do { \
182-
} while (0)
183-
184-
#define PERFORM_AND_CHECK( TEST ) \
185-
do { \
186-
TEST; \
187-
} while(0)
188-
189-
#endif
190108

191109
#endif

0 commit comments

Comments
 (0)