Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/perfetto/ext/base/string_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ bool StartsWithAny(const std::string& str,
const std::vector<std::string>& prefixes);
bool Contains(const std::string& haystack, const std::string& needle);
bool Contains(const std::string& haystack, char needle);
bool Contains(const std::vector<std::string>& haystack,
const std::string& needle);
size_t Find(const StringView& needle, const StringView& haystack);
bool CaseInsensitiveEqual(const std::string& first, const std::string& second);
std::string Join(const std::vector<std::string>& parts,
Expand Down
9 changes: 6 additions & 3 deletions src/base/http/http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,12 @@ size_t HttpServer::ParseOneHttpRequest(HttpServerConnection* conn) {
if (IsOriginAllowed(hdr_value))
conn->origin_allowed_ = hdr_value.ToStdString();
} else if (hdr_name.CaseInsensitiveEq("connection")) {
conn->keepalive_ = hdr_value.CaseInsensitiveEq("keep-alive");
http_req.is_websocket_handshake =
hdr_value.CaseInsensitiveEq("upgrade");
auto values = SplitString(hdr_value.ToStdString(), ",");
for (auto& value : values) {
value = ToLower(TrimWhitespace(value));
}
conn->keepalive_ = Contains(values, "keep-alive");
http_req.is_websocket_handshake = Contains(values, "upgrade");
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/base/string_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ bool Contains(const std::string& haystack, const char needle) {
return haystack.find(needle) != std::string::npos;
}

bool Contains(const std::vector<std::string>& haystack,
const std::string& needle) {
return std::find(haystack.begin(), haystack.end(), needle) != haystack.end();
}

size_t Find(const StringView& needle, const StringView& haystack) {
if (needle.empty())
return 0;
Expand Down
7 changes: 7 additions & 0 deletions src/base/string_utils_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ TEST(StringUtilsTest, Contains) {
EXPECT_FALSE(Contains("abc", "abcd"));
EXPECT_FALSE(Contains("", "a"));
EXPECT_FALSE(Contains("", "abc"));
auto values = std::vector<std::string>{"abc", "def"};
EXPECT_TRUE(Contains(values, "abc"));
EXPECT_TRUE(Contains(values, "def"));
EXPECT_FALSE(Contains(values, "abcdef"));
EXPECT_FALSE(Contains(values, "ab"));
EXPECT_FALSE(Contains(values, "ef"));
EXPECT_FALSE(Contains(std::vector<std::string>{}, "abcdef"));
}

TEST(StringUtilsTest, Find) {
Expand Down
25 changes: 20 additions & 5 deletions ui/src/trace_processor/http_rpc_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import protos from '../protos';
import {fetchWithTimeout} from '../base/http_utils';
import {assertExists} from '../base/logging';
import {assertExists, reportError} from '../base/logging';
import {EngineBase} from '../trace_processor/engine';

const RPC_CONNECT_TIMEOUT_MS = 2000;
Expand All @@ -32,6 +32,8 @@ export class HttpRpcEngine extends EngineBase {
private websocket?: WebSocket;
private connected = false;
private disposed = false;
private queue: Blob[] = [];
private isProcessingQueue = false;

// Can be changed by frontend/index.ts when passing ?rpc_port=1234 .
static rpcPort = '9001';
Expand Down Expand Up @@ -86,11 +88,24 @@ export class HttpRpcEngine extends EngineBase {
}

private onWebsocketMessage(e: MessageEvent) {
assertExists(e.data as Blob)
.arrayBuffer()
.then((buf) => {
const blob = assertExists(e.data as Blob);
this.queue.push(blob);
this.processQueue();
}

private async processQueue() {
if (this.isProcessingQueue) return;
this.isProcessingQueue = true;
while (this.queue.length > 0) {
try {
const blob = assertExists(this.queue.shift());
const buf = await blob.arrayBuffer();
super.onRpcResponseBytes(new Uint8Array(buf));
});
} catch (e) {
reportError(e);
}
}
this.isProcessingQueue = false;
}

static async checkConnection(): Promise<HttpRpcState> {
Expand Down
Loading