From 8d8a1274abed2f7c35c605249eb9f053e783774f Mon Sep 17 00:00:00 2001 From: Dawei Ma Date: Tue, 11 Sep 2018 14:25:10 +0800 Subject: [PATCH] Push message to mongoDB --- build.gradle | 1 + conf/config.properties | 4 +++ src/org/loklak/data/DAO.java | 12 +++++++ src/org/loklak/data/MongoDBManager.java | 42 +++++++++++++++++++++++++ src/org/loklak/harvester/Post.java | 11 ++++++- 5 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 src/org/loklak/data/MongoDBManager.java diff --git a/build.gradle b/build.gradle index 3f44f2d28..084592a6d 100644 --- a/build.gradle +++ b/build.gradle @@ -136,6 +136,7 @@ dependencies { compile group: 'xerces', name: 'xerces', version: '2.4.0' compile group: 'org.unbescape', name: 'unbescape', version: '1.0' compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0' + compile 'org.mongodb:mongodb-driver:3.4.3' } tasks.withType(Javadoc) { diff --git a/conf/config.properties b/conf/config.properties index bb9199296..00628ec37 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -301,3 +301,7 @@ dump.write_enabled = true # Stream stream.enabled = false stream.mqtt.address = tcp://127.0.0.1:1883 + +# MongoDB +db.mongo.enabled = false +db.mongo.address = mongodb://127.0.0.1:27017 diff --git a/src/org/loklak/data/DAO.java b/src/org/loklak/data/DAO.java index 3f3843876..3a8b273f6 100644 --- a/src/org/loklak/data/DAO.java +++ b/src/org/loklak/data/DAO.java @@ -175,7 +175,10 @@ public class DAO { public static TimelineCache timelineCache; public static MQTTPublisher mqttPublisher = null; + public static MongoDBManager mongoDBManager = null; public static boolean streamEnabled = false; + public static boolean mongoDBEnabled = false; + public static String twitterChannel = "twitter"; public static List randomTerms = new ArrayList<>(); public static enum IndexName { @@ -419,6 +422,13 @@ public static void init(Map configMap, Path dataPath) throws Exc mqttPublisher = new MQTTPublisher(mqttAddress); } + // Connect to mongoDB database + String mongoAddress = getConfig("db.mongo.address", "mongodb://127.0.0.1:27017"); + mongoDBEnabled = getConfig("db.mongo.enabled", false); + if (mongoDBEnabled) { + mongoDBManager = new MongoDBManager(mongoAddress); + } + // finally wait for healthy status of elasticsearch shards ClusterHealthStatus required_status = ClusterHealthStatus.fromString(config.get("elasticsearch_requiredClusterHealthStatus")); boolean ok; @@ -881,6 +891,8 @@ private static Set writeMessageBulkDump(Collection mws) for (MessageWrapper mw: mws) try { mw.t.publishToMQTT(); + //Store message string to mongoDB document + mw.t.saveToMongoDB(DAO.twitterChannel); if (!created.contains(mw.t.getPostId())) continue; synchronized (DAO.class) { diff --git a/src/org/loklak/data/MongoDBManager.java b/src/org/loklak/data/MongoDBManager.java new file mode 100644 index 000000000..cf2f547f1 --- /dev/null +++ b/src/org/loklak/data/MongoDBManager.java @@ -0,0 +1,42 @@ +package org.loklak.data; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import org.bson.Document; + +/** + * @author dawei.ma + * @date 2018/9/6 19:45 + */ +public class MongoDBManager { + private MongoClient mongoClient; + private String clientId; + private MongoDatabase database; + + public MongoDBManager(String address, String clientId) { + this.mongoClient = new MongoClient(new MongoClientURI(address)); + this.clientId = clientId; + this.database = mongoClient.getDatabase(clientId); + } + + public MongoDBManager(String address) { + this(address, "loklak_server"); + } + + public MongoClient getMongoClient() { + return mongoClient; + } + + public String getClientId() { + return clientId; + } + + public void saveChannelMessage(String channel, String message) { + MongoCollection collection = database.getCollection(channel); + Document doc = Document.parse(message); + collection.insertOne(doc); + } +} + diff --git a/src/org/loklak/harvester/Post.java b/src/org/loklak/harvester/Post.java index 7670a3c3b..2f0e1a63a 100644 --- a/src/org/loklak/harvester/Post.java +++ b/src/org/loklak/harvester/Post.java @@ -164,7 +164,16 @@ public final void publishToMQTT() { DAO.mqttPublisher.publish(this.getStreamChannels(), this.toString()); } } - + + /** + * Publish data to MongoDB + */ + public final void saveToMongoDB(String channel) { + if (DAO.mongoDBManager != null) { + DAO.mongoDBManager.saveChannelMessage(channel, this.toString()); + } + } + public void setTimestamp(long timestamp) { this.put("timestamp_id", timestamp); this.timestamp = timestamp;