Skip to content

Commit ee93eaf

Browse files
committed
Updated to export join keys for steps while creating a MergeJoins.
issue #26
1 parent 6ee9e7f commit ee93eaf

File tree

3 files changed

+109
-43
lines changed

3 files changed

+109
-43
lines changed

regress/ca/sqlpower/architect/etl/kettle/CreateKettleJobTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public void testOutputToXMLFileException() throws IOException {
235235
public void testCreateMergeJoinsNoInputs() {
236236
KettleJob job = new KettleJob(session);
237237
TransMeta transMeta = new TransMeta();
238-
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, new ArrayList<StepMeta>());
238+
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, new ArrayList<StepMeta>(), new ArrayList<String[]>(),new ArrayList<String[]>());
239239
assertEquals(0, mergeSteps.size());
240240
assertEquals(0, transMeta.nrSteps());
241241
}
@@ -245,7 +245,7 @@ public void testCreateMergeJoinsOneInput() {
245245
TransMeta transMeta = new TransMeta();
246246
List<StepMeta> inputSteps = new ArrayList<StepMeta>();
247247
inputSteps.add(new StepMeta());
248-
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, inputSteps);
248+
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, inputSteps, new ArrayList<String[]>(),new ArrayList<String[]>());
249249
assertEquals(0, mergeSteps.size());
250250
assertEquals(0, transMeta.nrSteps());
251251
assertEquals(0, transMeta.nrTransHops());
@@ -263,7 +263,7 @@ public void testCreateMergeJoinsTwoInputs() {
263263
input2.setName("input2");
264264
inputSteps.add(input2);
265265
transMeta.addStep(input2);
266-
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, inputSteps);
266+
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, inputSteps,new ArrayList<String[]>(),new ArrayList<String[]>());
267267
assertEquals(1, mergeSteps.size());
268268
assertEquals(3, transMeta.nrSteps());
269269
assertEquals(2, transMeta.nrTransHops());
@@ -281,7 +281,7 @@ public void testCreateMergeJoinsThreeInputs() {
281281
input2.setName("input2");
282282
inputSteps.add(input2);
283283
transMeta.addStep(input2);
284-
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, inputSteps);
284+
List<StepMeta> mergeSteps = job.createMergeJoins(0, transMeta, inputSteps, new ArrayList<String[]>(),new ArrayList<String[]>());
285285
assertEquals(1, mergeSteps.size());
286286
assertEquals(3, transMeta.nrSteps());
287287
assertEquals(2, transMeta.nrTransHops());

src/main/java/ca/sqlpower/architect/etl/kettle/KettleJob.java

Lines changed: 101 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import java.sql.Connection;
2727
import java.sql.SQLException;
2828
import java.util.ArrayList;
29+
import java.util.Arrays;
2930
import java.util.LinkedHashMap;
31+
import java.util.LinkedList;
3032
import java.util.List;
3133
import java.util.Map;
3234

@@ -64,6 +66,8 @@
6466
import ca.sqlpower.sqlobject.SQLColumn;
6567
import ca.sqlpower.sqlobject.SQLDatabase;
6668
import ca.sqlpower.sqlobject.SQLObjectException;
69+
import ca.sqlpower.sqlobject.SQLRelationship;
70+
import ca.sqlpower.sqlobject.SQLRelationship.ColumnMapping;
6771
import ca.sqlpower.sqlobject.SQLTable;
6872
import ca.sqlpower.util.Monitorable;
6973
import ca.sqlpower.util.MonitorableImpl;
@@ -191,6 +195,8 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
191195
List<SQLColumn> columnList = table.getColumns();
192196
List<String> noMappingForColumn = new ArrayList<String>();
193197
List<StepMeta> inputSteps = new ArrayList<StepMeta>();
198+
List<String[]> keyFields1 = new ArrayList<String[]>();
199+
List<String[]> keyFields2 = new ArrayList<String[]>();
194200
JDBCDataSourceType dsType = targetDB.getDataSource().getParentType();
195201
boolean isQuoting = dsType.getSupportsQuotingName();
196202
String ddlGeneratorClass = dsType.getDDLGeneratorClass();
@@ -233,15 +239,17 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
233239
//else if quoting for PostgresDDLGenerator
234240
sourceColumn = "\""+sourceColumn+"\"";
235241
}
236-
if (!tableMapping.containsKey(sourceTable)) {
237-
StringBuffer buffer = new StringBuffer();
238-
buffer.append("SELECT ");
239-
buffer.append(sourceColumn);
240-
buffer.append(" AS ").append(columnName);
241-
tableMapping.put(sourceTable, buffer);
242-
} else {
243-
tableMapping.get(sourceTable).append(", ").append
244-
(sourceColumn).append(" AS ").append(columnName);
242+
if(column.getSourceColumn() != null ) {
243+
if (!tableMapping.containsKey(sourceTable)) {
244+
StringBuffer buffer = new StringBuffer();
245+
buffer.append("SELECT ");
246+
buffer.append(sourceColumn);
247+
buffer.append(" AS ").append(columnName);
248+
tableMapping.put(sourceTable, buffer);
249+
} else {
250+
tableMapping.get(sourceTable).append(", ").append
251+
(sourceColumn).append(" AS ").append(columnName);
252+
}
245253
}
246254
}
247255

@@ -252,12 +260,15 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
252260
continue;
253261
} else {
254262
StringBuffer buffer = new StringBuffer();
255-
buffer.append("There is no source for the column(s): ");
256-
for (String noMapForCol: noMappingForColumn) {
257-
buffer.append(noMapForCol).append(" ");
263+
if(noMappingForColumn.size() > 0) {
264+
buffer.append("There is no source for the column(s): ");
265+
for (String noMapForCol: noMappingForColumn) {
266+
buffer.append(noMapForCol).append(" ");
267+
}
268+
269+
tasksToDo.add(buffer.toString() + " for the table " + table.getName());
270+
transMeta.addNote(new NotePadMeta(buffer.toString(), 0, 150, 125, 125));
258271
}
259-
tasksToDo.add(buffer.toString() + " for the table " + table.getName());
260-
transMeta.addNote(new NotePadMeta(buffer.toString(), 0, 150, 125, 125));
261272
}
262273
}
263274

@@ -267,6 +278,38 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
267278
}
268279

269280
for (SQLTable sourceTable: tableMapping.keySet()) {
281+
List<String> keys1 = new LinkedList<String>();
282+
List<String> keys2 = new LinkedList<String>();
283+
/**
284+
* Exported keys are different for table in Database then table id PlayPen when user create new Relationship
285+
* manually which doesn't exists in database table.
286+
* Here When user create a kettle job it based on Playpen tables. User might create new relationship. So to get the correct exported keys
287+
* for Playpen Tables we are getting the table whose parent is PlayPen.
288+
* So here tables in tableMapping Map has the table from database (parent is database). While table from the tableList has the parent as a PlayPen.
289+
* So even though the table is dragged from database it can have different exported keys (after dragging it in the playpen)specially when user create new relationship manually.
290+
*/
291+
SQLTable playpenTable = null;
292+
for(SQLTable pTable :tableList) {
293+
if (sourceTable.getName().equalsIgnoreCase(pTable.getName())) {
294+
playpenTable = pTable;
295+
break;
296+
}
297+
}
298+
logger.debug("playpenTable name:" + (playpenTable != null ?playpenTable.getName(): "null"));
299+
if (playpenTable != null) {
300+
for (SQLRelationship exportedKeys : playpenTable.getExportedKeys()) {
301+
for (ColumnMapping mapping: exportedKeys.getMappings()) {
302+
SQLColumn pkCol = mapping.getPkColumn();
303+
SQLColumn fkCol = mapping.getFkColumn();
304+
if(pkCol != null && fkCol!= null) {
305+
keys1.add(pkCol.getName());
306+
keys2.add(fkCol.getName());
307+
}
308+
}
309+
}
310+
}
311+
keyFields1.add(keys1.toArray(new String[keys1.size()]));
312+
keyFields2.add(keys2.toArray(new String[keys2.size()]));
270313
JDBCDataSource source = sourceTable.getParentDatabase().getDataSource();
271314
DatabaseMeta databaseMeta = addDatabaseConnection(databaseNames, source);
272315
transMeta.addDatabase(databaseMeta);
@@ -281,10 +324,9 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
281324
transMeta.addStep(stepMeta);
282325
inputSteps.add(stepMeta);
283326
}
284-
285327
List<StepMeta> mergeSteps;
286-
mergeSteps = createMergeJoins(settings.getJoinType(), transMeta, inputSteps);
287-
328+
329+
mergeSteps = createMergeJoins(settings.getJoinType(), transMeta, inputSteps,keyFields1, keyFields2);
288330
TableOutputMeta tableOutputMeta = new TableOutputMeta();
289331
tableOutputMeta.setDatabaseMeta(targetDatabaseMeta);
290332
tableOutputMeta.setTablename(table.getName());
@@ -293,23 +335,27 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
293335
stepMeta.setDraw(true);
294336
stepMeta.setLocation((inputSteps.size()+1)*spacing, inputSteps.size()*spacing);
295337
transMeta.addStep(stepMeta);
338+
if (inputSteps.size() >0 ) {
296339
TransHopMeta transHopMeta =
297340
new TransHopMeta(mergeSteps.isEmpty()?inputSteps.get(0):mergeSteps.get(mergeSteps.size()-1), stepMeta);
298-
if (!mergeSteps.isEmpty()) {
299-
transMeta.addNote(new NotePadMeta("The final hop is disabled because the join types may need to be updated.",0,0,125,125));
300-
tasksToDo.add("Enable the final hop in " + transMeta.getName() + " after correcting the merge joins.");
301-
transHopMeta.setEnabled(false);
302-
}
341+
//Commented as it always disable the hop for merge join
342+
// if (!mergeSteps.isEmpty()) {
343+
// transMeta.addNote(new NotePadMeta("The final hop is disabled because the join types may need to be updated.",0,0,125,125));
344+
// tasksToDo.add("Enable the final hop in " + transMeta.getName() + " after correcting the merge joins.");
345+
// transHopMeta.setEnabled(false);
346+
// }
303347
transMeta.addTransHop(transHopMeta);
304348

305349
transformations.add(transMeta);
306-
350+
logger.debug("Added a Trnasformation job for table "+table.getName());
351+
}
352+
}
307353
if (monitor.isCancelled()) {
308354
cancel();
309355
return;
310356
}
311357

312-
}
358+
// }
313359

314360
if (!noTransTables.isEmpty()) {
315361
StringBuffer buffer = new StringBuffer();
@@ -360,10 +406,10 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
360406
successEntry.setLocation(i*spacing, spacing);
361407
successEntry.setDrawn();
362408
jm.addJobEntry(successEntry);
363-
364-
JobHopMeta hop = new JobHopMeta(oldJobEntry, successEntry);
365-
jm.addJobHop(hop);
366-
409+
if(oldJobEntry != null) {
410+
JobHopMeta hop = new JobHopMeta(oldJobEntry, successEntry);
411+
jm.addJobHop(hop);
412+
}
367413
if (monitor.isCancelled()) {
368414
cancel();
369415
return;
@@ -373,7 +419,7 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
373419
jobname += "_"+getJob_no();
374420
}
375421
jm.setName(jobname);
376-
// System.out.println("setting job name: "+jobname);
422+
logger.debug("setting job name: "+jobname);
377423
if (settings.isSavingToFile()) {
378424
outputToXML(transformations, jm);
379425
} else {
@@ -452,7 +498,7 @@ public void doSplitedJobExport(List<SQLTable> tableList, SQLDatabase targetDB )
452498
return;
453499
}
454500

455-
jm.setName( settings.getJobName());
501+
jm.setName(settings.getJobName());
456502
job_no=0;
457503
if (settings.isSavingToFile()) {
458504
jobOutputToXML(jobMetaList, jm);
@@ -484,7 +530,7 @@ private void jobOutputToXML(List<JobMeta> jmList, JobMeta jm) throws IOException
484530
for (int i = 1; i < jm.nrJobEntries() -1; i++) {
485531
JobEntryJob jobs = (JobEntryJob)(jm.getJobEntry(i).getEntry());
486532
jobs.setFileName(getJobFilePath(jobs.getName()));
487-
System.out.println("\n jobOutputToXML::jobs fileName: "+jobs.getFileName());
533+
logger.debug("jobOutputToXML::jobs fileName: "+jobs.getFileName());
488534
}
489535

490536
String fileName = settings.getFilePath() ;
@@ -608,7 +654,7 @@ void outputToXML(List<TransMeta> transformations, JobMeta job) throws IOExceptio
608654

609655
for (TransMeta transMeta : transformations) {
610656
File file = new File(getTransFilePath(transMeta.getName()));
611-
System.out.println("\n transformation file: "+file.getAbsolutePath());
657+
logger.debug("transformation file: "+file.getAbsolutePath());
612658
transMeta.setFilename(file.getName());
613659
try {
614660
outputs.put(file, transMeta.getXML());
@@ -859,7 +905,7 @@ public Object[] createTestRepository() {
859905
* the steps in the inputSteps list. The MergeJoin steps are also put into the
860906
* TransMeta. This method is package private for testing purposes.
861907
*/
862-
List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<StepMeta> inputSteps) {
908+
List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<StepMeta> inputSteps, List<String[]> keyField1, List<String[]> keyField2) {
863909
List<StepMeta> mergeSteps = new ArrayList<StepMeta>();
864910
if (inputSteps.size() > 1) {
865911
MergeJoinMeta mergeJoinMeta = new MergeJoinMeta();
@@ -868,8 +914,15 @@ List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<S
868914
mergeJoinMeta.setStepMeta1(inputSteps.get(0));
869915
mergeJoinMeta.setStepName2(inputSteps.get(1).getName());
870916
mergeJoinMeta.setStepMeta2(inputSteps.get(1));
871-
mergeJoinMeta.setKeyFields1(new String[]{});
872-
mergeJoinMeta.setKeyFields2(new String[]{});
917+
String[] keyField_1 = keyField1.get(0);
918+
String[] keyField_2 = keyField2.get(0);
919+
logger.debug("MergeJoin Join tables " +
920+
inputSteps.get(0).getName() + " and " +
921+
inputSteps.get(1).getName());
922+
logger.debug("Key_Field1 :"+Arrays.toString(keyField_1));
923+
logger.debug("Key_Field2 :"+Arrays.toString(keyField_2));
924+
mergeJoinMeta.setKeyFields1(keyField_1);
925+
mergeJoinMeta.setKeyFields2(keyField_2);
873926
StepMeta stepMeta = new StepMeta("MergeJoin", "Join tables " +
874927
inputSteps.get(0).getName() + " and " +
875928
inputSteps.get(1).getName(), mergeJoinMeta);
@@ -881,7 +934,8 @@ List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<S
881934
transMeta.addTransHop(transHopMeta);
882935
transHopMeta = new TransHopMeta(inputSteps.get(1), stepMeta);
883936
transMeta.addTransHop(transHopMeta);
884-
tasksToDo.add("Verify the merge join " + stepMeta.getName() + " does the correct merge.");
937+
//commenting it disable the final hop. So when user open transformation in Pentaho they received warning about hop
938+
tasksToDo.add("Verify the merge join " + stepMeta.getName() + " does the correct merge.");
885939
}
886940

887941
for (int i = 0; i < inputSteps.size()-2; i++) {
@@ -891,8 +945,16 @@ List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<S
891945
mergeJoinMeta.setStepMeta1(mergeSteps.get(i));
892946
mergeJoinMeta.setStepName2(inputSteps.get(i+2).getName());
893947
mergeJoinMeta.setStepMeta2(inputSteps.get(i+2));
894-
mergeJoinMeta.setKeyFields1(new String[]{});
895-
mergeJoinMeta.setKeyFields2(new String[]{});
948+
String[] keyField_1 = keyField1.get(i+2);
949+
String[] keyField_2 = keyField2.get(i+2);
950+
logger.debug("*** MergeJoin Join tables " +
951+
inputSteps.get(i+2).getName() + " and " +
952+
inputSteps.get(i+2).getName());
953+
logger.debug("*** Key_Field1 :"+Arrays.toString(keyField_1));
954+
logger.debug("*** Key_Field2 :"+Arrays.toString(keyField_2));
955+
956+
mergeJoinMeta.setKeyFields1(keyField_1);
957+
mergeJoinMeta.setKeyFields2(keyField_2);
896958
StepMeta stepMeta = new StepMeta("MergeJoin", "Join table " + inputSteps.get(i+2).getName(), mergeJoinMeta);
897959
stepMeta.setDraw(true);
898960
stepMeta.setLocation((i + 3) * spacing, new Double((i + 2.25) * spacing).intValue());
@@ -902,6 +964,7 @@ List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<S
902964
transMeta.addTransHop(transHopMeta);
903965
transHopMeta = new TransHopMeta(inputSteps.get(i+2), stepMeta);
904966
transMeta.addTransHop(transHopMeta);
967+
//commenting it disable the final hop. So when user open transformation in Pentaho they received warning about hop
905968
tasksToDo.add("Verify the merge join " + stepMeta.getName() + " does the correct merge.");
906969
}
907970
return mergeSteps;

src/main/java/ca/sqlpower/architect/swingui/action/KettleJobAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,10 @@ public void actionPerformed(ActionEvent arg0) {
165165
toDoListDialog.add(builder.getPanel());
166166
toDoListDialog.pack();
167167
toDoListDialog.setLocationRelativeTo(frame);
168-
toDoListDialog.setVisible(true);
168+
//Show only if there is a task to do
169+
if(tasksToDo.size() > 0) {
170+
toDoListDialog.setVisible(true);
171+
}
169172
}
170173
};
171174

0 commit comments

Comments
 (0)