Skip to content

Commit e76aa3c

Browse files
committed
Customize river to accomodate requests from UBB
1 parent c60e5cc commit e76aa3c

File tree

3 files changed

+136
-56
lines changed

3 files changed

+136
-56
lines changed

src/main/java/org/elasticsearch/river/eea_rdf/RDFRiver.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
/**
2121
*
22-
* @author iulia
23-
*
22+
* @author EEA
23+
* Modified by Hemed 09-03-2015
2424
*/
2525
public class RDFRiver extends AbstractRiverComponent implements River {
2626
private volatile Harvester harvester;
@@ -79,6 +79,12 @@ private void addHarvesterSettings(RiverSettings settings) {
7979
.rdfEndpoint(XContentMapValues.nodeStringValue(
8080
rdfSettings.get("endpoint"),
8181
EEASettings.DEFAULT_ENDPOINT))
82+
.rdfNumberOfBulkActions(XContentMapValues.nodeLongValue(
83+
rdfSettings.get("bulkActions"),
84+
EEASettings.DEFAULT_NUMBER_OF_BULK_ACTIONS))
85+
.rdfUpdateDocuments(XContentMapValues.nodeBooleanValue(
86+
rdfSettings.get("updateDocuments"),
87+
EEASettings.DEFAULT_UPDATE_DOCUMENTS))
8288
.rdfQueryType(XContentMapValues.nodeStringValue(
8389
rdfSettings.get("queryType"),
8490
EEASettings.DEFAULT_QUERYTYPE))
@@ -158,6 +164,8 @@ public void start() {
158164
public void close() {
159165
harvester.log("Closing EEA RDF river [" + riverName.name() + "]");
160166
harvester.setClose(true);
161-
harvesterThread.interrupt();
167+
168+
if(harvesterThread != null)
169+
harvesterThread.interrupt();
162170
}
163171
}

src/main/java/org/elasticsearch/river/eea_rdf/settings/EEASettings.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@
44
import java.util.List;
55
/**
66
*
7-
* @author iulia
7+
* @author EEA
8+
* Modified by Hemed, 09-03-2015
89
*
910
*/
1011
public abstract class EEASettings {
1112

1213
public final static String DEFAULT_INDEX_NAME = "rdfdata";
1314
public final static String DEFAULT_TYPE_NAME = "resource";
14-
public final static int DEFAULT_BULK_SIZE = 30;
15+
public final static int DEFAULT_NUMBER_OF_BULK_ACTIONS = 100;
16+
public final static int DEFAULT_NUMBER_OF_RETRY = 5;
1517
public final static int DEFAULT_BULK_REQ = 30;
16-
public final static List<String> DEFAULT_QUERIES = new ArrayList<String>();
18+
public final static List<String>DEFAULT_QUERIES = new ArrayList<String>();
19+
public final static String [] DEFAULT_URI_DESCRIPTION_ARRAY = {"http://www.w3.org/2000/01/rdf-schema#label"};
1720
public final static String DEFAULT_ENDPOINT = "http://semantic.eea.europa.eu/sparql";
1821
public final static String DEFAULT_QUERYTYPE = "construct";
1922
public final static String DEFAULT_PROPLIST = "[" +
@@ -24,15 +27,14 @@ public abstract class EEASettings {
2427
"\"http://purl.org/dc/terms/title\", " +
2528
"\"http://www.w3.org/1999/02/22-rdf-syntax-ns#about\", " +
2629
"\"language\", \"topic\"]";
27-
public final static String DEFAULT_LIST_TYPE = "white";
30+
public final static String DEFAULT_LIST_TYPE = "white";
2831
public final static Boolean DEFAULT_ADD_LANGUAGE = true;
29-
public final static String DEFAULT_LANGUAGE = "\"en\"";
32+
public final static String DEFAULT_LANGUAGE = "\"en\"";
3033
public final static Boolean DEFAULT_ADD_URI = true;
31-
public final static String DEFAULT_URI_DESCRIPTION =
32-
"[http://www.w3.org/2000/01/rdf-schema#label,"
33-
+ "http://purl.org/dc/terms/title]";
34-
public final static String DEFAULT_SYNC_COND = "";
35-
public final static String DEFAULT_SYNC_TIME_PROP =
34+
public final static Boolean DEFAULT_UPDATE_DOCUMENTS = false;
35+
public final static String DEFAULT_URI_DESCRIPTION = "[" + "http://www.w3.org/2000/01/rdf-schema#label" + "]";
36+
public final static String DEFAULT_SYNC_COND = "";
37+
public final static String DEFAULT_SYNC_TIME_PROP =
3638
"http://cr.eionet.europa.eu/ontologies/contreg.rdf#lastRefreshed";
3739
public final static Boolean DEFAULT_SYNC_OLD_DATA = false;
3840

src/main/java/org/elasticsearch/river/eea_rdf/support/Harvester.java

Lines changed: 113 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@
2020
import org.elasticsearch.common.logging.ESLogger;
2121
import org.elasticsearch.common.logging.Loggers;
2222
import org.elasticsearch.river.eea_rdf.settings.EEASettings;
23+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
2324

2425
import java.io.IOException;
2526
import java.text.SimpleDateFormat;
2627
import java.util.*;
2728

28-
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
29+
2930

3031
/**
3132
*
32-
* @author iulia
33-
*
33+
* @author EEA <br>
34+
* Customized to accommodate requests from the University of Bergen Library.
35+
* Hemed, 09-03-2015
3436
*/
3537
public class Harvester implements Runnable {
3638

@@ -69,7 +71,9 @@ private enum QueryType {
6971
private String syncConditions;
7072
private String syncTimeProp;
7173
private Boolean syncOldData;
72-
74+
private Boolean updateDocuments;
75+
private long numberOfBulkActions;
76+
7377
private Client client;
7478
private String indexName;
7579
private String typeName;
@@ -376,6 +380,27 @@ public Harvester rdfSyncOldData(Boolean syncOldData) {
376380
this.syncOldData = syncOldData;
377381
return this;
378382
}
383+
384+
/**
385+
* This flag is set if you want to update the documents instead of indexing.
386+
* @param updateDocs
387+
* @return
388+
**/
389+
public Harvester rdfUpdateDocuments(Boolean updateDocs){
390+
this.updateDocuments = updateDocs;
391+
return this;
392+
}
393+
394+
/**
395+
* @param bulkActions
396+
* @return this object with numberOfBulkActions parameter set
397+
**/
398+
public Harvester rdfNumberOfBulkActions(long bulkActions)
399+
{
400+
this.numberOfBulkActions = bulkActions;
401+
return this;
402+
}
403+
379404

380405
public Harvester client(Client client) {
381406
this.client = client;
@@ -445,10 +470,11 @@ public void run() {
445470
if (success) {
446471
setLastUpdate(new Date(currentTime));
447472
}
448-
449-
client.admin().indices()
473+
474+
475+
/**client.admin().indices()
450476
.prepareDeleteMapping("_river").setType(riverName)
451-
.execute().actionGet();
477+
.execute().actionGet();**/
452478
}
453479

454480
public boolean runSync() {
@@ -602,6 +628,7 @@ private String getSyncQueryStr(Iterable<String> uris) {
602628
/**
603629
* Starts a harvester with predefined queries to synchronize with the
604630
* changes from the SPARQL endpoint
631+
* @return
605632
*/
606633
public boolean sync() {
607634
logger.info("Sync resources newer than {}", startTime);
@@ -670,7 +697,7 @@ public boolean sync() {
670697
for (String uri : rdfUrls) {
671698
currentBulk.add(uri);
672699

673-
if (currentBulk.size() == EEASettings.DEFAULT_BULK_SIZE) {
700+
if (currentBulk.size() == EEASettings.DEFAULT_BULK_REQ) {
674701
bulks.add(currentBulk);
675702
currentBulk = new ArrayList<String>();
676703
}
@@ -725,6 +752,7 @@ public boolean sync() {
725752

726753
/**
727754
* Starts the harvester for queries and/or URLs
755+
* @return
728756
*/
729757
public boolean runIndexAll() {
730758
logger.info(
@@ -835,7 +863,7 @@ private Model getModel(QueryExecution qexec) {
835863
* Add data to ES given a query execution service
836864
* @param qexec query execution service
837865
*/
838-
private void harvest(QueryExecution qexec) {
866+
private void harvest(QueryExecution qexec) throws IOException {
839867
boolean retry;
840868
do {
841869
retry = false;
@@ -1019,18 +1047,21 @@ private Map<String, ArrayList<String>> getJsonMap(Resource rs, Set<Property> pro
10191047

10201048
return jsonMap;
10211049
}
1022-
1023-
/**
1024-
* Index all the resources in a Jena Model to ES
1025-
*
1050+
1051+
/**
1052+
* Index or update all the resources in a Jena Model to ES
1053+
* Note: Update works if the user has specified the flag "updateDocuments" to true in the river settings.
1054+
* By default it is set to false.
1055+
* By doing this, you can partial update the documents without full re-indexing.
10261056
* @param model the model to index
10271057
* @param bulkRequest a BulkRequestBuilder
10281058
*/
1029-
private void addModelToES(Model model, BulkRequestBuilder bulkRequest) {
1059+
1060+
private void addModelToES(Model model, BulkRequestBuilder bulkRequest) throws IOException {
10301061
long startTime = System.currentTimeMillis();
10311062
long bulkLength = 0;
10321063
HashSet<Property> properties = new HashSet<Property>();
1033-
1064+
10341065
StmtIterator iter = model.listStatements();
10351066
while(iter.hasNext()) {
10361067
Statement st = iter.nextStatement();
@@ -1051,12 +1082,20 @@ private void addModelToES(Model model, BulkRequestBuilder bulkRequest) {
10511082
Resource rs = rsiter.nextResource();
10521083
Map<String, ArrayList<String>> jsonMap = getJsonMap(rs, properties, model);
10531084

1054-
bulkRequest.add(client.prepareIndex(indexName, typeName, rs.toString())
1055-
.setSource(mapToString(jsonMap)));
1056-
bulkLength++;
1057-
1058-
// We want to execute the bulk for every DEFAULT_BULK_SIZE requests
1059-
if(bulkLength % EEASettings.DEFAULT_BULK_SIZE == 0) {
1085+
if(updateDocuments){
1086+
//If updateDocuments is set to true, then prepare to update this document
1087+
prepareUpdateDocument(bulkRequest, mapToString(jsonMap), rs.toString());
1088+
}
1089+
else{
1090+
//Otherwise, prepare to index this document
1091+
prepareIndexDocument(bulkRequest, mapToString(jsonMap), rs.toString());
1092+
}
1093+
1094+
bulkLength++;
1095+
1096+
// We want to execute the bulk for every numberOfBulkActions requests
1097+
if(bulkLength % numberOfBulkActions == 0) {
1098+
10601099
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
10611100
// After executing, flush the BulkRequestBuilder.
10621101
bulkRequest = client.prepareBulk();
@@ -1076,11 +1115,15 @@ private void addModelToES(Model model, BulkRequestBuilder bulkRequest) {
10761115
}
10771116

10781117
}
1079-
1080-
// Show time taken to index the documents
1081-
logger.info("Indexed {} documents on {}/{} in {} seconds",
1082-
bulkLength, indexName, typeName,
1083-
(System.currentTimeMillis() - startTime)/ 1000.0);
1118+
//Show time taken to perfom the action
1119+
String actionPerformed = updateDocuments == true? "update: " : "index: ";
1120+
logger.info("\n==========================================="
1121+
+"\n\tTotal documents proccessed: " + bulkLength
1122+
+ "\n\tIndex: " + indexName
1123+
+ "\n\tType: " + typeName
1124+
+ "\n\tTime taken to " + actionPerformed + (System.currentTimeMillis() - startTime)/1000.0
1125+
+" seconds"
1126+
+"\n===========================================");
10841127
}
10851128

10861129

@@ -1092,7 +1135,6 @@ private void processBulkResponseFailure(BulkResponse response) {
10921135
logger.warn("There was failures when executing bulk : " + response.buildFailureMessage());
10931136

10941137
if(!logger.isDebugEnabled()) return;
1095-
10961138
for(BulkItemResponse item: response.getItems()) {
10971139
if (item.isFailed()) {
10981140
logger.debug("Error {} occured on index {}, type {}, id {} for {} operation "
@@ -1101,6 +1143,31 @@ private void processBulkResponseFailure(BulkResponse response) {
11011143
}
11021144
}
11031145
}
1146+
1147+
1148+
1149+
/** Prepare update of a document in ElasticSearch. Given document ID, a document will be merged to the existing document
1150+
* with this ID, if document does not exist, no update will be performed and the DocumentMissingException will be thrown.
1151+
*
1152+
* This is useful if someone wants to update a partial document in ElasticSearch without full re-indexing.
1153+
* Hemed, 09-03-2015
1154+
**/
1155+
private void prepareUpdateDocument(BulkRequestBuilder bulkRequest , String documentToUpdate, String documentId){
1156+
bulkRequest.add(client
1157+
.prepareUpdate(indexName, typeName, documentId)
1158+
//Merge this document to the existing one of the same Id.
1159+
.setDoc(documentToUpdate) );
1160+
}
1161+
1162+
1163+
/**
1164+
* Prepare document to be bulk indexed in ElasticSearch
1165+
**/
1166+
private void prepareIndexDocument(BulkRequestBuilder bulkRequest, String documentToIndex, String documentId){
1167+
bulkRequest.add(client
1168+
.prepareIndex(indexName, typeName, documentId)
1169+
.setSource(documentToIndex));
1170+
}
11041171

11051172
/**
11061173
* Converts a map of results to a String JSON representation for it
@@ -1192,11 +1259,7 @@ private String getStringForResult(RDFNode node) {
11921259
* @return a String value, either a label for the parameter or its value
11931260
* if no label is obtained from the endpoint
11941261
*/
1195-
private String getLabelForUri(String uri) {
1196-
String result;
1197-
if (uriLabelCache.containsKey(uri)) {
1198-
return uriLabelCache.get(uri);
1199-
}
1262+
private String getLabelForUri(String uri){
12001263
for(String prop:uriDescriptionList) {
12011264
String innerQuery = "SELECT ?r WHERE {<" + uri + "> <" +
12021265
prop + "> ?r } LIMIT 1";
@@ -1207,25 +1270,32 @@ private String getLabelForUri(String uri) {
12071270
rdfEndpoint,
12081271
query);
12091272
boolean keepTrying = true;
1210-
while(keepTrying) {
1273+
int numberOfRetry = 0;
1274+
1275+
//If the label is not available, retry querying the endpoint DEFAULT_NUMBER_OF_RETRY times,
1276+
//if no response, simply return resource URI instead of it's label.
1277+
while(keepTrying && numberOfRetry < EEASettings.DEFAULT_NUMBER_OF_RETRY)
1278+
{
12111279
keepTrying = false;
12121280
try {
12131281
ResultSet results = qexec.execSelect();
12141282

1215-
if(results.hasNext()) {
1283+
if(results.hasNext())
1284+
{
12161285
QuerySolution sol = results.nextSolution();
1217-
result = EEASettings.parseForJson(
1286+
String result = EEASettings.parseForJson(
12181287
sol.getLiteral("r").getLexicalForm());
1219-
if(!result.isEmpty()) {
1220-
uriLabelCache.put(uri, result);
1288+
if(!result.isEmpty())
12211289
return result;
1222-
}
12231290
}
1224-
} catch(Exception e) {
1225-
keepTrying = true;
1226-
logger.warn("Could not get label for uri {}. Retrying.",
1227-
uri);
1228-
} finally { qexec.close();}
1291+
} catch(Exception e){
1292+
keepTrying = true;
1293+
numberOfRetry++;
1294+
logger.warn("Could not get label for uri {}. Retrying..." , uri);
1295+
1296+
//Print the stack trace
1297+
e.printStackTrace();
1298+
}finally { qexec.close();}
12291299
}
12301300
} catch (QueryParseException qpe) {
12311301
logger.error("Exception for query {}. The label cannot be obtained",

0 commit comments

Comments
 (0)