Skip to content

Commit 637c1c7

Browse files
author
hongwei.quhw
committed
修复预热读只合并bodybug
1 parent cf29a80 commit 637c1c7

File tree

7 files changed

+1032321
-0
lines changed

7 files changed

+1032321
-0
lines changed

rdf-file-core/src/main/java/com/alipay/rdf/file/common/ProtocolFileWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public Summary getSummary() {
147147
public void append(InputStream in) {
148148
RdfFileUtil.assertNotNull(in, "ProtocolFileWriter.append(inputsream == null) ",
149149
RdfErrorEnum.ILLEGAL_ARGUMENT);
150+
ensureOpen();
150151
writer.append(in);
151152
}
152153

rdf-file-core/src/main/java/com/alipay/rdf/file/common/RawFileWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public Summary getSummary() {
8181

8282
@Override
8383
public void append(InputStream in) {
84+
ensureOpen();
8485
writer.append(in);
8586
}
8687

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.alipay.rdf.file.preheat;
2+
3+
import com.alipay.rdf.file.interfaces.*;
4+
import com.alipay.rdf.file.interfaces.FileStorage.FilePathFilter;
5+
import com.alipay.rdf.file.model.*;
6+
import com.alipay.rdf.file.storage.OssConfig;
7+
import com.alipay.rdf.file.util.OssTestUtil;
8+
import com.alipay.rdf.file.util.RdfFileUtil;
9+
import com.alipay.rdf.file.util.TemporaryFolderUtil;
10+
import com.alipay.rdf.file.util.TestLog;
11+
import junit.framework.Assert;
12+
import org.junit.After;
13+
import org.junit.Before;
14+
import org.junit.Test;
15+
16+
import java.io.File;
17+
import java.math.BigDecimal;
18+
import java.util.Collections;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
/**
24+
* 只合并body文件, 验证没有头,预热读合并的一个bug
25+
*
26+
* @author hongwei.quhw
27+
* @version $Id: PreheatReadTest.java, v 0.1 2020年7月10日 下午7:06:40 hongwei.quhw Exp $
28+
*/
29+
public class PreheatMergeBodyDeFileTest {
30+
private TemporaryFolderUtil temporaryFolder = new TemporaryFolderUtil();
31+
private static final StorageConfig storageConfig = OssTestUtil.geStorageConfig();
32+
private static String ossPathPrefix = "rdf/rdf-file/open/PreheatMergeBodyDeFileTest";
33+
private static FileStorage fileStorage = FileFactory.createStorage(storageConfig);
34+
private OssConfig ossConfig;
35+
36+
@Before
37+
public void setUp() throws Exception {
38+
FileDefaultConfig defaultConfig = new FileDefaultConfig();
39+
TestLog log = new TestLog() {
40+
public boolean isDebug() {
41+
return false;
42+
}
43+
};
44+
defaultConfig.setCommonLog(log);
45+
temporaryFolder.create();
46+
ossConfig = (OssConfig) storageConfig.getParam(OssConfig.OSS_STORAGE_CONFIG_KEY);
47+
ossConfig.setOssTempRoot(temporaryFolder.getRoot().getAbsolutePath());
48+
System.out.println(temporaryFolder.getRoot().getAbsolutePath());
49+
}
50+
51+
52+
53+
@Test
54+
public void mergeSlice() throws Exception {
55+
String ossPath = RdfFileUtil.combinePath(ossPathPrefix, "mergeSlice");
56+
57+
fileStorage.upload(File.class.getResource("/preheat/de/body/").getPath(), ossPath, false);
58+
59+
String targetFilePath = RdfFileUtil.combinePath(ossPath, "mergebody.txt");
60+
// 目标文件删除
61+
try {
62+
fileStorage.delete(targetFilePath);
63+
} catch (Exception e) {
64+
}
65+
66+
67+
List<String> bodyPath = fileStorage.listAllFiles(ossPath, new FilePathFilter() {
68+
@Override
69+
public boolean accept(String file) {
70+
return file.indexOf("body") > 0;
71+
}
72+
});
73+
Collections.sort(bodyPath);
74+
System.out.println(bodyPath);
75+
76+
FileConfig preheatConfig = new FileConfig(targetFilePath,
77+
"/preheat/template_body.json", storageConfig);
78+
preheatConfig.setFileEncoding("GBK");
79+
preheatConfig.setSummaryEnable(true);
80+
preheatConfig.setType(FileOssToolContants.OSS_PREHEAT_PROTOCOL_READER);
81+
82+
FileMerger fileMerger = FileFactory.createMerger(preheatConfig);
83+
MergerConfig mergerConfig = new MergerConfig();
84+
mergerConfig.setBodyFilePaths(bodyPath);
85+
fileMerger.merge(mergerConfig);
86+
87+
Assert.assertTrue(fileStorage.getFileInfo(targetFilePath).isExists());
88+
89+
90+
91+
FileConfig mergedConfig = new FileConfig(targetFilePath, "/preheat/template_body.json", storageConfig);
92+
mergedConfig.setFileEncoding("GBK");
93+
FileReader mergedReader = FileFactory.createReader(mergedConfig);
94+
95+
for (String path : bodyPath) {
96+
System.out.println(path);
97+
FileConfig config = new FileConfig(path, "/preheat/template_body.json", storageConfig);
98+
config.setFileEncoding("GBK");
99+
config.setFileDataType(FileDataTypeEnum.BODY);
100+
FileReader reader = FileFactory.createReader(config);
101+
Map<String, Object> row = null;
102+
while (null != (row = reader.readRow(HashMap.class))) {
103+
Assert.assertEquals(row, mergedReader.readRow(HashMap.class));
104+
}
105+
106+
reader.close();
107+
}
108+
109+
Assert.assertNull(mergedReader.readRow(HashMap.class));
110+
111+
mergedReader.close();
112+
113+
fileStorage.delete(targetFilePath);
114+
}
115+
116+
@After
117+
public void after() {
118+
temporaryFolder.delete();
119+
}
120+
}

0 commit comments

Comments
 (0)