Skip to content

Commit 75437c9

Browse files
polyzosluoyuxia
authored andcommitted
[kv] Supports compacted row as change log
1 parent 61c4ece commit 75437c9

File tree

15 files changed

+1203
-195
lines changed

15 files changed

+1203
-195
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ void testCreateTableWithInvalidProperty() {
659659
.cause()
660660
.isInstanceOf(InvalidConfigException.class)
661661
.hasMessageContaining(
662-
"Currently, Primary Key Table only supports ARROW log format if kv format is COMPACTED.");
662+
"Currently, Primary Key Table supports ARROW or COMPACTED log format when kv format is COMPACTED.");
663663
}
664664

665665
@Test

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,11 @@ void testPutAndPoll(String kvFormat) throws Exception {
730730
verifyAppendOrPut(false, "ARROW", kvFormat);
731731
}
732732

733+
@Test
734+
void testPutAndPollCompacted() throws Exception {
735+
verifyAppendOrPut(false, "COMPACTED", "COMPACTED");
736+
}
737+
733738
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
734739
throws Exception {
735740
Schema schema =
@@ -1384,4 +1389,233 @@ void testFileSystemRecognizeConnectionConf() throws Exception {
13841389
Collections.singletonMap("client.fs.test.key", "fs_test_value"));
13851390
}
13861391
}
1392+
1393+
// ---------------------- PK with COMPACTED log tests ----------------------
1394+
@Test
1395+
void testPkUpsertAndPollWithCompactedLog() throws Exception {
1396+
Schema schema =
1397+
Schema.newBuilder()
1398+
.column("a", DataTypes.INT())
1399+
.column("b", DataTypes.INT())
1400+
.column("c", DataTypes.STRING())
1401+
.column("d", DataTypes.BIGINT())
1402+
.primaryKey("a")
1403+
.build();
1404+
TableDescriptor tableDescriptor =
1405+
TableDescriptor.builder()
1406+
.schema(schema)
1407+
.kvFormat(KvFormat.COMPACTED)
1408+
.logFormat(LogFormat.COMPACTED)
1409+
.build();
1410+
TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_upsert_poll");
1411+
createTable(tablePath, tableDescriptor, false);
1412+
1413+
int expectedSize = 30;
1414+
try (Table table = conn.getTable(tablePath)) {
1415+
UpsertWriter upsertWriter = table.newUpsert().createWriter();
1416+
for (int i = 0; i < expectedSize; i++) {
1417+
String value = i % 2 == 0 ? "hello, friend" + i : null;
1418+
GenericRow r = row(i, 100, value, i * 10L);
1419+
upsertWriter.upsert(r);
1420+
if (i % 10 == 0) {
1421+
upsertWriter.flush();
1422+
}
1423+
}
1424+
upsertWriter.flush();
1425+
1426+
LogScanner logScanner = createLogScanner(table);
1427+
subscribeFromBeginning(logScanner, table);
1428+
int count = 0;
1429+
while (count < expectedSize) {
1430+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
1431+
for (ScanRecord scanRecord : scanRecords) {
1432+
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
1433+
InternalRow rr = scanRecord.getRow();
1434+
assertThat(rr.getFieldCount()).isEqualTo(4);
1435+
assertThat(rr.getInt(0)).isEqualTo(count);
1436+
assertThat(rr.getInt(1)).isEqualTo(100);
1437+
if (count % 2 == 0) {
1438+
assertThat(rr.getString(2).toString()).isEqualTo("hello, friend" + count);
1439+
} else {
1440+
assertThat(rr.isNullAt(2)).isTrue();
1441+
}
1442+
assertThat(rr.getLong(3)).isEqualTo(count * 10L);
1443+
count++;
1444+
}
1445+
}
1446+
assertThat(count).isEqualTo(expectedSize);
1447+
logScanner.close();
1448+
}
1449+
}
1450+
1451+
@Test
1452+
void testPkUpdateAndDeleteWithCompactedLog() throws Exception {
1453+
Schema schema =
1454+
Schema.newBuilder()
1455+
.column("a", DataTypes.INT())
1456+
.column("b", DataTypes.INT())
1457+
.primaryKey("a")
1458+
.build();
1459+
TableDescriptor tableDescriptor =
1460+
TableDescriptor.builder()
1461+
.schema(schema)
1462+
.kvFormat(KvFormat.COMPACTED)
1463+
.logFormat(LogFormat.COMPACTED)
1464+
.build();
1465+
TablePath tablePath = TablePath.of("test_db_1", "test_pk_compacted_update_delete");
1466+
createTable(tablePath, tableDescriptor, false);
1467+
1468+
try (Table table = conn.getTable(tablePath)) {
1469+
UpsertWriter upsertWriter = table.newUpsert().createWriter();
1470+
// initial insert
1471+
upsertWriter.upsert(row(1, 10));
1472+
upsertWriter.flush();
1473+
// update same key
1474+
upsertWriter.upsert(row(1, 20));
1475+
upsertWriter.flush();
1476+
// delete the key
1477+
upsertWriter.delete(row(1, 20));
1478+
upsertWriter.flush();
1479+
1480+
LogScanner scanner = createLogScanner(table);
1481+
subscribeFromBeginning(scanner, table);
1482+
// Expect: +I(1,10), -U(1,10), +U(1,20), -D(1,20)
1483+
ChangeType[] expected = {
1484+
ChangeType.INSERT,
1485+
ChangeType.UPDATE_BEFORE,
1486+
ChangeType.UPDATE_AFTER,
1487+
ChangeType.DELETE
1488+
};
1489+
int seen = 0;
1490+
while (seen < expected.length) {
1491+
ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
1492+
for (ScanRecord r : recs) {
1493+
assertThat(r.getChangeType()).isEqualTo(expected[seen]);
1494+
InternalRow row = r.getRow();
1495+
assertThat(row.getInt(0)).isEqualTo(1);
1496+
if (expected[seen] == ChangeType.INSERT
1497+
|| expected[seen] == ChangeType.UPDATE_BEFORE
1498+
|| expected[seen] == ChangeType.UPDATE_AFTER) {
1499+
// value field present
1500+
if (expected[seen] == ChangeType.UPDATE_AFTER) {
1501+
assertThat(row.getInt(1)).isEqualTo(20);
1502+
} else {
1503+
assertThat(row.getInt(1)).isEqualTo(10);
1504+
}
1505+
}
1506+
seen++;
1507+
}
1508+
}
1509+
assertThat(seen).isEqualTo(expected.length);
1510+
scanner.close();
1511+
}
1512+
}
1513+
1514+
@Test
1515+
void testPkCompactedProject() throws Exception {
1516+
Schema schema =
1517+
Schema.newBuilder()
1518+
.column("a", DataTypes.INT())
1519+
.column("b", DataTypes.INT())
1520+
.column("c", DataTypes.STRING())
1521+
.primaryKey("a")
1522+
.build();
1523+
TableDescriptor td =
1524+
TableDescriptor.builder()
1525+
.schema(schema)
1526+
.kvFormat(KvFormat.COMPACTED)
1527+
.logFormat(LogFormat.COMPACTED)
1528+
.build();
1529+
TablePath path = TablePath.of("test_db_1", "test_pk_compacted_project");
1530+
createTable(path, td, false);
1531+
1532+
try (Table table = conn.getTable(path)) {
1533+
UpsertWriter upsert = table.newUpsert().createWriter();
1534+
for (int i = 0; i < 10; i++) {
1535+
String v = i % 2 == 0 ? "v" + i : null;
1536+
upsert.upsert(row(i, 100 + i, v));
1537+
}
1538+
upsert.flush();
1539+
1540+
// Creating a projected log scanner for COMPACTED should fail
1541+
assertThatThrownBy(() -> createLogScanner(table, new int[] {0, 2}))
1542+
.isInstanceOf(IllegalArgumentException.class)
1543+
.hasMessageContaining("Projection is not supported for COMPACTED log format");
1544+
}
1545+
}
1546+
1547+
@Test
1548+
void testPkCompactedPollFromLatestNoRecords() throws Exception {
1549+
Schema schema =
1550+
Schema.newBuilder()
1551+
.column("a", DataTypes.INT())
1552+
.column("b", DataTypes.INT())
1553+
.primaryKey("a")
1554+
.build();
1555+
TableDescriptor td =
1556+
TableDescriptor.builder()
1557+
.schema(schema)
1558+
.kvFormat(KvFormat.COMPACTED)
1559+
.logFormat(LogFormat.COMPACTED)
1560+
.build();
1561+
TablePath path = TablePath.of("test_db_1", "test_pk_compacted_latest");
1562+
createTable(path, td, false);
1563+
1564+
try (Table table = conn.getTable(path)) {
1565+
LogScanner scanner = createLogScanner(table);
1566+
subscribeFromLatestOffset(path, null, null, table, scanner, admin);
1567+
// Now write a few rows and ensure only these are seen
1568+
UpsertWriter upsert = table.newUpsert().createWriter();
1569+
for (int i = 0; i < 5; i++) {
1570+
upsert.upsert(row(i, i));
1571+
}
1572+
upsert.flush();
1573+
1574+
int seen = 0;
1575+
while (seen < 5) {
1576+
ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
1577+
for (ScanRecord r : recs) {
1578+
assertThat(r.getChangeType()).isEqualTo(ChangeType.INSERT);
1579+
assertThat(r.getRow().getInt(0)).isBetween(0, 4);
1580+
seen++;
1581+
}
1582+
}
1583+
scanner.close();
1584+
}
1585+
}
1586+
1587+
@Test
1588+
void testPkDeleteNonExistentEmitsNoRecords() throws Exception {
1589+
Schema schema =
1590+
Schema.newBuilder()
1591+
.column("a", DataTypes.INT())
1592+
.column("b", DataTypes.INT())
1593+
.primaryKey("a")
1594+
.build();
1595+
TableDescriptor td =
1596+
TableDescriptor.builder()
1597+
.schema(schema)
1598+
.kvFormat(KvFormat.COMPACTED)
1599+
.logFormat(LogFormat.COMPACTED)
1600+
.build();
1601+
TablePath path = TablePath.of("test_db_1", "test_pk_compacted_delete_missing");
1602+
createTable(path, td, false);
1603+
1604+
try (Table table = conn.getTable(path)) {
1605+
UpsertWriter upsert = table.newUpsert().createWriter();
1606+
// delete non-existent key
1607+
upsert.delete(row(42, 0));
1608+
upsert.flush();
1609+
1610+
LogScanner scanner = createLogScanner(table);
1611+
subscribeFromBeginning(scanner, table);
1612+
int total = 0;
1613+
// poll a few times to ensure no accidental records
1614+
for (int i = 0; i < 3; i++) {
1615+
total += scanner.poll(Duration.ofSeconds(1)).count();
1616+
}
1617+
assertThat(total).isEqualTo(0);
1618+
scanner.close();
1619+
}
1620+
}
13871621
}

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,7 @@ public class ConfigOptions {
12301230
.defaultValue(LogFormat.ARROW)
12311231
.withDescription(
12321232
"The format of the log records in log store. The default value is `arrow`. "
1233-
+ "The supported formats are `arrow` and `indexed`.");
1233+
+ "The supported formats are `arrow`, `indexed` and `compacted`.");
12341234

12351235
public static final ConfigOption<ArrowCompressionType> TABLE_LOG_ARROW_COMPRESSION_TYPE =
12361236
key("table.log.arrow.compression.type")

fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
package org.apache.fluss.metadata;
1919

2020
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
21+
import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
2122
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
23+
import org.apache.fluss.row.compacted.CompactedRow;
2224
import org.apache.fluss.row.indexed.IndexedRow;
2325

24-
/** The format of the log records in log store. The supported formats are 'arrow' and 'indexed'. */
26+
/**
27+
* The format of the log records in log store. The supported formats are 'arrow', 'indexed' and
28+
* 'compacted'.
29+
*/
2530
public enum LogFormat {
2631

2732
/**
@@ -41,18 +46,29 @@ public enum LogFormat {
4146
*
4247
* @see MemoryLogRecordsIndexedBuilder
4348
*/
44-
INDEXED;
49+
INDEXED,
50+
51+
/**
52+
* The log record batches are stored in {@link CompactedRow} format which is a compact
53+
* row-oriented format optimized for primary key tables to reduce storage while trading CPU for
54+
* reads.
55+
*
56+
* @see MemoryLogRecordsCompactedBuilder
57+
*/
58+
COMPACTED;
4559

4660
/**
47-
* Creates a {@link LogFormat} from the given string. The string must be either 'arrow' or
48-
* 'indexed'.
61+
* Creates a {@link LogFormat} from the given string. The string must be either 'arrow',
62+
* 'indexed' or 'compacted'.
4963
*/
5064
public static LogFormat fromString(String format) {
5165
switch (format.toUpperCase()) {
5266
case "ARROW":
5367
return ARROW;
5468
case "INDEXED":
5569
return INDEXED;
70+
case "COMPACTED":
71+
return COMPACTED;
5672
default:
5773
throw new IllegalArgumentException("Unsupported log format: " + format);
5874
}

0 commit comments

Comments
 (0)