diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java index 4fad2d2696b4..65c0f09e4138 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/object/IoTDBObjectQueryIT.java @@ -33,21 +33,19 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.Field; import org.apache.tsfile.read.common.RowRecord; -import org.apache.tsfile.utils.Binary; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.sql.Blob; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; -import static org.junit.Assert.assertArrayEquals; +import static org.apache.iotdb.jdbc.IoTDBJDBCResultSet.OBJECT_ERR_MSG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -104,10 +102,20 @@ public void jdbcTest() { int cnt = 0; while (resultSet.next()) { cnt++; - Blob blob = resultSet.getBlob(3); - byte[] bytes = resultSet.getBytes("o1"); - assertArrayEquals(blob.getBytes(1, (int) blob.length()), bytes); - assertTrue(new String(bytes).endsWith(String.format("%d.bin", cnt))); + try { + resultSet.getBlob(3); + fail(); + } catch (SQLException e) { + assertEquals(OBJECT_ERR_MSG, e.getMessage()); + } + + try { + resultSet.getBytes("o1"); + fail(); + } catch (SQLException e) { + assertEquals(OBJECT_ERR_MSG, e.getMessage()); + } + String s = resultSet.getString(3); assertEquals("(Object) 5 B", s); } @@ -150,13 +158,15 @@ public void sessionTest() { String s = field.getStringValue(); assertEquals("(Object) 5 B", s); Object blob = field.getObjectValue(TSDataType.OBJECT); - assertTrue(blob instanceof Binary); - assertTrue( - new String(((Binary) blob).getValues()).endsWith(String.format("%d.bin", cnt))); - - Binary binary = field.getBinaryV(); - assertArrayEquals(binary.getValues(), ((Binary) blob).getValues()); - assertTrue(new String(binary.getValues()).endsWith(String.format("%d.bin", cnt))); + assertTrue(blob instanceof String); + assertEquals("(Object) 5 B", blob); + + try { + field.getBinaryV(); + fail(); + } catch (UnsupportedOperationException e) { + assertEquals("OBJECT Type only support getStringValue", e.getMessage()); + } } assertEquals(4, cnt); } @@ -174,8 +184,12 @@ public void sessionTest() { assertEquals("(Object) 5 B", o); String s = iterator.getString("o1"); assertEquals("(Object) 5 B", s); - Binary blob = iterator.getBlob(3); - assertTrue(new String(blob.getValues()).endsWith(String.format("%d.bin", cnt))); + try { + iterator.getBlob(3); + fail(); + } catch (StatementExecutionException e) { + assertEquals("OBJECT Type only support getString", e.getMessage()); + } } assertEquals(4, cnt); } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java index 4267f41dc8d8..538b8c296c12 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java @@ -23,7 +23,9 @@ import org.apache.tsfile.utils.Binary; +import java.io.File; import java.time.LocalDate; +import java.util.Optional; public interface Record { /** @@ -83,7 +85,7 @@ public interface Record { * Returns the Binary value at the specified column in this row. * *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}, - * {@code TSDataType.STRING} or {@code TSDataType.BLOB} or {@code TSDataType.OBJECT}. + * {@code TSDataType.STRING} or {@code TSDataType.BLOB}. * * @param columnIndex index of the specified column * @return the Binary value at the specified column in this row @@ -113,6 +115,15 @@ public interface Record { Object getObject(int columnIndex); + /** + * Returns the OBJECT value's real file path in current node at the specified column in this row. + * + * @param columnIndex index of the specified column + * @return Optional.empty() if current node doesn't have the real file storing the object content, + * otherwise the File referring to the OBJECT value's real file path + */ + Optional getObjectFile(int columnIndex); + /** * Returns the Binary representation of an object stored at the specified column in this row. * diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java index bb113680d8e5..9a1fcd291486 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java @@ -327,10 +327,26 @@ public LocalDate getDate(String columnName) throws StatementExecutionException { } public Binary getBlob(int columnIndex) throws StatementExecutionException { + final TSDataType dataType = ioTDBRpcDataSet.getDataType(columnIndex); + if (dataType == null) { + return null; + } + + if (dataType.equals(TSDataType.OBJECT)) { + throw new StatementExecutionException("OBJECT Type only support getString"); + } return ioTDBRpcDataSet.getBinary(columnIndex); } public Binary getBlob(String columnName) throws StatementExecutionException { + final TSDataType dataType = ioTDBRpcDataSet.getDataType(columnName); + if (dataType == null) { + return null; + } + + if (dataType.equals(TSDataType.OBJECT)) { + throw new StatementExecutionException("OBJECT Type only support getString"); + } return ioTDBRpcDataSet.getBinary(columnName); } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java index ff00e3b35dfe..f9c3fe481ac7 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java @@ -70,6 +70,8 @@ public class IoTDBJDBCResultSet implements ResultSet { + public static final String OBJECT_ERR_MSG = "OBJECT Type only support getString"; + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBJDBCResultSet.class); protected IoTDBStatement statement; @@ -301,6 +303,15 @@ public InputStream getBinaryStream(String arg0) throws SQLException { @Override public Blob getBlob(int arg0) throws SQLException { try { + final TSDataType dataType = ioTDBRpcDataSet.getDataType(arg0); + if (dataType == null) { + return null; + } + + if (dataType.equals(TSDataType.OBJECT)) { + throw new SQLException(OBJECT_ERR_MSG); + } + Binary binary = ioTDBRpcDataSet.getBinary(arg0); if (ObjectUtils.isNotEmpty(binary)) { return new SerialBlob(binary.getValues()); @@ -314,6 +325,15 @@ public Blob getBlob(int arg0) throws SQLException { @Override public Blob getBlob(String arg0) throws SQLException { try { + final TSDataType dataType = ioTDBRpcDataSet.getDataType(arg0); + if (dataType == null) { + return null; + } + + if (dataType.equals(TSDataType.OBJECT)) { + throw new SQLException(OBJECT_ERR_MSG); + } + Binary binary = ioTDBRpcDataSet.getBinary(arg0); if (ObjectUtils.isNotEmpty(binary)) { return new SerialBlob(binary.getValues()); @@ -360,7 +380,9 @@ public byte[] getBytes(int columnIndex) throws SQLException { return null; } - if (dataType.equals(TSDataType.BLOB) || dataType.equals(TSDataType.OBJECT)) { + if (dataType.equals(TSDataType.OBJECT)) { + throw new SQLException(OBJECT_ERR_MSG); + } else if (dataType.equals(TSDataType.BLOB)) { Binary binary = ioTDBRpcDataSet.getBinary(columnIndex); return binary == null ? null : binary.getValues(); } else { @@ -379,8 +401,9 @@ public byte[] getBytes(String columnName) throws SQLException { if (dataType == null) { return null; } - - if (dataType.equals(TSDataType.BLOB) || dataType.equals(TSDataType.OBJECT)) { + if (dataType.equals(TSDataType.OBJECT)) { + throw new SQLException(OBJECT_ERR_MSG); + } else if (dataType.equals(TSDataType.BLOB)) { Binary binary = ioTDBRpcDataSet.getBinary(columnName); return binary == null ? null : binary.getValues(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java index 599efc4f240e..a34a0eb650d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java @@ -29,14 +29,17 @@ import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.DateUtils; +import java.io.File; import java.time.LocalDate; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator.OBJECT_ERR_MSG; import static org.apache.iotdb.udf.api.type.Type.OBJECT; /** Parts of partition. */ @@ -170,6 +173,14 @@ public boolean getBoolean(int columnIndex) { @Override public Binary getBinary(int columnIndex) { + Type type = dataTypes.get(columnIndex); + if (type == OBJECT) { + throw new UnsupportedOperationException(OBJECT_ERR_MSG); + } + return getBinarySafely(columnIndex); + } + + public Binary getBinarySafely(int columnIndex) { return originalColumns[columnIndex].getBinary(offset); } @@ -193,15 +204,24 @@ public LocalDate getLocalDate(int columnIndex) { @Override public Object getObject(int columnIndex) { + Type type = dataTypes.get(columnIndex); + if (type == OBJECT) { + throw new UnsupportedOperationException(OBJECT_ERR_MSG); + } return originalColumns[columnIndex].getObject(offset); } @Override - public Binary readObject(int columnIndex, long offset, int length) { + public Optional getObjectFile(int columnIndex) { if (getDataType(columnIndex) != Type.OBJECT) { throw new UnsupportedOperationException("current column is not object column"); } - Binary binary = getBinary(columnIndex); + return ObjectTypeUtils.getObjectPathFromBinary(getBinarySafely(columnIndex)); + } + + @Override + public Binary readObject(int columnIndex, long offset, int length) { + Binary binary = getBinarySafely(columnIndex); return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, length, true).array()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java index 9151ef31e0de..ae66a11729d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java @@ -31,14 +31,19 @@ import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.DateUtils; +import java.io.File; import java.time.LocalDate; import java.util.Iterator; import java.util.List; +import java.util.Optional; import static org.apache.tsfile.read.common.type.BlobType.BLOB; public class RecordIterator implements Iterator { + public static final String OBJECT_ERR_MSG = + "OBJECT Type only support getString, getObjectFile, objectLength and readObject"; + private final List childrenColumns; private final List dataTypes; private final int positionCount; @@ -113,6 +118,14 @@ public boolean getBoolean(int columnIndex) { @Override public Binary getBinary(int columnIndex) { + org.apache.tsfile.read.common.type.Type type = dataTypes.get(columnIndex); + if (type == ObjectType.OBJECT) { + throw new UnsupportedOperationException(OBJECT_ERR_MSG); + } + return getBinarySafely(columnIndex); + } + + private Binary getBinarySafely(int columnIndex) { return childrenColumns.get(columnIndex).getBinary(index); } @@ -136,15 +149,24 @@ public LocalDate getLocalDate(int columnIndex) { @Override public Object getObject(int columnIndex) { + org.apache.tsfile.read.common.type.Type type = dataTypes.get(columnIndex); + if (type == ObjectType.OBJECT) { + throw new UnsupportedOperationException(OBJECT_ERR_MSG); + } return childrenColumns.get(columnIndex).getObject(index); } @Override - public Binary readObject(int columnIndex, long offset, int length) { + public Optional getObjectFile(int columnIndex) { if (getDataType(columnIndex) != Type.OBJECT) { throw new UnsupportedOperationException("current column is not object column"); } - Binary binary = getBinary(columnIndex); + return ObjectTypeUtils.getObjectPathFromBinary(getBinarySafely(columnIndex)); + } + + @Override + public Binary readObject(int columnIndex, long offset, int length) { + Binary binary = getBinarySafely(columnIndex); return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, length, true).array()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java index 225137b328ed..9cef48070285 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.TableDeviceSourceNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -68,7 +69,7 @@ public PlanNode visitPlan(PlanNode node, TableDistributedPlanGenerator.PlanConte SAME_WITH_ALL_CHILDREN, context .nodeDistributionMap - .get(node.getChildren().get(0).getPlanNodeId()) + .get(newNode.getChildren().get(0).getPlanNodeId()) .getRegion())); return newNode; } @@ -129,6 +130,9 @@ public PlanNode visitExplainAnalyze( ExchangeNode exchangeNode = new ExchangeNode(queryContext.getQueryId().genPlanNodeId()); exchangeNode.setChild(child); + context.nodeDistributionMap.put( + exchangeNode.getPlanNodeId(), + new NodeDistribution(DIFFERENT_FROM_ALL_CHILDREN, DataPartition.NOT_ASSIGNED)); exchangeNode.setOutputSymbols(child.getOutputSymbols()); newNode.setChild(exchangeNode); @@ -139,6 +143,41 @@ public PlanNode visitExplainAnalyze( return newNode; } + @Override + public PlanNode visitCollect( + CollectNode node, TableDistributedPlanGenerator.PlanContext context) { + PlanNode newNode = node.clone(); + if (node.getChildren().size() == 1) { + newNode.addChild(node.getChildren().get(0).accept(this, context)); + context.nodeDistributionMap.put( + node.getPlanNodeId(), + new NodeDistribution( + SAME_WITH_ALL_CHILDREN, + context + .nodeDistributionMap + .get(newNode.getChildren().get(0).getPlanNodeId()) + .getRegion())); + return newNode; + } + + for (PlanNode child : node.getChildren()) { + PlanNode rewriteNode = child.accept(this, context); + ExchangeNode exchangeNode = new ExchangeNode(queryContext.getQueryId().genPlanNodeId()); + exchangeNode.addChild(rewriteNode); + exchangeNode.setOutputSymbols(rewriteNode.getOutputSymbols()); + newNode.addChild(exchangeNode); + context.hasExchangeNode = true; + context.nodeDistributionMap.put( + exchangeNode.getPlanNodeId(), + new NodeDistribution(DIFFERENT_FROM_ALL_CHILDREN, DataPartition.NOT_ASSIGNED)); + } + context.nodeDistributionMap.put( + newNode.getPlanNodeId(), + new NodeDistribution(DIFFERENT_FROM_ALL_CHILDREN, DataPartition.NOT_ASSIGNED)); + + return newNode; + } + @Override public PlanNode visitTableFunctionProcessor( TableFunctionProcessorNode node, TableDistributedPlanGenerator.PlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 44ba3235911a..7dd142b77ce6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -179,15 +179,11 @@ public static long getObjectLength(Binary binary) { return wrap.getLong(); } - public static File getObjectPathFromBinary(Binary binary) { + public static Optional getObjectPathFromBinary(Binary binary) { byte[] bytes = binary.getValues(); String relativeObjectFilePath = new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET); - Optional file = TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath); - if (!file.isPresent()) { - throw new ObjectFileNotExist(relativeObjectFilePath); - } - return file.get(); + return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath); } public static Optional getNullableObjectPathFromBinary( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java index e0a6ca88825e..c5e745aea87e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/RecordObjectTypeTest.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; -import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.type.Type; @@ -34,7 +33,6 @@ import org.apache.tsfile.read.common.type.ObjectType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BytesUtils; -import org.apache.tsfile.utils.Pair; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -47,6 +45,12 @@ import java.nio.file.Path; import java.util.Collections; import java.util.Iterator; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator.OBJECT_ERR_MSG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RecordObjectTypeTest { @@ -98,23 +102,32 @@ private void testRecordIterator(Iterator recordIterator) { Record record = recordIterator.next(); Binary result = record.readObject(0); - Assert.assertEquals(100, result.getLength()); + assertEquals(100, result.getLength()); for (int j = 0; j < 100; j++) { - Assert.assertEquals(j, result.getValues()[j]); + assertEquals(j, result.getValues()[j]); } result = record.readObject(0, 10, 2); Assert.assertArrayEquals(new byte[] {(byte) 10, (byte) 11}, result.getValues()); - Object object = record.getObject(0); - Assert.assertTrue(object instanceof Binary); - Pair pair = ObjectTypeUtils.parseObjectBinary((Binary) object); - Assert.assertEquals(Long.valueOf(100L), pair.getLeft()); - Assert.assertTrue(pair.getRight().startsWith("test_") && pair.getRight().endsWith(".bin")); + try { + record.getObject(0); + fail("Should throw exception"); + } catch (UnsupportedOperationException e) { + assertEquals(OBJECT_ERR_MSG, e.getMessage()); + } + + try { + record.getBinary(0); + fail("Should throw exception"); + } catch (UnsupportedOperationException e) { + assertEquals(OBJECT_ERR_MSG, e.getMessage()); + } - Assert.assertArrayEquals(((Binary) object).getValues(), record.getBinary(0).getValues()); + Optional objectFile = record.getObjectFile(0); + assertTrue(objectFile.isPresent()); - Assert.assertEquals("(Object) 100 B", record.getString(0)); + assertEquals("(Object) 100 B", record.getString(0)); Assert.assertFalse(recordIterator.hasNext()); } diff --git a/pom.xml b/pom.xml index 6841b7169782..d9df3de1bf7b 100644 --- a/pom.xml +++ b/pom.xml @@ -173,7 +173,7 @@ 0.14.1 1.9 1.5.6-3 - 2.2.0-251208-SNAPSHOT + 2.2.0-251209-SNAPSHOT