1+ // Licensed to the Apache Software Foundation (ASF) under one
2+ // or more contributor license agreements. See the NOTICE file
3+ // distributed with this work for additional information
4+ // regarding copyright ownership. The ASF licenses this file
5+ // to you under the Apache License, Version 2.0 (the
6+ // "License"); you may not use this file except in compliance
7+ // with the License. You may obtain a copy of the License at
8+ //
9+ // http://www.apache.org/licenses/LICENSE-2.0
10+ //
11+ // Unless required by applicable law or agreed to in writing,
12+ // software distributed under the License is distributed on an
13+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+ // KIND, either express or implied. See the License for the
15+ // specific language governing permissions and limitations
16+ // under the License.
17+ import static groovy.test.GroovyAssert.shouldFail;
18+ import java.util.concurrent.ThreadLocalRandom
19+
20+ suite(" oss_hdfs_catalog_test" , " p2,external,new_catalog_property" ) {
21+ def testQueryAndInsert = { String catalogProperties , String prefix , String dbLocation ->
22+
23+ def catalog_name = " ${ prefix} _catalog"
24+ sql """
25+ DROP CATALOG IF EXISTS ${ catalog_name} ;
26+ """
27+ sql """
28+ CREATE CATALOG IF NOT EXISTS ${ catalog_name} PROPERTIES (
29+ ${ catalogProperties}
30+ );
31+ """
32+ sql """
33+ switch ${ catalog_name} ;
34+ """
35+
36+ def db_name = prefix + " _db" + System . currentTimeMillis() + ThreadLocalRandom . current(). nextInt(1000 )
37+ sql """
38+ DROP DATABASE IF EXISTS ${ db_name} FORCE;
39+ """
40+ if (dbLocation == null ) {
41+ sql """
42+ CREATE DATABASE IF NOT EXISTS ${ db_name}
43+
44+ """
45+ }else {
46+ sql """
47+ CREATE DATABASE IF NOT EXISTS ${ db_name}
48+ PROPERTIES ('location'='${ dbLocation} ');
49+
50+ """
51+
52+ }
53+
54+ def dbResult = sql """
55+ show databases like "${ db_name} ";
56+ """
57+ assert dbResult. size() == 1
58+
59+ sql """
60+ use ${ db_name} ;
61+ """
62+ def table_name = prefix + ThreadLocalRandom . current(). nextInt(1000 ) + " _table"
63+ sql """
64+ CREATE TABLE ${ table_name} (
65+ user_id BIGINT COMMENT "user id",
66+ name VARCHAR(20) COMMENT "name",
67+ age INT COMMENT "age"
68+ );
69+ """
70+ sql """
71+ insert into ${ table_name} values (1, 'a', 10);
72+ """
73+ // query
74+ def queryResult = sql """
75+ SELECT * FROM ${ table_name} ;
76+ """
77+ assert queryResult. size() == 1
78+
79+ sql """
80+ DROP TABLE ${ table_name} ;
81+ """
82+ sql """
83+ DROP DATABASE ${ db_name} FORCE;
84+ """
85+
86+ def dropResult = sql """
87+ show databases like "${ db_name} ";
88+ """
89+ assert dropResult. size() == 0
90+ }
91+
92+ def testQuery = { String catalog_name ,String catalogProperties ,String queryTable ,String expectedCount , boolean force_jni_scanner = false ->
93+
94+ sql """
95+ DROP CATALOG IF EXISTS ${ catalog_name} ;
96+ """
97+ if (force_jni_scanner) {
98+ sql """ set force_jni_scanner=true; """
99+ }else {
100+ sql """ set force_jni_scanner=false; """
101+ }
102+ sql """
103+ CREATE CATALOG IF NOT EXISTS ${ catalog_name} PROPERTIES (
104+ ${ catalogProperties}
105+ );
106+ """
107+ sql """
108+ switch ${ catalog_name} ;
109+ """
110+
111+ def queryResult = sql """
112+ SELECT count(1) FROM ${ queryTable} ;
113+ """
114+ assert queryResult[0 ][0 ] == expectedCount. toInteger()
115+
116+ sql """
117+ DROP CATALOG IF EXISTS ${ catalog_name} ;
118+ """
119+ }
120+
121+ /* ***************OSS_HDFS*******************/
122+ String oss_hdfs_ak = context. config. otherConfigs. get(" ossHdfsAk" )
123+ String oss_hdfs_sk = context. config. otherConfigs. get(" ossHdfsSk" )
124+ String oss_hdfs_endpoint = context. config. otherConfigs. get(" ossHdfsEndpoint" )
125+ String oss_hdfs_bucket = context. config. otherConfigs. get(" ossHdfsBucket" )
126+ String oss_hdfs_parent_path = " ${ oss_hdfs_bucket} /refactor-test"
127+ String oss_hdfs_region = context. config. otherConfigs. get(" ossHdfsRegion" )
128+
129+
130+ String old_oss_hdfs_storage_properties = """
131+ 'oss.access_key' = '${ oss_hdfs_ak} ',
132+ 'oss.secret_key' = '${ oss_hdfs_sk} ',
133+ 'oss.endpoint' = '${ oss_hdfs_endpoint} '
134+
135+
136+ """
137+ String usingOSSHDFSProps= """
138+ 'oss.hdfs.enabled'='true',
139+ """
140+ String new_oss_hdfs_storage_properties = """
141+ 'fs.oss.support' = 'true',
142+ 'oss.hdfs.access_key' = '${ oss_hdfs_ak} ',
143+ 'oss.hdfs.secret_key' = '${ oss_hdfs_sk} ',
144+ 'oss.hdfs.endpoint' = '${ oss_hdfs_endpoint} ',
145+ 'oss.hdfs.region'='${ oss_hdfs_region} '
146+ """
147+
148+
149+ // **************** Paimon FILESYSTEM ON OSS_HDFS *******************/
150+ String paimon_fs_warehouse = context. config. otherConfigs. get(" paimonFsWarehouseOnOssHdfs" )
151+ String query_table_paimon_fs = context. config. otherConfigs. get(" paimonFsWarehouseOnOssHdfsQueryTable" )
152+ String query_count_paimon_fs = context. config. otherConfigs. get(" paimonFsWarehouseOnOssHdfsQueryCount" )
153+ String paimon_file_system_catalog_properties = """
154+ 'type'='paimon',
155+ 'paimon.catalog.type'='filesystem',
156+ 'warehouse'='${ paimon_fs_warehouse} ',
157+ """
158+ testQuery(" paimon_fs_oss_hdfs_catalog" ,paimon_file_system_catalog_properties + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,true )
159+ testQuery(" paimon_fs_oss_hdfs_catalog" ,paimon_file_system_catalog_properties + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false )
160+ testQuery(" paimon_fs_oss_hdfs_region_catalog" ,paimon_file_system_catalog_properties + usingOSSHDFSProps + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,true )
161+ testQuery(" paimon_fs_oss_hdfs_region_catalog" ,paimon_file_system_catalog_properties + usingOSSHDFSProps + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false )
162+ testQuery(" paimon_fs_oss_hdfs_new_catalog" ,paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,true )
163+ testQuery(" paimon_fs_oss_hdfs_new_catalog" ,paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false )
164+ // **************** ICEBERG FILESYSTEM ON OSS_HDFS *******************/
165+ String iceberg_file_system_catalog_properties = """
166+ 'type'='iceberg',
167+ 'iceberg.catalog.type'='hadoop',
168+ """
169+
170+ String warehouse = """
171+ 'warehouse' = 'oss://${ oss_hdfs_parent_path} /iceberg-fs-oss-hdfs-warehouse',
172+ """
173+ testQueryAndInsert(iceberg_file_system_catalog_properties + warehouse + old_oss_hdfs_storage_properties, " iceberg_fs_on_oss_hdfs" ,null )
174+
175+ testQueryAndInsert(iceberg_file_system_catalog_properties + warehouse + usingOSSHDFSProps + old_oss_hdfs_storage_properties, " iceberg_fs_on_oss_hdfs_region" ,null )
176+
177+ testQueryAndInsert(iceberg_file_system_catalog_properties + warehouse + new_oss_hdfs_storage_properties, " iceberg_fs_on_oss_hdfs" ,null )
178+
179+ String enable_oss_hdfs_hms_catalog_test = context. config. otherConfigs. get(" enableOssHdfsHmsCatalogTest" )
180+ if (enable_oss_hdfs_hms_catalog_test == null || ! enable_oss_hdfs_hms_catalog_test. equalsIgnoreCase(" true" )) {
181+ return
182+ }
183+
184+ /* HMS props*/
185+ String hms_props = context. config. otherConfigs. get(" emrHmsProps" )
186+ String externalEnvIp = context. config. otherConfigs. get(" externalEnvIp" )
187+ String hmsuri = " thrift://${ externalEnvIp} :9083"
188+ String hms_properties = """
189+ "type"="hms",
190+ ${ hms_props} ,
191+ """
192+ // **************** HIVE HMS ON OSS_HDFS *******************/
193+
194+ String db_location
195+ // OSS-HDFS
196+ db_location = " oss://${ oss_hdfs_parent_path} /hive/hms/" + System . currentTimeMillis()
197+ testQueryAndInsert(hms_properties + old_oss_hdfs_storage_properties, " hive_hms_oss_hdfs_test_old" , db_location)
198+ db_location = " oss://${ oss_hdfs_parent_path} /hive/hms/" + System . currentTimeMillis()
199+ testQueryAndInsert(hms_properties + usingOSSHDFSProps+ old_oss_hdfs_storage_properties, " hive_hms_oss_hdfs_test_old" , db_location)
200+ db_location = " oss://${ oss_hdfs_parent_path} /hive/hms/" + System . currentTimeMillis()
201+ testQueryAndInsert(hms_properties + new_oss_hdfs_storage_properties, " hive_hms_oss_hdfs_test_new" , db_location)
202+
203+ // **************** ICEBERG HMS ON OSS_HDFS *******************/
204+ String iceberg_hms_type_prop = """
205+ 'type'='iceberg',
206+ 'iceberg.catalog.type'='hms',
207+ 'hive.metastore.uris'='${ hmsuri} ',
208+
209+ """
210+
211+ // Basic HMS with OSS storage
212+ warehouse = """
213+ 'warehouse' = 'oss://${ oss_hdfs_parent_path} /iceberg-hms-oss-hdfs-warehouse/',
214+ """
215+ testQueryAndInsert(iceberg_hms_type_prop + warehouse + old_oss_hdfs_storage_properties, " iceberg_hms_on_oss_hdfs_old" ,null )
216+ testQueryAndInsert(iceberg_hms_type_prop + oss_hdfs_endpoint + warehouse + old_oss_hdfs_storage_properties, " iceberg_hms_on_oss_hdfs_old" ,null )
217+
218+ testQueryAndInsert(iceberg_hms_type_prop + warehouse + new_oss_hdfs_storage_properties , " iceberg_hms_on_oss_hdfs_new" ,null )
219+
220+ }
0 commit comments