-
Notifications
You must be signed in to change notification settings - Fork 86
Expand file tree
/
Copy pathBigQuerySelectDataset.java
More file actions
204 lines (175 loc) · 7.51 KB
/
BigQuerySelectDataset.java
File metadata and controls
204 lines (175 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package io.cdap.plugin.gcp.bigquery.sqlengine;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
/**
* SQL Dataset that represents the result of a "Select" operation, such as join, that is executed in BigQuery.
*/
public class BigQuerySelectDataset implements SQLDataset, BigQuerySQLDataset {
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySelectDataset.class);
private final String datasetName;
private final Schema outputSchema;
private final BigQuerySQLEngineConfig sqlEngineConfig;
private final BigQuery bigQuery;
private final String project;
private final DatasetId bqDataset;
private final String bqTable;
private final String jobId;
private final BigQueryJobType operation;
private final String selectQuery;
private final Metrics metrics;
private Long numRows;
public static BigQuerySelectDataset getInstance(String datasetName,
Schema outputSchema,
BigQuerySQLEngineConfig sqlEngineConfig,
BigQuery bigQuery,
String project,
DatasetId bqDataset,
String bqTable,
String jobId,
BigQueryJobType jobType,
String selectQuery,
Metrics metrics) {
return new BigQuerySelectDataset(datasetName,
outputSchema,
sqlEngineConfig,
bigQuery,
project,
bqDataset,
bqTable,
jobId,
jobType,
selectQuery,
metrics);
}
private BigQuerySelectDataset(String datasetName,
Schema outputSchema,
BigQuerySQLEngineConfig sqlEngineConfig,
BigQuery bigQuery,
String project,
DatasetId bqDataset,
String bqTable,
String jobId,
BigQueryJobType operation,
String selectQuery,
Metrics metrics) {
this.datasetName = datasetName;
this.outputSchema = outputSchema;
this.sqlEngineConfig = sqlEngineConfig;
this.bigQuery = bigQuery;
this.project = project;
this.bqDataset = bqDataset;
this.bqTable = bqTable;
this.jobId = jobId;
this.operation = operation;
this.selectQuery = selectQuery;
this.metrics = metrics;
}
public BigQuerySelectDataset execute() {
TableId destinationTable = TableId.of(bqDataset.getProject(), bqDataset.getDataset(), bqTable);
// Get location for target dataset. This way, the job will run in the same location as the dataset
Dataset dataset = bigQuery.getDataset(bqDataset);
String location = dataset.getLocation();
// Update destination table schema to match configured schema in the pipeline.
updateTableSchema(destinationTable, outputSchema);
LOG.info("Creating table `{}` using job: {} with SQL statement: {}", bqTable, jobId,
selectQuery);
// Run BigQuery job with supplied SQL statement, storing results in a new table
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(selectQuery)
.setDestinationTable(destinationTable)
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_NEVER)
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.setSchemaUpdateOptions(Collections.singletonList(JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.setPriority(sqlEngineConfig.getJobPriority())
.setLabels(BigQuerySQLEngineUtils.getJobTags(operation))
.build();
// Create a job ID so that we can safely retry.
JobId bqJobId = JobId.newBuilder().setJob(jobId).setLocation(location).setProject(project).build();
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(bqJobId).build());
// Wait for the query to complete.
try {
queryJob = queryJob.waitFor();
} catch (InterruptedException ie) {
throw new SQLEngineException("Interrupted exception when executing Join operation", ie);
}
// Check for errors
if (queryJob == null) {
throw new SQLEngineException("BigQuery job not found: " + jobId);
} else if (queryJob.getStatus().getError() != null) {
BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
throw new SQLEngineException(String.format(
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
}
LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
return this;
}
@Override
public String getDatasetName() {
return datasetName;
}
@Override
public Schema getSchema() {
return outputSchema;
}
@Override
public long getNumRows() {
// Get the number of rows from BQ if not known at this time.
if (numRows == null) {
numRows = BigQuerySQLEngineUtils.getNumRows(bigQuery, bqDataset, bqTable);
}
return numRows;
}
@Override
public String getBigQueryProject() {
return bqDataset.getProject();
}
@Override
public String getBigQueryDataset() {
return bqDataset.getDataset();
}
@Override
public String getBigQueryTable() {
return bqTable;
}
@Override
@Nullable
public String getGCSPath() {
return null;
}
@Override
public String getJobId() {
return jobId;
}
protected void updateTableSchema(TableId tableId, Schema schema) {
// Get BigQuery schema for this table
com.google.cloud.bigquery.Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema);
// Get table and update definition to match the new schema
Table table = bigQuery.getTable(tableId);
TableDefinition updatedDefinition = table.getDefinition().toBuilder().setSchema(bqSchema).build();
Table updatedTable = table.toBuilder().setDefinition(updatedDefinition).build();
// Update table.
bigQuery.update(updatedTable);
}
}