Skip to content

Commit 38cc650

Browse files
authored
HIVE-28824: Metastore should also respect the max thrift message size (#5699) (Zhihua Deng, reviewed by Denys Kuzmenko)
1 parent 3b562f7 commit 38cc650

File tree

8 files changed

+163
-108
lines changed

8 files changed

+163
-108
lines changed

itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java

+23
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hive.service.cli.session.HiveSessionHook;
3838
import org.apache.hive.service.cli.session.HiveSessionHookContext;
3939
import org.apache.hive.service.cli.session.SessionUtils;
40+
import org.apache.thrift.transport.TTransportException;
4041
import org.junit.After;
4142
import org.junit.AfterClass;
4243
import org.junit.Before;
@@ -263,6 +264,28 @@ public void testNegativeProxyAuth() throws Exception {
263264
.getConnection(miniHS2.getJdbcURL("default", ";hive.server2.proxy.user=" + MiniHiveKdc.HIVE_TEST_USER_2));
264265
}
265266

267+
@Test
268+
public void testHs2ThriftMaxMessageSize() throws Exception {
269+
HiveConf.setVar(miniHS2.getHiveConf(), HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE, "512");
270+
assertEquals(512L,
271+
HiveConf.getSizeVar(miniHS2.getHiveConf(), HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE));
272+
Connection conn = DriverManager.getConnection(miniHS2.getJdbcURL());
273+
Statement stmt = conn.createStatement();
274+
try {
275+
StringBuilder createTable = new StringBuilder("create external table tesths2thriftmaxmessagesize(");
276+
for (int i = 0; i < 100; i++) {
277+
createTable.append("abcdefghijklmnopqrstuvwxyz").append(i).append(" string, ");
278+
}
279+
createTable.append(" a int)");
280+
Throwable t = assertThrows(SQLException.class, () -> stmt.execute(createTable.toString())).getCause();
281+
assertTrue(t instanceof TTransportException);
282+
assertEquals(TTransportException.END_OF_FILE, ((TTransportException)t).getType());
283+
assertTrue(t.getMessage().contains("Socket is closed by peer"));
284+
} finally {
285+
miniHS2.getHiveConf().unset(ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE.varname);
286+
}
287+
}
288+
266289
/**
267290
* Verify the config property value
268291
* @param propertyName

itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
2323
import org.apache.hadoop.hive.metastore.TestRemoteHiveMetaStore;
24-
import org.apache.hadoop.hive.metastore.TestHiveMetaStore;
24+
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
2525
import org.apache.hadoop.hive.metastore.api.Partition;
2626
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
2727
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -92,6 +92,18 @@ public void testThriftMaxMessageSize() throws Throwable {
9292
assertNotNull(partitions);
9393
assertEquals("expected to receive the same number of partitions added", values.size(), partitions.size());
9494

95+
// Set the max massage size on Metastore
96+
MetastoreConf.setVar(conf, ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1024");
97+
MetastoreConf.setVar(clientConf, ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1048576000");
98+
try (HiveMetaStoreClient client1 = new HiveMetaStoreClient(clientConf)) {
99+
TTransportException te = assertThrows(TTransportException.class,
100+
() -> client1.alter_partitions(dbName, tblName, partitions, new EnvironmentContext()));
101+
assertEquals(TTransportException.END_OF_FILE, te.getType());
102+
assertTrue(te.getMessage().contains("Socket is closed by peer"));
103+
} finally {
104+
conf.unset(ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE.getVarname());
105+
}
106+
95107
cleanUp(dbName, tblName, typeName);
96108
}
97109

itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java

+12
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.junit.BeforeClass;
4141
import org.junit.Test;
4242

43+
import static org.junit.Assert.assertEquals;
4344
import static org.junit.Assert.assertThrows;
4445
import static org.junit.Assert.assertTrue;
4546

@@ -117,6 +118,17 @@ public void testHmsThriftMaxMessageSize() throws Exception {
117118
// Verify the Thrift library is enforcing the limit
118119
assertTrue(exceptionMessage.contains("MaxMessageSize reached"));
119120
limitedClient.close();
121+
122+
MetastoreConf.setVar(clientConf, MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1048576000");
123+
MetastoreConf.setVar(miniHS2.getHiveConf(), MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "512");
124+
tblBuilder.setTableName("testThriftMaxMessageSize1");
125+
try (HiveMetaStoreClient client = new HiveMetaStoreClient(clientConf)) {
126+
TTransportException te = assertThrows(TTransportException.class, () -> tblBuilder.create(client, clientConf));
127+
assertEquals(TTransportException.END_OF_FILE, te.getType());
128+
assertTrue(te.getMessage().contains("Socket is closed by peer"));
129+
} finally {
130+
miniHS2.getHiveConf().unset(MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE.getVarname());
131+
}
120132
}
121133

122134
private Connection getConnection(String userName) throws Exception {

service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
import org.apache.thrift.server.TServerEventHandler;
4848
import org.apache.thrift.server.TThreadPoolServer;
4949
import org.apache.thrift.transport.TServerSocket;
50+
import org.apache.thrift.transport.TSocket;
5051
import org.apache.thrift.transport.TTransport;
52+
import org.apache.thrift.transport.TTransportException;
5153
import org.apache.thrift.transport.TTransportFactory;
5254

5355

@@ -100,7 +102,15 @@ protected void initServer() {
100102

101103
// Server args
102104
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
103-
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory)
105+
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverSocket.getServerSocket()) {
106+
@Override
107+
public TSocket accept() throws TTransportException {
108+
TSocket ts = super.accept();
109+
int maxThriftMessageSize = (int) HiveConf.getSizeVar(
110+
hiveConf, HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE);
111+
return HiveAuthUtils.configureThriftMaxMessageSize(ts, maxThriftMessageSize);
112+
}
113+
}).processorFactory(processorFactory)
104114
.transportFactory(transportFactory).protocolFactory(new TBinaryProtocol.Factory())
105115
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
106116
.executorService(executorService);

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java

+21-66
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.lang.reflect.Method;
2222
import java.util.concurrent.ExecutorService;
2323
import java.util.concurrent.SynchronousQueue;
24-
import org.apache.commons.cli.OptionBuilder;
2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
2726
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
@@ -40,7 +39,6 @@
4039
import org.apache.hadoop.hive.metastore.metrics.Metrics;
4140
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
4241
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
43-
import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
4442
import org.apache.hadoop.hive.metastore.utils.LogUtils;
4543
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
4644
import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
@@ -61,7 +59,9 @@
6159
import org.apache.thrift.server.TServlet;
6260
import org.apache.thrift.server.TThreadPoolServer;
6361
import org.apache.thrift.transport.TServerSocket;
62+
import org.apache.thrift.transport.TSocket;
6463
import org.apache.thrift.transport.TTransport;
64+
import org.apache.thrift.transport.TTransportException;
6565
import org.apache.thrift.transport.TTransportFactory;
6666

6767
import org.eclipse.jetty.security.ConstraintMapping;
@@ -219,69 +219,14 @@ public static long renewDelegationToken(String tokenStrForm) throws IOException
219219
return delegationTokenManager.renewDelegationToken(tokenStrForm);
220220
}
221221

222-
/**
223-
* HiveMetaStore specific CLI
224-
*
225-
*/
226-
public static class HiveMetastoreCli extends CommonCliOptions {
227-
private int port;
228-
229-
@SuppressWarnings("static-access")
230-
HiveMetastoreCli(Configuration configuration) {
231-
super("hivemetastore", true);
232-
this.port = MetastoreConf.getIntVar(configuration, ConfVars.SERVER_PORT);
233-
234-
// -p port
235-
OPTIONS.addOption(OptionBuilder
236-
.hasArg()
237-
.withArgName("port")
238-
.withDescription("Hive Metastore port number, default:"
239-
+ this.port)
240-
.create('p'));
241-
242-
}
243-
@Override
244-
public void parse(String[] args) {
245-
super.parse(args);
246-
247-
// support the old syntax "hivemetastore [port]" but complain
248-
args = commandLine.getArgs();
249-
if (args.length > 0) {
250-
// complain about the deprecated syntax -- but still run
251-
System.err.println(
252-
"This usage has been deprecated, consider using the new command "
253-
+ "line syntax (run with -h to see usage information)");
254-
255-
this.port = Integer.parseInt(args[0]);
256-
}
257-
258-
// notice that command line options take precedence over the
259-
// deprecated (old style) naked args...
260-
261-
if (commandLine.hasOption('p')) {
262-
this.port = Integer.parseInt(commandLine.getOptionValue('p'));
263-
} else {
264-
// legacy handling
265-
String metastorePort = System.getenv("METASTORE_PORT");
266-
if (metastorePort != null) {
267-
this.port = Integer.parseInt(metastorePort);
268-
}
269-
}
270-
}
271-
272-
public int getPort() {
273-
return this.port;
274-
}
275-
}
276-
277222
/*
278223
Interface to encapsulate Http and binary thrift server for
279224
HiveMetastore
280225
*/
281226
private interface ThriftServer {
282-
public void start() throws Throwable;
283-
public boolean isRunning();
284-
public IHMSHandler getHandler();
227+
void start() throws Throwable;
228+
boolean isRunning();
229+
IHMSHandler getHandler();
285230
}
286231

287232
/**
@@ -316,7 +261,7 @@ public static void main(String[] args) throws Throwable {
316261
startupShutdownMessage(HiveMetaStore.class, args, LOG);
317262

318263
try {
319-
String msg = "Starting hive metastore on port " + cli.port;
264+
String msg = "Starting hive metastore on port " + cli.getPort();
320265
LOG.info(msg);
321266
if (cli.isVerbose()) {
322267
System.err.println(msg);
@@ -631,10 +576,6 @@ private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridg
631576
keyStorePassword, keyStoreType, keyStoreAlgorithm, sslVersionBlacklist);
632577
}
633578

634-
if (tcpKeepAlive) {
635-
serverSocket = new TServerSocketKeepAlive(serverSocket);
636-
}
637-
638579
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
639580
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), r -> {
640581
Thread thread = new Thread(r);
@@ -643,7 +584,21 @@ private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridg
643584
return thread;
644585
});
645586

646-
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
587+
TThreadPoolServer.Args args =
588+
new TThreadPoolServer.Args(new TServerSocket(serverSocket.getServerSocket()) {
589+
@Override
590+
public TSocket accept() throws TTransportException {
591+
TSocket ts = super.accept();
592+
// get the limit from the configuration for every new connection
593+
int maxThriftMessageSize = (int) MetastoreConf.getSizeVar(
594+
conf, MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE);
595+
HMSHandler.LOG.debug("Thrift maxMessageSize = {}", maxThriftMessageSize);
596+
if (maxThriftMessageSize > 0) {
597+
ts.getConfiguration().setMaxMessageSize(maxThriftMessageSize);
598+
}
599+
return ts;
600+
}
601+
})
647602
.processor(processor)
648603
.transportFactory(transFactory)
649604
.protocolFactory(protocolFactory)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.metastore;
20+
21+
import org.apache.commons.cli.OptionBuilder;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
24+
import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
25+
26+
/**
27+
* HiveMetaStore specific CLI
28+
*
29+
*/
30+
class HiveMetastoreCli extends CommonCliOptions {
31+
private int port;
32+
33+
@SuppressWarnings("static-access")
34+
HiveMetastoreCli(Configuration configuration) {
35+
super("hivemetastore", true);
36+
this.port = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.SERVER_PORT);
37+
38+
// -p port
39+
OPTIONS.addOption(OptionBuilder
40+
.hasArg()
41+
.withArgName("port")
42+
.withDescription("Hive Metastore port number, default:"
43+
+ this.port)
44+
.create('p'));
45+
46+
}
47+
@Override
48+
public void parse(String[] args) {
49+
super.parse(args);
50+
51+
// support the old syntax "hivemetastore [port]" but complain
52+
args = commandLine.getArgs();
53+
if (args.length > 0) {
54+
// complain about the deprecated syntax -- but still run
55+
System.err.println(
56+
"This usage has been deprecated, consider using the new command "
57+
+ "line syntax (run with -h to see usage information)");
58+
59+
this.port = Integer.parseInt(args[0]);
60+
}
61+
62+
// notice that command line options take precedence over the
63+
// deprecated (old style) naked args...
64+
65+
if (commandLine.hasOption('p')) {
66+
this.port = Integer.parseInt(commandLine.getOptionValue('p'));
67+
} else {
68+
// legacy handling
69+
String metastorePort = System.getenv("METASTORE_PORT");
70+
if (metastorePort != null) {
71+
this.port = Integer.parseInt(metastorePort);
72+
}
73+
}
74+
}
75+
76+
public int getPort() {
77+
return this.port;
78+
}
79+
}

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java

-36
This file was deleted.

0 commit comments

Comments
 (0)