Skip to content

Commit

Permalink
SINGA-21 Code review
Browse files Browse the repository at this point in the history
review data_shard.h, data_shard.cc
  -- refine Next() functions for reading data shards
  -- reformat

add unit test for data shard
  • Loading branch information
wangsheng1001 authored and nudles committed Jun 24, 2015
1 parent aefc2d4 commit 28ac509
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 229 deletions.
99 changes: 52 additions & 47 deletions include/utils/data_shard.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
#ifndef INCLUDE_UTILS_SHARD_H_
#define INCLUDE_UTILS_SHARD_H_
#ifndef SINGA_UTILS_DATA_SHARD_H_
#define SINGA_UTILS_DATA_SHARD_H_

#include <google/protobuf/message.h>
#include <fstream>
#include <string>
#include <unordered_set>


using google::protobuf::Message;

namespace singa {

/**
Expand All @@ -22,8 +19,8 @@ namespace singa {
* encoded as [key_len key record_len val] (key_len and record_len are of type
* uint32, which indicate the bytes of key and record respectively.
*
* When Shard obj is created, it will remove the last key if the record size and
* key size do not match because the last write of tuple crashed.
* When Shard obj is created, it will remove the last key if the record size
* and key size do not match because the last write of tuple crashed.
*
* TODO
* 1. split one shard into multile shards.
Expand All @@ -33,54 +30,58 @@ namespace singa {
class DataShard {
public:
enum {
//!< read only mode used in training
kRead=0,
//!< write mode used in creating shard (will overwrite previous one)
kCreate=1,
//!< append mode, e.g. used when previous creating crashes
kAppend=2
// read only mode used in training
kRead = 0,
// write mode used in creating shard (will overwrite previous one)
kCreate = 1,
// append mode, e.g. used when previous creating crashes
kAppend = 2
};

public:
/**
* Init the shard obj.
* @folder shard folder (path excluding shard.dat) on worker node
* @mode shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend
* @bufsize batch bufsize bytes data for every disk op (read or write),
*
* @param folder Shard folder (path excluding shard.dat) on worker node
* @param mode Shard open mode, Shard::kRead, Shard::kWrite or Shard::kAppend
* @param bufsize Batch bufsize bytes data for every disk op (read or write),
* default is 100MB
*/
DataShard(std::string folder, char mode, int capacity=104857600);
DataShard(const std::string& folder, int mode);
DataShard(const std::string& folder, int mode, int capacity);
~DataShard();

/**
* read next tuple from the shard.
* @key key
* @param val record of type Message
* @return true if read success otherwise false, e.g., the tuple was not
* inserted completely.
*
* @param key Tuple key
* @param val Record of type Message
* @return false if read unsuccess, e.g., the tuple was not inserted
* completely.
*/
bool Next(std::string *key, Message* val);
bool Next(std::string* key, google::protobuf::Message* val);
/**
* read next tuple from the shard.
* @key key tuple key
* @param val record of type string
* @return true if read success otherwise false, e.g., the tuple was not
* inserted completely.
*
* @param key Tuple key
* @param val Record of type string
* @return false if read unsuccess, e.g., the tuple was not inserted
* completely.
*/
bool Next(std::string *key, std::string* val);

bool Next(std::string* key, std::string* val);
/**
* Append one tuple to the shard.
*
* @param key e.g., image path
* @param val
* @return reture if sucess, otherwise false, e.g., inserted before
* @return false if unsucess, e.g., inserted before
*/
bool Insert(const std::string& key, const Message& tuple);
bool Insert(const std::string& key, const google::protobuf::Message& tuple);
/**
* Append one tuple to the shard.
*
* @param key e.g., image path
* @param val
* @return reture if sucess, otherwise false, e.g., inserted before
* @return false if unsucess, e.g., inserted before
*/
bool Insert(const std::string& key, const std::string& tuple);
/**
Expand All @@ -92,54 +93,58 @@ class DataShard {
* Flush buffered data to disk.
* Used only for kCreate or kAppend.
*/
void Flush() ;
void Flush();
/**
* Iterate through all tuples to get the num of all tuples.
*
* @return num of tuples
*/
const int Count();
int Count();
/**
* @return path to shard file
*/
const std::string path(){
return path_;
}
inline std::string path() { return path_; }

protected:
/**
* Read the next key and prepare buffer for reading value.
*
* @param key
* @return length (i.e., bytes) of value field.
*/
int Next(std::string *key);
int Next(std::string* key);
/**
* Setup the disk pointer to the right position for append in case that
* the pervious write crashes.
*
* @param path shard path.
* @return offset (end pos) of the last success written record.
*/
int PrepareForAppend(std::string path);
int PrepareForAppend(const std::string& path);
/**
* Read data from disk if the current data in the buffer is not a full field.
*
* @param size size of the next field.
*/
bool PrepareNextField(int size);

private:
char mode_;
std::string path_;
char mode_ = 0;
std::string path_ = "";
// either ifstream or ofstream
std::fstream fdat_;
// to avoid replicated record
std::unordered_set<std::string> keys_;
// internal buffer
char* buf_;
char* buf_ = nullptr;
// offset inside the buf_
int offset_;
int offset_ = 0;
// allocated bytes for the buf_
int capacity_;
int capacity_ = 0;
// bytes in buf_, used in reading
int bufsize_;
int bufsize_ = 0;
};
} /* singa */
#endif // INCLUDE_UTILS_SHARD_H_

} // namespace singa

#endif // SINGA_UTILS_DATA_SHARD_H_
30 changes: 10 additions & 20 deletions src/test/test_cluster.cc
Original file line number Diff line number Diff line change
@@ -1,46 +1,36 @@
#include <fstream>
#include "gtest/gtest.h"
#include "proto/cluster.pb.h"
#include "utils/cluster.h"

using namespace singa;

//string folder="src/test/data/";
std::string host = "localhost:2181";

string host="localhost:2181";

void zk_cb(void *contest){
LOG(INFO) << "zk callback: " << (char *)contest;
void zk_cb(void *contest) {
LOG(INFO) << "zk callback: " << static_cast<char *>(contest);
}

TEST(CluserRuntimeTest, GroupManagement){
TEST(CluserRuntimeTest, GroupManagement) {
ClusterRuntime* rt = new ZKClusterRT(host);
ASSERT_EQ(rt->Init(), true);

ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true);

ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true);
ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true);

ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true);
ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true);

ASSERT_EQ(rt->WatchSGroup(1, 1, zk_cb, "test call back"), true);
ASSERT_EQ(rt->JoinSGroup(1, 1, 1), true);
ASSERT_EQ(rt->JoinSGroup(1, 2, 1), true);
ASSERT_EQ(rt->LeaveSGroup(1, 2, 1), true);
ASSERT_EQ(rt->LeaveSGroup(1, 1, 1), true);
sleep(3);
delete rt;
}

TEST(CluserRuntimeTest, ProcessManagement){
TEST(CluserRuntimeTest, ProcessManagement) {
ClusterRuntime* rt = new ZKClusterRT(host);
ASSERT_EQ(rt->Init(), true);

ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0);
ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1);
ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2);

ASSERT_NE(rt->GetProcHost(0), "");
ASSERT_NE(rt->GetProcHost(1), "");
ASSERT_NE(rt->GetProcHost(2), "");

sleep(3);
delete rt;
}
Expand Down
64 changes: 64 additions & 0 deletions src/test/test_shard.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include <sys/stat.h>

#include "gtest/gtest.h"
#include "utils/data_shard.h"

std::string key[] = {"firstkey",
"secondkey",
"3key",
"key4",
"key5"};
std::string tuple[] = {"firsttuple",
"2th-tuple",
"thridtuple",
"tuple4",
"tuple5"};

using namespace singa;

TEST(DataShardTest, CreateDataShard) {
std::string path = "src/test/data/shard_test";
mkdir(path.c_str(), 0755);
DataShard shard(path, DataShard::kCreate, 50);
shard.Insert(key[0], tuple[0]);
shard.Insert(key[1], tuple[1]);
shard.Insert(key[2], tuple[2]);
shard.Flush();
}

TEST(DataShardTest, AppendDataShard) {
std::string path = "src/test/data/shard_test";
DataShard shard(path, DataShard::kAppend, 50);
shard.Insert(key[3], tuple[3]);
shard.Insert(key[4], tuple[4]);
shard.Flush();
}

TEST(DataShardTest, CountDataShard) {
std::string path = "src/test/data/shard_test";
DataShard shard(path, DataShard::kRead, 50);
int count = shard.Count();
ASSERT_EQ(5, count);
}

TEST(DataShardTest, ReadDataShard) {
std::string path = "src/test/data/shard_test";
DataShard shard(path, DataShard::kRead, 50);
std::string k, t;
ASSERT_TRUE(shard.Next(&k, &t));
ASSERT_STREQ(key[0].c_str(), k.c_str());
ASSERT_STREQ(tuple[0].c_str(), t.c_str());
ASSERT_TRUE(shard.Next(&k, &t));
ASSERT_STREQ(key[1].c_str(), k.c_str());
ASSERT_STREQ(tuple[1].c_str(), t.c_str());
ASSERT_TRUE(shard.Next(&k, &t));
ASSERT_TRUE(shard.Next(&k, &t));
ASSERT_TRUE(shard.Next(&k, &t));
ASSERT_STREQ(key[4].c_str(), k.c_str());
ASSERT_STREQ(tuple[4].c_str(), t.c_str());
ASSERT_FALSE(shard.Next(&k, &t));
shard.SeekToFirst();
ASSERT_TRUE(shard.Next(&k, &t));
ASSERT_STREQ(key[0].c_str(), k.c_str());
ASSERT_STREQ(tuple[0].c_str(), t.c_str());
}
2 changes: 1 addition & 1 deletion src/utils/cluster_rt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag,
}
// copy the node name ot output
if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) {
snprintf(output, strlen(buf), "%s", buf);
strcpy(output, buf);
}
if (ret == ZOK) {
LOG(INFO) << "created zookeeper node " << buf
Expand Down
Loading

0 comments on commit 28ac509

Please sign in to comment.