Skip to content

Commit 12774c6

Browse files
authored
[ISSUE #9779] fix DirectBuffer will cause error at 9+ JDK version. (#9801)
* fix(DirectBuffer): issue#9779,fix the promblem of 'DirectBuffer will cause error at 9+ JDK version'. Change-Id: I4657ecc401046a3b0d29b466ee68845f45d34105 * fix(DirectBuffer): issue#9779,fix the promblem of 'DirectBuffer will cause error at 9+ JDK version'. Change-Id: Iff0880f694cccfa86b812b81260dfe09e4763fa9
1 parent 736f027 commit 12774c6

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.concurrent.TimeoutException;
3737
import java.util.function.Supplier;
3838
import java.util.stream.Collectors;
39+
40+
import io.netty.util.internal.PlatformDependent;
3941
import org.apache.rocketmq.common.MixAll;
4042
import org.apache.rocketmq.common.ServiceThread;
4143
import org.apache.rocketmq.common.SystemClock;
@@ -69,7 +71,6 @@
6971
import org.apache.rocketmq.store.queue.ReferredIterator;
7072
import org.apache.rocketmq.store.util.LibC;
7173
import org.rocksdb.RocksDBException;
72-
import sun.nio.ch.DirectBuffer;
7374

7475
/**
7576
* Store all metadata downtime for recovery, data protection reliability
@@ -2433,7 +2434,7 @@ private byte[] sampling(byte[] pageCacheTable, int sampleStep) {
24332434

24342435
private byte[] checkFileInPageCache(MappedFile mappedFile) {
24352436
long fileSize = mappedFile.getFileSize();
2436-
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
2437+
final long address = PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
24372438
int pageNums = (int) (fileSize + this.pageSize - 1) / this.pageSize;
24382439
byte[] pageCacheRst = new byte[pageNums];
24392440
int mincore = LibC.INSTANCE.mincore(new Pointer(address), new NativeLong(fileSize), pageCacheRst);
@@ -2509,7 +2510,7 @@ private int setFileReadMode(MappedFile mappedFile, int mode) {
25092510
log.error("setFileReadMode mappedFile is null");
25102511
return -1;
25112512
}
2512-
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
2513+
final long address = PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
25132514
int madvise = LibC.INSTANCE.madvise(new Pointer(address), new NativeLong(mappedFile.getFileSize()), mode);
25142515
if (madvise != 0) {
25152516
log.error("setFileReadMode error fileName: {}, madvise: {}, mode:{}", mappedFile.getFileName(), madvise, mode);

store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@
1616
*/
1717
package org.apache.rocketmq.store;
1818

19-
import com.sun.jna.NativeLong;
20-
import com.sun.jna.Pointer;
2119
import java.nio.ByteBuffer;
2220
import java.util.Deque;
2321
import java.util.concurrent.ConcurrentLinkedDeque;
22+
23+
import com.sun.jna.NativeLong;
24+
import com.sun.jna.Pointer;
25+
import io.netty.util.internal.PlatformDependent;
2426
import org.apache.rocketmq.common.constant.LoggerName;
2527
import org.apache.rocketmq.logging.org.slf4j.Logger;
2628
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2729
import org.apache.rocketmq.store.util.LibC;
28-
import sun.nio.ch.DirectBuffer;
2930

3031
public class TransientStorePool {
3132
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -48,7 +49,7 @@ public void init() {
4849
for (int i = 0; i < poolSize; i++) {
4950
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
5051

51-
final long address = ((DirectBuffer) byteBuffer).address();
52+
final long address = PlatformDependent.directBufferAddress(byteBuffer);
5253
Pointer pointer = new Pointer(address);
5354
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
5455

@@ -58,7 +59,7 @@ public void init() {
5859

5960
public void destroy() {
6061
for (ByteBuffer byteBuffer : availableBuffers) {
61-
final long address = ((DirectBuffer) byteBuffer).address();
62+
final long address = PlatformDependent.directBufferAddress(byteBuffer);
6263
Pointer pointer = new Pointer(address);
6364
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
6465
}

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3838
import java.util.concurrent.atomic.AtomicLong;
3939
import java.util.function.Consumer;
40+
41+
import io.netty.util.internal.PlatformDependent;
4042
import org.apache.commons.lang3.SystemUtils;
4143
import org.apache.rocketmq.common.UtilAll;
4244
import org.apache.rocketmq.common.constant.LoggerName;
@@ -57,7 +59,7 @@
5759
import org.apache.rocketmq.store.config.FlushDiskType;
5860
import org.apache.rocketmq.store.util.LibC;
5961
import sun.misc.Unsafe;
60-
import sun.nio.ch.DirectBuffer;
62+
6163

6264
public class DefaultMappedFile extends AbstractMappedFile {
6365
public static final int OS_PAGE_SIZE = 1024 * 4;
@@ -914,7 +916,7 @@ public void setFirstCreateInQueue(boolean firstCreateInQueue) {
914916
@Override
915917
public void mlock() {
916918
final long beginTime = System.currentTimeMillis();
917-
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
919+
final long address = PlatformDependent.directBufferAddress(this.mappedByteBuffer);
918920
Pointer pointer = new Pointer(address);
919921
{
920922
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
@@ -930,7 +932,7 @@ public void mlock() {
930932
@Override
931933
public void munlock() {
932934
final long beginTime = System.currentTimeMillis();
933-
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
935+
final long address = PlatformDependent.directBufferAddress(this.mappedByteBuffer);
934936
Pointer pointer = new Pointer(address);
935937
int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
936938
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
@@ -1049,7 +1051,7 @@ public boolean isLoaded(long position, int size) {
10491051
return true;
10501052
}
10511053
try {
1052-
long addr = ((DirectBuffer) mappedByteBuffer).address() + position;
1054+
long addr = PlatformDependent.directBufferAddress(mappedByteBuffer) + position;
10531055
return (boolean) IS_LOADED_METHOD.invoke(mappedByteBuffer, mappingAddr(addr), size, pageCount(size));
10541056
} catch (Exception e) {
10551057
log.info("invoke isLoaded0 of file {} error:", file.getAbsolutePath(), e);

0 commit comments

Comments
 (0)