Skip to content

Commit 8b59672

Browse files
branch-3.1: [feat](file-foramt) unify file format properties for tvf and outfile apache#50225 apache#50463 apache#50471 (apache#52101)
bp apache#50225 apache#50463 apache#50471 --------- Co-authored-by: Tiewei Fang <[email protected]>
1 parent d6736b9 commit 8b59672

19 files changed

+1642
-351
lines changed

fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java

Lines changed: 86 additions & 206 deletions
Large diffs are not rendered by default.

fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,11 @@ public static TFileCompressType getFileCompressType(String compressType) {
614614
return TFileCompressType.UNKNOWN;
615615
}
616616
final String upperCaseType = compressType.toUpperCase();
617-
return TFileCompressType.valueOf(upperCaseType);
617+
try {
618+
return TFileCompressType.valueOf(upperCaseType);
619+
} catch (IllegalArgumentException e) {
620+
return TFileCompressType.UNKNOWN;
621+
}
618622
}
619623

620624
/**
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.fileformat;
19+
20+
import org.apache.doris.nereids.exceptions.AnalysisException;
21+
import org.apache.doris.thrift.TFileAttributes;
22+
import org.apache.doris.thrift.TFileFormatType;
23+
import org.apache.doris.thrift.TFileTextScanRangeParams;
24+
import org.apache.doris.thrift.TResultFileSinkOptions;
25+
26+
import java.util.Map;
27+
28+
public class AvroFileFormatProperties extends FileFormatProperties {
29+
public AvroFileFormatProperties() {
30+
super(TFileFormatType.FORMAT_AVRO);
31+
}
32+
33+
@Override
34+
public void analyzeFileFormatProperties(Map<String, String> formatProperties, boolean isRemoveOriginProperty)
35+
throws AnalysisException {
36+
}
37+
38+
@Override
39+
public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) {
40+
}
41+
42+
@Override
43+
public TFileAttributes toTFileAttributes() {
44+
TFileAttributes fileAttributes = new TFileAttributes();
45+
TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams();
46+
fileAttributes.setTextParams(fileTextScanRangeParams);
47+
return fileAttributes;
48+
}
49+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.fileformat;
19+
20+
import org.apache.doris.analysis.Separator;
21+
import org.apache.doris.catalog.Column;
22+
import org.apache.doris.common.util.Util;
23+
import org.apache.doris.nereids.exceptions.AnalysisException;
24+
import org.apache.doris.qe.ConnectContext;
25+
import org.apache.doris.thrift.TFileAttributes;
26+
import org.apache.doris.thrift.TFileFormatType;
27+
import org.apache.doris.thrift.TFileTextScanRangeParams;
28+
import org.apache.doris.thrift.TResultFileSinkOptions;
29+
import org.apache.doris.thrift.TTextSerdeType;
30+
31+
import com.google.common.base.Strings;
32+
import com.google.common.collect.Lists;
33+
import org.apache.logging.log4j.LogManager;
34+
import org.apache.logging.log4j.Logger;
35+
36+
import java.util.List;
37+
import java.util.Map;
38+
39+
public class CsvFileFormatProperties extends FileFormatProperties {
40+
public static final Logger LOG = LogManager.getLogger(
41+
org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class);
42+
43+
public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
44+
public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
45+
public static final String DEFAULT_LINE_DELIMITER = "\n";
46+
47+
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
48+
public static final String PROP_LINE_DELIMITER = "line_delimiter";
49+
50+
public static final String PROP_SKIP_LINES = "skip_lines";
51+
public static final String PROP_CSV_SCHEMA = "csv_schema";
52+
public static final String PROP_COMPRESS_TYPE = "compress_type";
53+
public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
54+
55+
public static final String PROP_ENCLOSE = "enclose";
56+
57+
private String headerType = "";
58+
private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
59+
private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
60+
private String lineDelimiter = DEFAULT_LINE_DELIMITER;
61+
private boolean trimDoubleQuotes;
62+
private int skipLines;
63+
private byte enclose;
64+
65+
// used by tvf
66+
// User specified csv columns, it will override columns got from file
67+
private final List<Column> csvSchema = Lists.newArrayList();
68+
69+
String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
70+
71+
public CsvFileFormatProperties() {
72+
super(TFileFormatType.FORMAT_CSV_PLAIN);
73+
}
74+
75+
public CsvFileFormatProperties(String defaultColumnSeparator, TTextSerdeType textSerdeType) {
76+
super(TFileFormatType.FORMAT_CSV_PLAIN);
77+
this.defaultColumnSeparator = defaultColumnSeparator;
78+
this.textSerdeType = textSerdeType;
79+
}
80+
81+
public CsvFileFormatProperties(String headerType) {
82+
super(TFileFormatType.FORMAT_CSV_PLAIN);
83+
this.headerType = headerType;
84+
}
85+
86+
87+
@Override
88+
public void analyzeFileFormatProperties(Map<String, String> formatProperties, boolean isRemoveOriginProperty)
89+
throws AnalysisException {
90+
try {
91+
// analyze properties specified by user
92+
columnSeparator = getOrDefault(formatProperties, PROP_COLUMN_SEPARATOR,
93+
defaultColumnSeparator, isRemoveOriginProperty);
94+
if (Strings.isNullOrEmpty(columnSeparator)) {
95+
throw new AnalysisException("column_separator can not be empty.");
96+
}
97+
columnSeparator = Separator.convertSeparator(columnSeparator);
98+
99+
lineDelimiter = getOrDefault(formatProperties, PROP_LINE_DELIMITER,
100+
DEFAULT_LINE_DELIMITER, isRemoveOriginProperty);
101+
if (Strings.isNullOrEmpty(lineDelimiter)) {
102+
throw new AnalysisException("line_delimiter can not be empty.");
103+
}
104+
lineDelimiter = Separator.convertSeparator(lineDelimiter);
105+
106+
String enclosedString = getOrDefault(formatProperties, PROP_ENCLOSE,
107+
"", isRemoveOriginProperty);
108+
if (!Strings.isNullOrEmpty(enclosedString)) {
109+
if (enclosedString.length() > 1) {
110+
throw new AnalysisException("enclose should not be longer than one byte.");
111+
}
112+
enclose = (byte) enclosedString.charAt(0);
113+
if (enclose == 0) {
114+
throw new AnalysisException("enclose should not be byte [0].");
115+
}
116+
}
117+
118+
trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties,
119+
PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty))
120+
.booleanValue();
121+
skipLines = Integer.valueOf(getOrDefault(formatProperties,
122+
PROP_SKIP_LINES, "0", isRemoveOriginProperty)).intValue();
123+
if (skipLines < 0) {
124+
throw new AnalysisException("skipLines should not be less than 0.");
125+
}
126+
127+
String compressTypeStr = getOrDefault(formatProperties,
128+
PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty);
129+
compressionType = Util.getFileCompressType(compressTypeStr);
130+
131+
} catch (org.apache.doris.common.AnalysisException e) {
132+
throw new AnalysisException(e.getMessage());
133+
}
134+
}
135+
136+
@Override
137+
public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) {
138+
sinkOptions.setColumnSeparator(columnSeparator);
139+
sinkOptions.setLineDelimiter(lineDelimiter);
140+
}
141+
142+
// The method `analyzeFileFormatProperties` must have been called once before this method
143+
@Override
144+
public TFileAttributes toTFileAttributes() {
145+
TFileAttributes fileAttributes = new TFileAttributes();
146+
TFileTextScanRangeParams fileTextScanRangeParams = new TFileTextScanRangeParams();
147+
fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
148+
fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
149+
if (this.enclose != 0) {
150+
fileTextScanRangeParams.setEnclose(this.enclose);
151+
}
152+
fileAttributes.setTextParams(fileTextScanRangeParams);
153+
fileAttributes.setHeaderType(headerType);
154+
fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
155+
fileAttributes.setSkipLines(skipLines);
156+
fileAttributes.setEnableTextValidateUtf8(
157+
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
158+
return fileAttributes;
159+
}
160+
161+
public String getHeaderType() {
162+
return headerType;
163+
}
164+
165+
public TTextSerdeType getTextSerdeType() {
166+
return textSerdeType;
167+
}
168+
169+
public String getColumnSeparator() {
170+
return columnSeparator;
171+
}
172+
173+
public String getLineDelimiter() {
174+
return lineDelimiter;
175+
}
176+
177+
public boolean isTrimDoubleQuotes() {
178+
return trimDoubleQuotes;
179+
}
180+
181+
public int getSkipLines() {
182+
return skipLines;
183+
}
184+
185+
public byte getEnclose() {
186+
return enclose;
187+
}
188+
189+
public List<Column> getCsvSchema() {
190+
return csvSchema;
191+
}
192+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.fileformat;
19+
20+
import org.apache.doris.nereids.exceptions.AnalysisException;
21+
import org.apache.doris.thrift.TFileAttributes;
22+
import org.apache.doris.thrift.TFileCompressType;
23+
import org.apache.doris.thrift.TFileFormatType;
24+
import org.apache.doris.thrift.TResultFileSinkOptions;
25+
import org.apache.doris.thrift.TTextSerdeType;
26+
27+
import java.util.Map;
28+
29+
public abstract class FileFormatProperties {
30+
public static final String PROP_FORMAT = "format";
31+
public static final String FORMAT_PARQUET = "parquet";
32+
public static final String FORMAT_CSV = "csv";
33+
public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
34+
public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
35+
public static final String FORMAT_HIVE_TEXT = "hive_text";
36+
public static final String FORMAT_ORC = "orc";
37+
public static final String FORMAT_JSON = "json";
38+
public static final String FORMAT_AVRO = "avro";
39+
public static final String FORMAT_WAL = "wal";
40+
public static final String FORMAT_ARROW = "arrow";
41+
public static final String PROP_COMPRESS_TYPE = "compress_type";
42+
43+
protected TFileFormatType fileFormatType;
44+
45+
protected TFileCompressType compressionType;
46+
47+
public FileFormatProperties(TFileFormatType fileFormatType) {
48+
this.fileFormatType = fileFormatType;
49+
}
50+
51+
/**
52+
* Analyze user properties
53+
* @param formatProperties properties specified by user
54+
* @param isRemoveOriginProperty if this param is set to true, then this method would remove the origin property
55+
* @throws AnalysisException
56+
*/
57+
public abstract void analyzeFileFormatProperties(
58+
Map<String, String> formatProperties, boolean isRemoveOriginProperty)
59+
throws AnalysisException;
60+
61+
/**
62+
* generate TResultFileSinkOptions according to the properties of specified file format
63+
* You must call method `analyzeFileFormatProperties` once before calling method `toTResultFileSinkOptions`
64+
*/
65+
public abstract void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions);
66+
67+
/**
68+
* generate TFileAttributes according to the properties of specified file format
69+
* You must call method `analyzeFileFormatProperties` once before calling method `toTFileAttributes`
70+
*/
71+
public abstract TFileAttributes toTFileAttributes();
72+
73+
public static FileFormatProperties createFileFormatProperties(String formatString) {
74+
switch (formatString) {
75+
case FORMAT_CSV:
76+
return new CsvFileFormatProperties();
77+
case FORMAT_HIVE_TEXT:
78+
return new CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR,
79+
TTextSerdeType.HIVE_TEXT_SERDE);
80+
case FORMAT_CSV_WITH_NAMES:
81+
return new CsvFileFormatProperties(
82+
FORMAT_CSV_WITH_NAMES);
83+
case FORMAT_CSV_WITH_NAMES_AND_TYPES:
84+
return new CsvFileFormatProperties(
85+
FORMAT_CSV_WITH_NAMES_AND_TYPES);
86+
case FORMAT_PARQUET:
87+
return new ParquetFileFormatProperties();
88+
case FORMAT_ORC:
89+
return new OrcFileFormatProperties();
90+
case FORMAT_JSON:
91+
return new JsonFileFormatProperties();
92+
case FORMAT_AVRO:
93+
return new AvroFileFormatProperties();
94+
case FORMAT_WAL:
95+
return new WalFileFormatProperties();
96+
default:
97+
throw new AnalysisException("format:" + formatString + " is not supported.");
98+
}
99+
}
100+
101+
public static FileFormatProperties createFileFormatProperties(Map<String, String> formatProperties)
102+
throws AnalysisException {
103+
String formatString = formatProperties.getOrDefault(PROP_FORMAT, "")
104+
.toLowerCase();
105+
return createFileFormatProperties(formatString);
106+
}
107+
108+
protected String getOrDefault(Map<String, String> props, String key, String defaultValue,
109+
boolean isRemove) {
110+
String value = props.getOrDefault(key, defaultValue);
111+
if (isRemove) {
112+
props.remove(key);
113+
}
114+
return value;
115+
}
116+
117+
public TFileFormatType getFileFormatType() {
118+
return fileFormatType;
119+
}
120+
121+
public TFileCompressType getCompressionType() {
122+
return compressionType;
123+
}
124+
}

0 commit comments

Comments
 (0)