Skip to content

Commit fb179aa

Browse files
authored
Merge branch 'dev' into dev
2 parents 36854d5 + 80c5f59 commit fb179aa

File tree

4 files changed

+51
-11
lines changed

4 files changed

+51
-11
lines changed

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertEventFetcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.extern.slf4j.Slf4j;
2626

2727
import org.springframework.stereotype.Component;
28+
import org.springframework.transaction.annotation.Transactional;
2829

2930
@Slf4j
3031
@Component
@@ -40,7 +41,11 @@ public AlertEventFetcher(AlertHAServer alertHAServer,
4041
}
4142

4243
@Override
44+
@Transactional
4345
public List<Alert> fetchPendingEvent(int eventOffset) {
46+
// We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed
47+
// to the master db.
48+
// Avoid we query from the slave and find the data is not the latest.
4449
return alertDao.listPendingAlerts(eventOffset);
4550
}
4651

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434

3535
import lombok.extern.slf4j.Slf4j;
3636

37-
import org.springframework.beans.factory.annotation.Autowired;
3837
import org.springframework.stereotype.Component;
38+
import org.springframework.transaction.support.TransactionTemplate;
3939

4040
/**
4141
* Use to watch the worker group from database and notify the change.
@@ -44,16 +44,22 @@
4444
@Component
4545
public class WorkerGroupChangeNotifier {
4646

47-
@Autowired
48-
private MasterConfig masterConfig;
47+
private final MasterConfig masterConfig;
48+
49+
private final TransactionTemplate transactionTemplate;
4950

5051
private final WorkerGroupDao workerGroupDao;
52+
5153
private final List<WorkerGroupListener> listeners = new CopyOnWriteArrayList<>();
5254

5355
private Map<String, WorkerGroup> workerGroupMap = new HashMap<>();
5456

55-
public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) {
57+
public WorkerGroupChangeNotifier(final MasterConfig masterConfig,
58+
final WorkerGroupDao workerGroupDao,
59+
final TransactionTemplate transactionTemplate) {
60+
this.masterConfig = masterConfig;
5661
this.workerGroupDao = workerGroupDao;
62+
this.transactionTemplate = transactionTemplate;
5763
}
5864

5965
public void start() {
@@ -85,10 +91,15 @@ Map<String, WorkerGroup> getWorkerGroupMap() {
8591
}
8692

8793
private MapComparator<String, WorkerGroup> detectChangedWorkerGroups() {
88-
Map<String, WorkerGroup> tmpWorkerGroupMap = workerGroupDao.queryAll()
89-
.stream()
90-
.collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup));
91-
return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap);
94+
// We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed
95+
// to the master db.
96+
// Avoid we query from the slave and find the data is not the latest.
97+
return transactionTemplate.execute(status -> {
98+
Map<String, WorkerGroup> tmpWorkerGroupMap = workerGroupDao.queryAll()
99+
.stream()
100+
.collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup));
101+
return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap);
102+
});
92103
}
93104

94105
private void triggerListeners(MapComparator<String, WorkerGroup> mapComparator) {

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/IdSlotBasedCommandFetcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import lombok.extern.slf4j.Slf4j;
3030

31+
import org.springframework.transaction.annotation.Transactional;
32+
3133
/**
3234
* The command fetcher which is fetch commands by command id and slot.
3335
*/
@@ -48,7 +50,10 @@ public IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig idS
4850
this.commandDao = commandDao;
4951
}
5052

53+
// We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed to
54+
// the master db.
5155
@Override
56+
@Transactional
5257
public List<Command> fetchCommands() {
5358
long scheduleStartTime = System.currentTimeMillis();
5459
if (!masterSlotManager.checkSlotValid()) {

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifierTest.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222

2323
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
2424
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
25+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2526

2627
import java.util.List;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.junit.jupiter.api.Test;
3031
import org.mockito.Mockito;
32+
import org.springframework.transaction.support.TransactionCallback;
33+
import org.springframework.transaction.support.TransactionTemplate;
3134

3235
import com.google.common.collect.Lists;
3336

@@ -36,7 +39,10 @@ class WorkerGroupChangeNotifierTest {
3639
@Test
3740
void detectWorkerGroupChanges_addedWorkerGroup() {
3841
WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class);
39-
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao);
42+
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(
43+
new MasterConfig(),
44+
workerGroupDao,
45+
new MockTransactionTemplate());
4046

4147
WorkerGroup workerGroup1 = WorkerGroup.builder()
4248
.name("workerGroup1")
@@ -74,7 +80,10 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
7480
@Test
7581
void detectWorkerGroupChanges_deleteWorkerGroup() {
7682
WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class);
77-
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao);
83+
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(
84+
new MasterConfig(),
85+
workerGroupDao,
86+
new MockTransactionTemplate());
7887

7988
WorkerGroup workerGroup1 = WorkerGroup.builder()
8089
.name("workerGroup1")
@@ -115,7 +124,10 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
115124
@Test
116125
void detectWorkerGroupChanges_updateWorkerGroup() {
117126
WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class);
118-
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao);
127+
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(
128+
new MasterConfig(),
129+
workerGroupDao,
130+
new MockTransactionTemplate());
119131

120132
WorkerGroup workerGroup1 = WorkerGroup.builder()
121133
.name("workerGroup1")
@@ -156,4 +168,11 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
156168
assertThat(workerGroupDeleted.get()).isFalse();
157169
assertThat(workerGroupChangeNotifier.getWorkerGroupMap()).containsEntry("workerGroup1", updatedWorkerGroup1);
158170
}
171+
172+
public static class MockTransactionTemplate extends TransactionTemplate {
173+
174+
public <T> T execute(final TransactionCallback<T> action) {
175+
return action.doInTransaction(null);
176+
}
177+
}
159178
}

0 commit comments

Comments
 (0)