Skip to content

Commit

Permalink
fix(datasource): datasource sync failed (#3829)
Browse files Browse the repository at this point in the history
* fix sync ds

* code format
  • Loading branch information
MarkPotato777 authored Nov 13, 2024
1 parent beee9c9 commit 502eb06
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public ThreadPoolTaskExecutor syncDatabaseTaskExecutor() {
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(5);
executor.setTaskDecorator(new TraceDecorator<>());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
log.info("syncDatabaseTaskExecutor initialized");
return executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.service.connection;

import java.util.Collections;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -49,6 +50,8 @@ public void syncDatabases() {
if (CollectionUtils.isEmpty(orgDataSources)) {
return;
}
Collections.shuffle(orgDataSources);
log.info("Start to sync datasources, size={}", orgDataSources.size());
for (ConnectionConfig dataSource : orgDataSources) {
try {
databaseSyncManager.submitSyncDataSourceTask(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.oceanbase.odc.service.connection.database;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -27,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -72,6 +72,7 @@
import com.oceanbase.odc.core.shared.exception.BadRequestException;
import com.oceanbase.odc.core.shared.exception.ConflictException;
import com.oceanbase.odc.core.shared.exception.NotFoundException;
import com.oceanbase.odc.core.shared.exception.UnexpectedException;
import com.oceanbase.odc.metadb.connection.DatabaseEntity;
import com.oceanbase.odc.metadb.connection.DatabaseRepository;
import com.oceanbase.odc.metadb.connection.DatabaseSpecs;
Expand Down Expand Up @@ -115,6 +116,7 @@
import com.oceanbase.odc.service.iam.ResourceRoleService;
import com.oceanbase.odc.service.iam.UserService;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.model.Organization;
import com.oceanbase.odc.service.iam.model.User;
import com.oceanbase.odc.service.iam.model.UserResourceRole;
import com.oceanbase.odc.service.monitor.datasource.GetConnectionFailedEventListener;
Expand Down Expand Up @@ -512,19 +514,24 @@ public Boolean internalSyncDataSourceSchemas(@NonNull Long dataSourceId) throws
throw new ConflictException(ErrorCodes.ResourceSynchronizing,
new Object[] {ResourceType.ODC_DATABASE.getLocalizedMessage()}, "Can not acquire jdbc lock");
}
ConnectionConfig connection;
Optional<Organization> organizationOpt = Optional.empty();
try {
ConnectionConfig connection = connectionService.getForConnectionSkipPermissionCheck(dataSourceId);
connection = connectionService.getForConnectionSkipPermissionCheck(dataSourceId);
horizontalDataPermissionValidator.checkCurrentOrganization(connection);
organizationService.get(connection.getOrganizationId()).ifPresent(organization -> {
if (organization.getType() == OrganizationType.INDIVIDUAL) {
syncIndividualDataSources(connection);
} else {
syncTeamDataSources(connection);
}
});
organizationOpt = organizationService.get(connection.getOrganizationId());
Organization organization =
organizationOpt.orElseThrow(() -> new UnexpectedException("Organization not found"));
if (organization.getType() == OrganizationType.INDIVIDUAL) {
syncIndividualDataSources(connection);
} else {
syncTeamDataSources(connection);
}
connectionSyncHistoryService.upsert(connection.getId(), ConnectionSyncResult.SUCCESS,
connection.getOrganizationId(), null, null);
return true;
} catch (Exception ex) {
log.warn("Sync database failed, dataSourceId={}, errorMessage={}", dataSourceId, ex.getLocalizedMessage());
handleSyncException(ex, dataSourceId, organizationOpt);
return false;
} finally {
lock.unlock();
Expand All @@ -535,7 +542,8 @@ public int updateEnvironmentIdByConnectionId(@NotNull Long environmentId, @NotNu
return databaseRepository.setEnvironmentIdByConnectionId(environmentId, connectionId);
}

private void syncTeamDataSources(ConnectionConfig connection) {
private void syncTeamDataSources(ConnectionConfig connection)
throws ExecutionException, InterruptedException, TimeoutException {
Long currentProjectId = connection.getProjectId();
boolean blockExcludeSchemas = dbSchemaSyncProperties.isBlockExclusionsWhenSyncDbToProject();
List<String> excludeSchemas = dbSchemaSyncProperties.getExcludeSchemas(connection.getDialectType());
Expand Down Expand Up @@ -633,20 +641,6 @@ private void syncTeamDataSources(ConnectionConfig connection) {
}
connectionSyncHistoryService.upsert(connection.getId(), ConnectionSyncResult.SUCCESS,
connection.getOrganizationId(), null, null);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
String errorMessage = e.getMessage();
Throwable rootCause = e.getCause();
ConnectionSyncErrorReason failedReason = ConnectionSyncErrorReason.UNKNOWN;
log.warn("Failed to obtain the connection, errorMessage={}", errorMessage);
if (rootCause instanceof SQLException
&& StringUtils.containsIgnoreCase(errorMessage, "cluster not exist")) {
failedReason = ConnectionSyncErrorReason.CLUSTER_NOT_EXISTS;
deleteDatabaseIfClusterNotExists((SQLException) rootCause,
connection.getId(), "update connect_database set is_existed = 0 where connection_id=?");
}
connectionSyncHistoryService.upsert(connection.getId(), ConnectionSyncResult.FAILURE,
connection.getOrganizationId(), failedReason,
rootCause.getMessage());
} finally {
try {
executorService.shutdownNow();
Expand Down Expand Up @@ -685,7 +679,8 @@ private Long getProjectId(DatabaseEntity database, Long currentProjectId, List<S
return projectId;
}

private void syncIndividualDataSources(ConnectionConfig connection) {
private void syncIndividualDataSources(ConnectionConfig connection)
throws ExecutionException, InterruptedException, TimeoutException {
DataSource individualDataSource = getDataSourceFactory(connection).getDataSource();
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Set<String>> future = executorService.submit(() -> {
Expand Down Expand Up @@ -733,20 +728,6 @@ private void syncIndividualDataSources(ConnectionConfig connection) {
}
connectionSyncHistoryService.upsert(connection.getId(), ConnectionSyncResult.SUCCESS,
connection.getOrganizationId(), null, null);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
String errorMessage = e.getMessage();
Throwable rootCause = e.getCause();
ConnectionSyncErrorReason failedReason = ConnectionSyncErrorReason.UNKNOWN;
log.warn("Failed to obtain the connection, errorMessage={}", errorMessage);
if (rootCause instanceof SQLException
&& StringUtils.containsIgnoreCase(errorMessage, "cluster not exist")) {
failedReason = ConnectionSyncErrorReason.CLUSTER_NOT_EXISTS;
deleteDatabaseIfClusterNotExists((SQLException) rootCause,
connection.getId(), "delete from connect_database where connection_id=?");
}
connectionSyncHistoryService.upsert(connection.getId(), ConnectionSyncResult.FAILURE,
connection.getOrganizationId(), failedReason,
rootCause.getMessage());
} finally {
try {
executorService.shutdownNow();
Expand Down Expand Up @@ -1056,19 +1037,44 @@ private List<UserResourceRole> buildUserResourceRoles(Collection<Long> databaseI
return userResourceRoles;
}

private void handleSyncException(@NonNull Exception ex, @NonNull Long dataSourceId,
@NonNull Optional<Organization> organizationOpt) {
String errorMessage = ex.getMessage();
log.warn("Sync database failed, dataSourceId={}, errorMessage={}", dataSourceId, errorMessage);
if (!organizationOpt.isPresent()) {
return;
}
Organization organization = organizationOpt.get();
ConnectionSyncErrorReason failedReason = ConnectionSyncErrorReason.UNKNOWN;
if (StringUtils.containsIgnoreCase(errorMessage, "cluster not exist")) {
failedReason = ConnectionSyncErrorReason.CLUSTER_NOT_EXISTS;
deleteDatabaseIfInstanceNotExists(dataSourceId, organization.getType());
} else if (StringUtils.containsIgnoreCase(errorMessage, "No tenants found") || StringUtils
.containsIgnoreCase(errorMessage, "tenant expected 1 but was")) {
failedReason = ConnectionSyncErrorReason.TENANT_NOT_EXISTS;
deleteDatabaseIfInstanceNotExists(dataSourceId, organization.getType());
}
connectionSyncHistoryService.upsert(dataSourceId, ConnectionSyncResult.FAILURE, organization.getId(),
failedReason, errorMessage);
}

private void deleteDatabaseIfClusterNotExists(SQLException e, Long connectionId, String deleteSql) {
if (StringUtils.containsIgnoreCase(e.getMessage(), "cluster not exist")) {
log.info(
"Cluster not exist, set existed to false for all databases in this data source, data source id = {}",
connectionId);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
try {
jdbcTemplate.update(deleteSql, new Object[] {connectionId});
} catch (Exception ex) {
log.warn("Failed to delete databases when cluster not exist, errorMessage={}",
ex.getLocalizedMessage());
}
private void deleteDatabaseIfInstanceNotExists(Long connectionId, OrganizationType organizationType) {
log.info(
"Cluster or tenant not exist, set existed to false for all databases in this data source, data source id = {}",
connectionId);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String deleteSql;
if (organizationType == OrganizationType.INDIVIDUAL) {
deleteSql = "delete from connect_database where connection_id=?";
} else {
deleteSql = "update connect_database set is_existed = 0 where connection_id=?";
}
try {
jdbcTemplate.update(deleteSql, connectionId);
} catch (Exception ex) {
log.warn("Failed to delete databases when cluster not exist, errorMessage={}",
ex.getLocalizedMessage());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

public enum ConnectionSyncErrorReason {
CLUSTER_NOT_EXISTS,
TENANT_NOT_EXISTS,
UNKNOWN,
}

0 comments on commit 502eb06

Please sign in to comment.