Skip to content

Commit 90f27d4

Browse files
committed
Implement S3 transaction rollback for Neo4j handler
In case of failure, all added objects within the current query context are removed from S3. Signed-off-by: Steven Rojas <[email protected]>
1 parent 6060779 commit 90f27d4

File tree

1 file changed

+54
-42
lines changed

1 file changed

+54
-42
lines changed

src/QueryHandlerNeo4j.cc

Lines changed: 54 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -186,63 +186,75 @@ void QueryHandlerNeo4j::process_query(protobufs::queryMessage &proto_query,
186186
Json::Value json_responses;
187187
Json::Value cmd_result;
188188

189+
std::vector<std::string> images_log;
190+
189191
Json::Value root;
190192
int blob_count = 0;
191193
bool error = false;
192194

193-
rc = parse_commands(proto_query, root);
194-
// begin neo4j transaction
195-
tx = neoconn_pool->open_tx(conn, 10000, "w");
196-
for (int j = 0; j < root.size(); j++) {
197-
Json::Value neo4j_resp;
198-
std::string cypher;
195+
try {
196+
rc = parse_commands(proto_query, root);
197+
// begin neo4j transaction
198+
tx = neoconn_pool->open_tx(conn, 10000, "w");
199+
for (int j = 0; j < root.size(); j++) {
200+
Json::Value neo4j_resp;
201+
std::string cypher;
199202

200-
Json::Value &query = root[j];
201-
std::string cmd = query.getMemberNames()[0];
203+
Json::Value &query = root[j];
204+
std::string cmd = query.getMemberNames()[0];
202205

203206

204-
if (_rs_cmds.count(cmd) == 0) {
205-
std::cout<<"Command: " << cmd << "Does not exist!" << std::endl;
206-
}
207+
if (_rs_cmds.count(cmd) == 0) {
208+
std::cout<<"Command: " << cmd << "Does not exist!" << std::endl;
209+
}
207210

208-
Neo4jCommand *rscmd = _rs_cmds[cmd];
209-
cypher = query[cmd]["cypher"].asString();
211+
Neo4jCommand *rscmd = _rs_cmds[cmd];
212+
cypher = query[cmd]["cypher"].asString();
210213

211-
const std::string &blob =
212-
rscmd->need_blob(query) ? proto_query.blobs(blob_count++) : "";
214+
const std::string &blob =
215+
rscmd->need_blob(query) ? proto_query.blobs(blob_count++) : "";
213216

214-
rc = rscmd->data_processing(cypher, query, blob, 0, cmd_result);
215-
if (rc != 0) {
216-
error = true;
217-
proto_res.set_json(fastWriter.write(cmd_result));
218-
break;
219-
}
220-
res_stream = neoconn_pool->run_in_tx((char *)cypher.c_str(), tx);
221-
neo4j_resp = neoconn_pool->results_to_json(res_stream);
222-
query["cp_result"] = cmd_result;
223-
Json::Value resp_retval = rscmd->construct_responses(neo4j_resp, query, proto_res, blob);
224-
//THIS IS VERY CLUNKY and confusing, NEEDS TO BE REFACTORED
225-
if (neo4j_resp.isMember("metadata_res") && (cmd == "NeoAdd" || cmd == "NeoFind")) {
226-
resp_retval["metadata_res"] = neo4j_resp["metadata_res"];
227-
}
228-
json_responses.append(resp_retval);
217+
rc = rscmd->data_processing(cypher, query, blob, 0, cmd_result);
218+
if (rc != 0) {
219+
error = true;
220+
proto_res.set_json(fastWriter.write(cmd_result));
221+
break;
222+
}
223+
if (cmd_result.isMember("image_added")) {
224+
images_log.push_back(cmd_result["image_added"].asString());
225+
}
226+
res_stream = neoconn_pool->run_in_tx((char *)cypher.c_str(), tx);
227+
neo4j_resp = neoconn_pool->results_to_json(res_stream);
228+
query["cp_result"] = cmd_result;
229+
Json::Value resp_retval = rscmd->construct_responses(neo4j_resp, query, proto_res, blob);
230+
//THIS IS VERY CLUNKY and confusing, NEEDS TO BE REFACTORED
231+
if (neo4j_resp.isMember("metadata_res") && (cmd == "NeoAdd" || cmd == "NeoFind")) {
232+
resp_retval["metadata_res"] = neo4j_resp["metadata_res"];
233+
}
234+
json_responses.append(resp_retval);
229235

230236

231-
}
232-
proto_res.set_json(fastWriter.write(json_responses));
233-
// commit neo4j transaction, needs to be updated in future to account for
234-
// errors on response construction
235-
if (error == false) {
236-
rc = neoconn_pool->commit_tx(tx);
237-
238-
if(rc != 0){
239-
printf("Warning! Transaction Error: %d\n", rc);
240-
exit(1);
237+
}
238+
proto_res.set_json(fastWriter.write(json_responses));
239+
// commit neo4j transaction, needs to be updated in future to account for
240+
// errors on response construction
241+
if (error == false) {
242+
rc = neoconn_pool->commit_tx(tx);
243+
244+
if(rc != 0){
245+
printf("Warning! Transaction Error: %d\n", rc);
246+
exit(1);
247+
}
248+
241249
}
242250

251+
neoconn_pool->put_conn(conn);
252+
} catch(...) {
253+
VCL::RemoteConnection *connection = get_existing_connection();
254+
for (const std::string image : images_log) {
255+
connection->Remove_Object(image);
256+
}
243257
}
244-
245-
neoconn_pool->put_conn(conn);
246258
}
247259

248260
int QueryHandlerNeo4j::parse_commands(

0 commit comments

Comments
 (0)