Skip to content

Commit 750daa9

Browse files
committed
WIP - copy-pasta v9.x tests as a starting point
1 parent 7457694 commit 750daa9

File tree

4 files changed

+595
-1
lines changed

4 files changed

+595
-1
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
plugins { id 'org.apache.beam.module' }
20+
applyJavaNature(
21+
publish: false,
22+
archivesBaseName: 'beam-sdks-java-io-elasticsearch-tests-9'
23+
)
24+
provideIntegrationTestingDependencies()
25+
enableJavaPerformanceTesting()
26+
27+
description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 9.x"
28+
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 9.x"
29+
30+
def elastic_search_version = "9.0.0"
31+
32+
test {
33+
maxParallelForks = 1
34+
}
35+
36+
configurations.testImplementation {
37+
resolutionStrategy {
38+
force "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
39+
}
40+
}
41+
42+
dependencies {
43+
testImplementation project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common")
44+
testImplementation library.java.testcontainers_elasticsearch
45+
46+
testImplementation project(path: ":sdks:java:core", configuration: "shadow")
47+
testImplementation project(":sdks:java:io:elasticsearch")
48+
testImplementation library.java.slf4j_api
49+
testImplementation library.java.hamcrest
50+
testImplementation library.java.junit
51+
testImplementation "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
52+
testRuntimeOnly library.java.log4j2_api
53+
testRuntimeOnly library.java.log4j2_core
54+
testRuntimeOnly library.java.slf4j_jdk14
55+
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.elasticsearch;
19+
20+
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
21+
22+
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
23+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
24+
import org.apache.beam.sdk.testing.TestPipeline;
25+
import org.elasticsearch.client.RestClient;
26+
import org.junit.AfterClass;
27+
import org.junit.BeforeClass;
28+
import org.junit.Rule;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.junit.runners.JUnit4;
32+
33+
/**
34+
* A test of {@link ElasticsearchIO} on an independent Elasticsearch v9.x instance.
35+
*
36+
* <p>This test requires a running instance of Elasticsearch, and the test dataset must exist in the
37+
* database. See {@link ElasticsearchIOITCommon} for instructions to achieve this.
38+
*
39+
* <p>You can run this test by doing the following from the beam parent module directory with the
40+
* correct server IP:
41+
*
42+
* <pre>
43+
* ./gradlew integrationTest -p sdks/java/io/elasticsearch-tests/elasticsearch-tests-9
44+
* -DintegrationTestPipelineOptions='[
45+
* "--elasticsearchServer=1.2.3.4",
46+
* "--elasticsearchHttpPort=9200"]'
47+
* --tests org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOIT
48+
* -DintegrationTestRunner=direct
49+
* </pre>
50+
*
51+
* <p>It is likely that you will need to configure <code>thread_pool.write.queue_size: 250</code>
52+
* (or higher) in the backend Elasticsearch server for this test to run.
53+
*/
54+
@RunWith(JUnit4.class)
55+
public class ElasticsearchIOIT {
56+
private static RestClient restClient;
57+
private static ElasticsearchPipelineOptions options;
58+
private static ConnectionConfiguration readConnectionConfiguration;
59+
private static ConnectionConfiguration writeConnectionConfiguration;
60+
private static ConnectionConfiguration updateConnectionConfiguration;
61+
private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
62+
63+
@Rule public TestPipeline pipeline = TestPipeline.create();
64+
65+
@BeforeClass
66+
public static void beforeClass() throws Exception {
67+
PipelineOptionsFactory.register(ElasticsearchPipelineOptions.class);
68+
options = TestPipeline.testingPipelineOptions().as(ElasticsearchPipelineOptions.class);
69+
readConnectionConfiguration =
70+
ElasticsearchIOITCommon.getConnectionConfiguration(
71+
options, ElasticsearchIOITCommon.IndexMode.READ);
72+
writeConnectionConfiguration =
73+
ElasticsearchIOITCommon.getConnectionConfiguration(
74+
options, ElasticsearchIOITCommon.IndexMode.WRITE);
75+
updateConnectionConfiguration =
76+
ElasticsearchIOITCommon.getConnectionConfiguration(
77+
options, ElasticsearchIOITCommon.IndexMode.WRITE_PARTIAL);
78+
restClient = readConnectionConfiguration.createClient();
79+
elasticsearchIOTestCommon =
80+
new ElasticsearchIOTestCommon(readConnectionConfiguration, restClient, true);
81+
}
82+
83+
@AfterClass
84+
public static void afterClass() throws Exception {
85+
ElasticsearchIOTestUtils.deleteIndex(writeConnectionConfiguration, restClient);
86+
ElasticsearchIOTestUtils.deleteIndex(updateConnectionConfiguration, restClient);
87+
restClient.close();
88+
}
89+
90+
@Test
91+
public void testSplitsVolume() throws Exception {
92+
elasticsearchIOTestCommon.testSplit(10_000);
93+
}
94+
95+
@Test
96+
public void testReadVolume() throws Exception {
97+
elasticsearchIOTestCommon.setPipeline(pipeline);
98+
elasticsearchIOTestCommon.testRead();
99+
}
100+
101+
@Test
102+
public void testReadPITVolume() throws Exception {
103+
elasticsearchIOTestCommon.setPipeline(pipeline);
104+
elasticsearchIOTestCommon.testReadPIT();
105+
}
106+
107+
@Test
108+
public void testWriteVolume() throws Exception {
109+
// cannot share elasticsearchIOTestCommon because tests run in parallel.
110+
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
111+
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
112+
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
113+
elasticsearchIOTestCommonWrite.testWrite();
114+
}
115+
116+
@Test
117+
public void testWriteVolumeStateful() throws Exception {
118+
// cannot share elasticsearchIOTestCommon because tests run in parallel.
119+
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
120+
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
121+
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
122+
elasticsearchIOTestCommonWrite.testWriteStateful();
123+
}
124+
125+
@Test
126+
public void testSizesVolume() throws Exception {
127+
elasticsearchIOTestCommon.testSizes();
128+
}
129+
130+
/**
131+
* This test verifies volume loading of Elasticsearch using explicit document IDs and routed to an
132+
* index named the same as the scientist, and type which is based on the modulo 2 of the scientist
133+
* name. The goal of this IT is to help observe and verify that the overhead of adding the
134+
* functions to parse the document and extract the ID is acceptable.
135+
*/
136+
@Test
137+
public void testWriteWithFullAddressingVolume() throws Exception {
138+
// cannot share elasticsearchIOTestCommon because tests run in parallel.
139+
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
140+
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
141+
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
142+
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
143+
}
144+
145+
@Test
146+
public void testWriteWithAllowableErrors() throws Exception {
147+
elasticsearchIOTestCommon.testWriteWithAllowedErrors();
148+
}
149+
150+
@Test
151+
public void testWriteWithRouting() throws Exception {
152+
elasticsearchIOTestCommon.setPipeline(pipeline);
153+
elasticsearchIOTestCommon.testWriteWithRouting();
154+
}
155+
156+
@Test
157+
public void testWriteScriptedUpsert() throws Exception {
158+
elasticsearchIOTestCommon.setPipeline(pipeline);
159+
elasticsearchIOTestCommon.testWriteScriptedUpsert();
160+
}
161+
162+
@Test
163+
public void testWriteWithDocVersion() throws Exception {
164+
elasticsearchIOTestCommon.setPipeline(pipeline);
165+
elasticsearchIOTestCommon.testWriteWithDocVersion();
166+
}
167+
168+
/**
169+
* This test verifies volume partial updates of Elasticsearch. The test dataset index is cloned
170+
* and then a new field is added to each document using a partial update. The test then asserts
171+
* the updates were applied.
172+
*/
173+
@Test
174+
public void testWritePartialUpdate() throws Exception {
175+
ElasticsearchIOTestUtils.copyIndex(
176+
restClient,
177+
readConnectionConfiguration.getIndex(),
178+
updateConnectionConfiguration.getIndex());
179+
// cannot share elasticsearchIOTestCommon because tests run in parallel.
180+
ElasticsearchIOTestCommon elasticsearchIOTestCommonUpdate =
181+
new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
182+
elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
183+
elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
184+
}
185+
186+
/**
187+
* This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
188+
* around half of the documents are deleted and the other half is partially updated using bulk
189+
* delete request. The test then asserts the documents were deleted successfully.
190+
*/
191+
@Test
192+
public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
193+
ElasticsearchIOTestUtils.copyIndex(
194+
restClient,
195+
readConnectionConfiguration.getIndex(),
196+
updateConnectionConfiguration.getIndex());
197+
ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
198+
new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
199+
elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
200+
elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
201+
}
202+
203+
/**
204+
* This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
205+
* around half of the documents are deleted using bulk delete request. The test then asserts the
206+
* documents were deleted successfully.
207+
*/
208+
@Test
209+
public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
210+
ElasticsearchIOTestUtils.copyIndex(
211+
restClient,
212+
readConnectionConfiguration.getIndex(),
213+
updateConnectionConfiguration.getIndex());
214+
ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
215+
new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
216+
elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
217+
elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
218+
}
219+
}

0 commit comments

Comments
 (0)