Java源码示例:org.apache.mina.filter.executor.OrderedThreadPoolExecutor

示例1
public  void startListner(IoHandler iohandler,int listenPort) throws Exception{
	acceptor = new NioSocketAcceptor();
	acceptor.setBacklog(100);
	acceptor.setReuseAddress(true);
	acceptor.setHandler(iohandler);
	
       DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
       IoFilter protocol = new ProtocolCodecFilter(new GameProtocolcodecFactory());
       chain.addLast("codec", protocol);
	threadpool = new OrderedThreadPoolExecutor(500);
	threadpool.setThreadFactory(new ServerThreadFactory("OrderedThreadPool"));
	chain.addLast("threadPool", new ExecutorFilter(threadpool));
	
	int recsize = 5120;
	int sendsize = 40480;                                                                                         
	int timeout = 10;
	SocketSessionConfig sc = acceptor.getSessionConfig();
	sc.setReuseAddress(true);// 设置每一个非主监听连接的端口可以重用
	sc.setReceiveBufferSize(recsize);// 设置输入缓冲区的大小
	sc.setSendBufferSize(sendsize);// 设置输出缓冲区的大小
	sc.setTcpNoDelay(true);// flush函数的调用 设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出   
	sc.setSoLinger(0);
	sc.setIdleTime(IdleStatus.READER_IDLE, timeout);
	acceptor.bind(new InetSocketAddress(listenPort));
}
 
示例2
@Override
public void run() {
	DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
	chain.addLast("codec", new HttpServerCodecImpl());

	// // 线程队列池
	OrderedThreadPoolExecutor threadpool = new OrderedThreadPoolExecutor(minaServerConfig.getOrderedThreadPoolExecutorSize());
	chain.addLast("threadPool", new ExecutorFilter(threadpool));

	acceptor.setReuseAddress(minaServerConfig.isReuseAddress()); // 允许地址重用

	SocketSessionConfig sc = acceptor.getSessionConfig();
	sc.setReuseAddress(minaServerConfig.isReuseAddress());
	sc.setReceiveBufferSize(minaServerConfig.getMaxReadSize());
	sc.setSendBufferSize(minaServerConfig.getSendBufferSize());
	sc.setTcpNoDelay(minaServerConfig.isTcpNoDelay());
	sc.setSoLinger(minaServerConfig.getSoLinger());
	sc.setIdleTime(IdleStatus.READER_IDLE, minaServerConfig.getReaderIdleTime());
	sc.setIdleTime(IdleStatus.WRITER_IDLE, minaServerConfig.getWriterIdleTime());

	acceptor.setHandler(ioHandler);

	try {
		acceptor.bind(new InetSocketAddress(minaServerConfig.getHttpPort()));
		LOG.warn("已开始监听HTTP端口:{}", minaServerConfig.getHttpPort());
	} catch (IOException e) {
		SysUtil.exit(getClass(), e, "监听HTTP端口:{}已被占用", minaServerConfig.getHttpPort());
	}
}
 
示例3
/** {@inheritDoc} */
@Override
public void run() {
	synchronized (this) {
		if (!isRunning) {
			DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
			if (factory == null) {
				factory = new DefaultProtocolCodecFactory();
			}
			factory.getDecoder().setMaxReadSize(minaServerConfig.getMaxReadSize());
			factory.getEncoder().setMaxScheduledWriteMessages(minaServerConfig.getMaxScheduledWriteMessages());
			chain.addLast("codec", new ProtocolCodecFilter(factory));
			threadpool = new OrderedThreadPoolExecutor(minaServerConfig.getOrderedThreadPoolExecutorSize());
			chain.addLast("threadPool", new ExecutorFilter(threadpool));
			if(filters != null){
                   filters.forEach((key, filter)->chain.addLast(key, filter));
			}

			DatagramSessionConfig dc = acceptor.getSessionConfig();
			dc.setReuseAddress(minaServerConfig.isReuseAddress());
			dc.setReceiveBufferSize(minaServerConfig.getReceiveBufferSize());
			dc.setSendBufferSize(minaServerConfig.getSendBufferSize());
			dc.setIdleTime(IdleStatus.READER_IDLE, minaServerConfig.getReaderIdleTime());
			dc.setIdleTime(IdleStatus.WRITER_IDLE, minaServerConfig.getWriterIdleTime());
			dc.setBroadcast(true);
			dc.setCloseOnPortUnreachable(true);

			acceptor.setHandler(ioHandler);
			try {
				acceptor.bind(new InetSocketAddress(minaServerConfig.getPort()));
				LOGGER.warn("已开始监听UDP端口:{}", minaServerConfig.getPort());
			} catch (IOException e) {
				LOGGER.warn("监听UDP端口:{}已被占用", minaServerConfig.getPort());
				LOGGER.error("UDP, 服务异常", e);
			}
		}
	}
}
 
示例4
/** {@inheritDoc} */
@Override
public void run() {
    synchronized (this) {
        if (!isRunning) {
            isRunning = true;
            DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
            if (factory == null) {
                factory = new DefaultProtocolCodecFactory();
            }

            if (factory instanceof DefaultProtocolCodecFactory) {
                ProtocolCodecFactoryImpl defaultFactory = (ProtocolCodecFactoryImpl) factory;
                defaultFactory.getDecoder().setMaxReadSize(minaServerConfig.getMaxReadSize());
                defaultFactory.getEncoder()
                              .setMaxScheduledWriteMessages(minaServerConfig.getMaxScheduledWriteMessages());
            }

            chain.addLast("codec", new ProtocolCodecFilter(factory));
            threadpool = new OrderedThreadPoolExecutor(minaServerConfig.getOrderedThreadPoolExecutorSize());
            chain.addLast("threadPool", new ExecutorFilter(threadpool));
            if (filters != null) {
                filters.forEach((key, filter) -> {
                    if ("ssl".equalsIgnoreCase(key) || "tls".equalsIgnoreCase(key)) { // ssl过滤器必须添加到首部
                        chain.addFirst(key, filter);
                    } else {
                        chain.addLast(key, filter);
                    }
                });
            }

            acceptor.setReuseAddress(minaServerConfig.isReuseAddress()); // 允许地址重用

            SocketSessionConfig sc = acceptor.getSessionConfig();
            sc.setReuseAddress(minaServerConfig.isReuseAddress());
            sc.setReceiveBufferSize(minaServerConfig.getReceiveBufferSize());
            sc.setSendBufferSize(minaServerConfig.getSendBufferSize());
            sc.setTcpNoDelay(minaServerConfig.isTcpNoDelay());
            sc.setSoLinger(minaServerConfig.getSoLinger());
            sc.setIdleTime(IdleStatus.READER_IDLE, minaServerConfig.getReaderIdleTime());
            sc.setIdleTime(IdleStatus.WRITER_IDLE, minaServerConfig.getWriterIdleTime());

            acceptor.setHandler(ioHandler);

            try {
                acceptor.bind(new InetSocketAddress(minaServerConfig.getPort()));
                log.warn("已开始监听TCP端口:{}", minaServerConfig.getPort());
            } catch (IOException e) {
                log.warn("监听TCP端口:{}已被占用", minaServerConfig.getPort());
                log.error("TCP 服务异常", e);
            }
        }
    }
}
 
示例5
@Override 
public void run()
{
    while ( !stop )
    {
        // wait polling time
        try
        {
            Thread.sleep( pollingInterval );
        }
        catch ( InterruptedException e )
        {
            Log.trace("Sleep interrupted");
        }

        long tmpMsgWritten = 0L;
        long tmpMsgRead = 0L;
        long tmpBytesWritten = 0L;
        long tmpBytesRead = 0L;
        long tmpScheduledWrites = 0L;
        long tmpQueuevedEvents = 0L;

        for (IoSession session : polledSessions)
        {
            // upadating individual session statistics
            IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );

            long currentTimestamp = System.currentTimeMillis();
            // Calculate delta
            float pollDelta = (currentTimestamp - sessStat.lastPollingTime) / 1000f;
            // Store last polling time of this session
            sessStat.lastPollingTime = currentTimestamp;

            long readBytes = session.getReadBytes();
            long writtenBytes = session.getWrittenBytes();
            long readMessages = session.getReadMessages();
            long writtenMessages = session.getWrittenMessages();
            sessStat.byteReadThroughput = (readBytes - sessStat.lastByteRead) / pollDelta;
            sessStat.byteWrittenThroughput = (writtenBytes - sessStat.lastByteWrite) / pollDelta;
            sessStat.messageReadThroughput = (readMessages - sessStat.lastMessageRead) / pollDelta;
            sessStat.messageWrittenThroughput = (writtenMessages - sessStat.lastMessageWrite) / pollDelta;

            tmpMsgWritten += (writtenMessages - sessStat.lastMessageWrite);
            tmpMsgRead += (readMessages - sessStat.lastMessageRead);
            tmpBytesWritten += (writtenBytes - sessStat.lastByteWrite);
            tmpBytesRead += (readBytes - sessStat.lastByteRead);
            tmpScheduledWrites += session.getScheduledWriteMessages();

            ExecutorFilter executorFilter =
                    (ExecutorFilter) session.getFilterChain().get(EXECUTOR_FILTER_NAME);
            if (executorFilter != null) {
                Executor executor =  executorFilter.getExecutor();
                if (executor instanceof OrderedThreadPoolExecutor) {
                    tmpQueuevedEvents += ((OrderedThreadPoolExecutor) executor).getActiveCount();
                }
            }

            sessStat.lastByteRead = readBytes;
            sessStat.lastByteWrite = writtenBytes;
            sessStat.lastMessageRead = readMessages;
            sessStat.lastMessageWrite = writtenMessages;

        }

        totalMsgWritten.addAndGet(tmpMsgWritten);
        totalMsgRead.addAndGet(tmpMsgRead);
        totalBytesWritten.addAndGet(tmpBytesWritten);
        totalBytesRead.addAndGet(tmpBytesRead);
        totalScheduledWrites.set(tmpScheduledWrites);
        totalQueuedEvents.set(tmpQueuevedEvents);
    }
}