Skip to content

Commit 21441db

Browse files
[b456377401] Cache Spark YARN applications
Collected Spark YARN applications need to be cached as they will be needed in the upcoming task connecting to Spark Server History and fetching their source and used Spark version from event logs.
1 parent 9a0951a commit 21441db

File tree

5 files changed

+213
-26
lines changed

5 files changed

+213
-26
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaManagerHandle.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ClouderaManagerHandle implements Handle {
3737

3838
private ImmutableList<ClouderaClusterDTO> clusters;
3939
private ImmutableList<ClouderaHostDTO> hosts;
40+
private ImmutableList<ClouderaYarnApplicationDTO> sparkYarnApplications;
4041

4142
public ClouderaManagerHandle(URI apiURI, CloseableHttpClient httpClient) {
4243
Preconditions.checkNotNull(apiURI, "Cloudera's apiURI can't be null.");
@@ -98,15 +99,29 @@ public synchronized ImmutableList<ClouderaHostDTO> getHosts() {
9899
}
99100

100101
public synchronized void initHostsIfNull(List<ClouderaHostDTO> hosts) {
101-
// Todo
102-
// Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
103-
// Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
102+
Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
103+
Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
104104

105105
if (this.hosts == null) {
106106
this.hosts = ImmutableList.copyOf(hosts);
107107
}
108108
}
109109

110+
@CheckForNull
111+
public synchronized ImmutableList<ClouderaYarnApplicationDTO> getSparkYarnApplications() {
112+
return sparkYarnApplications;
113+
}
114+
115+
public synchronized void initSparkYarnApplicationsIfNull(
116+
List<ClouderaYarnApplicationDTO> sparkYarnApplications) {
117+
Preconditions.checkNotNull(
118+
sparkYarnApplications, "Spark YARN applications can't be initialised to null list.");
119+
120+
if (this.sparkYarnApplications == null) {
121+
this.sparkYarnApplications = ImmutableList.copyOf(sparkYarnApplications);
122+
}
123+
}
124+
110125
@Override
111126
public void close() throws IOException {
112127
if (httpClient != null) {
@@ -145,4 +160,17 @@ public static ClouderaHostDTO create(String id, String name) {
145160

146161
abstract String getName();
147162
}
163+
164+
@AutoValue
165+
public abstract static class ClouderaYarnApplicationDTO {
166+
public static ClouderaYarnApplicationDTO create(String id, String clusterName) {
167+
return new AutoValue_ClouderaManagerHandle_ClouderaYarnApplicationDTO(id, clusterName);
168+
}
169+
170+
@CheckForNull
171+
@Nullable
172+
abstract String getId();
173+
174+
abstract String getClusterName();
175+
}
148176
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaYarnApplicationTypeTask.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;
1818

19+
import static com.google.common.collect.ImmutableList.toImmutableList;
20+
import static java.util.Arrays.stream;
21+
1922
import com.fasterxml.jackson.databind.JsonNode;
2023
import com.google.common.base.Preconditions;
2124
import com.google.common.collect.ImmutableList;
2225
import com.google.common.collect.ImmutableMap;
2326
import com.google.common.io.ByteSink;
2427
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
2528
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
29+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaYarnApplicationDTO;
2630
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiYARNApplicationDTO;
31+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model.YarnApplicationType;
2732
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
2833
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2934
import java.io.IOException;
@@ -34,7 +39,6 @@
3439
import java.util.HashSet;
3540
import java.util.List;
3641
import java.util.Set;
37-
import java.util.stream.Collectors;
3842
import java.util.stream.StreamSupport;
3943
import javax.annotation.Nonnull;
4044
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -44,10 +48,8 @@
4448
import org.slf4j.LoggerFactory;
4549

4650
public class ClouderaYarnApplicationTypeTask extends AbstractClouderaYarnApplicationTask {
47-
private static final Logger logger = LoggerFactory.getLogger(ClouderaYarnApplicationsTask.class);
48-
49-
private final ImmutableList<String> predefinedAppTypes =
50-
ImmutableList.of("MAPREDUCE", "SPARK", "Oozie Launcher");
51+
private static final Logger logger =
52+
LoggerFactory.getLogger(ClouderaYarnApplicationTypeTask.class);
5153

5254
public ClouderaYarnApplicationTypeTask(
5355
ZonedDateTime startDate, ZonedDateTime endDate, TaskCategory taskCategory) {
@@ -72,21 +74,28 @@ protected void doRun(
7274
new PaginatedClouderaYarnApplicationsLoader(
7375
handle, context.getArguments().getPaginationPageSize());
7476

77+
List<ClouderaYarnApplicationDTO> sparkYarnApplications = new ArrayList<>();
7578
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
7679
for (ClouderaClusterDTO cluster : clusters) {
7780
final String clusterName = cluster.getName();
78-
Set<String> yarnAppTypes = new HashSet<>(fetchYARNApplicationTypes(handle, clusterName));
79-
yarnAppTypes.addAll(predefinedAppTypes);
80-
yarnAppTypes.addAll(context.getArguments().getYarnApplicationTypes());
81-
for (String yarnAppType : yarnAppTypes) {
81+
for (String yarnAppType : getYarnApplicationTypes(context, handle, clusterName)) {
8282
logger.info(
8383
"Dump YARN applications with {} type from {} cluster", yarnAppType, clusterName);
8484
int loadedAppsCount =
8585
appLoader.load(
8686
clusterName,
8787
yarnAppType,
88-
yarnAppsPage ->
89-
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName));
88+
yarnAppsPage -> {
89+
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName);
90+
if (yarnAppType.equals(YarnApplicationType.SPARK.getValue())) {
91+
yarnAppsPage.stream()
92+
.map(
93+
yarnApp ->
94+
ClouderaYarnApplicationDTO.create(
95+
yarnApp.getApplicationId(), clusterName))
96+
.forEach(sparkYarnApplications::add);
97+
}
98+
});
9099
logger.info(
91100
"Dumped {} YARN applications with {} type from {} cluster",
92101
loadedAppsCount,
@@ -95,6 +104,7 @@ protected void doRun(
95104
}
96105
}
97106
}
107+
handle.initSparkYarnApplicationsIfNull(sparkYarnApplications);
98108
}
99109

100110
private void writeYarnAppTypes(
@@ -114,22 +124,37 @@ private void writeYarnAppTypes(
114124
}
115125
}
116126

117-
private List<String> fetchYARNApplicationTypes(ClouderaManagerHandle handle, String clusterName) {
118-
String yarnAppTypesUrl =
127+
private Set<String> getYarnApplicationTypes(
128+
TaskRunContext context, ClouderaManagerHandle handle, String clusterName) {
129+
Set<String> yarnApplicationTypes = new HashSet<>();
130+
ImmutableList<String> predefinedYarnAppTypes =
131+
stream(YarnApplicationType.values())
132+
.map(YarnApplicationType::getValue)
133+
.collect(toImmutableList());
134+
yarnApplicationTypes.addAll(predefinedYarnAppTypes);
135+
yarnApplicationTypes.addAll(fetchClusterServiceTypes(handle, clusterName));
136+
yarnApplicationTypes.addAll(context.getArguments().getYarnApplicationTypes());
137+
return yarnApplicationTypes;
138+
}
139+
140+
private ImmutableList<String> fetchClusterServiceTypes(
141+
ClouderaManagerHandle handle, String clusterName) {
142+
String serviceTypesUrl =
119143
handle.getApiURI().toString() + "clusters/" + clusterName + "/serviceTypes";
120144
CloseableHttpClient httpClient = handle.getHttpClient();
121-
try (CloseableHttpResponse appTypesResp = httpClient.execute(new HttpGet(yarnAppTypesUrl))) {
122-
int statusCode = appTypesResp.getStatusLine().getStatusCode();
145+
try (CloseableHttpResponse serviceTypesResp =
146+
httpClient.execute(new HttpGet(serviceTypesUrl))) {
147+
int statusCode = serviceTypesResp.getStatusLine().getStatusCode();
123148
if (!isStatusCodeOK(statusCode)) {
124149
throw new ClouderaConnectorException(
125150
String.format(
126151
"Cloudera API returned bad http status: %d. Message: %s",
127-
statusCode, readFromStream(appTypesResp.getEntity().getContent())));
152+
statusCode, readFromStream(serviceTypesResp.getEntity().getContent())));
128153
}
129-
JsonNode appTypesJson = readJsonTree(appTypesResp.getEntity().getContent());
130-
return StreamSupport.stream(appTypesJson.get("items").spliterator(), false)
154+
JsonNode serviceTypesJson = readJsonTree(serviceTypesResp.getEntity().getContent());
155+
return StreamSupport.stream(serviceTypesJson.get("items").spliterator(), false)
131156
.map(JsonNode::asText)
132-
.collect(Collectors.toList());
157+
.collect(toImmutableList());
133158
} catch (IOException ex) {
134159
throw new ClouderaConnectorException(ex.getMessage(), ex);
135160
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model;
18+
19+
public enum YarnApplicationType {
20+
MAPREDUCE("MAPREDUCE"),
21+
SPARK("SPARK"),
22+
OOZIE_LAUNCHER("Oozie Launcher");
23+
24+
private final String value;
25+
26+
YarnApplicationType(String value) {
27+
this.value = value;
28+
}
29+
30+
public String getValue() {
31+
return value;
32+
}
33+
}

dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaManagerHandleTest.java

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import com.google.common.collect.ImmutableList;
2323
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
24+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaHostDTO;
25+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaYarnApplicationDTO;
2426
import java.net.URI;
2527
import java.util.ArrayList;
2628
import java.util.List;
@@ -131,19 +133,97 @@ public void initClustersTwice_throwsException() {
131133
assertEquals("The cluster already initialized!", exception.getMessage());
132134
}
133135

134-
// @Test todo
135-
public void initHostsWithNullOrEmpty_throwsException() {
136+
@Test
137+
public void initHosts_success() {
138+
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
139+
List<ClouderaHostDTO> dtos = new ArrayList<>();
140+
dtos.add(ClouderaHostDTO.create("1", "first"));
141+
dtos.add(ClouderaHostDTO.create("2", "second"));
142+
143+
handle.initHostsIfNull(dtos);
144+
145+
assertEquals(dtos, handle.getHosts());
146+
}
147+
148+
@Test
149+
public void initHostsWithNull_throwsException() {
150+
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
151+
152+
NullPointerException npe =
153+
assertThrows(NullPointerException.class, () -> handle.initHostsIfNull(null));
154+
155+
assertEquals("Hosts can't be initialised to null list.", npe.getMessage());
156+
}
157+
158+
@Test
159+
public void initHostsWithEmptyList_throwsException() {
136160
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
137161

138162
IllegalArgumentException exception =
139163
assertThrows(
140164
IllegalArgumentException.class, () -> handle.initHostsIfNull(ImmutableList.of()));
141165

142166
assertEquals("Hosts can't be initialised to empty list.", exception.getMessage());
167+
}
168+
169+
@Test
170+
public void initHostsTwice_noEffect() {
171+
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
172+
List<ClouderaHostDTO> first = new ArrayList<>();
173+
first.add(ClouderaHostDTO.create("1", "first"));
174+
first.add(ClouderaHostDTO.create("2", "second"));
175+
List<ClouderaHostDTO> second = new ArrayList<>();
176+
second.add(ClouderaHostDTO.create("3", "third"));
177+
second.add(ClouderaHostDTO.create("4", "fourth"));
178+
179+
handle.initHostsIfNull(first);
180+
181+
assertEquals(first, handle.getHosts());
182+
183+
handle.initHostsIfNull(second);
184+
185+
assertEquals(first, handle.getHosts());
186+
}
187+
188+
@Test
189+
public void initSparkYarnApplications_success() {
190+
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
191+
List<ClouderaYarnApplicationDTO> dtos = new ArrayList<>();
192+
dtos.add(ClouderaYarnApplicationDTO.create("1", "clusterOne"));
193+
dtos.add(ClouderaYarnApplicationDTO.create("2", "clusterTwo"));
194+
195+
handle.initSparkYarnApplicationsIfNull(dtos);
196+
197+
assertEquals(dtos, handle.getSparkYarnApplications());
198+
}
199+
200+
@Test
201+
public void initSparkYarnApplicationsWithNull_throwsException() {
202+
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
143203

144204
NullPointerException npe =
145-
assertThrows(NullPointerException.class, () -> handle.initHostsIfNull(null));
205+
assertThrows(
206+
NullPointerException.class, () -> handle.initSparkYarnApplicationsIfNull(null));
146207

147-
assertEquals("Hosts can't be initialised to null list.", npe.getMessage());
208+
assertEquals("Spark YARN applications can't be initialised to null list.", npe.getMessage());
209+
}
210+
211+
@Test
212+
public void initSparkYarnApplicationsTwice_noEffect() {
213+
ClouderaManagerHandle handle = new ClouderaManagerHandle(localhost, httpClient);
214+
List<ClouderaYarnApplicationDTO> first = new ArrayList<>();
215+
first.add(ClouderaYarnApplicationDTO.create("1", "clusterOne"));
216+
first.add(ClouderaYarnApplicationDTO.create("2", "clusterTwo"));
217+
List<ClouderaYarnApplicationDTO> second = new ArrayList<>();
218+
second.add(ClouderaYarnApplicationDTO.create("3", "clusterOne"));
219+
second.add(ClouderaYarnApplicationDTO.create("4", "clusterTwo"));
220+
221+
handle.initSparkYarnApplicationsIfNull(first);
222+
223+
assertEquals(first, handle.getSparkYarnApplications());
224+
225+
handle.initSparkYarnApplicationsIfNull(second);
226+
227+
assertEquals(first, handle.getSparkYarnApplications());
148228
}
149229
}

dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaYarnApplicationTypeTaskTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import com.github.tomakehurst.wiremock.WireMockServer;
3232
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
3333
import com.github.tomakehurst.wiremock.matching.StringValuePattern;
34+
import com.google.common.collect.ImmutableList;
3435
import com.google.common.io.ByteSink;
3536
import com.google.common.io.CharSink;
3637
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
3738
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
3839
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
40+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaYarnApplicationDTO;
3941
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
4042
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
4143
import java.io.Writer;
@@ -211,6 +213,25 @@ public void doRun_applicationTypeContainsSpace_queryParameterIsEncoded() throws
211213
"{\"yarnAppTypes\":[{\"applicationId\":\"oozie\",\"applicationType\":\"Oozie Launcher\",\"clusterName\":\"test-cluster\"}]}"));
212214
}
213215

216+
@Test
217+
public void doRun_initializedClusters_cachesSparkYarnApplications() throws Exception {
218+
initClusters(
219+
ClouderaClusterDTO.create("cluster-1", "first-cluster"),
220+
ClouderaClusterDTO.create("cluster-2", "second-cluster"));
221+
stubYARNApplicationTypesAPI("first-cluster", "{\"items\":[]}");
222+
stubYARNApplicationTypesAPI("second-cluster", "{\"items\":[]}");
223+
stubPredefinedApplicationTypes("first-cluster");
224+
stubPredefinedApplicationTypes("second-cluster");
225+
226+
task.doRun(context, sink, handle);
227+
228+
assertEquals(
229+
ImmutableList.of(
230+
ClouderaYarnApplicationDTO.create("spark-app", "first-cluster"),
231+
ClouderaYarnApplicationDTO.create("spark-app", "second-cluster")),
232+
handle.getSparkYarnApplications());
233+
}
234+
214235
private void initClusters(ClouderaClusterDTO... clusters) {
215236
handle.initClusters(Arrays.asList(clusters));
216237
}

0 commit comments

Comments
 (0)