Skip to content

[release/5.x] Cherry pick: Fix empty committed snapshots: Restore behaviour of writing to temporary uncommitted file and renaming (#7029) #7034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [5.0.18]

[5.0.18]: https://github.com/microsoft/CCF/releases/tag/5.0.18

### Fixed

- CCF will no longer create in-progress snapshot files with a `.committed` suffix. It will only rename files to `.committed` when they are complete and ready for reading (#7029).

## [5.0.17]

[5.0.17]: https://github.com/microsoft/CCF/releases/tag/5.0.17
Expand Down
19 changes: 18 additions & 1 deletion python/src/ccf/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def digest(algo, data):
return h.finalize()


def is_snapshot_file_committed(file_name):
return file_name.endswith(COMMITTED_FILE_SUFFIX)


def unpack(stream, fmt):
size = struct.calcsize(fmt)
buf = stream.read(size)
Expand Down Expand Up @@ -789,14 +793,23 @@ def __init__(self, filename: str):
self._filename = filename
self._file_size = os.path.getsize(filename)

if self._file_size == 0:
raise InvalidSnapshotException(f"{filename} is currently empty")

entry_start_pos = super()._read_header()

# 1.x snapshots do not include evidence
if self.is_committed() and not self.is_snapshot_file_1_x():
receipt_pos = entry_start_pos + self._header.size
receipt_bytes = _peek_all(self._file, pos=receipt_pos)

receipt = json.loads(receipt_bytes.decode("utf-8"))
try:
receipt = json.loads(receipt_bytes.decode("utf-8"))
except json.decoder.JSONDecodeError as e:
raise InvalidSnapshotException(
f"Cannot read receipt from snapshot {os.path.basename(self._filename)}: Receipt starts at {receipt_pos} (file is {self._file_size} bytes), and contains {receipt_bytes}"
) from e

# Receipts included in snapshots always contain leaf components,
# including a claims digest and commit evidence, from 2.0.0-rc0 onwards.
# This verification code deliberately does not support snapshots
Expand Down Expand Up @@ -1074,3 +1087,7 @@ class UntrustedNodeException(Exception):

class UnknownTransaction(Exception):
"""The transaction at seqno does not exist in ledger"""


class InvalidSnapshotException(Exception):
"""The given snapshot file is invalid and cannot be parsed"""
83 changes: 56 additions & 27 deletions src/host/snapshots.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,48 +254,77 @@ namespace asynchost
{
if (snapshot_idx == it->first)
{
// e.g. snapshot_100_105.committed
// e.g. snapshot_100_105
auto file_name = fmt::format(
"{}{}{}{}{}{}",
"{}{}{}{}{}",
snapshot_file_prefix,
snapshot_idx_delimiter,
it->first,
snapshot_idx_delimiter,
it->second.evidence_idx,
snapshot_committed_suffix);
it->second.evidence_idx);
auto full_snapshot_path = snapshot_dir / file_name;

if (fs::exists(full_snapshot_path))
int snapshot_fd = open(
full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
if (snapshot_fd == -1)
{
// In the case that a file with this name already exists, keep
// existing file and drop pending snapshot
LOG_FAIL_FMT(
"Cannot write snapshot as file already exists: {}", file_name);
}
else
{
std::ofstream snapshot_file(
full_snapshot_path, std::ios::app | std::ios::binary);
if (!snapshot_file.good())
if (errno == EEXIST)
{
// In the case that a file with this name already exists, keep
// existing file and drop pending snapshot
LOG_FAIL_FMT(
"Cannot write snapshot: error opening file {}", file_name);
"Cannot write snapshot as file already exists: {}",
file_name);
}
else
{
const auto& snapshot = it->second.snapshot;
snapshot_file.write(
reinterpret_cast<const char*>(snapshot->data()),
snapshot->size());
snapshot_file.write(
reinterpret_cast<const char*>(receipt_data), receipt_size);

LOG_INFO_FMT(
"New snapshot file written to {} [{} bytes]",
file_name,
static_cast<size_t>(snapshot_file.tellp()));
LOG_FAIL_FMT(
"Cannot write snapshot: error ({}) opening file {}",
errno,
file_name);
}
}
else
{
const auto& snapshot = it->second.snapshot;

#define THROW_ON_ERROR(x) \
do \
{ \
auto rc = x; \
if (rc == -1) \
{ \
throw std::runtime_error(fmt::format( \
"Error ({}) writing snapshot {} in " #x, errno, file_name)); \
} \
} while (0)

THROW_ON_ERROR(
write(snapshot_fd, snapshot->data(), snapshot->size()));
THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size));

THROW_ON_ERROR(fsync(snapshot_fd));
THROW_ON_ERROR(close(snapshot_fd));

#undef THROW_ON_ERROR

LOG_INFO_FMT(
"New snapshot file written to {} [{} bytes]",
file_name,
snapshot->size() + receipt_size);

// e.g. snapshot_100_105.committed
const auto committed_file_name =
fmt::format("{}{}", file_name, snapshot_committed_suffix);
const auto full_committed_path =
snapshot_dir / committed_file_name;

files::rename(full_snapshot_path, full_committed_path);
LOG_INFO_FMT(
"Renamed temporary snapshot {} to committed {}",
file_name,
committed_file_name);
}

pending_snapshots.erase(it);

Expand Down
65 changes: 57 additions & 8 deletions tests/e2e_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import infra.snp as snp
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import infra.concurrency

from loguru import logger as LOG

Expand Down Expand Up @@ -49,14 +50,62 @@ def test_save_committed_ledger_files(network, args):


def test_parse_snapshot_file(network, args):
primary, _ = network.find_primary()
network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2)
committed_snapshots_dir = network.get_committed_snapshots(primary)
for snapshot in os.listdir(committed_snapshots_dir):
with ccf.ledger.Snapshot(os.path.join(committed_snapshots_dir, snapshot)) as s:
assert len(
s.get_public_domain().get_tables()
), "No public table in snapshot"
class ReaderThread(infra.concurrency.StoppableThread):
def __init__(self, network):
super().__init__(name="reader")
primary, _ = network.find_primary()
self.snapshots_dir = os.path.join(
primary.remote.remote.root,
primary.remote.snapshots_dir_name,
)

def run(self):
seen = set()
while not self.is_stopped():
for snapshot in os.listdir(self.snapshots_dir):
if (
ccf.ledger.is_snapshot_file_committed(snapshot)
and snapshot not in seen
):
seen.add(snapshot)
with ccf.ledger.Snapshot(
os.path.join(self.snapshots_dir, snapshot)
) as s:
assert len(
s.get_public_domain().get_tables()
), "No public table in snapshot"
LOG.success(f"Successfully parsed snapshot: {snapshot}")
LOG.info(f"Tested {len(seen)} snapshots")
assert len(seen) > 0, "No snapshots seen, so this tested nothing"

class WriterThread(infra.concurrency.StoppableThread):
def __init__(self, network, reader):
super().__init__(name="writer")
self.primary, _ = network.find_primary()
self.member = network.consortium.get_any_active_member()
self.reader = reader

def run(self):
while not self.is_stopped() and self.reader.is_alive():
self.member.update_ack_state_digest(self.primary)

reader_thread = ReaderThread(network)
reader_thread.start()

writer_thread = WriterThread(network, reader_thread)
writer_thread.start()

# When this test was added, the original failure was occurring 100% of the time within 0.5s.
# This fix has been manually verified across multi-minute runs.
# 5s is a plausible run-time in the CI, that should still provide convincing coverage.
time.sleep(5)

writer_thread.stop()
writer_thread.join()

reader_thread.stop()
reader_thread.join()

return network


Expand Down