Skip to content

Commit a1be12b

Browse files
authored
Merge pull request #167 from altmannmarcelo/metadata_for_mrbr
Optional Metadata fix for Minimal Row Based Replication
2 parents dd636c6 + 119672f commit a1be12b

File tree

3 files changed

+91
-58
lines changed

3 files changed

+91
-58
lines changed

src/binlog/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,39 @@ mod tests {
11571157
assert_eq!(output, ev.data());
11581158
}
11591159

1160+
// https://github.com/blackbeam/rust_mysql_common/issues/162
1161+
if file_path.file_name().unwrap() == "minimal_row_metadata.000001" {
1162+
let event_data = ev.read_data().unwrap();
1163+
match event_data {
1164+
Some(EventData::RowsEvent(ev)) => {
1165+
let table_map_event =
1166+
binlog_file.reader().get_tme(ev.table_id()).unwrap();
1167+
let row = ev.rows(table_map_event).next().unwrap().unwrap();
1168+
let after_image = row.1.unwrap();
1169+
after_image
1170+
.columns()
1171+
.iter()
1172+
.enumerate()
1173+
.for_each(|(i, column)| {
1174+
if column.name_str() == "@2" {
1175+
assert_eq!(
1176+
column.character_set(),
1177+
CollationId::UTF8MB4_0900_AI_CI as u16
1178+
);
1179+
} else if column.name_str() == "@4" {
1180+
match after_image.as_ref(i).unwrap() {
1181+
BinlogValue::Value(val) => {
1182+
assert_eq!(val, &Value::Int(3230202323));
1183+
}
1184+
_ => panic!("Expected a value"),
1185+
}
1186+
}
1187+
});
1188+
}
1189+
_ => (),
1190+
}
1191+
}
1192+
11601193
ev_pos = ev_end;
11611194
}
11621195
}

src/binlog/row.rs

Lines changed: 58 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -176,67 +176,67 @@ impl<'de> MyDeserialize<'de> for BinlogRow {
176176
let mut column_name_iter = opt_meta_extractor.iter_column_name();
177177

178178
for i in 0..(num_columns as usize) {
179-
// check if column is in columns list
180-
if cols.get(i).as_deref().copied().unwrap_or(false) {
181-
let column_type = table_info.get_column_type(i);
182-
183-
// TableMapEvent must define column type for the current column.
184-
let column_type = match column_type {
185-
Ok(Some(ty)) => ty,
186-
Ok(None) => {
187-
return Err(io::Error::new(io::ErrorKind::InvalidData, "No column type"));
188-
}
189-
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e)),
190-
};
191-
192-
let column_meta = table_info.get_column_metadata(i).unwrap_or(&[]);
193-
194-
let is_partial = column_type == ColumnType::MYSQL_TYPE_JSON
195-
&& partial_cols
196-
.as_mut()
197-
.and_then(|bits| bits.next().as_deref().copied())
198-
.unwrap_or(false);
199-
200-
let is_unsigned = column_type
201-
.is_numeric_type()
202-
.then(|| signedness_iterator.next())
203-
.flatten()
204-
.unwrap_or_default();
205-
206-
let charset = if column_type.is_character_type() {
207-
charset_iter.next().transpose()?.unwrap_or_default()
208-
} else if column_type.is_enum_or_set_type() {
209-
enum_and_set_charset_iter
210-
.next()
211-
.transpose()?
212-
.unwrap_or_default()
213-
} else {
214-
Default::default()
215-
};
216-
217-
let column_name_raw = column_name_iter.next().transpose()?;
218-
let column_name = column_name_raw
219-
.as_ref()
220-
.map(|x| Cow::Borrowed(x.name_raw()))
221-
.unwrap_or_else(|| {
222-
// default column name is `@<i>` where i is a column offset in a table
223-
Cow::Owned(format!("@{}", i).into())
224-
});
225-
226-
let mut column_flags = ColumnFlags::empty();
227-
228-
if is_unsigned {
229-
column_flags |= ColumnFlags::UNSIGNED_FLAG;
230-
}
179+
let column_type = table_info.get_column_type(i);
231180

232-
if primary_key_iter
233-
.next_if(|next| next.is_err() || next.as_ref().ok() == Some(&(i as u64)))
234-
.transpose()?
235-
.is_some()
236-
{
237-
column_flags |= ColumnFlags::PRI_KEY_FLAG;
181+
// TableMapEvent must define column type for the current column.
182+
let column_type = match column_type {
183+
Ok(Some(ty)) => ty,
184+
Ok(None) => {
185+
return Err(io::Error::new(io::ErrorKind::InvalidData, "No column type"));
238186
}
187+
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e)),
188+
};
189+
190+
let column_meta = table_info.get_column_metadata(i).unwrap_or(&[]);
191+
192+
let is_partial = column_type == ColumnType::MYSQL_TYPE_JSON
193+
&& partial_cols
194+
.as_mut()
195+
.and_then(|bits| bits.next().as_deref().copied())
196+
.unwrap_or(false);
197+
198+
let is_unsigned = column_type
199+
.is_numeric_type()
200+
.then(|| signedness_iterator.next())
201+
.flatten()
202+
.unwrap_or_default();
203+
204+
let charset = if column_type.is_character_type() {
205+
charset_iter.next().transpose()?.unwrap_or_default()
206+
} else if column_type.is_enum_or_set_type() {
207+
enum_and_set_charset_iter
208+
.next()
209+
.transpose()?
210+
.unwrap_or_default()
211+
} else {
212+
Default::default()
213+
};
214+
215+
let column_name_raw = column_name_iter.next().transpose()?;
216+
let column_name = column_name_raw
217+
.as_ref()
218+
.map(|x| Cow::Borrowed(x.name_raw()))
219+
.unwrap_or_else(|| {
220+
// default column name is `@<i>` where i is a column offset in a table
221+
Cow::Owned(format!("@{}", i).into())
222+
});
223+
224+
let mut column_flags = ColumnFlags::empty();
225+
226+
if is_unsigned {
227+
column_flags |= ColumnFlags::UNSIGNED_FLAG;
228+
}
229+
230+
if primary_key_iter
231+
.next_if(|next| next.is_err() || next.as_ref().ok() == Some(&(i as u64)))
232+
.transpose()?
233+
.is_some()
234+
{
235+
column_flags |= ColumnFlags::PRI_KEY_FLAG;
236+
}
239237

238+
// check if column is in columns list
239+
if cols.get(i).as_deref().copied().unwrap_or(false) {
240240
let column = Column::new(column_type)
241241
.with_schema(table_info.database_name_raw())
242242
.with_table(table_info.table_name_raw())
495 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)