Skip to content

Commit

Permalink
[Fix-16884] Using transaction when fetch command to avoid fetch comma…
Browse files Browse the repository at this point in the history
…nd from slave database (#16885)
  • Loading branch information
ruanwenjun authored Dec 8, 2024
1 parent 0d9dba2 commit 80c5f59
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Component
Expand All @@ -40,7 +41,11 @@ public AlertEventFetcher(AlertHAServer alertHAServer,
}

@Override
@Transactional
public List<Alert> fetchPendingEvent(int eventOffset) {
// We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed
// to the master db.
// Avoid we query from the slave and find the data is not the latest.
return alertDao.listPendingAlerts(eventOffset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

/**
* Use to watch the worker group from database and notify the change.
Expand All @@ -44,16 +44,22 @@
@Component
public class WorkerGroupChangeNotifier {

@Autowired
private MasterConfig masterConfig;
private final MasterConfig masterConfig;

private final TransactionTemplate transactionTemplate;

private final WorkerGroupDao workerGroupDao;

private final List<WorkerGroupListener> listeners = new CopyOnWriteArrayList<>();

private Map<String, WorkerGroup> workerGroupMap = new HashMap<>();

public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) {
public WorkerGroupChangeNotifier(final MasterConfig masterConfig,
final WorkerGroupDao workerGroupDao,
final TransactionTemplate transactionTemplate) {
this.masterConfig = masterConfig;
this.workerGroupDao = workerGroupDao;
this.transactionTemplate = transactionTemplate;
}

public void start() {
Expand Down Expand Up @@ -85,10 +91,15 @@ Map<String, WorkerGroup> getWorkerGroupMap() {
}

private MapComparator<String, WorkerGroup> detectChangedWorkerGroups() {
Map<String, WorkerGroup> tmpWorkerGroupMap = workerGroupDao.queryAll()
.stream()
.collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup));
return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap);
// We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed
// to the master db.
// Avoid we query from the slave and find the data is not the latest.
return transactionTemplate.execute(status -> {
Map<String, WorkerGroup> tmpWorkerGroupMap = workerGroupDao.queryAll()
.stream()
.collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup));
return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap);
});
}

private void triggerListeners(MapComparator<String, WorkerGroup> mapComparator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.transaction.annotation.Transactional;

/**
* The command fetcher which is fetch commands by command id and slot.
*/
Expand All @@ -48,7 +50,10 @@ public IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig idS
this.commandDao = commandDao;
}

// We use transaction here to ensure that if mysql is configured at master/slave mode, this query will be routed to
// the master db.
@Override
@Transactional
public List<Command> fetchCommands() {
long scheduleStartTime = System.currentTimeMillis();
if (!masterSlotManager.checkSlotValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;

import com.google.common.collect.Lists;

Expand All @@ -36,7 +39,10 @@ class WorkerGroupChangeNotifierTest {
@Test
void detectWorkerGroupChanges_addedWorkerGroup() {
WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class);
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao);
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(
new MasterConfig(),
workerGroupDao,
new MockTransactionTemplate());

WorkerGroup workerGroup1 = WorkerGroup.builder()
.name("workerGroup1")
Expand Down Expand Up @@ -74,7 +80,10 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
@Test
void detectWorkerGroupChanges_deleteWorkerGroup() {
WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class);
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao);
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(
new MasterConfig(),
workerGroupDao,
new MockTransactionTemplate());

WorkerGroup workerGroup1 = WorkerGroup.builder()
.name("workerGroup1")
Expand Down Expand Up @@ -115,7 +124,10 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
@Test
void detectWorkerGroupChanges_updateWorkerGroup() {
WorkerGroupDao workerGroupDao = Mockito.mock(WorkerGroupDao.class);
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(workerGroupDao);
WorkerGroupChangeNotifier workerGroupChangeNotifier = new WorkerGroupChangeNotifier(
new MasterConfig(),
workerGroupDao,
new MockTransactionTemplate());

WorkerGroup workerGroup1 = WorkerGroup.builder()
.name("workerGroup1")
Expand Down Expand Up @@ -156,4 +168,11 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
assertThat(workerGroupDeleted.get()).isFalse();
assertThat(workerGroupChangeNotifier.getWorkerGroupMap()).containsEntry("workerGroup1", updatedWorkerGroup1);
}

public static class MockTransactionTemplate extends TransactionTemplate {

public <T> T execute(final TransactionCallback<T> action) {
return action.doInTransaction(null);
}
}
}

0 comments on commit 80c5f59

Please sign in to comment.