Java源码示例:org.apache.rocketmq.store.config.StorePathConfigHelper
示例1
public void saveInSyncOffset() {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
return;
}
long minInSyncOffset = getMinOffsetInSync();
if (minInSyncOffset == -1) {
return;
}
String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(String.valueOf(minInSyncOffset), fileName);
} catch (IOException e) {
log.error("save phy offset slave reported [{}] exception", fileName, e);
}
log.info("save slave min offset in sync:{}", minInSyncOffset);
}
示例2
public void initInSyncOffset(long offset) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
return;
}
String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
File file = new File(fileName);
if (file.exists()) {
log.info("as master before, no need to sync offset");
return;
}
try {
MixAll.string2File(String.valueOf(offset), fileName);
} catch (IOException e) {
log.error("save phy offset slave reported [{}] exception", fileName, e);
}
}
示例3
public void truncate(long phyOffset) {
String backupDirPath = StorePathConfigHelper.getBackupStoreSubDirPath(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir(),
String.valueOf(defaultMessageStore.getSystemClock().now()));
File dir = new File(backupDirPath);
if (!dir.exists()) {
dir.mkdirs();
}
this.mappedFileQueue.backupFiles(phyOffset, backupDirPath);
this.mappedFileQueue.setFlushedWhere(phyOffset);
this.mappedFileQueue.setCommittedWhere(phyOffset);
this.mappedFileQueue.truncateDirtyFiles(phyOffset);
// Clear ConsumeQueue redundant data
this.defaultMessageStore.truncateDirtyLogicFiles(phyOffset);
}
示例4
private long getInSyncOffsetSaved() {
long offset = -1;
String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.messageStoreConfig.getStorePathRootDir());
try {
File file = new File(fileName);
if (file.exists()) {
String offsetStr = MixAll.file2String(fileName);
if (offsetStr != null) {
offset = Long.valueOf(offsetStr);
}
file.delete();
}
} catch (Exception ex) {
log.error("get offset in sync failed", ex);
}
return offset;
}
示例5
public void backupFiles(long offset, String backupPath) {
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
file.flush(0);
try {
Path source = Paths.get(file.getFileName());
Path target = Paths.get(StorePathConfigHelper.getBackupStoreFilePath(backupPath, new File(file.getFileName()).getName()));
Files.copy(source, target);
log.info("backup file:{} to {}", file.getFileName(), target);
} catch (Exception ex) {
log.error("backup file failed", ex);
}
}
}
}
示例6
public void saveInSyncOffset() {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
return;
}
long minInSyncOffset = getMinOffsetInSync();
if (minInSyncOffset == -1) {
return;
}
String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(String.valueOf(minInSyncOffset), fileName);
} catch (IOException e) {
log.error("save phy offset slave reported [{}] exception", fileName, e);
}
log.info("save slave min offset in sync:{}", minInSyncOffset);
}
示例7
public void truncate(long phyOffset) {
String backupDirPath = StorePathConfigHelper.getBackupStoreSubDirPath(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir(),
String.valueOf(defaultMessageStore.getSystemClock().now()));
File dir = new File(backupDirPath);
if (!dir.exists()) {
dir.mkdirs();
}
this.mappedFileQueue.backupFiles(phyOffset, backupDirPath);
this.mappedFileQueue.setFlushedWhere(phyOffset);
this.mappedFileQueue.setCommittedWhere(phyOffset);
this.mappedFileQueue.truncateDirtyFiles(phyOffset);
// Clear ConsumeQueue redundant data
this.defaultMessageStore.truncateDirtyLogicFiles(phyOffset);
}
示例8
private long getInSyncOffsetSaved() {
long offset = -1;
String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(this.messageStoreConfig.getStorePathRootDir());
try {
File file = new File(fileName);
if (file.exists()) {
String offsetStr = MixAll.file2String(fileName);
if (offsetStr != null) {
offset = Long.valueOf(offsetStr);
}
file.delete();
}
} catch (Exception ex) {
log.error("get offset in sync failed", ex);
}
return offset;
}
示例9
@Test
public void testTruncate() throws Exception {
String topic = "truncateTopic";
for (int i = 0; i < 1000; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
if (messageStore instanceof DefaultMessageStore) {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore) messageStore;
long maxPhyOffset = defaultMessageStore.getMaxPhyOffset();
String fileName = StorePathConfigHelper.getOffsetInSyncStorePath(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
MixAll.string2File(String.valueOf(defaultMessageStore.getMaxPhyOffset() - 100), fileName);
defaultMessageStore.getMessageStoreConfig().setBrokerRole(BrokerRole.SLAVE);
defaultMessageStore.truncateNotSync();
assertThat(defaultMessageStore.getMaxPhyOffset()).isEqualTo(maxPhyOffset - 100);
}
}
示例10
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
示例11
/**
* 关闭存储服务
*/
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
try {
// 等待其他调用停止
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
log.error("shutdown Exception, ", e);
}
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
this.storeStatsService.shutdown();
this.indexService.shutdown();
this.commitLog.shutdown();
this.reputMessageService.shutdown();
this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
if (this.runningFlags.isWriteable()) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
}
this.transientStorePool.destroy();
}
示例12
public ConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
this.topic = topic;
this.queueId = queueId;
String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}
示例13
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
示例14
/**
*/
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
log.error("shutdown Exception, ", e);
}
if (this.scheduleMessageService != null) {
this.scheduleMessageService.shutdown();
}
this.haService.shutdown();
this.storeStatsService.shutdown();
this.indexService.shutdown();
this.commitLog.shutdown();
this.reputMessageService.shutdown();
this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
this.transactionStateService.shutdown();
if (this.runningFlags.isWriteable()) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
}
this.transientStorePool.destroy();
}
示例15
public void destroy() {
this.destroyLogics();
this.commitLog.destroy();
this.indexService.destroy();
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
示例16
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
{
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
{
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
}
}
result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
return result;
}
示例17
/**
* @throws IOException
*/
private void createTempFile() throws IOException {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
boolean result = file.createNewFile();
log.info(fileName + (result ? " create OK" : " already exists"));
}
示例18
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId;
try {
queueId = Integer.parseInt(fileQueueId.getName());
} catch (NumberFormatException e) {
continue;
}
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
示例19
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(//
topic, //
queueId, //
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
示例20
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
示例21
public ConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
this.topic = topic;
this.queueId = queueId;
String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
// 消费队列文件扩展大小48M
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}
示例22
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
示例23
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.commitLog = new CommitLog(this);
this.consumeQueueTable = new ConcurrentHashMap<>(32);
this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
this.haService = new HAService(this);
this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this);
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
this.allocateMappedFileService.start();
this.indexService.start();
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");
}
示例24
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
}
{
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
{
if (this.scheduleMessageService != null) {
this.scheduleMessageService.buildRunningStats(result);
}
}
result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
return result;
}
示例25
public void destroy() {
this.destroyLogics();
this.commitLog.destroy();
this.indexService.destroy();
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
示例26
@Test
public void testDispatchBehindWhenShutdown() {
messageStore.shutdown();
assertTrue(!messageStore.shutDownNormal);
File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
assertTrue(file.exists());
}
示例27
@Test
public void testDispatchBehindWhenShutdown() {
messageStore.shutdown();
assertTrue(!messageStore.shutDownNormal);
File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
assertTrue(file.exists());
}
示例28
private String diskUtil() {
String storePathPhysic = this.deFiBrokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.deFiBrokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
StorePathConfigHelper.getStorePathIndex(this.deFiBrokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
示例29
/**
* @throws IOException
*/
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
示例30
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
this.storePath =
StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}