Skip to content

Commit

Permalink
[Feature][Zeta]Add jar path precheck when job submit on master (#7976)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangshenghang authored Nov 7, 2024
1 parent fe03f0f commit 1df6b83
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
env {
parallelism = 1
job.mode = "BATCH"
jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
}

source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
env {
parallelism = 1
job.mode = "BATCH"
jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
}

source {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.engine.common.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum ClassLoaderErrorCode implements SeaTunnelErrorCode {
NOT_FOUND_JAR("NOT-FOUND-JAR", "Jar package not found");

private final String code;
private final String description;

ClassLoaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.common.exception;

import org.apache.seatunnel.common.exception.ExceptionParamsUtil;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

import java.util.HashMap;

public class ClassLoaderException extends SeaTunnelEngineException {

public ClassLoaderException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode.getErrorMessage() + " - " + errorMessage);
ExceptionParamsUtil.assertParamsMatchWithDescription(
seaTunnelErrorCode.getDescription(), new HashMap<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.seatunnel.engine.core.classloader;

import org.apache.seatunnel.engine.common.exception.ClassLoaderErrorCode;
import org.apache.seatunnel.engine.common.exception.ClassLoaderException;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.net.URL;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -34,14 +41,17 @@ public class DefaultClassLoaderService implements ClassLoaderService {
private final boolean cacheMode;
private final Map<Long, Map<String, ClassLoader>> classLoaderCache;
private final Map<Long, Map<String, AtomicInteger>> classLoaderReferenceCount;
private final NodeEngine nodeEngine;

public DefaultClassLoaderService(boolean cacheMode) {
public DefaultClassLoaderService(boolean cacheMode, NodeEngine nodeEngine) {
this.cacheMode = cacheMode;
this.nodeEngine = nodeEngine;
classLoaderCache = new ConcurrentHashMap<>();
classLoaderReferenceCount = new ConcurrentHashMap<>();
log.info("start classloader service" + (cacheMode ? " with cache mode" : ""));
}

@SneakyThrows
@Override
public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> jars) {
log.debug("Get classloader for job {} with jars {}", jobId, jars);
Expand All @@ -59,6 +69,24 @@ public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> jars)
classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();
return classLoaderMap.get(key);
} else {
if (Objects.nonNull(nodeEngine)) {
for (URL jar : jars) {
File file = new File(jar.toURI().getPath());
if (!file.exists()) {
String host =
((NodeEngineImpl) nodeEngine).getNode().getThisAddress().getHost();
throw new ClassLoaderException(
ClassLoaderErrorCode.NOT_FOUND_JAR,
"The jar file "
+ jar
+ " can not be found in node "
+ host
+ ", please ensure that the deployment paths of SeaTunnel on different nodes are consistent.");
}
}
} else {
log.debug("Run the test class without file checking");
}
ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(jars);
log.info("Create classloader for job {} with jars {}", jobId, jars);
classLoaderMap.put(key, classLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class AbstractClassLoaderServiceTest {

@BeforeEach
void setUp() {
classLoaderService = new DefaultClassLoaderService(cacheMode());
classLoaderService = new DefaultClassLoaderService(cacheMode(), null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@

package org.apache.seatunnel.engine.core.classloader;

import org.apache.seatunnel.engine.common.exception.ClassLoaderException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import com.google.common.collect.Lists;
import com.hazelcast.cluster.Address;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.spi.impl.NodeEngineImpl;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

Expand Down Expand Up @@ -100,4 +108,45 @@ void testRecycleClassLoaderFromThread() throws MalformedURLException, Interrupte
Thread.sleep(2000);
Assertions.assertFalse(thread.isAlive());
}

@Test
void testPreCheckJar() throws IOException {

// Mocking Node and NodeEngineImpl for testing
Node mockNode = Mockito.mock(Node.class);
Mockito.when(mockNode.getThisAddress()).thenReturn(new Address("localhost", 5801));
NodeEngineImpl mockNodeEngine = Mockito.mock(NodeEngineImpl.class);
Mockito.when(mockNodeEngine.getNode()).thenReturn(mockNode);
// Creating DefaultClassLoaderService object for testing
DefaultClassLoaderService defaultClassLoaderService =
new DefaultClassLoaderService(cacheMode(), mockNodeEngine);
// Test case to check ClassLoaderException when file is not found
Assertions.assertThrows(
ClassLoaderException.class,
() -> {
try {
defaultClassLoaderService.getClassLoader(
3L, Lists.newArrayList(new URL("file:/fake.jar")));
} catch (ClassLoaderException e) {
Assertions.assertTrue(
e.getMessage()
.contains(
"The jar file file:/fake.jar can not be found in node localhost, please ensure that the deployment paths of SeaTunnel on different nodes are consistent."));
throw e;
}
});

// Creating a temporary jar file for testing
File tempJar = File.createTempFile("console", ".jar");
String tempJarPath = tempJar.toURI().toURL().toString();

// Test case to check successful class loader creation with existing jar file
Assertions.assertDoesNotThrow(
() ->
defaultClassLoaderService.getClassLoader(
3L, Lists.newArrayList(new URL(tempJarPath))));

// Deleting the temporary jar file after test
tempJar.delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void init(NodeEngine engine, Properties hzProperties) {

classLoaderService =
new DefaultClassLoaderService(
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode());
seaTunnelConfig.getEngineConfig().isClassloaderCacheMode(), nodeEngine);

eventService = new EventService(nodeEngine);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import com.hazelcast.internal.serialization.Data;
import lombok.NonNull;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
Expand Down Expand Up @@ -167,7 +167,12 @@ public void testFinish() {
}

@Test
public void testClassloaderSplit() throws MalformedURLException {
public void testClassloaderSplit() throws IOException {
File console = File.createTempFile("console", ".jar");
File fake = File.createTempFile("fake", ".jar");
String consoleFile = console.toURI().toURL().toString();
String fakeFile = fake.toURI().toURL().toString();

TaskExecutionService taskExecutionService = server.getTaskExecutionService();

long sleepTime = 300;
Expand All @@ -190,8 +195,8 @@ public void testClassloaderSplit() throws MalformedURLException {
nodeEngine.getSerializationService().toData(testTask1),
nodeEngine.getSerializationService().toData(testTask2)),
Arrays.asList(
Collections.singleton(new URL("file://fake.jar")),
Collections.singleton(new URL("file://console.jar"))),
Collections.singleton(new URL(fakeFile)),
Collections.singleton(new URL(consoleFile))),
Arrays.asList(emptySet(), emptySet()));

Data data = nodeEngine.getSerializationService().toData(taskGroupImmutableInformation);
Expand All @@ -203,24 +208,27 @@ public void testClassloaderSplit() throws MalformedURLException {
TaskGroupContext taskGroupContext =
taskExecutionService.getActiveExecutionContext(location);
Assertions.assertIterableEquals(
Collections.singleton(new URL("file://fake.jar")),
Collections.singleton(new URL(fakeFile)),
taskGroupContext.getJars().get(testTask1.getTaskID()));
Assertions.assertIterableEquals(
Collections.singleton(new URL("file://console.jar")),
Collections.singleton(new URL(consoleFile)),
taskGroupContext.getJars().get(testTask2.getTaskID()));

Assertions.assertIterableEquals(
Collections.singletonList(new URL("file://fake.jar")),
Collections.singletonList(new URL(fakeFile)),
Arrays.asList(
((URLClassLoader) taskGroupContext.getClassLoader(testTask1.getTaskID()))
.getURLs()));
Assertions.assertIterableEquals(
Collections.singletonList(new URL("file://console.jar")),
Collections.singletonList(new URL(consoleFile)),
Arrays.asList(
((URLClassLoader) taskGroupContext.getClassLoader(testTask2.getTaskID()))
.getURLs()));

taskExecutionService.cancelTaskGroup(location);

fake.delete();
console.delete();
}

/** Test task execution time is the same as the timer timeout */
Expand Down

0 comments on commit 1df6b83

Please sign in to comment.