Skip to content

Commit 21607c5

Browse files
author
davidecaroselli
committed
DataManager/Datastream renamed in BinaryLog
1 parent 34f3ca6 commit 21607c5

34 files changed

+267
-257
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ $ ./mmt status
6464
[Engine: "default"]
6565
REST API: running - 8045/translate
6666
Cluster: running - port 5016
67-
Datastream: running - localhost:9092
67+
Binary log: running - localhost:9092
6868
Database: running - localhost:9042
6969
```
7070

cli/mmt/engine.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ def __init__(self, props=None):
304304
self.api_port = props['api']['port'] if 'api' in props else None
305305
self.api_root = props['api']['root'] if 'api' in props and 'root' in props['api'] else None
306306
self.cluster_port = props['cluster_port']
307-
self.datastream_host = props['datastream']['host'] if 'datastream' in props else None
308-
self.datastream_port = props['datastream']['port'] if 'datastream' in props else None
307+
self.binlog_host = props['binlog']['host'] if 'binlog' in props else None
308+
self.binlog_port = props['binlog']['port'] if 'binlog' in props else None
309309
self.database_host = props['database']['host'] if 'database' in props else None
310310
self.database_port = props['database']['port'] if 'database' in props else None
311311
self.embedded_services = [p for p in props['embedded_services']] if 'embedded_services' in props else None
@@ -342,7 +342,7 @@ def __init__(self, engine):
342342
self._log_file = engine.get_logfile('node', ensure=False)
343343
self._api = None
344344

345-
def start(self, api_port=None, cluster_port=None, datastream_port=None,
345+
def start(self, api_port=None, cluster_port=None, binlog_port=None,
346346
db_port=None, leader=None, verbosity=None, remote_debug=False, log_file=None):
347347
if log_file is not None:
348348
self._log_file = log_file
@@ -360,9 +360,9 @@ def start(self, api_port=None, cluster_port=None, datastream_port=None,
360360
args.append('--api-port')
361361
args.append(str(api_port))
362362

363-
if datastream_port is not None:
364-
args.append('--datastream-port')
365-
args.append(str(datastream_port))
363+
if binlog_port is not None:
364+
args.append('--binlog-port')
365+
args.append(str(binlog_port))
366366

367367
if db_port is not None:
368368
args.append('--db-port')

cli/server.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ def parse_args_start(argv=None):
1515
parser.add_argument('--cluster-port', dest='cluster_port', metavar='CLUSTER_PORT',
1616
help='the network port used internally by the cluster for communication between '
1717
'Cluster nodes. (default is 5016)', default=None, type=int)
18-
parser.add_argument('--datastream-port', dest='datastream_port', metavar='DATASTREAM_PORT',
19-
help='the network port used by Datastream, currently implemented with Kafka '
20-
'(default is 9092', default=None, type=int)
18+
parser.add_argument('--binlog-port', '--datastream-port', dest='binlog_port', metavar='BINLOG_PORT',
19+
help='the network port used by BinaryLog, currently implemented with Kafka '
20+
'(default is 9092)', default=None, type=int)
2121
parser.add_argument('--db-port', dest='db_port', metavar='DB_PORT',
2222
help='the network port used by the DB, currently implemented with Cassandra '
23-
'(default is 9042', default=None, type=int)
23+
'(default is 9042)', default=None, type=int)
2424
parser.add_argument('--join-leader', dest='leader', metavar='NODE_IP', default=None,
2525
help='use this option to join this node to an existent cluster. '
2626
'NODE is the IP of the remote host to connect to.')
@@ -66,7 +66,7 @@ def main_start(argv=None):
6666
print('Starting engine "%s"...' % engine.name, end='', flush=True)
6767
node.start(api_port=args.api_port,
6868
cluster_port=args.cluster_port,
69-
datastream_port=args.datastream_port,
69+
binlog_port=args.binlog_port,
7070
db_port=args.db_port,
7171
leader=args.leader,
7272
verbosity=args.verbosity,
@@ -135,13 +135,13 @@ def main_status(argv=None):
135135
if node_running else 'stopped'
136136
cluster_s = ('running - port %d' % node_state.cluster_port) \
137137
if node_running else 'stopped'
138-
datastream_s = ('running - %s:%d' % (node_state.datastream_host, node_state.datastream_port)) \
138+
binlog_s = ('running - %s:%d' % (node_state.binlog_host, node_state.binlog_port)) \
139139
if node_running else 'stopped'
140140
database_s = ('running - %s:%d' % (node_state.database_host, node_state.database_port)) \
141141
if node_running else 'stopped'
142142

143143
print('[Engine: "%s"]' % engine.name)
144144
print(' REST API: %s' % rest_api_s)
145145
print(' Cluster: %s' % cluster_s)
146-
print(' Datastream: %s' % datastream_s)
146+
print(' Binary log: %s' % binlog_s)
147147
print(' Database: %s' % database_s)

src/backup-model/src/main/java/eu/modernmt/backup/BackupEngine.java

+19-20
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package eu.modernmt.backup;
22

33
import eu.modernmt.backup.model.BackupMemory;
4-
import eu.modernmt.cluster.kafka.KafkaDataManager;
5-
import eu.modernmt.config.AnalyzerConfig;
6-
import eu.modernmt.config.DataStreamConfig;
4+
import eu.modernmt.cluster.kafka.KafkaBinaryLog;
5+
import eu.modernmt.config.BinaryLogConfig;
76
import eu.modernmt.config.NodeConfig;
87
import eu.modernmt.context.lucene.LuceneAnalyzer;
9-
import eu.modernmt.data.DataListener;
10-
import eu.modernmt.data.DataManager;
8+
import eu.modernmt.data.LogDataListener;
9+
import eu.modernmt.data.BinaryLog;
1110
import eu.modernmt.data.HostUnreachableException;
1211
import eu.modernmt.decoder.neural.memory.lucene.LuceneTranslationMemory;
1312
import eu.modernmt.hw.NetworkUtils;
@@ -34,23 +33,23 @@ public class BackupEngine {
3433

3534
private LuceneAnalyzer contextAnalyzer = null;
3635
private LuceneTranslationMemory memory = null;
37-
private DataManager dataManager = null;
36+
private BinaryLog binlog = null;
3837
private BackupMemory backupMemory = null;
3938

4039
public BackupEngine(NodeConfig config, File models) {
4140
this.uuid = UUID.randomUUID().toString();
4241
this.config = config;
4342
this.models = models;
4443

45-
DataStreamConfig dataStreamConfig = config.getDataStreamConfig();
44+
BinaryLogConfig binaryLogConfig = config.getBinaryLogConfig();
4645

47-
String[] hosts = dataStreamConfig.getHosts();
48-
boolean localDatastream = hosts.length == 1 && NetworkUtils.isLocalhost(hosts[0]);
49-
boolean embeddedDatastream = dataStreamConfig.isEmbedded();
46+
String[] hosts = binaryLogConfig.getHosts();
47+
boolean localBinaryLog = hosts.length == 1 && NetworkUtils.isLocalhost(hosts[0]);
48+
boolean embeddedBinaryLog = binaryLogConfig.isEmbedded();
5049

51-
if (embeddedDatastream && localDatastream) {
50+
if (embeddedBinaryLog && localBinaryLog) {
5251
String host = NetworkUtils.getMyIpv4Address();
53-
dataStreamConfig.setHost(host);
52+
binaryLogConfig.setHost(host);
5453
}
5554
}
5655

@@ -68,15 +67,15 @@ public void start() throws IOException {
6867
memory = new LuceneTranslationMemory(Paths.join(models, "memory"), 1);
6968
backupMemory = new BackupMemory(Paths.join(models, "backup"));
7069

71-
dataManager = new KafkaDataManager(config.getEngineConfig().getLanguageIndex(), new Preprocessor(), null, uuid, config.getDataStreamConfig());
72-
for (DataListener listener : contextAnalyzer.getDataListeners())
73-
dataManager.addDataListener(listener);
74-
dataManager.addDataListener(memory);
75-
dataManager.addDataListener(backupMemory);
70+
binlog = new KafkaBinaryLog(config.getEngineConfig().getLanguageIndex(), new Preprocessor(), null, uuid, config.getBinaryLogConfig());
71+
for (LogDataListener listener : contextAnalyzer.getDataListeners())
72+
binlog.addLogDataListener(listener);
73+
binlog.addLogDataListener(memory);
74+
binlog.addLogDataListener(backupMemory);
7675

7776
Map<Short, Long> positions;
7877
try {
79-
positions = dataManager.connect(60, TimeUnit.SECONDS, true, false);
78+
positions = binlog.connect(60, TimeUnit.SECONDS, true, false);
8079
} catch (HostUnreachableException e) {
8180
throw new IOException(e);
8281
}
@@ -90,7 +89,7 @@ public void stop() throws IOException {
9089
}
9190

9291
public void stop(boolean optimize) throws IOException {
93-
IOException dmError = close(dataManager);
92+
IOException dmError = close(binlog);
9493
IOException caError;
9594
IOException mError;
9695
IOException sError;
@@ -132,7 +131,7 @@ public void stop(boolean optimize) throws IOException {
132131
if (sError != null)
133132
throw sError;
134133

135-
dataManager = null;
134+
binlog = null;
136135
contextAnalyzer = null;
137136
memory = null;
138137
backupMemory = null;

src/backup-model/src/main/java/eu/modernmt/backup/model/BackupMemory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Map;
2121
import java.util.function.Consumer;
2222

23-
public class BackupMemory implements Closeable, DataListener {
23+
public class BackupMemory implements Closeable, LogDataListener {
2424

2525
private final Logger logger = LogManager.getLogger(BackupMemory.class);
2626

src/command-line/src/main/java/eu/modernmt/cli/ClusterNodeMain.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private static class Args {
5454

5555
Option apiPort = Option.builder("a").longOpt("api-port").hasArg().type(Integer.class).required(false).build();
5656
Option clusterPort = Option.builder("p").longOpt("cluster-port").hasArg().type(Integer.class).required(false).build();
57-
Option datastreamPort = Option.builder().longOpt("datastream-port").hasArg().required(false).build();
57+
Option binlogPort = Option.builder().longOpt("binlog-port").hasArg().required(false).build();
5858
Option databasePort = Option.builder().longOpt("db-port").hasArg().required(false).build();
5959

6060
Option leader = Option.builder().longOpt("leader").hasArg().required(false).build();
@@ -69,7 +69,7 @@ private static class Args {
6969
cliOptions.addOption(verbosity);
7070
cliOptions.addOption(leader);
7171
cliOptions.addOption(logFile);
72-
cliOptions.addOption(datastreamPort);
72+
cliOptions.addOption(binlogPort);
7373
cliOptions.addOption(databasePort);
7474
}
7575

@@ -106,13 +106,13 @@ public Args(String[] args) throws ParseException, ConfigException {
106106
this.verbosity = verbosity == null ? 1 : Integer.parseInt(verbosity);
107107

108108
/* read the engine.xconf file and build the engineConfig, with its:
109-
network config, datastream config, database config, net-api config and net-join config */
109+
network config, binary-log config, database config, net-api config and net-join config */
110110
this.config = XMLConfigBuilder.build(Engine.getConfigFile(this.engine));
111111
this.config.getEngineConfig().setName(this.engine);
112112

113113
// Create the config objects based on the engine.xconf file
114114
NetworkConfig netConfig = this.config.getNetworkConfig();
115-
DataStreamConfig streamConfig = this.config.getDataStreamConfig();
115+
BinaryLogConfig binlogConfig = this.config.getBinaryLogConfig();
116116
DatabaseConfig dbConfig = this.config.getDatabaseConfig();
117117
ApiConfig apiConfig = netConfig.getApiConfig();
118118
JoinConfig joinConfig = netConfig.getJoinConfig();
@@ -128,9 +128,9 @@ public Args(String[] args) throws ParseException, ConfigException {
128128
String apiPort = cli.getOptionValue("api-port");
129129
if (apiPort != null)
130130
apiConfig.setPort(Integer.parseInt(apiPort));
131-
String datastreamPort = cli.getOptionValue("datastream-port");
132-
if (datastreamPort != null)
133-
streamConfig.setPort(Integer.parseInt(datastreamPort));
131+
String binlogPort = cli.getOptionValue("binlog-port");
132+
if (binlogPort != null)
133+
binlogConfig.setPort(Integer.parseInt(binlogPort));
134134
String databasePort = cli.getOptionValue("db-port");
135135
if (databasePort != null)
136136
dbConfig.setPort(Integer.parseInt(databasePort));
@@ -141,21 +141,21 @@ public Args(String[] args) throws ParseException, ConfigException {
141141
142142
If a leader was passed by command line with option --join-leader:
143143
- create a Member with its host and port try to use this Member only to join the cluster
144-
- the leader hosts the database and datastream, so set it as the host in datastreamconfig and databaseconfig
144+
- the leader hosts the database and binlog, so set it as the host in binaryLogConfig and databaseConfig
145145
146-
Assume that all the ports that the leader uses (cluster, datastream and cassandra ports) are the same as this node.
146+
Assume that all the ports that the leader uses (cluster, binlog and cassandra ports) are the same as this node.
147147
If they are not, then the configuration is wrong.
148148
149149
If no leader is passed by command line, do nothing.
150150
- The config object will use the list of members parsed from engine.xconf and try to join them in order.
151-
- use the database and datastream hosts and ports in engine.xconf for datastreamconfig and databaseconfig */
151+
- use the database and binlog hosts and ports in engine.xconf for binaryLogConfig and databaseConfig */
152152
String leader = cli.getOptionValue("leader");
153153

154154
if (leader != null) {
155155
JoinConfig.Member[] members = new JoinConfig.Member[1];
156156
members[0] = new JoinConfig.Member(leader, netConfig.getPort());
157157
joinConfig.setMembers(members);
158-
streamConfig.setHost(leader);
158+
binlogConfig.setHost(leader);
159159
dbConfig.setHost(leader);
160160
}
161161
}
@@ -207,7 +207,7 @@ private static class FileStatusListener implements ClusterNode.StatusListener {
207207
* - cluster port
208208
* - api config (root and port),
209209
* - database (host and port)
210-
* - datastream (host and port)
210+
* - binlog (host and port)
211211
*
212212
* @param file the node.properties JSON file
213213
* in which to store the definitive configuration
@@ -219,7 +219,7 @@ public FileStatusListener(File file, NodeConfig config) {
219219
NetworkConfig netConfig = config.getNetworkConfig();
220220
ApiConfig apiConfig = netConfig.getApiConfig();
221221
DatabaseConfig dbConfig = config.getDatabaseConfig();
222-
DataStreamConfig streamConfig = config.getDataStreamConfig();
222+
BinaryLogConfig binlogConfig = config.getBinaryLogConfig();
223223

224224
this.properties = new JsonObject();
225225

@@ -239,11 +239,11 @@ public FileStatusListener(File file, NodeConfig config) {
239239
this.properties.add("database", db);
240240
}
241241

242-
if (streamConfig.isEnabled()) {
243-
JsonObject stream = new JsonObject();
244-
stream.addProperty("port", streamConfig.getPort());
245-
stream.addProperty("host", StringUtils.join(streamConfig.getHosts(), ','));
246-
this.properties.add("datastream", stream);
242+
if (binlogConfig.isEnabled()) {
243+
JsonObject binlog = new JsonObject();
244+
binlog.addProperty("port", binlogConfig.getPort());
245+
binlog.addProperty("host", StringUtils.join(binlogConfig.getHosts(), ','));
246+
this.properties.add("binlog", binlog);
247247
}
248248

249249
this.properties.addProperty("cluster_port", netConfig.getPort());

src/commons/src/main/java/eu/modernmt/config/DataStreamConfig.java src/commons/src/main/java/eu/modernmt/config/BinaryLogConfig.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
* Created by davide on 04/01/17.
77
* Updated by andrearossi on 03/04/17
88
* <p>
9-
* This class embodies a configuration for a DataStream.
9+
* This class embodies a configuration for a BinaryLog.
1010
* It may be read from a configuration file (e.g. engineConf.xml)
1111
* (or obtained in some different way)
1212
*/
13-
public class DataStreamConfig {
13+
public class BinaryLogConfig {
1414

1515
private final NodeConfig parent;
1616
private boolean enabled = true;
@@ -19,7 +19,7 @@ public class DataStreamConfig {
1919
private int port = 9092;
2020
private String name = null;
2121

22-
public DataStreamConfig(NodeConfig parent) {
22+
public BinaryLogConfig(NodeConfig parent) {
2323
this.parent = parent;
2424
}
2525

@@ -31,7 +31,7 @@ public boolean isEnabled() {
3131
return enabled;
3232
}
3333

34-
public DataStreamConfig setEnabled(boolean enabled) {
34+
public BinaryLogConfig setEnabled(boolean enabled) {
3535
this.enabled = enabled;
3636
return this;
3737
}
@@ -40,7 +40,7 @@ public boolean isEmbedded() {
4040
return this.embedded;
4141
}
4242

43-
public DataStreamConfig setEmbedded(boolean embedded) {
43+
public BinaryLogConfig setEmbedded(boolean embedded) {
4444
this.embedded = embedded;
4545
return this;
4646
}
@@ -49,7 +49,7 @@ public int getPort() {
4949
return this.port;
5050
}
5151

52-
public DataStreamConfig setPort(int port) {
52+
public BinaryLogConfig setPort(int port) {
5353
this.port = port;
5454
return this;
5555
}
@@ -58,12 +58,12 @@ public String[] getHosts() {
5858
return this.hosts;
5959
}
6060

61-
public DataStreamConfig setHost(String host) {
61+
public BinaryLogConfig setHost(String host) {
6262
this.hosts = new String[]{host};
6363
return this;
6464
}
6565

66-
public DataStreamConfig setHosts(String[] hosts) {
66+
public BinaryLogConfig setHosts(String[] hosts) {
6767
this.hosts = hosts;
6868
return this;
6969
}
@@ -72,14 +72,14 @@ public String getName() {
7272
return name;
7373
}
7474

75-
public DataStreamConfig setName(String name) {
75+
public BinaryLogConfig setName(String name) {
7676
this.name = name;
7777
return this;
7878
}
7979

8080
@Override
8181
public String toString() {
82-
return "Datastream: " +
82+
return "Binlog: " +
8383
"enabled=" + enabled +
8484
", embedded=" + embedded +
8585
", hosts=" + StringUtils.join(hosts, ',') +

0 commit comments

Comments
 (0)