Skip to content

Commit 0f13686

Browse files
committed
Merge branch 'dev' of https://github.com/datasophon/datasophon into dev
2 parents cab0780 + 499e32d commit 0f13686

File tree

11 files changed

+148
-5
lines changed

11 files changed

+148
-5
lines changed

datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException
6767
getActorRefName(ServiceRoleCheckActor.class));
6868
ActorRef hostCheckActor =
6969
actorSystem.actorOf(Props.create(HostCheckActor.class), getActorRefName(HostCheckActor.class));
70+
actorSystem.actorOf(Props.create(MasterNodeProcessingActor.class),
71+
getActorRefName(MasterNodeProcessingActor.class));
7072

7173
actorSystem.scheduler().schedule(
7274
FiniteDuration.apply(60L, TimeUnit.SECONDS),
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.datasophon.api.master;
2+
3+
import akka.actor.UntypedActor;
4+
import com.datasophon.common.command.OlapOpsType;
5+
import com.datasophon.common.command.OlapSqlExecCommand;
6+
import com.datasophon.common.utils.ExecResult;
7+
import com.datasophon.common.utils.OlapUtils;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.util.concurrent.TimeUnit;
12+
13+
public class MasterNodeProcessingActor extends UntypedActor {
14+
15+
private static final Logger logger = LoggerFactory.getLogger(MasterNodeProcessingActor.class);
16+
17+
@Override
18+
public void onReceive(Object message) throws Throwable {
19+
if (message instanceof OlapSqlExecCommand) {
20+
OlapSqlExecCommand command = (OlapSqlExecCommand) message;
21+
ExecResult execResult = OlapOpsType.ADD_BE.equals(command.getOpsType())
22+
? OlapUtils.addBackendBySqlClient(command.getFeMaster(), command.getHostName())
23+
: OlapUtils.addFollowerBySqlClient(command.getFeMaster(), command.getHostName());
24+
String tip = OlapOpsType.ADD_BE.equals(command.getOpsType()) ? "backend" : "follower";
25+
if (execResult.getExecResult()) {
26+
logger.info(command.getHostName() + " " + tip + " be added success");
27+
} else {
28+
logger.info(command.getHostName() + " " + tip + " be added failed");
29+
}
30+
int tryTimes = 0;
31+
while (!execResult.getExecResult() && tryTimes < 3) {
32+
try {
33+
TimeUnit.SECONDS.sleep(10L);
34+
execResult = OlapOpsType.ADD_BE.equals(command.getOpsType())
35+
? OlapUtils.addBackendBySqlClient(command.getFeMaster(), command.getHostName())
36+
: OlapUtils.addFollowerBySqlClient(command.getFeMaster(), command.getHostName());
37+
if (execResult.getExecResult()) {
38+
logger.info(command.getHostName() + " " + tip + " be added success");
39+
break;
40+
} else {
41+
logger.info(command.getHostName() + " " + tip + " be added failed");
42+
}
43+
tryTimes++;
44+
} catch (InterruptedException e) {
45+
logger.info("The SR operate be sleep operation failed");
46+
}
47+
}
48+
} else {
49+
unhandled(message);
50+
}
51+
}
52+
}

datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceStartHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import com.datasophon.api.load.GlobalVariables;
2121
import com.datasophon.api.master.ActorUtils;
22+
import com.datasophon.common.Constants;
23+
import com.datasophon.common.cache.CacheUtils;
2224
import com.datasophon.common.command.ServiceRoleOperateCommand;
2325
import com.datasophon.common.enums.ServiceRoleType;
2426
import com.datasophon.common.model.ServiceRoleInfo;
@@ -56,6 +58,7 @@ public ExecResult handlerRequest(ServiceRoleInfo serviceRoleInfo) throws Excepti
5658
serviceRoleOperateCommand.setSlave(serviceRoleInfo.isSlave());
5759
serviceRoleOperateCommand.setCommandType(serviceRoleInfo.getCommandType());
5860
serviceRoleOperateCommand.setMasterHost(serviceRoleInfo.getMasterHost());
61+
serviceRoleOperateCommand.setManagerHost(CacheUtils.getString(Constants.HOSTNAME));
5962

6063
logger.info("service master host is {}", serviceRoleInfo.getMasterHost());
6164

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.datasophon.common.command;
2+
3+
/**
4+
* @Enum OlapOpsType
5+
* @Author: 张大伟
6+
* @Date: 2023/4/24 21:28
7+
* @Version: 1.0
8+
*/
9+
public enum OlapOpsType {
10+
11+
ADD_BE,
12+
ADD_FE
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.datasophon.common.command;
2+
3+
import java.io.Serializable;
4+
5+
import lombok.Data;
6+
7+
@Data
8+
public class OlapSqlExecCommand implements Serializable {
9+
10+
private OlapOpsType opsType;
11+
12+
private String feMaster;
13+
14+
private String hostName;
15+
}

datasophon-common/src/main/java/com/datasophon/common/command/ServiceRoleOperateCommand.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class ServiceRoleOperateCommand extends BaseCommand implements Serializab
4040

4141
private String masterHost;
4242

43+
private String managerHost;
44+
4345
private Boolean enableRangerPlugin;
4446

4547
private RunAs runAs;

datasophon-common/src/main/java/com/datasophon/common/utils/OlapUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static ExecResult addFollowerBySqlClient(String feMaster,
7070
return ShellUtils.exceShell(sqlCommand);
7171
}
7272

73-
public static ExecResult addBackendSqlClient(String feMaster,
73+
public static ExecResult addBackendBySqlClient(String feMaster,
7474
String hostname) {
7575
String sqlCommand =
7676
"mysql -h"

datasophon-worker/src/main/java/com/datasophon/worker/WorkerApplicationServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.datasophon.common.utils.ShellUtils;
2929
import com.datasophon.worker.actor.RemoteEventActor;
3030
import com.datasophon.worker.actor.WorkerActor;
31+
import com.datasophon.worker.utils.ActorUtils;
3132
import com.datasophon.worker.utils.UnixUtils;
3233

3334
import java.net.InetAddress;
@@ -77,6 +78,7 @@ public static void main(String[] args) throws UnknownHostException {
7778
CacheUtils.put(Constants.HOSTNAME, hostname);
7879
// init actor
7980
ActorSystem system = initActor(hostname);
81+
ActorUtils.setActorSystem(system);
8082

8183
subscribeRemoteEvent(system);
8284

datasophon-worker/src/main/java/com/datasophon/worker/strategy/BEHandlerStrategy.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717

1818
package com.datasophon.worker.strategy;
1919

20+
import akka.actor.ActorRef;
2021
import com.datasophon.common.Constants;
2122
import com.datasophon.common.cache.CacheUtils;
23+
import com.datasophon.common.command.OlapOpsType;
24+
import com.datasophon.common.command.OlapSqlExecCommand;
2225
import com.datasophon.common.command.ServiceRoleOperateCommand;
2326
import com.datasophon.common.enums.CommandType;
2427
import com.datasophon.common.utils.ExecResult;
2528
import com.datasophon.common.utils.OlapUtils;
2629
import com.datasophon.common.utils.ShellUtils;
2730
import com.datasophon.common.utils.ThrowableUtils;
2831
import com.datasophon.worker.handler.ServiceHandler;
32+
import com.datasophon.worker.utils.ActorUtils;
2933
import org.slf4j.Logger;
3034
import org.slf4j.LoggerFactory;
3135

@@ -48,8 +52,13 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
4852
command.getDecompressPackageName(), command.getRunAs());
4953
if (startResult.getExecResult()) {
5054
try {
51-
OlapUtils.addBackend(command.getMasterHost(), CacheUtils.getString(Constants.HOSTNAME));
52-
} catch (SQLException | ClassNotFoundException e) {
55+
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
56+
sqlExecCommand.setFeMaster(command.getMasterHost());
57+
sqlExecCommand.setHostName(CacheUtils.getString(Constants.HOSTNAME));
58+
sqlExecCommand.setOpsType(OlapOpsType.ADD_BE);
59+
ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
60+
.tell(sqlExecCommand, ActorRef.noSender());
61+
} catch (Exception e) {
5362
logger.info("add backend failed {}", ThrowableUtils.getStackTrace(e));
5463
}
5564
logger.info("slave be start success");

datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEHandlerStrategy.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package com.datasophon.worker.strategy;
1919

20+
import akka.actor.ActorRef;
2021
import com.datasophon.common.Constants;
2122
import com.datasophon.common.cache.CacheUtils;
23+
import com.datasophon.common.command.OlapOpsType;
24+
import com.datasophon.common.command.OlapSqlExecCommand;
2225
import com.datasophon.common.command.ServiceRoleOperateCommand;
2326
import com.datasophon.common.enums.CommandType;
2427
import com.datasophon.common.model.ServiceRoleRunner;
@@ -30,6 +33,7 @@
3033
import java.sql.SQLException;
3134
import java.util.ArrayList;
3235

36+
import com.datasophon.worker.utils.ActorUtils;
3337
import org.slf4j.Logger;
3438
import org.slf4j.LoggerFactory;
3539

@@ -58,8 +62,13 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
5862
if (startResult.getExecResult()) {
5963
// add follower
6064
try {
61-
OlapUtils.addFollower(command.getMasterHost(), CacheUtils.getString(Constants.HOSTNAME));
62-
} catch (SQLException | ClassNotFoundException e) {
65+
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
66+
sqlExecCommand.setFeMaster(command.getMasterHost());
67+
sqlExecCommand.setHostName(CacheUtils.getString(Constants.HOSTNAME));
68+
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE);
69+
ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
70+
.tell(sqlExecCommand, ActorRef.noSender());
71+
} catch (Exception e) {
6372
logger.info("add slave fe failed {}", ThrowableUtils.getStackTrace(e));
6473
}
6574
logger.info("slave fe start success");

0 commit comments

Comments
 (0)