Skip to content

Commit 2f12d00

Browse files
authored
Merge pull request #73 from alipay/20210527-readall
20210527 readall
2 parents fa47c76 + 904a357 commit 2f12d00

File tree

100 files changed

+5024
-476
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+5024
-476
lines changed

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
<url>https://github.com/alipay/rdf-file</url>
1212

1313
<properties>
14-
<rdf.file.core.version>2.2.8.1</rdf.file.core.version>
15-
<rdf.file.oss.version>2.2.8.1</rdf.file.oss.version>
16-
<rdf.file.sftp.version>2.2.8.1</rdf.file.sftp.version>
14+
<rdf.file.core.version>2.2.9</rdf.file.core.version>
15+
<rdf.file.oss.version>2.2.9</rdf.file.oss.version>
16+
<rdf.file.sftp.version>2.2.9</rdf.file.sftp.version>
1717
</properties>
1818

1919
<modules>
@@ -111,4 +111,4 @@
111111
<developerConnection>scm:git:ssh://github.com:alipay/rdf-file.git</developerConnection>
112112
<url>https://github.com/alipay/rdf-file/tree/master</url>
113113
</scm>
114-
</project>
114+
</project>

rdf-file-core/src/main/java/com/alipay/rdf/file/codec/AbstractColumnInfoCodec.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@
1111
import com.alipay.rdf.file.meta.FileMeta;
1212
import com.alipay.rdf.file.model.FileConfig;
1313
import com.alipay.rdf.file.model.FileDataTypeEnum;
14+
import com.alipay.rdf.file.util.RdfFileUtil;
1415

1516
import java.util.List;
1617

1718
/**
1819
* Copyright (C) 2013-2018 Ant Financial Services Group
1920
*
21+
* 主要用于对字段信息用于头尾输出,针对nosql特性文件不能使用
22+
*
2023
* @author quhongwei
2124
* @version : AbstractColumnInfoCodec.java, v 0.1 2019年11月29日 12:55 quhongwei Exp $
2225
*/
@@ -51,5 +54,4 @@ protected static List<FileColumnMeta> getColumnMetas(FileConfig config, FileData
5154
RdfErrorEnum.UNSUPPORTED_OPERATION);
5255
}
5356
}
54-
55-
}
57+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.alipay.rdf.file.codec;
2+
3+
import com.alipay.rdf.file.spi.RdfFileRowCodecSpi;
4+
5+
/**
6+
* @Author: hongwei.quhw 2021/7/30 3:19 下午
7+
*/
8+
public abstract class AbstractRowCodec implements RdfFileRowCodecSpi {
9+
@Override
10+
public String postSerialize(String line, RowCodecContext ctx) {
11+
return line;
12+
}
13+
14+
@Override
15+
public String preDeserialize(String line, RowCodecContext ctx) {
16+
return line;
17+
}
18+
}

rdf-file-core/src/main/java/com/alipay/rdf/file/codec/BodyCodec.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
package com.alipay.rdf.file.codec;
22

3-
import java.util.List;
4-
import java.util.Map;
5-
6-
import com.alipay.rdf.file.common.ProtocolFileReader;
7-
import com.alipay.rdf.file.common.ProtocolFileWriter;
83
import com.alipay.rdf.file.exception.RdfErrorEnum;
94
import com.alipay.rdf.file.exception.RdfFileException;
105
import com.alipay.rdf.file.interfaces.FileReader;
@@ -20,9 +15,12 @@
2015
import com.alipay.rdf.file.protocol.RowDefinition;
2116
import com.alipay.rdf.file.spi.RdfFileProcessorSpi;
2217

18+
import java.util.List;
19+
import java.util.Map;
20+
2321
/**
2422
* Copyright (C) 2013-2018 Ant Financial Services Group
25-
*
23+
*
2624
* @author hongwei.quhw
2725
* @version $Id: BodyCodec.java, v 0.1 2017年4月10日 下午7:27:45 hongwei.quhw Exp $
2826
*/

rdf-file-core/src/main/java/com/alipay/rdf/file/codec/ColumnInfoHorizontalCodec.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/**
1919
* Copyright (C) 2013-2018 Ant Financial Services Group
20-
*
20+
* <p>
2121
* 字段信息水平编码解码
2222
*
2323
* @author hongwei.quhw
@@ -53,6 +53,11 @@ public static <T> T deserialize(FileDataTypeEnum layoutType, FileDataTypeEnum da
5353
String line = reader.readLine();
5454
RdfFileUtil.assertNotBlank(line, "文件=" + config.getFilePath() + ", " + layoutType.name() + " 内容缺失");
5555

56+
// 非关系的编码模式不校验
57+
if (!RdfFileUtil.isRelationCodecMode(config)) {
58+
return null;
59+
}
60+
5661
FileMeta fileMeta = TemplateLoader.load(config);
5762
String[] columns = ProtocolLoader.loadProtocol(fileMeta.getProtocol()).getRowSplit().split(
5863
new SplitContext(line, config, FileDataTypeEnum.BODY));
@@ -62,18 +67,21 @@ public static <T> T deserialize(FileDataTypeEnum layoutType, FileDataTypeEnum da
6267
splitLength = fileMeta.isEndWithSplit(layoutType) ? splitLength + 1 : splitLength;
6368

6469
if (splitLength != columns.length) {
65-
throw new RdfFileException("文件=" + config.getFilePath() + ", " + layoutType.name() + " line=" + line,
66-
RdfErrorEnum.DESERIALIZE_ERROR);
70+
if (RdfFileUtil.isRelationReadRowCompatibility(config)) {
71+
splitLength = Math.min(splitLength, columns.length);
72+
} else {
73+
throw new RdfFileException("rdf-file#ColumnInfoHorizontalCodec.deserialize 文件=" + config.getFilePath() + ", " + layoutType.name() + " line=" + line, RdfErrorEnum.DESERIALIZE_ERROR);
74+
}
6775
}
6876

6977
int statIndex = fileMeta.isStartWithSplit(layoutType) ? 1 : 0;
70-
int endIndex = fileMeta.isEndWithSplit(layoutType) ? columns.length - 1 : columns.length;
78+
int endIndex = fileMeta.isEndWithSplit(layoutType) ? splitLength - 1 : splitLength;
7179

7280
for (int i = statIndex; i < endIndex; i++) {
7381
FileColumnMeta colMeta = colMetas.get(i - statIndex);
7482
if (!getValue(colMeta, method).equals(columns[i])) {
7583
throw new RdfFileException(
76-
"文件" + layoutType.name() + "字段校验:文件模板定义的第" + i + "个column为[" + colMetas.get(i).getDesc() + "], 实际文件中为["
84+
"rdf-file#ColumnInfoHorizontalCodec.deserialize 文件" + layoutType.name() + "字段校验:文件模板定义的第" + i + "个column为[" + colMetas.get(i).getDesc() + "], 实际文件中为["
7785
+ columns[i] + "]", RdfErrorEnum.DESERIALIZE_ERROR);
7886
}
7987
}

rdf-file-core/src/main/java/com/alipay/rdf/file/codec/ColumnInfoVerticalCodec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*
1515
* 字段信息纵向编码解码
1616
*
17+
* 不涉及字段分隔符
18+
*
1719
* @author hongwei.quhw
1820
* @version $Id: ColumnInfoVerticalCodec.java, v 0.1 2017-1-3 下午5:50:00 hongwei.quhw Exp $
1921
*/
@@ -31,13 +33,13 @@ public static void serialize(FileDataTypeEnum layoutType, FileDataTypeEnum dataT
3133
public static <T> T deserialize(FileDataTypeEnum layoutType, FileDataTypeEnum dataType, FileConfig fileConfig,
3234
FileReader reader, String method) {
3335
for (FileColumnMeta colMeta : getColumnMetas(fileConfig, dataType)) {
34-
String columName = RdfFileUtil.assertTrimNotBlank(reader.readLine());
35-
RdfFileUtil.assertNotBlank(columName, "文件=" + fileConfig.getFilePath() + ", " + layoutType.name() + " 内容缺失");
36+
String columnName = RdfFileUtil.assertTrimNotBlank(reader.readLine());
37+
RdfFileUtil.assertNotBlank(columnName, "文件=" + fileConfig.getFilePath() + ", " + layoutType.name() + " 内容缺失");
3638

3739
String tempalteValue = getValue(colMeta, method);
38-
if (!tempalteValue.equals(columName)) {
40+
if (!tempalteValue.equals(columnName)) {
3941
throw new RdfFileException(
40-
"rdf-file#" + layoutType.name() + "模板中定义的column为" + tempalteValue + ", 文件中读取的column为" + columName + " 不一致",
42+
"rdf-file#" + layoutType.name() + "模板中定义的column为[" + tempalteValue + "], 文件中读取的column为[" + columnName + "]不一致",
4143
RdfErrorEnum.VALIDATE_ERROR);
4244

4345
}

rdf-file-core/src/main/java/com/alipay/rdf/file/codec/RowColumnHorizontalCodec.java

Lines changed: 43 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package com.alipay.rdf.file.codec;
22

3-
import java.util.List;
4-
import java.util.Map;
5-
63
import com.alipay.rdf.file.condition.RowConditionExecutor;
7-
import com.alipay.rdf.file.exception.RdfErrorEnum;
8-
import com.alipay.rdf.file.exception.RdfFileException;
4+
import com.alipay.rdf.file.loader.ExtensionLoader;
95
import com.alipay.rdf.file.loader.ProtocolLoader;
106
import com.alipay.rdf.file.loader.TemplateLoader;
117
import com.alipay.rdf.file.meta.FileColumnMeta;
@@ -16,21 +12,24 @@
1612
import com.alipay.rdf.file.processor.ProcessExecutor.BizData;
1713
import com.alipay.rdf.file.processor.ProcessorTypeEnum;
1814
import com.alipay.rdf.file.protocol.RowDefinition;
19-
import com.alipay.rdf.file.spi.RdfFileFunctionSpi.CodecType;
20-
import com.alipay.rdf.file.spi.RdfFileFunctionSpi.FuncContext;
2115
import com.alipay.rdf.file.spi.RdfFileProcessorSpi;
16+
import com.alipay.rdf.file.spi.RdfFileRowCodecSpi;
17+
import com.alipay.rdf.file.spi.RdfFileRowCodecSpi.RowCodecContext;
2218
import com.alipay.rdf.file.spi.RdfFileRowSplitSpi.SplitContext;
2319
import com.alipay.rdf.file.util.BeanMapWrapper;
2420
import com.alipay.rdf.file.util.RdfFileConstants;
2521
import com.alipay.rdf.file.util.RdfFileUtil;
2622

23+
import java.util.List;
24+
import java.util.Map;
25+
2726
/**
2827
* Copyright (C) 2013-2018 Ant Financial Services Group
29-
*
28+
*
3029
* 针对字段水平codec
31-
*
30+
*
3231
* 抽离的目的在于方便,对单行数据进行编码解码, 特别是在测试或者排查问题的时候
33-
*
32+
*
3433
* @author hongwei.quhw
3534
* @version $Id: RowColumnHorizontalCodec.java, v 0.1 2017年8月1日 下午8:49:41 hongwei.quhw Exp $
3635
*/
@@ -41,113 +40,54 @@ public static String serialize(BeanMapWrapper bmw, FileConfig fileConfig, RowDef
4140
Map<ProcessorTypeEnum, List<RdfFileProcessorSpi>> processors,
4241
FileDataTypeEnum rowType) {
4342
ProcessExecutor.execute(ProcessorTypeEnum.BEFORE_SERIALIZE_ROW, processors, fileConfig,
44-
new BizData(RdfFileConstants.DATA, bmw),
45-
new BizData(RdfFileConstants.ROW_TYPE, rowType));
46-
47-
FileMeta fileMeta = TemplateLoader.load(fileConfig);
48-
List<FileColumnMeta> columnMetas = RowConditionExecutor.serializeRow(fileConfig, bmw,
49-
rowType);
50-
StringBuffer line = new StringBuffer();
51-
String split = RdfFileUtil.getRowSplit(fileConfig);
52-
53-
if (null != split && fileMeta.isStartWithSplit(rowType)) {
54-
line.append(split);
55-
}
56-
57-
for (int i = 0; i < columnMetas.size(); i++) {
58-
FileColumnMeta columnMeta = columnMetas.get(i);
59-
FuncContext ctx = new FuncContext();
60-
try {
61-
ctx.codecType = CodecType.SERIALIZE;
62-
ctx.field = bmw.getProperty(columnMeta.getName());
63-
ctx.columnMeta = columnMeta;
64-
ctx.fileConfig = fileConfig;
65-
line.append(rd.getOutput().execute(ctx));
66-
67-
if (null != split
68-
&& (i < columnMetas.size() - 1 || fileMeta.isEndWithSplit(rowType))) {
69-
line.append(split);
70-
}
71-
} catch (RdfFileException e) {
72-
throw new RdfFileException(
73-
"rdf-file#RowColumnHorizontalCodec.serialize serialize row=" + bmw.getBean()
74-
+ ", fileConfig=" + fileConfig + ", 将数据序列到文件出错. 错误列信息: columnMeta=" + columnMeta + ", field=" + ctx.field + ", errorMsg="
75-
+ e.getMessage(),
76-
e, e.getErrorEnum());
77-
} catch (Throwable e) {
78-
throw new RdfFileException(
79-
"rdf-file#RowColumnHorizontalCodec.serialize row=" + bmw.getBean()
80-
+ ", fileConfig=" + fileConfig + ", 将数据序列到文件出错. 错误列信息: columnMeta=" + columnMeta + ", field=" + ctx.field,
81-
e, RdfErrorEnum.SERIALIZE_ERROR);
82-
}
83-
}
84-
85-
String data = line.toString();
43+
new BizData(RdfFileConstants.DATA, bmw),
44+
new BizData(RdfFileConstants.ROW_TYPE, rowType));
45+
// 条件模板处理
46+
List<FileColumnMeta> columnMetas = RowConditionExecutor.serializeRow(fileConfig, bmw, rowType);
47+
48+
RowCodecContext ctx = new RowCodecContext(bmw, fileConfig, columnMetas, rd);
49+
RdfFileRowCodecSpi rowCodec = ExtensionLoader.getExtensionLoader(RdfFileRowCodecSpi.class).getExtension(RdfFileUtil.getRowCodecMode(fileConfig));
50+
// 字段编码
51+
String line = rowCodec.serialize(ctx);
52+
// 行编码后置处理
53+
line = rowCodec.postSerialize(line, ctx);
54+
// 行整体格式化处理
55+
line = RowFormatCodec.serialize(fileConfig, line, rowType);
8656

8757
ProcessExecutor.execute(ProcessorTypeEnum.AFTER_SERIALIZE_ROW, processors, fileConfig,
88-
new BizData(RdfFileConstants.DATA, data),
89-
new BizData(RdfFileConstants.ROW_TYPE, rowType));
58+
new BizData(RdfFileConstants.DATA, line),
59+
new BizData(RdfFileConstants.ROW_TYPE, rowType));
9060

91-
return data;
61+
return line;
9262
}
9363

9464
public static <T> T deserialize(BeanMapWrapper bmw, FileConfig fileConfig, String line,
9565
RowDefinition rd,
9666
Map<ProcessorTypeEnum, List<RdfFileProcessorSpi>> processors,
9767
FileDataTypeEnum rowType) {
9868
ProcessExecutor.execute(ProcessorTypeEnum.BEFORE_DESERIALIZE_ROW, processors, fileConfig,
99-
new BizData(RdfFileConstants.DATA, line),
100-
new BizData(RdfFileConstants.ROW_TYPE, rowType));
69+
new BizData(RdfFileConstants.DATA, line),
70+
new BizData(RdfFileConstants.ROW_TYPE, rowType));
71+
// 行格式化处理
72+
line = RowFormatCodec.deserialize(fileConfig, line, rowType);
10173

10274
FileMeta fileMeta = TemplateLoader.load(fileConfig);
103-
104-
boolean startWithSplit = fileMeta.isStartWithSplit(rowType);
105-
boolean endWithSplit = fileMeta.isEndWithSplit(rowType);
106-
107-
String[] column = ProtocolLoader.loadProtocol(fileMeta.getProtocol()).getRowSplit()
108-
.split(new SplitContext(line, fileConfig, rowType));
109-
110-
List<FileColumnMeta> columnMetas = RowConditionExecutor.deserializeRow(fileConfig, column,
111-
rowType, line);
112-
113-
int splitLength = startWithSplit ? columnMetas.size() + 1 : columnMetas.size();
114-
splitLength = endWithSplit ? splitLength + 1 : splitLength;
115-
116-
if (column.length != splitLength) {
117-
throw new RdfFileException("rdf-file#RowColumnHorizontalCodec.deserialize fileConfig="
118-
+ fileConfig + ", line=[" + line + "],模板定义列数=" + splitLength
119-
+ ", 实际列数=" + column.length,
120-
RdfErrorEnum.DESERIALIZE_ERROR);
121-
}
122-
123-
int statIndex = fileMeta.isStartWithSplit(rowType) ? 1 : 0;
124-
int endIndex = fileMeta.isEndWithSplit(rowType) ? column.length - 1 : column.length;
125-
126-
for (int i = statIndex; i < endIndex; i++) {
127-
FileColumnMeta columnMeta = columnMetas.get(i - statIndex);
128-
FuncContext ctx = new FuncContext();
129-
try {
130-
ctx.codecType = CodecType.DESERIALIZE;
131-
ctx.field = column[i];
132-
ctx.columnMeta = columnMeta;
133-
ctx.fileConfig = fileConfig;
134-
bmw.setProperty(columnMeta.getName(), rd.getOutput().execute(ctx));
135-
} catch (RdfFileException e) {
136-
throw new RdfFileException(
137-
"rdf-file#RowColumnHorizontalCodec.deserialize line=" + line + ", fileConfig="
138-
+ fileConfig + ", 将数据反序列到对象出错. 错误列信息:field=" + ctx.field + ", columnMeta=" + columnMeta + ", errorMsg="
139-
+ e.getMessage(),
140-
e, e.getErrorEnum());
141-
} catch (Throwable e) {
142-
throw new RdfFileException("rdf-file#RowColumnHorizontalCodec.deserialize line="
143-
+ line + ", fileConfig=" + fileConfig + ", 将数据反序列到对象出错. 错误列信息:field=" + ctx.field + ", columnMeta=" + columnMeta,
144-
e, RdfErrorEnum.DESERIALIZE_ERROR);
145-
}
146-
}
75+
RowCodecContext ctx = new RowCodecContext(bmw, fileConfig, null, rd, null);
76+
RdfFileRowCodecSpi rowCodec = ExtensionLoader.getExtensionLoader(RdfFileRowCodecSpi.class).getExtension(RdfFileUtil.getRowCodecMode(fileConfig));
77+
// 行前置处理
78+
line =rowCodec.preDeserialize(line, ctx);
79+
80+
String[] columnValues = ProtocolLoader.loadProtocol(fileMeta.getProtocol()).getRowSplit().split(new SplitContext(line, fileConfig, rowType));
81+
// 条件模板处理
82+
List<FileColumnMeta> columnMetas = RowConditionExecutor.deserializeRow(fileConfig, columnValues, rowType, line);
83+
ctx.columnMetas = columnMetas;
84+
ctx.columnValues = columnValues;
85+
// 行字段解析
86+
rowCodec.deserialize(line, ctx);
14787

14888
ProcessExecutor.execute(ProcessorTypeEnum.AFTER_DESERIALIZE_ROW, processors, fileConfig,
149-
new BizData(RdfFileConstants.DATA, bmw),
150-
new BizData(RdfFileConstants.ROW_TYPE, rowType));
89+
new BizData(RdfFileConstants.DATA, bmw),
90+
new BizData(RdfFileConstants.ROW_TYPE, rowType));
15191

15292
return (T) bmw.getBean();
15393

0 commit comments

Comments
 (0)