1- package org .elasticsearch .river .eea_rdf ;
1+ package org .elasticsearch .river .ubb ;
22
33import org .elasticsearch .client .Client ;
44import org .elasticsearch .common .inject .Inject ;
77import org .elasticsearch .common .util .concurrent .EsExecutors ;
88import org .elasticsearch .common .xcontent .support .XContentMapValues ;
99import org .elasticsearch .river .*;
10- import org .elasticsearch .river .eea_rdf .settings .EEASettings ;
11- import org .elasticsearch .river .eea_rdf .support .ContextFactory ;
12- import org .elasticsearch .river .eea_rdf .support .Harvester ;
13- import org .elasticsearch .river .eea_rdf .support .JsonFileLoader ;
10+ import org .elasticsearch .river .ubb .settings .Defaults ;
11+ import org .elasticsearch .river .ubb .support .ContextFactory ;
12+ import org .elasticsearch .river .ubb .support .Harvester ;
13+ import org .elasticsearch .river .ubb .support .JsonFileLoader ;
1414
1515import java .util .Arrays ;
1616import java .util .List ;
1717import java .util .Map ;
1818
1919/**
20- * @author EEA
20+ * @author European Environment Agency (EEA)
21+ * @author Hemed Al Ruwehy
2122 * <p>
2223 * Modified by Hemed Ali Al Ruwehy, The University of Bergen Library
2324 * @since 09-03-2015
@@ -35,7 +36,7 @@ public RDFRiver(RiverName riverName,
3536 super (riverName , settings );
3637 harvester = new Harvester ();
3738 harvester .client (client ).riverName (riverName .name ());
38- addHarvesterSettings (settings );
39+ buildHarvester (settings );
3940 }
4041
4142 /**
@@ -46,6 +47,19 @@ private static Map<String, Object> extractSettings(RiverSettings settings, Strin
4647 return (Map <String , Object >) settings .settings ().get (key );
4748 }
4849
50+ /**
51+ * Type casting accessors for river settings
52+ **/
53+ @ SuppressWarnings ("unchecked" )
54+ private static Map <String , Object > extractSettings (RiverSettings settings ) {
55+ if (settings .settings ().containsKey (Defaults .EEA_SETTINGS_KEY )) {
56+ return (Map <String , Object >) settings .settings ().get (Defaults .EEA_SETTINGS_KEY );
57+ }
58+ throw new IllegalArgumentException (String .
59+ format ("No key in the river settings. Expected \" %s\" " , Defaults .EEA_SETTINGS_KEY ));
60+ }
61+
62+
4963 @ SuppressWarnings ("unchecked" )
5064 private static Map <String , String > getStrStrMapFromSettings (Map <String , Object > settings , String key ) {
5165 return (Map <String , String >) settings .get (key );
@@ -63,7 +77,7 @@ private static Map<String, String> loadProperties(Map<String, Object> settings,
6377
6478 private static String loadContext (Map <String , Object > settings , String key ) {
6579 Object values = settings .get (key );
66- logger .info ("Reading context from: " + values );
80+ logger .info ("Reading from context : " + values );
6781 return new JsonFileLoader ().resolveToString (values .toString ());
6882 }
6983
@@ -78,13 +92,11 @@ private static List<String> getStrListFromSettings(Map<String, Object> settings,
7892 return (List <String >) settings .get (key );
7993 }
8094
81- private void addHarvesterSettings (RiverSettings settings ) {
82- if (!settings .settings ().containsKey (EEASettings .RIVER_SETTINGS_KEY )) {
83- throw new IllegalArgumentException (
84- String .format ("There is no \" %s\" key in the river settings." , EEASettings .RIVER_SETTINGS_KEY ));
85- }
86-
87- Map <String , Object > rdfSettings = extractSettings (settings , EEASettings .RIVER_SETTINGS_KEY );
95+ /**
96+ * Builds harvester with the provided settings
97+ */
98+ private void buildHarvester (RiverSettings settings ) {
99+ Map <String , Object > rdfSettings = extractSettings (settings );
88100 harvester .rdfIndexType (XContentMapValues .nodeStringValue (
89101 rdfSettings .get ("indexType" ), "full" ))
90102 .rdfStartTime (XContentMapValues .nodeStringValue (
@@ -99,55 +111,57 @@ private void addHarvesterSettings(RiverSettings settings) {
99111 rdfSettings .get ("queryPath" ), "" ))
100112 .rdfNumberOfBulkActions (XContentMapValues .nodeLongValue (
101113 rdfSettings .get ("bulkActions" ),
102- EEASettings .DEFAULT_NUMBER_OF_BULK_ACTIONS ))
114+ Defaults .DEFAULT_NUMBER_OF_BULK_ACTIONS ))
103115 .rdfUpdateDocuments (XContentMapValues .nodeBooleanValue (
104116 rdfSettings .get ("updateDocuments" ),
105- EEASettings .DEFAULT_UPDATE_DOCUMENTS ))
117+ Defaults .DEFAULT_UPDATE_DOCUMENTS ))
106118 .rdfQueryType (XContentMapValues .nodeStringValue (
107119 rdfSettings .get ("queryType" ),
108- EEASettings .DEFAULT_QUERYTYPE ))
120+ Defaults .DEFAULT_QUERYTYPE ))
109121 .rdfListType (XContentMapValues .nodeStringValue (
110122 rdfSettings .get ("listtype" ),
111- EEASettings .DEFAULT_LIST_TYPE ))
123+ Defaults .DEFAULT_LIST_TYPE ))
112124 .rdfAddLanguage (XContentMapValues .nodeBooleanValue (
113125 rdfSettings .get ("addLanguage" ),
114- EEASettings .DEFAULT_ADD_LANGUAGE ))
126+ Defaults .DEFAULT_ADD_LANGUAGE ))
115127 .rdfLanguage (XContentMapValues .nodeStringValue (
116128 rdfSettings .get ("language" ),
117- EEASettings .DEFAULT_LANGUAGE ))
129+ Defaults .DEFAULT_LANGUAGE ))
118130 .rdfAddUriForResource (XContentMapValues .nodeBooleanValue (
119131 rdfSettings .get ("includeResourceURI" ),
120- EEASettings .DEFAULT_ADD_URI ))
132+ Defaults .DEFAULT_ADD_URI ))
121133 .removeIllegalCharsForSuggestion (XContentMapValues .nodeBooleanValue (
122134 rdfSettings .get ("removeIllegalCharsForSuggestion" ),
123135 true ))
124136 .deleteRiverAfterCreation (XContentMapValues .nodeBooleanValue (
125137 rdfSettings .get ("deleteRiverAfterCreation" ), false ))
138+ .generateSortLabel (XContentMapValues .nodeBooleanValue (
139+ rdfSettings .get ("generateSortLabel" ), false ))
126140 .maxSuggestInputLength (XContentMapValues .nodeIntegerValue (
127141 rdfSettings .get ("maxSuggestInputLength" ),
128- EEASettings .DEFAULT_MAX_SUGGEST_INPUT_LENGTH ))
142+ Defaults .DEFAULT_MAX_SUGGEST_INPUT_LENGTH ))
129143 /*.rdfURIDescription(XContentMapValues.nodeStringValue(
130144 rdfSettings.get("uriDescription"),
131145 EEASettings.DEFAULT_URI_DESCRIPTION))
132146 */
133147 .rdfSyncConditions (XContentMapValues .nodeStringValue (
134148 rdfSettings .get ("syncConditions" ),
135- EEASettings .DEFAULT_SYNC_COND ))
149+ Defaults .DEFAULT_SYNC_COND ))
136150 /*.rdfContextProp(XContentMapValues.nodeStringValue(
137151 rdfSettings.get("context"), ""))
138152 */
139153 .rdfSyncTimeProp (XContentMapValues .nodeStringValue (
140154 rdfSettings .get ("syncTimeProp" ),
141- EEASettings .DEFAULT_SYNC_TIME_PROP ))
155+ Defaults .DEFAULT_SYNC_TIME_PROP ))
142156 .rdfSyncOldData (XContentMapValues .nodeBooleanValue (
143157 rdfSettings .get ("syncOldData" ),
144- EEASettings .DEFAULT_SYNC_OLD_DATA ));
158+ Defaults .DEFAULT_SYNC_OLD_DATA ));
145159
146160 if (rdfSettings .containsKey ("uriDescription" )) {
147161 harvester .rdfURIDescription (getStrListFromSettings (rdfSettings , "uriDescription" ));
148162 } else {
149163 //Convert the default array to List
150- List <String > defaultUriList = Arrays .asList (EEASettings .DEFAULT_URI_DESCRIPTION );
164+ List <String > defaultUriList = Arrays .asList (Defaults .DEFAULT_URI_DESCRIPTION );
151165 harvester .rdfURIDescription (defaultUriList );
152166 }
153167 if (rdfSettings .containsKey ("proplist" )) {
@@ -156,7 +170,7 @@ private void addHarvesterSettings(RiverSettings settings) {
156170 if (rdfSettings .containsKey ("query" )) {
157171 harvester .rdfQuery (getStrListFromSettings (rdfSettings , "query" ));
158172 } else {
159- harvester .rdfQuery (EEASettings .DEFAULT_QUERIES );
173+ harvester .rdfQuery (Defaults .DEFAULT_QUERIES );
160174 }
161175 /*if (rdfSettings.containsKey("normProp")) {
162176 harvester.rdfNormalizationProp(getStrStrMapFromSettings(rdfSettings, "normProp"));
@@ -181,29 +195,28 @@ private void addHarvesterSettings(RiverSettings settings) {
181195 }
182196 if (settings .settings ().containsKey ("index" )) {
183197 Map <String , Object > indexSettings = extractSettings (settings , "index" );
184- harvester .index (XContentMapValues .nodeStringValue (
185- indexSettings .get ("index" ),
186- EEASettings .DEFAULT_INDEX_NAME ))
187- .type (XContentMapValues .nodeStringValue (
188- indexSettings .get ("type" ),
189- EEASettings .DEFAULT_TYPE_NAME ));
198+ harvester .index (XContentMapValues .nodeStringValue (indexSettings .get ("index" ),
199+ Defaults .DEFAULT_INDEX_NAME ))
200+ .type (XContentMapValues .nodeStringValue (indexSettings .get ("type" ),
201+ Defaults .DEFAULT_TYPE_NAME ));
190202 } else {
191- harvester .index (EEASettings .DEFAULT_INDEX_NAME )
192- .type (EEASettings .DEFAULT_TYPE_NAME );
203+ harvester .index (Defaults .DEFAULT_INDEX_NAME ).type (Defaults .DEFAULT_TYPE_NAME );
193204 }
194205 }
195206
196207 @ Override
197208 public void start () {
209+ harvester .log ("Starting river [" + riverName .name () + "]" );
210+ harvester .timeStarted (System .currentTimeMillis ());
198211 harvesterThread = EsExecutors .daemonThreadFactory (
199- settings .globalSettings (), "eea_rdf_river( " + riverName ().name () + ") " )
212+ settings .globalSettings (), "ubbRiver[ " + riverName ().name () + "] " )
200213 .newThread (harvester );
201214 harvesterThread .start ();
202215 }
203216
204217 @ Override
205218 public void close () {
206- harvester .log ("Closing UBB RDF river [" + riverName .name () + "]" );
219+ harvester .log ("Closing river [" + riverName .name () + "]" );
207220 harvester .setClose (true );
208221 if (harvesterThread != null && !harvesterThread .isInterrupted ()) {
209222 harvesterThread .interrupt ();
0 commit comments