Java源码示例:org.apache.rocketmq.store.config.StorePathConfigHelper

示例1
public void saveInSyncOffset() {
    if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
        return;
    }

    long minInSyncOffset = getMinOffsetInSync();
    if (minInSyncOffset == -1) {
        return;
    }

    String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    try {
        MixAll.string2File(String.valueOf(minInSyncOffset), fileName);
    } catch (IOException e) {
        log.error("save phy offset slave reported [{}] exception", fileName, e);
    }

    log.info("save slave min offset in sync:{}", minInSyncOffset);
}
 
示例2
public void initInSyncOffset(long offset) {
    if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
        return;
    }
    String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    File file = new File(fileName);
    if (file.exists()) {
        log.info("as master before, no need to sync offset");
        return;
    }

    try {
        MixAll.string2File(String.valueOf(offset), fileName);
    } catch (IOException e) {
        log.error("save phy offset slave reported [{}] exception", fileName, e);
    }

}
 
示例3
public void truncate(long phyOffset) {
    String backupDirPath = StorePathConfigHelper.getBackupStoreSubDirPath(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir(),
        String.valueOf(defaultMessageStore.getSystemClock().now()));
    File dir = new File(backupDirPath);
    if (!dir.exists()) {
        dir.mkdirs();
    }
    this.mappedFileQueue.backupFiles(phyOffset, backupDirPath);

    this.mappedFileQueue.setFlushedWhere(phyOffset);
    this.mappedFileQueue.setCommittedWhere(phyOffset);
    this.mappedFileQueue.truncateDirtyFiles(phyOffset);

    // Clear ConsumeQueue redundant data
    this.defaultMessageStore.truncateDirtyLogicFiles(phyOffset);
}
 
示例4
private long getInSyncOffsetSaved() {
    long offset = -1;
    String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.messageStoreConfig.getStorePathRootDir());
    try {
        File file = new File(fileName);
        if (file.exists()) {
            String offsetStr = MixAll.file2String(fileName);
            if (offsetStr != null) {
                offset = Long.valueOf(offsetStr);
            }
            file.delete();
        }
    } catch (Exception ex) {
        log.error("get offset in sync failed", ex);
    }
    return offset;
}
 
示例5
public void backupFiles(long offset, String backupPath) {
    for (MappedFile file : this.mappedFiles) {
        long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
        if (fileTailOffset > offset) {
            file.flush(0);
            try {
                Path source = Paths.get(file.getFileName());
                Path target = Paths.get(StorePathConfigHelper.getBackupStoreFilePath(backupPath, new File(file.getFileName()).getName()));
                Files.copy(source, target);
                log.info("backup file:{} to {}", file.getFileName(), target);
            } catch (Exception ex) {
                log.error("backup file failed", ex);
            }
        }
    }
}
 
示例6
public void saveInSyncOffset() {
    if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
        return;
    }

    long minInSyncOffset = getMinOffsetInSync();
    if (minInSyncOffset == -1) {
        return;
    }

    String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    try {
        MixAll.string2File(String.valueOf(minInSyncOffset), fileName);
    } catch (IOException e) {
        log.error("save phy offset slave reported [{}] exception", fileName, e);
    }

    log.info("save slave min offset in sync:{}", minInSyncOffset);
}
 
示例7
public void truncate(long phyOffset) {
    String backupDirPath = StorePathConfigHelper.getBackupStoreSubDirPath(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir(),
        String.valueOf(defaultMessageStore.getSystemClock().now()));
    File dir = new File(backupDirPath);
    if (!dir.exists()) {
        dir.mkdirs();
    }
    this.mappedFileQueue.backupFiles(phyOffset, backupDirPath);

    this.mappedFileQueue.setFlushedWhere(phyOffset);
    this.mappedFileQueue.setCommittedWhere(phyOffset);
    this.mappedFileQueue.truncateDirtyFiles(phyOffset);

    // Clear ConsumeQueue redundant data
    this.defaultMessageStore.truncateDirtyLogicFiles(phyOffset);
}
 
示例8
private long getInSyncOffsetSaved() {
    long offset = -1;
    String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.messageStoreConfig.getStorePathRootDir());
    try {
        File file = new File(fileName);
        if (file.exists()) {
            String offsetStr = MixAll.file2String(fileName);
            if (offsetStr != null) {
                offset = Long.valueOf(offsetStr);
            }
            file.delete();
        }
    } catch (Exception ex) {
        log.error("get offset in sync failed", ex);
    }
    return offset;
}
 
示例9
@Test
public void testTruncate() throws Exception {
    String topic = "truncateTopic";

    for (int i = 0; i < 1000; i++) {
        MessageExtBrokerInner messageExtBrokerInner = buildMessage();
        messageExtBrokerInner.setTopic(topic);
        messageExtBrokerInner.setQueueId(0);
        messageStore.putMessage(messageExtBrokerInner);
    }
    if (messageStore instanceof DefaultMessageStore) {
        DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore;
        long maxPhyOffset = defaultMessageStore.getMaxPhyOffset();
        String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
        MixAll.string2File(String.valueOf(defaultMessageStore.getMaxPhyOffset() - 100), fileName);

        defaultMessageStore.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
        defaultMessageStore.truncateNotSync();

        assertThat(defaultMessageStore.getMaxPhyOffset()).isEqualTo(maxPhyOffset - 100);
    }
}
 
示例10
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(
            topic,
            queueId,
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
            this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }

    return logic;
}
 
示例11
/**
 * 关闭存储服务
 */
public void shutdown() {
    if (!this.shutdown) {
        this.shutdown = true;

        this.scheduledExecutorService.shutdown();

        try {
            // 等待其他调用停止
            Thread.sleep(1000 * 3);
        } catch (InterruptedException e) {
            log.error("shutdown Exception, ", e);
        }

        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.shutdown();
        }

        this.haService.shutdown();

        this.storeStatsService.shutdown();
        this.indexService.shutdown();
        this.commitLog.shutdown();
        this.reputMessageService.shutdown();
        this.flushConsumeQueueService.shutdown();
        this.allocateMappedFileService.shutdown();
        this.storeCheckpoint.flush();
        this.storeCheckpoint.shutdown();

        if (this.runningFlags.isWriteable()) {
            this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        } else {
            log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
        }
    }

    this.transientStorePool.destroy();
}
 
示例12
public ConsumeQueue(
    final String topic,
    final int queueId,
    final String storePath,
    final int mappedFileSize,
    final DefaultMessageStore defaultMessageStore) {
    this.storePath = storePath;
    this.mappedFileSize = mappedFileSize;
    this.defaultMessageStore = defaultMessageStore;

    this.topic = topic;
    this.queueId = queueId;

    String queueDir = this.storePath
        + File.separator + topic
        + File.separator + queueId;

    this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);

    this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

    if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
        this.consumeQueueExt = new ConsumeQueueExt(
            topic,
            queueId,
            StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
            defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
            defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
        );
    }
}
 
示例13
public IndexService(final DefaultMessageStore store) {
    this.defaultMessageStore = store;
    this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
    this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
    this.storePath =
        StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
 
示例14
/**

     */
    public void shutdown() {
        if (!this.shutdown) {
            this.shutdown = true;

            this.scheduledExecutorService.shutdown();

            try {

                Thread.sleep(1000 * 3);
            } catch (InterruptedException e) {
                log.error("shutdown Exception, ", e);
            }

            if (this.scheduleMessageService != null) {
                this.scheduleMessageService.shutdown();
            }

            this.haService.shutdown();

            this.storeStatsService.shutdown();
            this.indexService.shutdown();
            this.commitLog.shutdown();
            this.reputMessageService.shutdown();
            this.flushConsumeQueueService.shutdown();
            this.allocateMappedFileService.shutdown();
            this.storeCheckpoint.flush();
            this.storeCheckpoint.shutdown();
            this.transactionStateService.shutdown();

            if (this.runningFlags.isWriteable()) {
                this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
            } else {
                log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
            }
        }

        this.transientStorePool.destroy();
    }
 
示例15
public void destroy() {
    this.destroyLogics();
    this.commitLog.destroy();
    this.indexService.destroy();
    this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
 
示例16
@Override
public HashMap<String, String> getRuntimeInfo() {
    HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

    }

    {

        String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
    }

    {
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(result);
        }
    }

    result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
    result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));

    return result;
}
 
示例17
/**
 * @throws IOException
 */
private void createTempFile() throws IOException {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    MappedFile.ensureDirOK(file.getParent());
    boolean result = file.createNewFile();
    log.info(fileName + (result ? " create OK" : " already exists"));
}
 
示例18
private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {

        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();

            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId;
                    try {
                        queueId = Integer.parseInt(fileQueueId.getName());
                    } catch (NumberFormatException e) {
                        continue;
                    }
                    ConsumeQueue logic = new ConsumeQueue(
                        topic,
                        queueId,
                        StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                        this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                        this);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (!logic.load()) {
                        return false;
                    }
                }
            }
        }
    }

    log.info("load logics queue all over, OK");

    return true;
}
 
示例19
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
            topic, //
            queueId, //
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }

    return logic;
}
 
示例20
private String diskUtil() {
    String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

    String storePathLogis =
        StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);

    String storePathIndex =
        StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);

    return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
 
示例21
public ConsumeQueue(
        final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final DefaultMessageStore defaultMessageStore) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.defaultMessageStore = defaultMessageStore;

        this.topic = topic;
        this.queueId = queueId;

        String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;

        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);

        this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

        if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
            this.consumeQueueExt = new ConsumeQueueExt(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
//                消费队列文件扩展大小48M
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
            );
        }
    }
 
示例22
public IndexService(final DefaultMessageStore store) {
    this.defaultMessageStore = store;
    this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
    this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
    this.storePath =
        StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
 
示例23
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
    final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    this.allocateMappedFileService = new AllocateMappedFileService(this);
    this.commitLog = new CommitLog(this);
    this.consumeQueueTable = new ConcurrentHashMap<>(32);

    this.flushConsumeQueueService = new FlushConsumeQueueService();
    this.cleanCommitLogService = new CleanCommitLogService();
    this.cleanConsumeQueueService = new CleanConsumeQueueService();
    this.storeStatsService = new StoreStatsService();
    this.indexService = new IndexService(this);
    this.haService = new HAService(this);

    this.reputMessageService = new ReputMessageService();

    this.scheduleMessageService = new ScheduleMessageService(this);

    this.transientStorePool = new TransientStorePool(messageStoreConfig);

    if (messageStoreConfig.isTransientStorePoolEnable()) {
        this.transientStorePool.init();
    }

    this.allocateMappedFileService.start();

    this.indexService.start();

    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

    File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
    MappedFile.ensureDirOK(file.getParent());
    lockFile = new RandomAccessFile(file, "rw");
}
 
示例24
@Override
public HashMap<String, String> getRuntimeInfo() {
    HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

    }

    {

        String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
    }

    {
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(result);
        }
    }

    result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
    result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));

    return result;
}
 
示例25
public void destroy() {
    this.destroyLogics();
    this.commitLog.destroy();
    this.indexService.destroy();
    this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
 
示例26
@Test
public void testDispatchBehindWhenShutdown() {
    messageStore.shutdown();
    assertTrue(!messageStore.shutDownNormal);
    File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
    assertTrue(file.exists());
}
 
示例27
@Test
public void testDispatchBehindWhenShutdown() {
    messageStore.shutdown();
    assertTrue(!messageStore.shutDownNormal);
    File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
    assertTrue(file.exists());
}
 
示例28
private String diskUtil() {
    String storePathPhysic = this.deFiBrokerController.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

    String storePathLogis =
        StorePathConfigHelper.getStorePathConsumeQueue(this.deFiBrokerController.getMessageStoreConfig().getStorePathRootDir());
    double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);

    String storePathIndex =
        StorePathConfigHelper.getStorePathIndex(this.deFiBrokerController.getMessageStoreConfig().getStorePathRootDir());
    double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);

    return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
 
示例29
/**
 * @throws IOException
 */
public boolean load() {
    boolean result = true;

    try {
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }

        // load Commit Log
        result = result && this.commitLog.load();

        // load Consume Queue
        result = result && this.loadConsumeQueue();

        if (result) {
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

            this.indexService.load(lastExitOK);

            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}
 
示例30
public IndexService(final DefaultMessageStore store) {
    this.defaultMessageStore = store;
    this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
    this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
    this.storePath =
        StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}