Skip to content

Commit 176e4b1

Browse files
committed
Added logic to improve metrics for BQ Write operations
1 parent 6e585a0 commit 176e4b1

1 file changed

Lines changed: 29 additions & 11 deletions

File tree

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.lang.reflect.Type;
5252
import java.util.ArrayList;
5353
import java.util.Arrays;
54-
import java.util.Collections;
5554
import java.util.List;
5655
import java.util.Map;
5756
import java.util.Objects;
@@ -70,7 +69,8 @@ public class BigQueryWrite {
7069
public static final String SQL_OUTPUT_CONFIG = "config";
7170
public static final String SQL_OUTPUT_FIELDS = "fields";
7271
public static final String SQL_OUTPUT_SCHEMA = "schema";
73-
private static final Type LIST_OF_STRINGS_TYPE = new TypeToken<ArrayList<String>>() { }.getType();
72+
private static final Type LIST_OF_STRINGS_TYPE = new TypeToken<ArrayList<String>>() {
73+
}.getType();
7474

7575
private final BigQuerySQLEngineConfig sqlEngineConfig;
7676
private final BigQuery bigQuery;
@@ -167,7 +167,10 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
167167

168168
// Ensore both datasets are in the same location.
169169
if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) {
170-
LOG.warn("Direct table copy is only supported if both datasets are in the same location.");
170+
LOG.warn("Direct table copy is only supported if both datasets are in the same location. "
171+
+ "'{}' is '{}' , '{}' is '{}' .",
172+
sourceDatasetId.getDataset(), srcDataset.getLocation(),
173+
destinationDatasetId.getDataset(), destDataset.getLocation());
171174
return SQLWriteResult.unsupported(datasetName);
172175
}
173176

@@ -177,6 +180,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
177180
return SQLWriteResult.unsupported(datasetName);
178181
}
179182

183+
// Get source table instance
184+
Table srcTable = bigQuery.getTable(sourceTableId);
185+
180186
// Get destination table instance
181187
Table destTable = bigQuery.getTable(destinationTableId);
182188

@@ -219,7 +225,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
219225

220226
// Wait for the query to complete.
221227
queryJob = queryJob.waitFor();
222-
JobStatistics.QueryStatistics statistics = queryJob.getStatistics();
228+
JobStatistics.QueryStatistics queryJobStats = queryJob.getStatistics();
223229

224230
// Check for errors
225231
if (queryJob.getStatus().getError() != null) {
@@ -229,8 +235,11 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
229235
return SQLWriteResult.faiure(datasetName);
230236
}
231237

232-
long numRows = statistics.getNumDmlAffectedRows();
233-
LOG.info("Copied {} records from {}.{}.{} to {}.{}.{}", numRows,
238+
// Number of rows is taken from the job statistics if available.
239+
// If not, we use the number of source table records.
240+
long numRows = queryJobStats != null && queryJobStats.getNumDmlAffectedRows() != null ?
241+
queryJobStats.getNumDmlAffectedRows() : srcTable.getNumRows().longValue();
242+
LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows,
234243
sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
235244
destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable());
236245

@@ -240,8 +249,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
240249

241250
/**
242251
* Relax table fields based on the supplied schema
252+
*
243253
* @param schema schema to use when relaxing
244-
* @param table the destionation table to relax
254+
* @param table the destionation table to relax
245255
*/
246256
protected void relaxTableSchema(Schema schema, Table table) {
247257
com.google.cloud.bigquery.Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema);
@@ -252,9 +262,10 @@ protected void relaxTableSchema(Schema schema, Table table) {
252262

253263
/**
254264
* Create a new BigQuery table based on the supplied schema and table identifier
255-
* @param schema schema to use for this table
256-
* @param tableId itendifier for the new table
257-
* @param sinkConfig Sink configuration used to define this table
265+
*
266+
* @param schema schema to use for this table
267+
* @param tableId itendifier for the new table
268+
* @param sinkConfig Sink configuration used to define this table
258269
* @param newDestinationTable Atomic reference to this new table. Used to delete this table if the execution fails.
259270
*/
260271
protected void createTable(Schema schema,
@@ -268,7 +279,7 @@ protected void createTable(Schema schema,
268279
tableDefinitionBuilder.setSchema(bqSchema);
269280

270281
// Configure partitioning options
271-
switch(sinkConfig.getPartitioningType()) {
282+
switch (sinkConfig.getPartitioningType()) {
272283
case TIME:
273284
tableDefinitionBuilder.setTimePartitioning(getTimePartitioning(sinkConfig));
274285
break;
@@ -306,6 +317,7 @@ protected void createTable(Schema schema,
306317

307318
/**
308319
* Try to delete this table while handling exception
320+
*
309321
* @param table the table identified for the table we want to delete.
310322
*/
311323
protected void tryDeleteTable(TableId table) {
@@ -381,6 +393,7 @@ protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId s
381393

382394
/**
383395
* Build time partitioning configuration based on the BigQuery Sink configuration.
396+
*
384397
* @param config sink configuration to use
385398
* @return Time Partitioning configuration
386399
*/
@@ -398,6 +411,7 @@ protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) {
398411

399412
/**
400413
* Build range partitioning configuration based on the BigQuery Sink configuration.
414+
*
401415
* @param config sink configuration to use
402416
* @return Range Partitioning configuration
403417
*/
@@ -416,6 +430,7 @@ protected RangePartitioning getRangePartitioning(BigQuerySinkConfig config) {
416430

417431
/**
418432
* Build range used for partitioning configuration
433+
*
419434
* @param config sink configuration to use
420435
* @return Range configuration
421436
*/
@@ -433,6 +448,7 @@ protected RangePartitioning.Range getRangePartitioningRange(BigQuerySinkConfig c
433448

434449
/**
435450
* Get the list of fields to use for clustering based on the supplied sink configuration
451+
*
436452
* @param config sink configuration to use
437453
* @return List containing all clustering order fields.
438454
*/
@@ -446,6 +462,7 @@ List<String> getClusteringOrderFields(BigQuerySinkConfig config) {
446462

447463
/**
448464
* Get the clustering information for a list of clustering fields
465+
*
449466
* @param clusteringFields list of clustering fields to use
450467
* @return Clustering configuration
451468
*/
@@ -457,6 +474,7 @@ protected Clustering getClustering(List<String> clusteringFields) {
457474

458475
/**
459476
* Get encryption configuration for the supplied sink configuration
477+
*
460478
* @param config sink configuration to use
461479
* @return Encryption configuration
462480
*/

0 commit comments

Comments
 (0)