Skip to content

Commit 7344916

Browse files
authored
[FLINK-36501][table-common] Remove all deprecated methods in CatalogFactory (#25579)
1 parent f0c8f18 commit 7344916

File tree

3 files changed

+34
-155
lines changed

3 files changed

+34
-155
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/CatalogFactory.java

+2-67
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,11 @@
1919
package org.apache.flink.table.factories;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22-
import org.apache.flink.configuration.ConfigOption;
2322
import org.apache.flink.configuration.ReadableConfig;
2423
import org.apache.flink.table.catalog.Catalog;
25-
import org.apache.flink.table.catalog.exceptions.CatalogException;
2624
import org.apache.flink.table.legacy.factories.TableFactory;
2725

28-
import java.util.List;
2926
import java.util.Map;
30-
import java.util.Set;
3127

3228
/**
3329
* A factory to create configured catalog instances based on string-based properties. See also
@@ -38,30 +34,15 @@
3834
* instead.
3935
*/
4036
@PublicEvolving
41-
public interface CatalogFactory extends TableFactory, Factory {
42-
43-
/**
44-
* Creates and configures a {@link Catalog} using the given properties.
45-
*
46-
* @param properties normalized properties describing an external catalog.
47-
* @return the configured catalog.
48-
* @deprecated Use {@link this#createCatalog(Context)} instead and implement {@link Factory}
49-
* instead of {@link TableFactory}.
50-
*/
51-
@Deprecated
52-
default Catalog createCatalog(String name, Map<String, String> properties) {
53-
throw new CatalogException("Catalog factories must implement createCatalog()");
54-
}
37+
public interface CatalogFactory extends Factory {
5538

5639
/**
5740
* Creates and configures a {@link Catalog} using the given context.
5841
*
5942
* <p>An implementation should perform validation and the discovery of further (nested)
6043
* factories in this method.
6144
*/
62-
default Catalog createCatalog(Context context) {
63-
throw new CatalogException("Catalog factories must implement createCatalog(Context)");
64-
}
45+
Catalog createCatalog(Context context);
6546

6647
/** Context provided when a catalog is created. */
6748
@PublicEvolving
@@ -86,50 +67,4 @@ interface Context {
8667
*/
8768
ClassLoader getClassLoader();
8869
}
89-
90-
default String factoryIdentifier() {
91-
if (requiredContext() == null || supportedProperties() == null) {
92-
throw new CatalogException("Catalog factories must implement factoryIdentifier()");
93-
}
94-
95-
return null;
96-
}
97-
98-
default Set<ConfigOption<?>> requiredOptions() {
99-
if (requiredContext() == null || supportedProperties() == null) {
100-
throw new CatalogException("Catalog factories must implement requiredOptions()");
101-
}
102-
103-
return null;
104-
}
105-
106-
default Set<ConfigOption<?>> optionalOptions() {
107-
if (requiredContext() == null || supportedProperties() == null) {
108-
throw new CatalogException("Catalog factories must implement optionalOptions()");
109-
}
110-
111-
return null;
112-
}
113-
114-
// --------------------------------------------------------------------------------------------
115-
// Default implementations for legacy {@link TableFactory} stack.
116-
// --------------------------------------------------------------------------------------------
117-
118-
/**
119-
* @deprecated Implement the {@link Factory} based stack instead.
120-
*/
121-
@Deprecated
122-
default Map<String, String> requiredContext() {
123-
// Default implementation for catalogs implementing the new {@link Factory} stack instead.
124-
return null;
125-
}
126-
127-
/**
128-
* @deprecated Implement the {@link Factory} based stack instead.
129-
*/
130-
@Deprecated
131-
default List<String> supportedProperties() {
132-
// Default implementation for catalogs implementing the new {@link Factory} stack instead.
133-
return null;
134-
}
13570
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java

+32-41
Original file line numberDiff line numberDiff line change
@@ -372,48 +372,39 @@ public static Catalog createCatalog(
372372
Map<String, String> options,
373373
ReadableConfig configuration,
374374
ClassLoader classLoader) {
375-
// Use the legacy mechanism first for compatibility
375+
final DefaultCatalogContext discoveryContext =
376+
new DefaultCatalogContext(catalogName, options, configuration, classLoader);
376377
try {
377-
final CatalogFactory legacyFactory =
378-
TableFactoryService.find(CatalogFactory.class, options, classLoader);
379-
return legacyFactory.createCatalog(catalogName, options);
380-
} catch (NoMatchingTableFactoryException e) {
381-
// No matching legacy factory found, try using the new stack
382-
383-
final DefaultCatalogContext discoveryContext =
384-
new DefaultCatalogContext(catalogName, options, configuration, classLoader);
385-
try {
386-
final CatalogFactory factory = getCatalogFactory(discoveryContext);
387-
388-
// The type option is only used for discovery, we don't actually want to forward it
389-
// to the catalog factory itself.
390-
final Map<String, String> factoryOptions =
391-
options.entrySet().stream()
392-
.filter(
393-
entry ->
394-
!CommonCatalogOptions.CATALOG_TYPE
395-
.key()
396-
.equals(entry.getKey()))
397-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
398-
final DefaultCatalogContext context =
399-
new DefaultCatalogContext(
400-
catalogName, factoryOptions, configuration, classLoader);
401-
return factory.createCatalog(context);
402-
} catch (Throwable t) {
403-
throw new ValidationException(
404-
String.format(
405-
"Unable to create catalog '%s'.%n%nCatalog options are:%n%s",
406-
catalogName,
407-
options.entrySet().stream()
408-
.map(
409-
optionEntry ->
410-
stringifyOption(
411-
optionEntry.getKey(),
412-
optionEntry.getValue()))
413-
.sorted()
414-
.collect(Collectors.joining("\n"))),
415-
t);
416-
}
378+
final CatalogFactory factory = getCatalogFactory(discoveryContext);
379+
380+
// The type option is only used for discovery, we don't actually want to forward it
381+
// to the catalog factory itself.
382+
final Map<String, String> factoryOptions =
383+
options.entrySet().stream()
384+
.filter(
385+
entry ->
386+
!CommonCatalogOptions.CATALOG_TYPE
387+
.key()
388+
.equals(entry.getKey()))
389+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
390+
final DefaultCatalogContext context =
391+
new DefaultCatalogContext(
392+
catalogName, factoryOptions, configuration, classLoader);
393+
return factory.createCatalog(context);
394+
} catch (Throwable t) {
395+
throw new ValidationException(
396+
String.format(
397+
"Unable to create catalog '%s'.%n%nCatalog options are:%n%s",
398+
catalogName,
399+
options.entrySet().stream()
400+
.map(
401+
optionEntry ->
402+
stringifyOption(
403+
optionEntry.getKey(),
404+
optionEntry.getValue()))
405+
.sorted()
406+
.collect(Collectors.joining("\n"))),
407+
t);
417408
}
418409
}
419410

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java

-47
Original file line numberDiff line numberDiff line change
@@ -98,53 +98,6 @@ void testDropCatalog() {
9898
assertThat(tableEnv.getCatalog(name)).isNotPresent();
9999
}
100100

101-
@Test
102-
void testCreateLegacyCatalogFromUserClassLoader() throws Exception {
103-
final String className = "UserCatalogFactory";
104-
URLClassLoader classLoader =
105-
ClassLoaderUtils.withRoot(TempDirUtils.newFolder(temporaryFolder))
106-
.addResource(
107-
"META-INF/services/org.apache.flink.table.legacy.factories.TableFactory",
108-
"UserCatalogFactory")
109-
.addClass(
110-
className,
111-
"import org.apache.flink.table.catalog.GenericInMemoryCatalog;\n"
112-
+ "import org.apache.flink.table.factories.CatalogFactory;\n"
113-
+ "import java.util.Collections;\n"
114-
+ "import org.apache.flink.table.catalog.Catalog;\n"
115-
+ "import java.util.HashMap;\n"
116-
+ "import java.util.List;\n"
117-
+ "import java.util.Map;\n"
118-
+ "\tpublic class UserCatalogFactory implements CatalogFactory {\n"
119-
+ "\t\t@Override\n"
120-
+ "\t\tpublic Catalog createCatalog(\n"
121-
+ "\t\t\t\tString name,\n"
122-
+ "\t\t\t\tMap<String, String> properties) {\n"
123-
+ "\t\t\treturn new GenericInMemoryCatalog(name);\n"
124-
+ "\t\t}\n"
125-
+ "\n"
126-
+ "\t\t@Override\n"
127-
+ "\t\tpublic Map<String, String> requiredContext() {\n"
128-
+ "\t\t\tHashMap<String, String> hashMap = new HashMap<>();\n"
129-
+ "\t\t\thashMap.put(\"type\", \"userCatalog\");\n"
130-
+ "\t\t\treturn hashMap;\n"
131-
+ "\t\t}\n"
132-
+ "\n"
133-
+ "\t\t@Override\n"
134-
+ "\t\tpublic List<String> supportedProperties() {\n"
135-
+ "\t\t\treturn Collections.emptyList();\n"
136-
+ "\t\t}\n"
137-
+ "\t}")
138-
.build();
139-
140-
try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
141-
TableEnvironment tableEnvironment = getTableEnvironment();
142-
tableEnvironment.executeSql("CREATE CATALOG cat WITH ('type'='userCatalog')");
143-
144-
assertThat(tableEnvironment.getCatalog("cat")).isPresent();
145-
}
146-
}
147-
148101
@Test
149102
void testCreateCatalogFromUserClassLoader() throws Exception {
150103
final String className = "UserCatalogFactory";

0 commit comments

Comments
 (0)