Skip to content

Commit 39bc4bb

Browse files
committed
Adding the 'SortRows' step before the merge join.
Changes done to resolve the issue #29
1 parent 833d1fe commit 39bc4bb

File tree

1 file changed

+50
-31
lines changed

1 file changed

+50
-31
lines changed

src/main/java/ca/sqlpower/architect/etl/kettle/KettleJob.java

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.pentaho.di.core.NotePadMeta;
3737
import org.pentaho.di.core.database.DatabaseMeta;
3838
import org.pentaho.di.core.exception.KettleException;
39+
import org.pentaho.di.core.gui.Point;
3940
import org.pentaho.di.core.logging.LogWriter;
4041
import org.pentaho.di.core.util.EnvUtil;
4142
import org.pentaho.di.job.JobHopMeta;
@@ -56,6 +57,7 @@
5657
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
5758
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
5859
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
60+
import org.pentaho.di.trans.steps.sort.SortRowsMeta;
5961
import ca.sqlpower.architect.ArchitectSession;
6062
import ca.sqlpower.architect.DepthFirstSearch;
6163
import ca.sqlpower.architect.ddl.DDLUtils;
@@ -329,43 +331,38 @@ public void doExport(List<SQLTable> tableList, SQLDatabase targetDB ) throws SQL
329331
tableInputMeta.setDatabaseMeta(databaseMeta);
330332
tableInputMeta.setSQL(tableMapping.get(sourceTable).toString());
331333
transMeta.addStep(stepMeta);
332-
inputSteps.add(stepMeta);
334+
//sort the Rows
335+
StepMeta sortedStepMeta = createSortRowsStep(transMeta,stepMeta);
336+
transMeta.addStep(sortedStepMeta);
337+
inputSteps.add(sortedStepMeta);
333338
}
334-
List<StepMeta> mergeSteps;
335-
336-
mergeSteps = createMergeJoins(settings.getJoinType(), transMeta, inputSteps,keyFields1, keyFields2);
337-
// boolean isInserUpdate = false;
338-
StepMeta stepMeta = null;
339+
340+
List<StepMeta> mergeSteps = createMergeJoins(settings.getJoinType(), transMeta, inputSteps,keyFields1, keyFields2);
341+
StepMeta outputStepMeta = null;
339342
if (settings.isInsertUpdate()) {
340343
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
341-
insertUpdateMeta.setDefault();
344+
insertUpdateMeta.setDefault();
342345
insertUpdateMeta.setDatabaseMeta(targetDatabaseMeta);
343346
insertUpdateMeta.setTableName(table.getName());
344347
insertUpdateMeta.setSchemaName(settings.getSchemaName());
345-
stepMeta = new StepMeta("InsertUpdate", "Insert/Update " + table.getName(), insertUpdateMeta);
348+
outputStepMeta = new StepMeta("InsertUpdate", "Insert/Update " + table.getName(), insertUpdateMeta);
346349

347350
} else {
348351
TableOutputMeta tableOutputMeta = new TableOutputMeta();
349352
tableOutputMeta.setDatabaseMeta(targetDatabaseMeta);
350353
tableOutputMeta.setTablename(table.getName());
351354
tableOutputMeta.setSchemaName(settings.getSchemaName());
352-
stepMeta = new StepMeta("TableOutput", "Output to " + table.getName(), tableOutputMeta);
355+
outputStepMeta = new StepMeta("TableOutput", "Output to " + table.getName(), tableOutputMeta);
353356

354357
}
355-
if(stepMeta != null) {
356-
stepMeta.setDraw(true);
357-
stepMeta.setLocation((inputSteps.size()+1)*spacing, inputSteps.size()*spacing);
358-
transMeta.addStep(stepMeta);
358+
if(outputStepMeta != null) {
359+
outputStepMeta.setDraw(true);
360+
outputStepMeta.setLocation((inputSteps.size()+2)*spacing, (inputSteps.size())*spacing);
361+
transMeta.addStep(outputStepMeta);
359362
}
360363
if (inputSteps.size() > 1 ) {
361364
TransHopMeta transHopMeta =
362-
new TransHopMeta(mergeSteps.isEmpty()?inputSteps.get(0):mergeSteps.get(mergeSteps.size()-1), stepMeta);
363-
//Commented as it always disable the hop for merge join
364-
// if (!mergeSteps.isEmpty()) {
365-
// transMeta.addNote(new NotePadMeta("The final hop is disabled because the join types may need to be updated.",0,0,125,125));
366-
// tasksToDo.add("Enable the final hop in " + transMeta.getName() + " after correcting the merge joins.");
367-
// transHopMeta.setEnabled(false);
368-
// }
365+
new TransHopMeta(mergeSteps.isEmpty()?inputSteps.get(0):mergeSteps.get(mergeSteps.size()-1), outputStepMeta);
369366
transMeta.addTransHop(transHopMeta);
370367
transformations.add(transMeta);
371368
logger.debug("Added a Trnasformation job for table "+table.getName());
@@ -918,6 +915,27 @@ public Object[] createTestRepository() {
918915

919916
}
920917

918+
/**
919+
* Create a steps to Sort the Rows and add it to transMeta
920+
* @param transMeta
921+
* @param inputStep
922+
* @return stepMeta
923+
*/
924+
private StepMeta createSortRowsStep(TransMeta transMeta, StepMeta inputStep) {
925+
StepMeta stepMeta = null ;
926+
SortRowsMeta sortRowsMeta = new SortRowsMeta();
927+
sortRowsMeta.setDefault();
928+
Point location = inputStep.getLocation();
929+
stepMeta = new StepMeta("SortRows", "SortRows for " + inputStep.getName(), sortRowsMeta );
930+
stepMeta.setDraw(true);
931+
stepMeta.setLocation(location.x*2, location.y);
932+
transMeta.addStep(stepMeta);
933+
// add the hop
934+
TransHopMeta transHopMeta = new TransHopMeta(inputStep, stepMeta);
935+
transMeta.addTransHop(transHopMeta);
936+
return stepMeta;
937+
}
938+
921939
/**
922940
* This creates all of the MergeJoin kettle steps as well as their hops from
923941
* the steps in the inputSteps list. The MergeJoin steps are also put into the
@@ -941,19 +959,23 @@ List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<S
941959
logger.debug("Key_Field2 :"+Arrays.toString(keyField_2));
942960
mergeJoinMeta.setKeyFields1(keyField_1);
943961
mergeJoinMeta.setKeyFields2(keyField_2);
962+
//removing the 'SortRows for' form the previous inputsets(sortRoes steps) to make the name more meaning for merge join.
963+
String table1_name = inputSteps.get(0).getName();
964+
table1_name = table1_name.replaceFirst("SortRows for ","");
965+
String table2_name = inputSteps.get(0).getName();
966+
table2_name = table2_name.replaceFirst("SortRows for ","");
944967
StepMeta stepMeta = new StepMeta("MergeJoin", "Join tables " +
945-
inputSteps.get(0).getName() + " and " +
946-
inputSteps.get(1).getName(), mergeJoinMeta);
968+
table1_name + " and " + table2_name, mergeJoinMeta);
947969
stepMeta.setDraw(true);
948-
stepMeta.setLocation(2*spacing, new Double(1.5*spacing).intValue());
970+
stepMeta.setLocation(3*spacing, new Double(1.5*spacing).intValue());
949971
transMeta.addStep(stepMeta);
950972
mergeSteps.add(stepMeta);
951973
TransHopMeta transHopMeta = new TransHopMeta(inputSteps.get(0), stepMeta);
952974
transMeta.addTransHop(transHopMeta);
953975
transHopMeta = new TransHopMeta(inputSteps.get(1), stepMeta);
954976
transMeta.addTransHop(transHopMeta);
955977
//commenting it disable the final hop. So when user open transformation in Pentaho they received warning about hop
956-
tasksToDo.add("Verify the merge join " + stepMeta.getName() + " does the correct merge.");
978+
tasksToDo.add("Verify the merge join " + stepMeta.getName() + " does the correct merge.");
957979
}
958980

959981
for (int i = 0; i < inputSteps.size()-2; i++) {
@@ -965,17 +987,14 @@ List<StepMeta> createMergeJoins(int defaultJoinType, TransMeta transMeta, List<S
965987
mergeJoinMeta.setStepMeta2(inputSteps.get(i+2));
966988
String[] keyField_1 = keyField1.get(i+2);
967989
String[] keyField_2 = keyField2.get(i+2);
968-
logger.debug("*** MergeJoin Join tables " +
969-
inputSteps.get(i+2).getName() + " and " +
970-
inputSteps.get(i+2).getName());
971-
logger.debug("*** Key_Field1 :"+Arrays.toString(keyField_1));
972-
logger.debug("*** Key_Field2 :"+Arrays.toString(keyField_2));
973990

974991
mergeJoinMeta.setKeyFields1(keyField_1);
975992
mergeJoinMeta.setKeyFields2(keyField_2);
976-
StepMeta stepMeta = new StepMeta("MergeJoin", "Join table " + inputSteps.get(i+2).getName(), mergeJoinMeta);
993+
String table1_name = inputSteps.get(i+2).getName();
994+
table1_name = table1_name.replaceFirst("SortRows for ","");
995+
StepMeta stepMeta = new StepMeta("MergeJoin", "Join table " + table1_name, mergeJoinMeta);
977996
stepMeta.setDraw(true);
978-
stepMeta.setLocation((i + 3) * spacing, new Double((i + 2.25) * spacing).intValue());
997+
stepMeta.setLocation((i + 4) * spacing, new Double((i + 2.25) * spacing).intValue());
979998
transMeta.addStep(stepMeta);
980999
mergeSteps.add(stepMeta);
9811000
TransHopMeta transHopMeta = new TransHopMeta(mergeSteps.get(i), stepMeta);

0 commit comments

Comments
 (0)