Skip to content

Commit b5323d3

Browse files
authored
fix(java): remove client side message checksum generation (#2342)
1 parent 7902efc commit b5323d3

File tree

2 files changed

+21
-35
lines changed
  • foreign/java
    • external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink
    • java-sdk/src/main/java/org/apache/iggy/message

2 files changed

+21
-35
lines changed

foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121

2222
import org.apache.flink.api.connector.sink2.SinkWriter;
2323
import org.apache.iggy.client.blocking.http.IggyHttpClient;
24+
import org.apache.iggy.connector.error.ConnectorException;
25+
import org.apache.iggy.connector.serialization.SerializationSchema;
2426
import org.apache.iggy.identifier.StreamId;
2527
import org.apache.iggy.identifier.TopicId;
2628
import org.apache.iggy.message.Message;
2729
import org.apache.iggy.message.MessageHeader;
2830
import org.apache.iggy.message.MessageId;
2931
import org.apache.iggy.message.Partitioning;
30-
import org.apache.iggy.connector.error.ConnectorException;
31-
import org.apache.iggy.connector.serialization.SerializationSchema;
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

@@ -38,8 +38,6 @@
3838
import java.util.ArrayList;
3939
import java.util.List;
4040
import java.util.Optional;
41-
import java.util.zip.CRC32;
42-
import java.util.zip.Checksum;
4341

4442
/**
4543
* Sink writer implementation for writing records to Iggy.
@@ -73,12 +71,12 @@ public enum PartitioningStrategy {
7371
/**
7472
* Creates a new sink writer.
7573
*
76-
* @param httpClient the HTTP Iggy client
77-
* @param streamId the stream identifier
78-
* @param topicId the topic identifier
79-
* @param serializer the serialization schema
80-
* @param batchSize maximum number of records to buffer before flushing
81-
* @param flushInterval maximum time to wait before flushing
74+
* @param httpClient the HTTP Iggy client
75+
* @param streamId the stream identifier
76+
* @param topicId the topic identifier
77+
* @param serializer the serialization schema
78+
* @param batchSize maximum number of records to buffer before flushing
79+
* @param flushInterval maximum time to wait before flushing
8280
* @param partitioningStrategy the partitioning strategy
8381
*/
8482
public IggySinkWriter(
@@ -145,7 +143,7 @@ public void flush(boolean endOfInput) throws IOException {
145143
}
146144

147145
LOGGER.debug("IggySinkWriter.flush() - flushing {} messages to stream={}, topic={}",
148-
buffer.size(), streamId, topicId);
146+
buffer.size(), streamId, topicId);
149147

150148
try {
151149
// Serialize all buffered records
@@ -285,12 +283,8 @@ public int getBufferSize() {
285283
* @return a Message instance
286284
*/
287285
private Message createMessage(byte[] payload) {
288-
Checksum crc32 = new CRC32();
289-
crc32.update(payload, 0, payload.length);
290-
BigInteger checksum = BigInteger.valueOf(crc32.getValue());
291-
292286
MessageHeader header = new MessageHeader(
293-
checksum,
287+
BigInteger.ZERO,
294288
MessageId.serverGenerated(),
295289
BigInteger.ZERO, // offset
296290
BigInteger.ZERO, // timestamp

foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,23 @@
2222
import java.math.BigInteger;
2323
import java.util.Map;
2424
import java.util.Optional;
25-
import java.util.zip.CRC32;
26-
import java.util.zip.Checksum;
2725

2826
public record Message(
2927
MessageHeader header,
3028
byte[] payload,
3129
Optional<Map<String, HeaderValue>> userHeaders
3230
) {
3331

34-
public static Message of(String payload) {
35-
final byte[] payloadBytes = payload.getBytes();
36-
final MessageHeader msgHeader = new MessageHeader(getChecksum(payloadBytes), MessageId.serverGenerated(),
37-
BigInteger.ZERO, BigInteger.ZERO, BigInteger.ZERO, 0L,
38-
(long)payloadBytes.length);
39-
return new Message(msgHeader, payloadBytes, Optional.empty());
40-
}
32+
public static Message of(String payload) {
33+
final byte[] payloadBytes = payload.getBytes();
34+
final MessageHeader msgHeader = new MessageHeader(BigInteger.ZERO, MessageId.serverGenerated(),
35+
BigInteger.ZERO, BigInteger.ZERO, BigInteger.ZERO, 0L,
36+
(long) payloadBytes.length);
37+
return new Message(msgHeader, payloadBytes, Optional.empty());
38+
}
4139

42-
private static BigInteger getChecksum(byte[] payload) {
43-
final Checksum crc32 = new CRC32();
44-
crc32.update(payload, 0, payload.length);
45-
return BigInteger.valueOf(crc32.getValue());
46-
}
47-
48-
public int getSize() {
49-
// userHeaders is empty for now.
50-
return MessageHeader.SIZE + payload.length;
51-
}
40+
public int getSize() {
41+
// userHeaders is empty for now.
42+
return MessageHeader.SIZE + payload.length;
43+
}
5244
}

0 commit comments

Comments
 (0)