Skip to content

Commit

Permalink
PSSim clean up and runs
Browse files Browse the repository at this point in the history
  • Loading branch information
pxsalehi committed Mar 21, 2017
1 parent 6852e42 commit e881f1b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 104 deletions.
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ Content-based publish/subscribe simulator

Input files:
- sim.config
- Topology
- Latencies
- publishers folder (optional)
- subscribers folder (optional)
- faults (optional)
- Publisher popularity (optional)
- Topology
- Latencies
- faults (optional)

Build with:
./install_local_jars.sh
./build.sh

Run with:
./pssim.sh sim_dir seed log_level (off/error/info)

example: ./pssim.sh src/test/resources/simulations/overlay50/ 53211815 off
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
<resources>
<resource>
<directory>src/main</directory>
<excludes>
<exclude>**/*.java</exclude>
</excludes>
</resource>

</resources>
<plugins>
<plugin>
Expand Down
120 changes: 26 additions & 94 deletions src/main/java/de/tum/msrg/pubsub/PSSim.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class PSSim implements Entity {
private long seed;
private String faultsFile;
private String resultFile;
private String topicStatsFile;
private String latenciesFile;
private String SIM_CONF_FILE_NAME = "sim.config";
private String TOPOLOGY_FILE_NAME = "topology.txt";
Expand Down Expand Up @@ -70,7 +69,6 @@ public PSSim(String simDir, long seed) throws ConfigParserException, RuntimeSimE
OverlayLatency.getInstance().initialize(latenciesFile, noOfBrokers);
faultsFile = FileUtils.joinFilePaths(simDir, FAULTS_FILE_NAME);
resultFile = FileUtils.joinFilePaths(simDir, "stats" + seed + ".txt");
topicStatsFile = FileUtils.joinFilePaths(simDir, "topicstats" + seed + ".txt");
System.out.println("Creating workload generator");
SimLogger.info("Creating workload generator");
loadGen = new WorkloadGenerator(config, topology, seed);
Expand Down Expand Up @@ -122,30 +120,6 @@ public void writeResults() {
System.out.println("******************************************************");
System.out.println(stats);
System.out.println("******************************************************");
// write topic stats
BufferedWriter topicStatsOut = null;
try {
topicStatsOut = new BufferedWriter(new FileWriter(topicStatsFile));
int[] pop = StatsCollector.getInstance().getPopularityPerTopic();
int[] deliv = StatsCollector.getInstance().getDeliveryPerTopic();
topicStatsOut.write("topic \t pop \t deliv\n");
long totalTreeDeliv = StatsCollector.getInstance().getTotalDeliveredPubCount() - StatsCollector.getInstance().getTotalPubsDeliveredViaGossip();
double delivCDF = 0.;
for(int c = 0; c < noOfTopics; ++c) {
double topicDelivPerc = (double)deliv[c] / totalTreeDeliv;
delivCDF += topicDelivPerc;
topicStatsOut.write(c + "\t" + pop[c] + "\t" + delivCDF + '\n');
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(topicStatsOut != null)
topicStatsOut.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}


Expand All @@ -166,18 +140,22 @@ public static void main(String[] args) throws ConfigParserException, RuntimeSimE
}

private String getStatsAsString() {
return "Number of broker: " + overlay.getNoOfNodes() +
"\nNumber of subscribers : " + StatsCollector.getInstance().getNoOfSubscribers() +
return "Number of broker: "
+ overlay.getNoOfNodes() +
"\nNumber of subscribers : "
+ StatsCollector.getInstance().getNoOfSubscribers() +
"\nWorkload distribution: "
+ config.getStringConfig(ConfigKeys.WLOAD_PUBLISHER_POPULARITY) +
"\nZipf skewness: " + config.getFloatConfig(ConfigKeys.WLOAD_ZIPF_EXPONENT) +
"\nPub generated : " + StatsCollector.getInstance().getTotalGeneratedPubCount() +
"\nPub published : " + StatsCollector.getInstance().getTotalPublishedPubCount() +
"\nTotal pubs sent : " + StatsCollector.getInstance().getTotalSentPubCount() +
"\nGossips sent : " + StatsCollector.getInstance().getTotalSentGossipCount() +
"\nGossips sent before last delivery : "
+ StatsCollector.getInstance().getTotalGossipsSentsBeforeLastDelivery() +
"\nTotal messages sent : " + StatsCollector.getInstance().getTotalMsgsSent() +
"\nZipf skewness: "
+ config.getFloatConfig(ConfigKeys.WLOAD_ZIPF_EXPONENT) +
"\nPub generated : "
+ StatsCollector.getInstance().getTotalGeneratedPubCount() +
"\nPub published : "
+ StatsCollector.getInstance().getTotalPublishedPubCount() +
"\nTotal pubs sent : "
+ StatsCollector.getInstance().getTotalSentPubCount() +
"\nTotal messages sent : "
+ StatsCollector.getInstance().getTotalMsgsSent() +
"\nTotal packets sent : "
+ StatsCollector.getInstance().getTotalSentPacketCount() +
"\nPackets sent per broker 99th: "
Expand All @@ -187,72 +165,26 @@ private String getStatsAsString() {
"\nNo Of publisher brokers: " + StatsCollector.getInstance().getNoOfPublisherBrokers() +
"\nPackets sent per publisher avg: "
+ StatsCollector.getInstance().getAveragePacketSentCountPerPublisher() +
"\nGossips received: " + StatsCollector.getInstance().getTotalReceivedGossipCount() +
"\nTotal pubs delivered : "
+ StatsCollector.getInstance().getTotalDeliveredPubCount() +
"\nPercentage of local pub deliveries: "
+ StatsCollector.getInstance().getPercentageOfLocalDelivery() +
"\nPubs delivered via gossip: "
+ StatsCollector.getInstance().getTotalPubsDeliveredViaGossip() +
"\nPubs delivered via direct link: "
+ StatsCollector.getInstance().getTotalPubsDeliveredViaDirectLink() +
"\nPubs delivered via batching: "
+ StatsCollector.getInstance().getTotalPubsDeliveredViaBatch() +
"\nGossip delivery false positive rate: "
+ StatsCollector.getInstance().getGossipDeliveryFalsePositiveRate() +
"\nTree delivery latency 99th: "
+ StatsCollector.getInstance().get99thTreeDeliveryLatency() +
"\nTree delivery hopcount 99th: "
+ StatsCollector.getInstance().get99thTreeDeliveryHopcount() +
"\nGossip delivery latency 99th: "
+ StatsCollector.getInstance().getGossipDeliveryLatencyPercentile(0.99) +
"\nGossip delivery latency 90th: "
+ StatsCollector.getInstance().getGossipDeliveryLatencyPercentile(0.90) +
"\nGossip delivery hopcount 99th: "
+ StatsCollector.getInstance().get99thGossipDeliveryHopcount() +
"\nBatch delivery latency 99th: "
+ StatsCollector.getInstance().getBatchDeliveryLatencyPercentile(0.99) +
"\nBatch delivery latency 90th: "
+ StatsCollector.getInstance().getBatchDeliveryLatencyPercentile(0.90) +
"\nBatch delivery hopcount 99th: "
+ StatsCollector.getInstance().get99thBatchDeliveryHopcount() +
"\ndirect link delivery latency 99th: "
+ StatsCollector.getInstance().getDirectLinkDeliveryLatencyPercentile(0.99) +
"\ndirect link delivery latency 90th: "
+ StatsCollector.getInstance().getDirectLinkDeliveryLatencyPercentile(0.90) +
"\ndirect link delivery hopcount 99th: "
+ StatsCollector.getInstance().get99thDirectLinkDeliveryHopcount() +
"\nTotal delivery latency 99th: "
+ StatsCollector.getInstance().get99thTotalDeliveryLatency() +
"\nTotal delivery hopcount 99th: "
+ StatsCollector.getInstance().get99thTotalDeliveryHopcount() +
"\nAverage match count: " + StatsCollector.getInstance().getAverageUtilRatio().x +
"\nAverage avg path length: " + StatsCollector.getInstance().getAverageUtilRatio().y +
"\nAverage gossip per broker: "
+ StatsCollector.getInstance().getAvgGossipBeforeLastDeliveryPerBroker() +
"\nNumber of gossip groups: " + StatsCollector.getInstance().getNoOfGossipGroups() +
"\nAverage queue size: " + StatsCollector.getInstance().getAverageQueueSize() +
"\n99th max queue size: " + StatsCollector.getInstance().get99thQueueSize() +
"\nPerc. of direct out of tree delivery per publisher 99th: "
+ StatsCollector.getInstance().get99thDirectDeliveryPerPublisher() +
"\nNumber of direct ueberlinks 99th: "
+ StatsCollector.getInstance().get99thMsgsSentForEachPublishPerPublisher() +
"\nLast delivery : " + StatsCollector.getInstance().getLastPubDeliveryTime() +
"\nDelivery rate : " + StatsCollector.getInstance().getPubDeliveryRate() +
"\nPubs sent via gossip: "
+ StatsCollector.getInstance().getTotalPubsSentViaGossip() +
"\nPubs sent via batching: "
+ StatsCollector.getInstance().getTotalPubsSentViaBatching() +
"\nPubs sent via direct link: "
+ StatsCollector.getInstance().getTotalPubsSentViaDirectLink() +
"\nUnsuccessful retrieve from B0(root): "
+ StatsCollector.getInstance().getUnsuccessfulRetrieveCount(0) +
"\nAverage pub match per received gossip: "
+ StatsCollector.getInstance().getAverageMatchCountPerGossip() +
"\nAverage pub gain: " + StatsCollector.getInstance().getAveragePubGain() +
"\n99th perc. pub gain: " + StatsCollector.getInstance().getPercentilePubGain(0.99) +
"\n50th perc. pub gain: " + StatsCollector.getInstance().getPercentilePubGain(0.50) +
"\nAverage throughput: " + StatsCollector.getInstance().getAverageThroughput() +
"\nAverage match count: "
+ StatsCollector.getInstance().getAverageUtilRatio().x +
"\nAverage queue size: "
+ StatsCollector.getInstance().getAverageQueueSize() +
"\n99th max queue size: "
+ StatsCollector.getInstance().get99thQueueSize() +
"\nLast delivery : "
+ StatsCollector.getInstance().getLastPubDeliveryTime() +
"\nDelivery rate : "
+ StatsCollector.getInstance().getPubDeliveryRate() +
"\nAverage throughput: "
+ StatsCollector.getInstance().getAverageThroughput() +
"\nTotal pure forward msg count: "
+ StatsCollector.getInstance().getTotalPureForwardCount();
}
Expand Down
5 changes: 4 additions & 1 deletion todo
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
- update config file
- read publisher popularity from file
- add more sample sims to test/resources
- pass map to create batch factor (only for capacity manager)
- add input capacity
- config as singleton
- workload: load or generate
- cleanup workload, create small classes for each part
- support link drop rate
- support java 8
- support java 8
- tidy up distribution values

0 comments on commit e881f1b

Please sign in to comment.