Skip to content

Commit f06aa95

Browse files
authored
HIVE-28817: Rebuilding a materialized view stored in Iceberg fails when schema has varchar column (Krisztian Kasa, reviewed by Laszlo Bodor)
1 parent 2ad2c5a commit f06aa95

File tree

9 files changed

+391
-29
lines changed

9 files changed

+391
-29
lines changed

iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreLock.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import com.github.benmanes.caffeine.cache.Cache;
2323
import com.github.benmanes.caffeine.cache.Caffeine;
24-
import java.net.InetAddress;
25-
import java.net.UnknownHostException;
2624
import java.util.Optional;
2725
import java.util.UUID;
2826
import java.util.concurrent.Executors;
@@ -33,6 +31,8 @@
3331
import java.util.concurrent.locks.ReentrantLock;
3432
import org.apache.hadoop.conf.Configuration;
3533
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
34+
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
35+
import org.apache.hadoop.hive.metastore.api.DataOperationType;
3636
import org.apache.hadoop.hive.metastore.api.LockComponent;
3737
import org.apache.hadoop.hive.metastore.api.LockLevel;
3838
import org.apache.hadoop.hive.metastore.api.LockRequest;
@@ -42,12 +42,12 @@
4242
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
4343
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
4444
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
45+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
4546
import org.apache.iceberg.ClientPool;
4647
import org.apache.iceberg.exceptions.CommitFailedException;
4748
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
4849
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
4950
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
50-
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5151
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
5252
import org.apache.iceberg.util.Tasks;
5353
import org.apache.thrift.TException;
@@ -91,6 +91,7 @@ public class MetastoreLock implements HiveLock {
9191
private final long lockHeartbeatIntervalTime;
9292
private final ScheduledExecutorService exitingScheduledExecutorService;
9393
private final String agentInfo;
94+
private final Configuration conf;
9495

9596
private Optional<Long> hmsLockId = Optional.empty();
9697
private ReentrantLock jvmLock = null;
@@ -102,6 +103,7 @@ public MetastoreLock(Configuration conf, ClientPool<IMetaStoreClient, TException
102103
this.fullName = catalogName + "." + databaseName + "." + tableName;
103104
this.databaseName = databaseName;
104105
this.tableName = tableName;
106+
this.conf = conf;
105107

106108
this.lockAcquireTimeout =
107109
conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
@@ -268,21 +270,18 @@ private long acquireLock() throws LockException {
268270
private LockInfo createLock() throws LockException {
269271
LockInfo lockInfo = new LockInfo();
270272

271-
String hostName;
272-
try {
273-
hostName = InetAddress.getLocalHost().getHostName();
274-
} catch (UnknownHostException uhe) {
275-
throw new LockException(uhe, "Error generating host name");
276-
}
277-
278273
LockComponent lockComponent =
279274
new LockComponent(LockType.EXCL_WRITE, LockLevel.TABLE, databaseName);
275+
lockComponent.setOperationType(DataOperationType.NO_TXN);
280276
lockComponent.setTablename(tableName);
281-
LockRequest lockRequest =
282-
new LockRequest(
283-
Lists.newArrayList(lockComponent),
284-
HiveHadoopUtil.currentUser(),
285-
hostName);
277+
278+
// An open ACID transaction might exist when the SQL statement involves both native and Iceberg tables.
279+
// Use it's txn id.
280+
LockRequest lockRequest = new LockRequestBuilder(null)
281+
.setTransactionId(conf.getLong(hive_metastoreConstants.TXN_ID, 0L))
282+
.setUser(HiveHadoopUtil.currentUser())
283+
.addLockComponent(lockComponent)
284+
.build();
286285

287286
// Only works in Hive 2 or later.
288287
if (HiveVersion.min(HiveVersion.HIVE_2)) {

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
414414
}
415415

416416
try {
417+
conf.setLong(hive_metastoreConstants.TXN_ID, IcebergAcidUtil.getTxnId());
417418
commitLock.lock();
418419
doPreAlterTable(hmsTable, context);
419420
} catch (Exception e) {

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.hadoop.fs.Path;
4444
import org.apache.hadoop.hive.conf.HiveConf;
4545
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
46+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
4647
import org.apache.hadoop.hive.ql.Context.Operation;
4748
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
4849
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -456,6 +457,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
456457

457458
for (JobContext jobContext : jobContexts) {
458459
JobConf conf = jobContext.getJobConf();
460+
conf.setLong(hive_metastoreConstants.TXN_ID, IcebergAcidUtil.getTxnId());
459461
table = Optional.ofNullable(table).orElse(Catalogs.loadTable(conf, catalogProperties));
460462
branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
461463
snapshotId = getSnapshotId(outputTable.table, branchName);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.apache.commons.lang3.StringUtils;
2929
import org.apache.hadoop.conf.Configuration;
3030
import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
31+
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
3132
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
33+
import org.apache.hadoop.hive.ql.session.SessionState;
3234
import org.apache.iceberg.ContentFile;
3335
import org.apache.iceberg.MetadataColumns;
3436
import org.apache.iceberg.PartitionKey;
@@ -391,4 +393,17 @@ public T build() {
391393
}
392394
}
393395

396+
static long getTxnId() {
397+
SessionState sessionState = SessionState.get();
398+
if (sessionState == null) {
399+
return 0L;
400+
}
401+
402+
HiveTxnManager txnManager = sessionState.getTxnMgr();
403+
if (txnManager == null) {
404+
return 0L;
405+
}
406+
407+
return txnManager.getCurrentTxnId();
408+
}
394409
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
-- MV source table has varchar column.
2+
-- SORT_QUERY_RESULTS
3+
4+
set hive.explain.user=false;
5+
set hive.support.concurrency=true;
6+
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
7+
set iceberg.mr.schema.auto.conversion=true;
8+
9+
create table t1 (a int, b varchar(256), c char(100), d int not null) stored as orc tblproperties ('transactional'='true');
10+
11+
insert into t1 values (1, 'Alfred', 'Alfred', 1);
12+
13+
create materialized view mat1 stored by iceberg stored as orc tblproperties ('format-version'='2') as
14+
select b, c, d, sum(a) from t1 group by b, c, d;
15+
16+
insert into t1 values (4, 'Jane', 'Jane', 4);
17+
18+
explain
19+
alter materialized view mat1 rebuild;
20+
21+
alter materialized view mat1 rebuild;
22+
23+
select * from mat1;

0 commit comments

Comments
 (0)