@@ -54,13 +54,6 @@ public class DataCheckerDao {
5454 private static final String SQL_SOURCE_TYPE_JOB_PARTITION =
5555 "SELECT * FROM DBS d JOIN TBLS t ON t.DB_ID = d.DB_ID JOIN PARTITIONS p ON p.TBL_ID = t.TBL_ID WHERE d.NAME=? AND t.TBL_NAME=? AND p.PART_NAME=?" ;
5656
57- private static final String SQL_SOURCE_TYPE_BDP =
58- "SELECT * FROM desktop_bdapimport WHERE bdap_db_name = ? AND bdap_table_name = ? AND target_partition_name = ? AND status = '1';" ;
59-
60- private static final String SQL_SOURCE_TYPE_BDP_WITH_TIME_CONDITION =
61- "SELECT * FROM desktop_bdapimport WHERE bdap_db_name = ? AND bdap_table_name = ? AND target_partition_name = ? " +
62- "AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(STR_TO_DATE(modify_time, '%Y-%m-%d %H:%i:%s'))) <= ? AND status = '1';" ;
63-
6457 private static final String SQL_DOPS_CHECK_TABLE =
6558 "SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name is null AND task_state NOT IN (10,13) order by order_id desc limit 1" ;
6659 private static final String SQL_DOPS_CHECK_PARTITION =
@@ -72,7 +65,6 @@ public class DataCheckerDao {
7265 private static final String MASK_SOURCE_TYPE = "maskdb" ;
7366
7467 private static DataSource jobDS ;
75- private static DataSource bdpDS ;
7668
7769 private static DataSource dopsDS ;
7870 private static volatile DataCheckerDao instance ;
@@ -96,13 +88,6 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
9688 return false ;
9789 }
9890 }
99- if (bdpDS == null ) {
100- bdpDS = DataDruidFactory .getBDPInstance (props , log );
101- if (bdpDS == null ) {
102- log .warn ("Error getting job Druid DataSource instance" );
103- return false ;
104- }
105- }
10691 boolean systemCheck = Boolean .valueOf (props .getProperty (DataChecker .QUALITIS_SWITCH ));
10792 if (systemCheck && dopsDS == null ) {
10893 dopsDS = DataDruidFactory .getDopsInstance (props , log );//通过alibaba的druid数据库连接池获取JOB数据库连接
@@ -122,7 +107,7 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
122107 }
123108 log .info ("(DataChecker info) database table partition info : " + dataCheckerInfo );
124109 long waitTime = Long .valueOf (props .getProperty (DataChecker .WAIT_TIME , "1" )) * 3600 * 1000 ;
125- int queryFrequency = Integer .valueOf (props .getProperty (DataChecker .QUERY_FREQUENCY , "30000 " ));
110+ int queryFrequency = Integer .valueOf (props .getProperty (DataChecker .QUERY_FREQUENCY , "60000 " ));
126111// String timeScape = props.getProperty(DataChecker.TIME_SCAPE, "NULL");
127112 log .info ("(DataChecker info) wait time : " + waitTime );
128113 log .info ("(DataChecker info) query frequency : " + queryFrequency );
@@ -134,13 +119,12 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
134119 });
135120 QualitisUtil qualitisUtil = new QualitisUtil (props );
136121 try (Connection jobConn = jobDS .getConnection ();
137- Connection bdpConn = bdpDS .getConnection ();
138122 Connection dopsConn = dopsDS != null ? dopsDS .getConnection () : null ) {
139123 List <Boolean > allCheckRes = dataObjectList
140124 .parallelStream ()
141125 .map (proObjectMap -> {
142126 log .info ("Begin to Check dataObject:" + proObjectMap .entrySet ().toString ());
143- boolean checkRes = getDataCheckResult (proObjectMap , jobConn , bdpConn , dopsConn , props , log ,action ,qualitisUtil );
127+ boolean checkRes = getDataCheckResult (proObjectMap , jobConn , dopsConn , props , log ,action ,qualitisUtil );
144128 if (null != action .getExecutionRequestRefContext ()) {
145129 if (checkRes ) {
146130 action .getExecutionRequestRefContext ().appendLog ("Database table partition info : " + proObjectMap .get (DataChecker .DATA_OBJECT ) + " has arrived" );
@@ -178,7 +162,6 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
178162
179163 private boolean getDataCheckResult (Map <String , String > proObjectMap ,
180164 Connection jobConn ,
181- Connection bdpConn ,
182165 Connection dopsConn ,
183166 Properties props ,
184167 Logger log ,
@@ -231,7 +214,7 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
231214 }
232215 log .info ("start to check maskis" );
233216 proObjectMap .put (DataChecker .SOURCE_TYPE , MASK_SOURCE_TYPE );
234- normalCheck = ( getBdpTotalCount ( dataObject , bdpConn , log , props ) > 0 || "success" .equals (fetchMaskCode (dataObject , log , props ).get ("maskStatus" ) ));
217+ normalCheck = "success" .equals (fetchMaskCode (dataObject , log , props ).get ("maskStatus" ));
235218 if (null != action .getExecutionRequestRefContext ()){
236219 action .getExecutionRequestRefContext ().appendLog (dataObjectStr +" check maskis end,check result:" +normalCheck );
237220 }
@@ -316,25 +299,6 @@ private PreparedStatement getJobStatement(Connection conn, CheckDataObject dataO
316299 }
317300 }
318301
319- /**
320- * 构造查询maskis的查询
321- */
322- private PreparedStatement getBdpStatement (Connection conn , CheckDataObject dataObject , String timeScape ) throws SQLException {
323- PreparedStatement pstmt = null ;
324- if (timeScape .equals ("NULL" )) {
325- pstmt = conn .prepareCall (SQL_SOURCE_TYPE_BDP );
326- } else {
327- pstmt = conn .prepareCall (SQL_SOURCE_TYPE_BDP_WITH_TIME_CONDITION );
328- pstmt .setInt (4 , Integer .valueOf (timeScape ) * 3600 );
329- }
330- if (dataObject .getPartitionName () == null ) {
331- dataObject .setPartitionName ("" );
332- }
333- pstmt .setString (1 , dataObject .getDbName ());
334- pstmt .setString (2 , dataObject .getTableName ());
335- pstmt .setString (3 , dataObject .getPartitionName ());
336- return pstmt ;
337- }
338302
339303 /**
340304 * 构造查询dops库的查询
@@ -414,27 +378,6 @@ private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logge
414378 }
415379 }
416380
417- /**
418- * 查mask db
419- */
420- private long getBdpTotalCount (CheckDataObject dataObject , Connection conn , Logger log , Properties props ) {
421- String timeScape = props .getOrDefault (DataChecker .TIME_SCAPE , "NULL" ).toString ();
422- log .info ("-------------------------------------- search bdp data " );
423- log .info ("-------------------------------------- dataObject: " + dataObject .toString ());
424- try (PreparedStatement pstmt = getBdpStatement (conn , dataObject , timeScape )) {
425- ResultSet rs = pstmt .executeQuery ();
426- long ret = 0L ;
427- while (rs .next ()) {
428- ret ++;
429- }
430- // long ret=rs.last() ? rs.getRow() : 0;
431- log .info ("-------------------------------------- bdp data result:" +ret );
432- return ret ;
433- } catch (SQLException e ) {
434- log .error ("fetch data from bdp error" , e );
435- return 0 ;
436- }
437- }
438381
439382 /**
440383 * - 返回0表示未找到任何记录 ;
0 commit comments