Java源码示例:com.alibaba.rocketmq.store.config.FlushDiskType
示例1
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mapedFileQueue =
new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
defaultMessageStore.getAllocateMapedFileService());
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘
this.flushCommitLogService = new GroupCommitService();
}
else {
// 异步刷盘
this.flushCommitLogService = new FlushRealTimeService();
}
this.appendMessageCallback = new DefaultAppendMessageCallback(
defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
示例2
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mapedFileQueue =
new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore
.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
}
else { //默认是异步刷盘,走这个分支。
this.flushCommitLogService = new FlushRealTimeService();
}
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
示例3
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MapedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
this.mlock();
}
示例4
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mapedFileQueue = new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
示例5
/**
* 所谓预热,就是把超过设定大小(默认1G)的文件 ,每间隔4k(内存分页的大小) 写一个byte (使 page dirty) ,
* 脏页累积到一定量(16M)的时候,做刷盘动作 (数据真正的落在本地磁盘)。
* @param type
* @param pages
*/
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
//所谓slice, 可以理解为bytebuffer中剩余容量的一个快照 。
//比如原来bytebuffer长度为1024 , 还有512字节容量,则新的bytebuffer的pos就是512, limit就是1024,
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MapedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0); // 把bytebuffer的剩余容量即slice 做一个数据填充。
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) { //并且同步刷盘的话,
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { //写入的位置和flush的位置差值分页数超过了指定的页数。
//这里算一下,默认是16M做一次强制刷盘。
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
}