Java源码示例:org.apache.mina.filter.executor.OrderedThreadPoolExecutor
示例1
public void startListner(IoHandler iohandler,int listenPort) throws Exception{
acceptor = new NioSocketAcceptor();
acceptor.setBacklog(100);
acceptor.setReuseAddress(true);
acceptor.setHandler(iohandler);
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
IoFilter protocol = new ProtocolCodecFilter(new GameProtocolcodecFactory());
chain.addLast("codec", protocol);
threadpool = new OrderedThreadPoolExecutor(500);
threadpool.setThreadFactory(new ServerThreadFactory("OrderedThreadPool"));
chain.addLast("threadPool", new ExecutorFilter(threadpool));
int recsize = 5120;
int sendsize = 40480;
int timeout = 10;
SocketSessionConfig sc = acceptor.getSessionConfig();
sc.setReuseAddress(true);// 设置每一个非主监听连接的端口可以重用
sc.setReceiveBufferSize(recsize);// 设置输入缓冲区的大小
sc.setSendBufferSize(sendsize);// 设置输出缓冲区的大小
sc.setTcpNoDelay(true);// flush函数的调用 设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出
sc.setSoLinger(0);
sc.setIdleTime(IdleStatus.READER_IDLE, timeout);
acceptor.bind(new InetSocketAddress(listenPort));
}
示例2
@Override
public void run() {
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
chain.addLast("codec", new HttpServerCodecImpl());
// // 线程队列池
OrderedThreadPoolExecutor threadpool = new OrderedThreadPoolExecutor(minaServerConfig.getOrderedThreadPoolExecutorSize());
chain.addLast("threadPool", new ExecutorFilter(threadpool));
acceptor.setReuseAddress(minaServerConfig.isReuseAddress()); // 允许地址重用
SocketSessionConfig sc = acceptor.getSessionConfig();
sc.setReuseAddress(minaServerConfig.isReuseAddress());
sc.setReceiveBufferSize(minaServerConfig.getMaxReadSize());
sc.setSendBufferSize(minaServerConfig.getSendBufferSize());
sc.setTcpNoDelay(minaServerConfig.isTcpNoDelay());
sc.setSoLinger(minaServerConfig.getSoLinger());
sc.setIdleTime(IdleStatus.READER_IDLE, minaServerConfig.getReaderIdleTime());
sc.setIdleTime(IdleStatus.WRITER_IDLE, minaServerConfig.getWriterIdleTime());
acceptor.setHandler(ioHandler);
try {
acceptor.bind(new InetSocketAddress(minaServerConfig.getHttpPort()));
LOG.warn("已开始监听HTTP端口:{}", minaServerConfig.getHttpPort());
} catch (IOException e) {
SysUtil.exit(getClass(), e, "监听HTTP端口:{}已被占用", minaServerConfig.getHttpPort());
}
}
示例3
/** {@inheritDoc} */
@Override
public void run() {
synchronized (this) {
if (!isRunning) {
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
if (factory == null) {
factory = new DefaultProtocolCodecFactory();
}
factory.getDecoder().setMaxReadSize(minaServerConfig.getMaxReadSize());
factory.getEncoder().setMaxScheduledWriteMessages(minaServerConfig.getMaxScheduledWriteMessages());
chain.addLast("codec", new ProtocolCodecFilter(factory));
threadpool = new OrderedThreadPoolExecutor(minaServerConfig.getOrderedThreadPoolExecutorSize());
chain.addLast("threadPool", new ExecutorFilter(threadpool));
if(filters != null){
filters.forEach((key, filter)->chain.addLast(key, filter));
}
DatagramSessionConfig dc = acceptor.getSessionConfig();
dc.setReuseAddress(minaServerConfig.isReuseAddress());
dc.setReceiveBufferSize(minaServerConfig.getReceiveBufferSize());
dc.setSendBufferSize(minaServerConfig.getSendBufferSize());
dc.setIdleTime(IdleStatus.READER_IDLE, minaServerConfig.getReaderIdleTime());
dc.setIdleTime(IdleStatus.WRITER_IDLE, minaServerConfig.getWriterIdleTime());
dc.setBroadcast(true);
dc.setCloseOnPortUnreachable(true);
acceptor.setHandler(ioHandler);
try {
acceptor.bind(new InetSocketAddress(minaServerConfig.getPort()));
LOGGER.warn("已开始监听UDP端口:{}", minaServerConfig.getPort());
} catch (IOException e) {
LOGGER.warn("监听UDP端口:{}已被占用", minaServerConfig.getPort());
LOGGER.error("UDP, 服务异常", e);
}
}
}
}
示例4
/** {@inheritDoc} */
@Override
public void run() {
synchronized (this) {
if (!isRunning) {
isRunning = true;
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
if (factory == null) {
factory = new DefaultProtocolCodecFactory();
}
if (factory instanceof DefaultProtocolCodecFactory) {
ProtocolCodecFactoryImpl defaultFactory = (ProtocolCodecFactoryImpl) factory;
defaultFactory.getDecoder().setMaxReadSize(minaServerConfig.getMaxReadSize());
defaultFactory.getEncoder()
.setMaxScheduledWriteMessages(minaServerConfig.getMaxScheduledWriteMessages());
}
chain.addLast("codec", new ProtocolCodecFilter(factory));
threadpool = new OrderedThreadPoolExecutor(minaServerConfig.getOrderedThreadPoolExecutorSize());
chain.addLast("threadPool", new ExecutorFilter(threadpool));
if (filters != null) {
filters.forEach((key, filter) -> {
if ("ssl".equalsIgnoreCase(key) || "tls".equalsIgnoreCase(key)) { // ssl过滤器必须添加到首部
chain.addFirst(key, filter);
} else {
chain.addLast(key, filter);
}
});
}
acceptor.setReuseAddress(minaServerConfig.isReuseAddress()); // 允许地址重用
SocketSessionConfig sc = acceptor.getSessionConfig();
sc.setReuseAddress(minaServerConfig.isReuseAddress());
sc.setReceiveBufferSize(minaServerConfig.getReceiveBufferSize());
sc.setSendBufferSize(minaServerConfig.getSendBufferSize());
sc.setTcpNoDelay(minaServerConfig.isTcpNoDelay());
sc.setSoLinger(minaServerConfig.getSoLinger());
sc.setIdleTime(IdleStatus.READER_IDLE, minaServerConfig.getReaderIdleTime());
sc.setIdleTime(IdleStatus.WRITER_IDLE, minaServerConfig.getWriterIdleTime());
acceptor.setHandler(ioHandler);
try {
acceptor.bind(new InetSocketAddress(minaServerConfig.getPort()));
log.warn("已开始监听TCP端口:{}", minaServerConfig.getPort());
} catch (IOException e) {
log.warn("监听TCP端口:{}已被占用", minaServerConfig.getPort());
log.error("TCP 服务异常", e);
}
}
}
}
示例5
@Override
public void run()
{
while ( !stop )
{
// wait polling time
try
{
Thread.sleep( pollingInterval );
}
catch ( InterruptedException e )
{
Log.trace("Sleep interrupted");
}
long tmpMsgWritten = 0L;
long tmpMsgRead = 0L;
long tmpBytesWritten = 0L;
long tmpBytesRead = 0L;
long tmpScheduledWrites = 0L;
long tmpQueuevedEvents = 0L;
for (IoSession session : polledSessions)
{
// upadating individual session statistics
IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
long currentTimestamp = System.currentTimeMillis();
// Calculate delta
float pollDelta = (currentTimestamp - sessStat.lastPollingTime) / 1000f;
// Store last polling time of this session
sessStat.lastPollingTime = currentTimestamp;
long readBytes = session.getReadBytes();
long writtenBytes = session.getWrittenBytes();
long readMessages = session.getReadMessages();
long writtenMessages = session.getWrittenMessages();
sessStat.byteReadThroughput = (readBytes - sessStat.lastByteRead) / pollDelta;
sessStat.byteWrittenThroughput = (writtenBytes - sessStat.lastByteWrite) / pollDelta;
sessStat.messageReadThroughput = (readMessages - sessStat.lastMessageRead) / pollDelta;
sessStat.messageWrittenThroughput = (writtenMessages - sessStat.lastMessageWrite) / pollDelta;
tmpMsgWritten += (writtenMessages - sessStat.lastMessageWrite);
tmpMsgRead += (readMessages - sessStat.lastMessageRead);
tmpBytesWritten += (writtenBytes - sessStat.lastByteWrite);
tmpBytesRead += (readBytes - sessStat.lastByteRead);
tmpScheduledWrites += session.getScheduledWriteMessages();
ExecutorFilter executorFilter =
(ExecutorFilter) session.getFilterChain().get(EXECUTOR_FILTER_NAME);
if (executorFilter != null) {
Executor executor = executorFilter.getExecutor();
if (executor instanceof OrderedThreadPoolExecutor) {
tmpQueuevedEvents += ((OrderedThreadPoolExecutor) executor).getActiveCount();
}
}
sessStat.lastByteRead = readBytes;
sessStat.lastByteWrite = writtenBytes;
sessStat.lastMessageRead = readMessages;
sessStat.lastMessageWrite = writtenMessages;
}
totalMsgWritten.addAndGet(tmpMsgWritten);
totalMsgRead.addAndGet(tmpMsgRead);
totalBytesWritten.addAndGet(tmpBytesWritten);
totalBytesRead.addAndGet(tmpBytesRead);
totalScheduledWrites.set(tmpScheduledWrites);
totalQueuedEvents.set(tmpQueuevedEvents);
}
}