Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static int write(Map<String, String> map, ByteBuffer buffer) {
if (entry.getKey() == null) {
buffer.putInt(-1);
} else {
bytes = entry.getKey().getBytes();
bytes = entry.getKey().getBytes(TSFileConfig.STRING_CHARSET);
buffer.putInt(bytes.length);
buffer.put(bytes);
length += bytes.length;
Expand All @@ -194,7 +194,7 @@ public static int write(Map<String, String> map, ByteBuffer buffer) {
if (entry.getValue() == null) {
buffer.putInt(-1);
} else {
bytes = entry.getValue().getBytes();
bytes = entry.getValue().getBytes(TSFileConfig.STRING_CHARSET);
buffer.putInt(bytes.length);
buffer.put(bytes);
length += bytes.length;
Expand Down Expand Up @@ -509,7 +509,7 @@ public static int sizeToWrite(String s) {
if (s == null) {
return INT_LEN;
}
return INT_LEN + s.getBytes().length;
return INT_LEN + s.getBytes(TSFileConfig.STRING_CHARSET).length;
}

/** read a byte var from inputStream. */
Expand Down Expand Up @@ -1202,7 +1202,7 @@ public static void writeObject(Object value, DataOutputStream outputStream) {
outputStream.write(NONE.ordinal());
} else {
outputStream.write(STRING.ordinal());
byte[] bytes = value.toString().getBytes();
byte[] bytes = value.toString().getBytes(TSFileConfig.STRING_CHARSET);
outputStream.writeInt(bytes.length);
outputStream.write(bytes);
}
Expand Down Expand Up @@ -1238,7 +1238,7 @@ public static void writeObject(Object value, ByteBuffer byteBuffer) {
byteBuffer.putInt(NONE.ordinal());
} else {
byteBuffer.putInt(STRING.ordinal());
byte[] bytes = value.toString().getBytes();
byte[] bytes = value.toString().getBytes(TSFileConfig.STRING_CHARSET);
byteBuffer.putInt(bytes.length);
byteBuffer.put(bytes);
}
Expand Down Expand Up @@ -1271,7 +1271,7 @@ public static Object readObject(ByteBuffer buffer) {
length = buffer.getInt();
bytes = new byte[length];
buffer.get(bytes);
return new String(bytes);
return new String(bytes, TSFileConfig.STRING_CHARSET);
}
}

Expand Down
113 changes: 112 additions & 1 deletion java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,13 +748,26 @@ private Object createValueColumnOfDataType(TSDataType dataType, int capacity) {

/** Serialize {@link Tablet} */
public ByteBuffer serialize() throws IOException {
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
final int serializedSize = serializedSize();
try (PublicBAOS byteArrayOutputStream = new PublicBAOS(serializedSize);
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
serialize(outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
}

/** Return the exact serialized byte size of this tablet. */
public int serializedSize() {
int size = 0;
size = Math.addExact(size, ReadWriteIOUtils.sizeToWrite(insertTargetName));
size = Math.addExact(size, Integer.BYTES);
size = Math.addExact(size, serializedSizeOfMeasurementSchemas());
size = Math.addExact(size, serializedSizeOfTimes());
size = Math.addExact(size, serializedSizeOfBitMaps());
size = Math.addExact(size, serializedSizeOfValues());
return size;
}

public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(insertTargetName, stream);
ReadWriteIOUtils.write(rowSize, stream);
Expand All @@ -764,6 +777,104 @@ public void serialize(DataOutputStream stream) throws IOException {
writeValues(stream);
}

private int serializedSizeOfMeasurementSchemas() {
int size = Byte.BYTES;
if (schemas != null) {
size = Math.addExact(size, Integer.BYTES);
for (int i = 0; i < schemas.size(); i++) {
size = Math.addExact(size, Byte.BYTES);
final IMeasurementSchema schema = schemas.get(i);
if (schema != null) {
size = Math.addExact(size, schema.serializedSize());
size = Math.addExact(size, Byte.BYTES);
}
}
}
return size;
}

private int serializedSizeOfTimes() {
int size = Byte.BYTES;
if (timestamps != null) {
size = Math.addExact(size, Math.multiplyExact(Long.BYTES, rowSize));
}
return size;
}

private int serializedSizeOfBitMaps() {
int size = Byte.BYTES;
if (bitMaps != null) {
final int columnCount = schemas == null ? 0 : schemas.size();
for (int i = 0; i < columnCount; i++) {
if (bitMaps[i] == null || bitMaps[i].isAllUnmarked(rowSize)) {
size = Math.addExact(size, Byte.BYTES);
} else {
size = Math.addExact(size, Byte.BYTES);
size = Math.addExact(size, Integer.BYTES);
size = Math.addExact(size, Integer.BYTES);
size = Math.addExact(size, BitMap.getSizeOfBytes(rowSize));
Comment on lines 813 to +815
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why two integers?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two Integer.BYTES entries represent two different fields in the serialized format.

In writeBitMaps(), a non-empty bitmap is serialized as:

  1. hasBitMap flag: 1 byte
  2. rowSize: 4 bytes
  3. Binary length prefix: 4 bytes
  4. bitmap bytes: BitMap.getSizeOfBytes(rowSize)

The previous code used:

ReadWriteIOUtils.sizeToWrite(new Binary(bitMaps[i].getTruncatedByteArray(rowSize)))

That includes the Binary length prefix plus the actual bitmap bytes.

So the new code:

size = Math.addExact(size, Integer.BYTES); // rowSize
size = Math.addExact(size, Integer.BYTES); // Binary length prefix
size = Math.addExact(size, BitMap.getSizeOfBytes(rowSize)); // bitmap bytes

is equivalent to the old calculation. The two integers are not duplicates: one is the bitmap logical size (rowSize), and the other is the length prefix
written by ReadWriteIOUtils.write(Binary, stream).

}
}
}
return size;
}

private int serializedSizeOfValues() {
int size = Byte.BYTES;
if (values != null) {
final int columnCount = schemas == null ? 0 : schemas.size();
for (int i = 0; i < columnCount; i++) {
size = Math.addExact(size, serializedSizeOfColumn(schemas.get(i).getType(), values[i]));
}
}
return size;
}

private int serializedSizeOfColumn(final TSDataType dataType, final Object column) {
int size = Byte.BYTES;
if (column == null) {
return size;
}
switch (dataType) {
case INT32:
return Math.addExact(size, Math.multiplyExact(Integer.BYTES, rowSize));
case DATE:
return Math.addExact(size, Math.multiplyExact(Integer.BYTES, rowSize));
case INT64:
case TIMESTAMP:
return Math.addExact(size, Math.multiplyExact(Long.BYTES, rowSize));
case FLOAT:
return Math.addExact(size, Math.multiplyExact(Float.BYTES, rowSize));
case DOUBLE:
return Math.addExact(size, Math.multiplyExact(Double.BYTES, rowSize));
case BOOLEAN:
return Math.addExact(size, rowSize);
case TEXT:
case STRING:
case BLOB:
case OBJECT:
return Math.addExact(size, serializedSizeOfBinaryValues((Binary[]) column));
default:
throw new UnSupportedDataTypeException(
Messages.format("error.write.type_not_supported", dataType));
}
}

private static int serializedSizeOfBinaryValues(final Binary[] binaryValues, final int rowSize) {
int size = 0;
for (int j = 0; j < rowSize; j++) {
size = Math.addExact(size, Byte.BYTES);
if (binaryValues[j] != null) {
size = Math.addExact(size, ReadWriteIOUtils.sizeToWrite(binaryValues[j]));
}
}
return size;
}

private int serializedSizeOfBinaryValues(final Binary[] binaryValues) {
return serializedSizeOfBinaryValues(binaryValues, rowSize);
}

/** Serialize {@link MeasurementSchema}s */
private void writeMeasurementSchemas(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ public int serializeTo(OutputStream outputStream) throws IOException {
@Override
public int serializedSize() {
int byteLen = 0;
byteLen += ReadWriteIOUtils.sizeToWrite(measurementName);
byteLen += 3 * Byte.BYTES;
byteLen = Math.addExact(byteLen, ReadWriteIOUtils.sizeToWrite(measurementName));
byteLen = Math.addExact(byteLen, 3 * Byte.BYTES);
if (props == null) {
byteLen += Integer.BYTES;
byteLen = Math.addExact(byteLen, Integer.BYTES);
} else {
byteLen += Integer.BYTES;
byteLen = Math.addExact(byteLen, Integer.BYTES);
for (Map.Entry<String, String> entry : props.entrySet()) {
byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
byteLen = Math.addExact(byteLen, ReadWriteIOUtils.sizeToWrite(entry.getKey()));
byteLen = Math.addExact(byteLen, ReadWriteIOUtils.sizeToWrite(entry.getValue()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ public void mapSerdeTest() {
Assert.assertNotNull(result);
Assert.assertEquals(map, result);

ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
ReadWriteIOUtils.write(map, buffer);
buffer.flip();
result = ReadWriteIOUtils.readMap(buffer);
Assert.assertNotNull(result);
Assert.assertEquals(map, result);

// 7. null
map = null;
byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
Expand Down
Loading
Loading