Skip to content

Commit caf6cf8

Browse files
authored
[test][spark] Add alter with incompatible col type test case (#6689)
1 parent 1e6eb24 commit caf6cf8

File tree

4 files changed

+79
-14
lines changed

4 files changed

+79
-14
lines changed

paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,20 +115,24 @@ private static void setup() {
115115
private TServer server;
116116
private HiveMetaStore.HMSHandler baseHandler;
117117

118+
public void start(Configuration configuration, int port) {
119+
start(configuration, DEFAULT_POOL_SIZE, port);
120+
}
121+
118122
/**
119123
* Starts a TestHiveMetastore with the default connection pool size (5) and the default
120124
* HiveConf.
121125
*/
122126
public void start(int port) {
123-
start(new HiveConf(new Configuration(), TestHiveMetastore.class), DEFAULT_POOL_SIZE, port);
127+
start(new Configuration(), DEFAULT_POOL_SIZE, port);
124128
}
125129

126130
/**
127131
* Starts a TestHiveMetastore with the default connection pool size (5) and the default
128132
* HiveConf.
129133
*/
130134
public void start() {
131-
start(new HiveConf(new Configuration(), TestHiveMetastore.class), DEFAULT_POOL_SIZE, 9083);
135+
start(new Configuration(), DEFAULT_POOL_SIZE, 9083);
132136
}
133137

134138
/**
@@ -137,13 +141,11 @@ public void start() {
137141
* @param conf The hive configuration to use
138142
* @param poolSize The number of threads in the executor pool
139143
*/
140-
public void start(HiveConf conf, int poolSize, int portNum) {
144+
public void start(Configuration conf, int poolSize, int portNum) {
141145
try {
142146
TServerSocket socket = new TServerSocket(portNum);
143147
int port = socket.getServerSocket().getLocalPort();
144-
initConf(conf, port);
145-
146-
this.hiveConf = conf;
148+
this.hiveConf = initConf(conf, port);
147149
this.server = newThriftServer(socket, poolSize, hiveConf);
148150
this.executorService = Executors.newSingleThreadExecutor();
149151
this.executorService.submit(() -> server.serve());
@@ -211,16 +213,17 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con
211213
return new TThreadPoolServer(args);
212214
}
213215

214-
private void initConf(HiveConf conf, int port) {
215-
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
216-
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir());
217-
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
218-
conf.set(
216+
private HiveConf initConf(Configuration conf, int port) {
217+
conf.setIfUnset(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
218+
conf.setIfUnset(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir());
219+
conf.setIfUnset(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
220+
conf.setIfUnset(
219221
HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname,
220222
"false");
221-
conf.set(
223+
conf.setIfUnset(
222224
HiveConf.ConfVars.HIVE_IN_TEST.varname,
223225
HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
226+
return new HiveConf(conf, TestHiveMetastore.class);
224227
}
225228

226229
private static void setupMetastoreDB(String dbURL) throws SQLException, IOException {

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.paimon.spark
2020

2121
import org.apache.paimon.catalog.{Catalog, DelegateCatalog}
2222
import org.apache.paimon.hive.{HiveCatalog, TestHiveMetastore}
23+
import org.apache.paimon.spark.PaimonHiveTestBase._
2324
import org.apache.paimon.table.FileStoreTable
2425

2526
import org.apache.hadoop.conf.Configuration
@@ -41,7 +42,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
4142

4243
protected val hiveDbName: String = "test_hive"
4344

44-
val hiveUri: String = PaimonHiveTestBase.hiveUri
45+
protected def configuration: Configuration = new Configuration
4546

4647
/**
4748
* Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive ([[SparkCatalog]] in hive)
@@ -61,7 +62,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
6162
}
6263

6364
override protected def beforeAll(): Unit = {
64-
testHiveMetastore.start(PaimonHiveTestBase.hivePort)
65+
testHiveMetastore.start(configuration, hivePort)
6566
super.beforeAll()
6667
spark.sql(s"USE $sparkCatalogName")
6768
spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName")
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
import org.apache.paimon.spark.PaimonHiveTestBase
22+
23+
import org.apache.hadoop.conf.Configuration
24+
25+
class DDLWithDisallowIncompatibleColType extends DDLWithIncompatibleColType {
26+
val disallowIncompatible: Boolean = true
27+
}
28+
29+
class DDLWithAllowIncompatibleColType extends DDLWithIncompatibleColType {
30+
val disallowIncompatible: Boolean = false
31+
}
32+
33+
abstract class DDLWithIncompatibleColType extends PaimonHiveTestBase {
34+
35+
def disallowIncompatible: Boolean
36+
37+
override def configuration: Configuration = {
38+
val conf_ = super.configuration
39+
conf_.set(
40+
"hive.metastore.disallow.incompatible.col.type.changes",
41+
disallowIncompatible.toString)
42+
conf_
43+
}
44+
45+
test("Paimon DDL with hive catalog: alter with incompatible col type") {
46+
withTable("t") {
47+
spark.sql("CREATE TABLE t (a INT, b INT, c STRUCT<f1: INT>) USING paimon")
48+
if (disallowIncompatible) {
49+
val e = intercept[Exception] {
50+
spark.sql("ALTER TABLE t DROP COLUMN b")
51+
}
52+
assert(
53+
e.getMessage.contains(
54+
"The following columns have types incompatible with the existing columns"))
55+
} else {
56+
spark.sql("ALTER TABLE t DROP COLUMN b")
57+
}
58+
}
59+
}
60+
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.{DelegateCatalog, Identifier}
2222
import org.apache.paimon.fs.Path
2323
import org.apache.paimon.hive.HiveCatalog
2424
import org.apache.paimon.spark.PaimonHiveTestBase
25+
import org.apache.paimon.spark.PaimonHiveTestBase.hiveUri
2526
import org.apache.paimon.table.FormatTable
2627
import org.apache.paimon.utils.CompressUtils
2728

0 commit comments

Comments
 (0)