forked from quickwit-oss/tantivy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserializer.rs
93 lines (83 loc) · 3.38 KB
/
serializer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter, VInt};
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::postings::compression::{BlockEncoder, VIntEncoder};
/// The PositionSerializer is in charge of serializing all of the positions
/// of all of the terms of a given field.
///
/// It is valid to call write_position_delta more than once per term.
pub struct PositionSerializer<W: io::Write> {
block_encoder: BlockEncoder,
positions_wrt: CountingWriter<W>,
positions_buffer: Vec<u8>,
block: Vec<u32>,
bit_widths: Vec<u8>,
}
impl<W: io::Write> PositionSerializer<W> {
/// Creates a new PositionSerializer writing into the given positions_wrt.
pub fn new(positions_wrt: W) -> PositionSerializer<W> {
PositionSerializer {
block_encoder: BlockEncoder::new(),
positions_wrt: CountingWriter::wrap(positions_wrt),
positions_buffer: Vec::with_capacity(128_000),
block: Vec::with_capacity(128),
bit_widths: Vec::new(),
}
}
/// Returns the number of bytes written in the positions write object
/// at this point.
/// When called before writing the positions of a term, this value is used as
/// start offset.
/// When called after writing the positions of a term, this value is used as
/// end offset.
pub fn written_bytes(&self) -> u64 {
self.positions_wrt.written_bytes()
}
fn remaining_block_len(&self) -> usize {
COMPRESSION_BLOCK_SIZE - self.block.len()
}
/// Writes all of the given positions delta.
pub fn write_positions_delta(&mut self, mut positions_delta: &[u32]) {
while !positions_delta.is_empty() {
let remaining_block_len = self.remaining_block_len();
let num_to_write = remaining_block_len.min(positions_delta.len());
self.block.extend(&positions_delta[..num_to_write]);
positions_delta = &positions_delta[num_to_write..];
if self.remaining_block_len() == 0 {
self.flush_block();
}
}
}
fn flush_block(&mut self) {
// encode the positions in the block
if self.block.is_empty() {
return;
}
if self.block.len() == COMPRESSION_BLOCK_SIZE {
let (bit_width, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(&self.block[..], false);
self.bit_widths.push(bit_width);
self.positions_buffer.extend(block_encoded);
} else {
debug_assert!(self.block.len() < COMPRESSION_BLOCK_SIZE);
let block_vint_encoded = self.block_encoder.compress_vint_unsorted(&self.block[..]);
self.positions_buffer.extend_from_slice(block_vint_encoded);
}
self.block.clear();
}
/// Close the positions for the current term.
pub fn close_term(&mut self) -> io::Result<()> {
self.flush_block();
VInt(self.bit_widths.len() as u64).serialize(&mut self.positions_wrt)?;
self.positions_wrt.write_all(&self.bit_widths[..])?;
self.positions_wrt.write_all(&self.positions_buffer)?;
self.bit_widths.clear();
self.positions_buffer.clear();
Ok(())
}
/// Close the positions for this term and flushes the data.
pub fn close(mut self) -> io::Result<()> {
self.positions_wrt.flush()
}
}