Skip to content

Commit 387835f

Browse files
committed
fix various synchronization and event handling issues
1 parent 9c3aa2e commit 387835f

10 files changed

+62
-65
lines changed

tools/cabana/streams/abstractstream.cc

+10-4
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) const {
128128
}
129129

130130
void AbstractStream::updateLastMsgsTo(double sec) {
131-
std::lock_guard lk(mutex_);
132131
current_sec_ = sec;
133132
uint64_t last_ts = toMonoTime(sec);
134133
std::unordered_map<MessageId, CanData> msgs;
@@ -160,10 +159,17 @@ void AbstractStream::updateLastMsgsTo(double sec) {
160159
std::any_of(messages_.cbegin(), messages_.cend(),
161160
[this](const auto &m) { return !last_msgs.count(m.first); });
162161
last_msgs = messages_;
163-
mutex_.unlock();
164-
165162
emit msgsReceived(nullptr, id_changed);
166-
resumeStream();
163+
164+
std::lock_guard lk(mutex_);
165+
seek_finished_ = true;
166+
seek_finished_cv_.notify_one();
167+
}
168+
169+
void AbstractStream::waitForSeekFinshed() {
170+
std::unique_lock lock(mutex_);
171+
seek_finished_cv_.wait(lock, [this]() { return seek_finished_; });
172+
seek_finished_ = false;
167173
}
168174

169175
const CanEvent *AbstractStream::newEvent(uint64_t mono_time, const cereal::CanData::Reader &c) {

tools/cabana/streams/abstractstream.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <algorithm>
44
#include <array>
5+
#include <condition_variable>
56
#include <memory>
67
#include <mutex>
78
#include <optional>
@@ -108,7 +109,7 @@ class AbstractStream : public QObject {
108109
void mergeEvents(const std::vector<const CanEvent *> &events);
109110
const CanEvent *newEvent(uint64_t mono_time, const cereal::CanData::Reader &c);
110111
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
111-
virtual void resumeStream() {}
112+
void waitForSeekFinshed();
112113
std::vector<const CanEvent *> all_events_;
113114
double current_sec_ = 0;
114115
std::optional<std::pair<double, double>> time_range_;
@@ -124,6 +125,8 @@ class AbstractStream : public QObject {
124125

125126
// Members accessed in multiple threads. (mutex protected)
126127
std::mutex mutex_;
128+
std::condition_variable seek_finished_cv_;
129+
bool seek_finished_ = false;
127130
std::set<MessageId> new_msgs_;
128131
std::unordered_map<MessageId, CanData> messages_;
129132
std::unordered_map<MessageId, std::vector<uint8_t>> masks_;

tools/cabana/streams/replaystream.cc

+5-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,12 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint
5353

5454
// Forward replay callbacks to corresponding Qt signals.
5555
replay->onSeeking = [this](double sec) { emit seeking(sec); };
56-
replay->onSeekedTo = [this](double sec) { emit seekedTo(sec); };
56+
replay->onSeekedTo = [this](double sec) {
57+
emit seekedTo(sec);
58+
waitForSeekFinshed();
59+
};
5760
replay->onQLogLoaded = [this](std::shared_ptr<LogReader> qlog) { emit qLogLoaded(qlog); };
58-
replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::QueuedConnection); };
61+
replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::BlockingQueuedConnection); };
5962

6063
bool success = replay->load();
6164
if (!success) {

tools/cabana/streams/replaystream.h

-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ class ReplayStream : public AbstractStream {
3232
inline float getSpeed() const { return replay->getSpeed(); }
3333
inline Replay *getReplay() const { return replay.get(); }
3434
inline bool isPaused() const override { return replay->isPaused(); }
35-
void resumeStream() override { return replay->resumeStream(); }
3635
void pause(bool pause) override;
3736

3837
signals:

tools/replay/replay.cc

+23-36
Original file line numberDiff line numberDiff line change
@@ -106,31 +106,27 @@ void Replay::seekTo(double seconds, bool relative) {
106106
rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
107107
notifyEvent(onSeeking, target_time);
108108

109-
double seeked_to_sec = -1;
110109
interruptStream([&]() {
111110
current_segment_ = target_segment;
112111
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
113-
seeking_to_ = target_time;
114-
115-
if (event_data_->isSegmentLoaded(target_segment)) {
116-
seeked_to_sec = *seeking_to_;
117-
seeking_to_.reset();
118-
}
112+
seeking_to_.store(target_time, std::memory_order_relaxed);
119113
return false;
120114
});
121115

122-
checkSeekProgress(seeked_to_sec);
123116
seg_mgr_->setCurrentSegment(target_segment);
117+
checkSeekProgress();
124118
}
125119

126-
void Replay::checkSeekProgress(double seeked_to_sec) {
127-
if (seeked_to_sec >= 0) {
128-
if (onSeekedTo) {
129-
onSeekedTo(seeked_to_sec);
130-
} else {
131-
interruptStream([]() { return true; });
132-
}
120+
void Replay::checkSeekProgress() {
121+
if (!seg_mgr_->getEventData()->isSegmentLoaded(current_segment_)) return;
122+
123+
double seek_to = seeking_to_.exchange(-1.0, std::memory_order_relaxed);
124+
if (seek_to >= 0 && onSeekedTo) {
125+
onSeekedTo(seek_to);
133126
}
127+
128+
// Resume the interrupted stream
129+
interruptStream([]() { return true; });
134130
}
135131

136132
void Replay::seekToFlag(FindFlag flag) {
@@ -152,29 +148,19 @@ void Replay::pause(bool pause) {
152148
void Replay::handleSegmentMerge() {
153149
if (exit_) return;
154150

155-
double seeked_to_sec = -1;
156-
interruptStream([&]() {
157-
event_data_ = seg_mgr_->getEventData();
158-
notifyEvent(onSegmentsMerged);
159-
160-
bool segment_loaded = event_data_->isSegmentLoaded(current_segment_);
161-
if (seeking_to_ && segment_loaded) {
162-
seeked_to_sec = *seeking_to_;
163-
seeking_to_.reset();
164-
return false;
165-
}
166-
return segment_loaded;
167-
});
168-
169-
checkSeekProgress(seeked_to_sec);
170-
if (!stream_thread_.joinable() && !event_data_->events.empty()) {
171-
startStream();
151+
auto event_data = seg_mgr_->getEventData();
152+
if (!stream_thread_.joinable() && !event_data->segments.empty()) {
153+
startStream(event_data->segments.begin()->second);
172154
}
155+
notifyEvent(onSegmentsMerged);
156+
157+
// Interrupt the stream to handle segment merge
158+
interruptStream([]() { return false; });
159+
checkSeekProgress();
173160
}
174161

175-
void Replay::startStream() {
176-
const auto &cur_segment = event_data_->segments.begin()->second;
177-
const auto &events = cur_segment->log->events;
162+
void Replay::startStream(const std::shared_ptr<Segment> segment) {
163+
const auto &events = segment->log->events;
178164
route_start_ts_ = events.front().mono_time;
179165
cur_mono_time_ += route_start_ts_ - 1;
180166

@@ -212,7 +198,7 @@ void Replay::startStream() {
212198
if (!hasFlag(REPLAY_FLAG_NO_VIPC)) {
213199
std::pair<int, int> camera_size[MAX_CAMERAS] = {};
214200
for (auto type : ALL_CAMERAS) {
215-
if (auto &fr = cur_segment->frames[type]) {
201+
if (auto &fr = segment->frames[type]) {
216202
camera_size[type] = {fr->width, fr->height};
217203
}
218204
}
@@ -271,6 +257,7 @@ void Replay::streamThread() {
271257
stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
272258
if (exit_) break;
273259

260+
event_data_ = seg_mgr_->getEventData();
274261
const auto &events = event_data_->events;
275262
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {}));
276263
if (first == events.cend()) {

tools/replay/replay.h

+4-5
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@ class Replay {
5555
inline const std::string &carFingerprint() const { return car_fingerprint_; }
5656
inline const std::shared_ptr<std::vector<Timeline::Entry>> getTimeline() const { return timeline_.getEntries(); }
5757
inline const std::optional<Timeline::Entry> findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); }
58-
const std::shared_ptr<SegmentManager::EventData> getEventData() const { return event_data_; }
58+
const std::shared_ptr<SegmentManager::EventData> getEventData() const { return seg_mgr_->getEventData(); }
5959
void installEventFilter(std::function<bool(const Event *)> filter) { event_filter_ = filter; }
60-
void resumeStream() { interruptStream([]() { return true; }); }
6160

6261
// Event callback functions
6362
std::function<void()> onSegmentsMerged = nullptr;
@@ -68,15 +67,15 @@ class Replay {
6867
private:
6968
void setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block);
7069
void setupSegmentManager(bool has_filters);
71-
void startStream();
70+
void startStream(const std::shared_ptr<Segment> segment);
7271
void streamThread();
7372
void handleSegmentMerge();
7473
void interruptStream(const std::function<bool()>& update_fn);
7574
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
7675
std::vector<Event>::const_iterator last);
7776
void publishMessage(const Event *e);
7877
void publishFrame(const Event *e);
79-
void checkSeekProgress(double seeked_to_sec);
78+
void checkSeekProgress();
8079

8180
std::unique_ptr<SegmentManager> seg_mgr_;
8281
Timeline timeline_;
@@ -87,7 +86,7 @@ class Replay {
8786
bool user_paused_ = false;
8887
std::condition_variable stream_cv_;
8988
int current_segment_ = 0;
90-
std::optional<double> seeking_to_;
89+
std::atomic<double> seeking_to_ = -1.0;
9190
std::atomic<bool> exit_ = false;
9291
std::atomic<bool> interrupt_requested_ = false;
9392
bool events_ready_ = false;

tools/replay/seg_mgr.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const Segm
105105
merged_event_data->segments[n] = segments_.at(n);
106106
}
107107

108-
event_data_ = merged_event_data;
108+
std::atomic_store(&event_data_, std::move(merged_event_data));
109109
merged_segments_ = segments_to_merge;
110110

111111
return true;

tools/replay/seg_mgr.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ class SegmentManager {
2121
};
2222

2323
SegmentManager(const std::string &route_name, uint32_t flags, const std::string &data_dir = "")
24-
: flags_(flags), route_(route_name, data_dir) {};
24+
: flags_(flags), route_(route_name, data_dir), event_data_(std::make_shared<EventData>()) {}
2525
~SegmentManager();
2626

2727
bool load();
2828
void setCurrentSegment(int seg_num);
2929
void setCallback(const std::function<void()> &callback) { onSegmentMergedCallback_ = callback; }
3030
void setFilters(const std::vector<bool> &filters) { filters_ = filters; }
31-
const std::shared_ptr<EventData> getEventData() const { return event_data_; }
31+
const std::shared_ptr<EventData> getEventData() const { return std::atomic_load(&event_data_); }
3232
bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); }
3333

3434
Route route_;

tools/replay/timeline.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include "tools/replay/timeline.h"
22

3-
#include <array>
43
#include <algorithm>
4+
#include <array>
55

66
#include "cereal/gen/cpp/log.capnp.h"
77

@@ -74,7 +74,7 @@ void Timeline::buildTimeline(const Route &route, uint64_t route_start_ts, bool l
7474
// Sort and finalize the timeline entries
7575
auto entries = std::make_shared<std::vector<Entry>>(staging_entries_);
7676
std::sort(entries->begin(), entries->end(), [](auto &a, auto &b) { return a.start_time < b.start_time; });
77-
timeline_entries_ = entries;
77+
std::atomic_store(&timeline_entries_, std::move(entries));
7878

7979
callback(log); // Notify the callback once the log is processed
8080
}

tools/replay/timeline.h

+11-11
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ class Timeline {
2727
std::function<void(std::shared_ptr<LogReader>)> callback);
2828
std::optional<uint64_t> find(double cur_ts, FindFlag flag) const;
2929
std::optional<Entry> findAlertAtTime(double target_time) const;
30-
const std::shared_ptr<std::vector<Entry>> getEntries() const { return timeline_entries_; }
30+
const std::shared_ptr<std::vector<Entry>> getEntries() const { return std::atomic_load(&timeline_entries_); }
3131

3232
private:
33-
void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache,
34-
std::function<void(std::shared_ptr<LogReader>)> callback);
35-
void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
36-
void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
33+
void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache,
34+
std::function<void(std::shared_ptr<LogReader>)> callback);
35+
void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
36+
void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
3737

38-
std::thread thread_;
39-
std::atomic<bool> should_exit_ = false;
38+
std::thread thread_;
39+
std::atomic<bool> should_exit_ = false;
4040

41-
// Temporarily holds entries before they are sorted and finalized
42-
std::vector<Entry> staging_entries_;
41+
// Temporarily holds entries before they are sorted and finalized
42+
std::vector<Entry> staging_entries_;
4343

44-
// Final sorted timeline entries
45-
std::shared_ptr<std::vector<Entry>> timeline_entries_;
44+
// Final sorted timeline entries
45+
std::shared_ptr<std::vector<Entry>> timeline_entries_;
4646
};

0 commit comments

Comments
 (0)