|
21 | 21 | import org.elasticsearch.cluster.ProjectState;
|
22 | 22 | import org.elasticsearch.cluster.block.ClusterBlockException;
|
23 | 23 | import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
24 |
| -import org.elasticsearch.cluster.metadata.DataStream; |
25 | 24 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
26 |
| -import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; |
| 25 | +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; |
27 | 26 | import org.elasticsearch.cluster.metadata.ProjectMetadata;
|
28 | 27 | import org.elasticsearch.cluster.project.ProjectResolver;
|
29 | 28 | import org.elasticsearch.cluster.service.ClusterService;
|
|
32 | 31 | import org.elasticsearch.common.settings.Settings;
|
33 | 32 | import org.elasticsearch.common.util.concurrent.EsExecutors;
|
34 | 33 | import org.elasticsearch.core.SuppressForbidden;
|
35 |
| -import org.elasticsearch.index.Index; |
36 | 34 | import org.elasticsearch.indices.SystemIndices;
|
37 | 35 | import org.elasticsearch.injection.guice.Inject;
|
38 |
| -import org.elasticsearch.snapshots.SnapshotInProgressException; |
39 |
| -import org.elasticsearch.snapshots.SnapshotsService; |
40 | 36 | import org.elasticsearch.tasks.Task;
|
41 | 37 | import org.elasticsearch.threadpool.ThreadPool;
|
42 | 38 | import org.elasticsearch.transport.TransportService;
|
|
46 | 42 | import java.util.List;
|
47 | 43 | import java.util.Set;
|
48 | 44 | import java.util.function.Consumer;
|
| 45 | +import java.util.stream.Collectors; |
49 | 46 |
|
50 | 47 | import static org.elasticsearch.action.datastreams.DataStreamsActionUtil.getDataStreamNames;
|
51 | 48 |
|
@@ -155,34 +152,11 @@ static ClusterState removeDataStream(
|
155 | 152 | }
|
156 | 153 | }
|
157 | 154 |
|
158 |
| - Set<String> snapshottingDataStreams = SnapshotsService.snapshottingDataStreams(projectState, dataStreams); |
159 |
| - if (snapshottingDataStreams.isEmpty() == false) { |
160 |
| - throw new SnapshotInProgressException( |
161 |
| - "Cannot delete data streams that are being snapshotted: " |
162 |
| - + snapshottingDataStreams |
163 |
| - + ". Try again after snapshot finishes or cancel the currently running snapshot." |
164 |
| - ); |
165 |
| - } |
166 |
| - |
167 |
| - Set<Index> backingIndicesToRemove = new HashSet<>(); |
168 |
| - for (String dataStreamName : dataStreams) { |
169 |
| - DataStream dataStream = project.dataStreams().get(dataStreamName); |
170 |
| - assert dataStream != null; |
171 |
| - backingIndicesToRemove.addAll(dataStream.getIndices()); |
172 |
| - backingIndicesToRemove.addAll(dataStream.getFailureIndices()); |
173 |
| - } |
174 |
| - |
175 |
| - // first delete the data streams and then the indices: |
176 |
| - // (this to avoid data stream validation from failing when deleting an index that is part of a data stream |
177 |
| - // without updating the data stream) |
178 |
| - // TODO: change order when delete index api also updates the data stream the index to be removed is member of |
179 |
| - ClusterState newState = projectState.updatedState(builder -> { |
180 |
| - for (String ds : dataStreams) { |
181 |
| - LOGGER.info("removing data stream [{}]", ds); |
182 |
| - builder.removeDataStream(ds); |
183 |
| - } |
184 |
| - }); |
185 |
| - return MetadataDeleteIndexService.deleteIndices(newState.projectState(projectState.projectId()), backingIndicesToRemove, settings); |
| 155 | + return MetadataDataStreamsService.deleteDataStreams( |
| 156 | + projectState, |
| 157 | + dataStreams.stream().map(project.dataStreams()::get).collect(Collectors.toSet()), |
| 158 | + settings |
| 159 | + ); |
186 | 160 | }
|
187 | 161 |
|
188 | 162 | @Override
|
|
0 commit comments