预初始化工作线程池以重用连接对象(套接字)
问题内容:
我需要在Java中建立一个工人池,其中每个工人都有自己连接的套接字;当工作线程运行时,它使用套接字,但保持打开状态以备后用。我们之所以决定采用这种方法,是因为与临时创建,连接和销毁套接字相关的开销需要太多开销,因此我们需要一种方法,通过该方法工人池可以通过其套接字连接进行预初始化,以准备在确保套接字资源不受其他线程影响的情况下继续工作(套接字不是线程安全的),因此我们需要遵循以下原则:
public class SocketTask implements Runnable {
Socket socket;
public SocketTask(){
//create + connect socket here
}
public void run(){
//use socket here
}
}
在应用程序启动时,我们希望初始化工作进程,并希望以某种方式初始化套接字连接…
MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
pool.addWorker( new WorkerThread());
当应用程序请求工作时,我们会将任务发送到工作池以立即执行…
pool.queueWork( new SocketTask(..));
更新了工作代码
基于Gray和jontejj的有用评论,我使以下代码可以工作…
套接字任务
public class SocketTask implements Runnable {
private String workDetails;
private static final ThreadLocal<Socket> threadLocal =
new ThreadLocal<Socket>(){
@Override
protected Socket initialValue(){
return new Socket();
}
};
public SocketTask(String details){
this.workDetails = details;
}
public void run(){
Socket s = getSocket(); //gets from threadlocal
//send data on socket based on workDetails, etc.
}
public static Socket getSocket(){
return threadLocal.get();
}
}
执行器服务
ExecutorService threadPool =
Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());
int tasks = 15;
for( int i = 1; i <= tasks; i++){
threadPool.execute(new SocketTask("foobar-" + i));
}
我喜欢这种方法有几个原因…
- 套接字是可用于运行任务的本地对象(通过ThreadLocal),从而消除了并发问题。
- 套接字仅创建一次并保持打开状态,在新任务进入队列时可以重用,从而消除了套接字对象创建/销毁的开销。
问题答案:
一种想法是将Socket
s放入BlockingQueue
。然后,每当需要Socket
线程时take()
,线程就可以从队列中移出,并在线程完成处理Socket
后将put()
其返回队列。
public void run() {
Socket socket = socketQueue.take();
try {
// use the socket ...
} finally {
socketQueue.put(socket);
}
}
这具有其他好处:
- 您可以返回使用
ExecutorService
代码。 - 您可以将套接字通信与结果处理分开。
- 您不需要一对一的对应关系来处理线程和套接字。但是套接字通信可能完成了98%的工作,因此可能没有收益。
- 完成并
ExecutorService
完成后,您可以通过使套接字出队并关闭它们来关闭套接字。
这确实增加了另一个的额外开销,BlockingQueue
但是如果您正在进行Socket
通信,则不会注意到它。
我们认为ThreadFactory不能满足我们的需求…
我认为,如果您使用线程局部变量,则可以使这项工作有效。您的线程工厂将创建一个线程,该线程首先打开套接字,将其存储在本地线程中,然后调用Runnable
arg,该arg使用套接字完成所有工作,从ExecutorService
内部队列中取出作业。一旦完成,该arg.run()
方法将完成,您可以从本地线程获取套接字并关闭它。
类似于以下内容。有点混乱,但您应该明白。
ExecutorService threadPool =
Executors.newFixedThreadPool(10,
new ThreadFactory() {
public Thread newThread(final Runnable r) {
Thread thread = new Thread(new Runnable() {
public void run() {
openSocketAndStoreInThreadLocal();
// our tasks would then get the socket from the thread-local
r.run();
getSocketFromThreadLocalAndCloseIt();
}
});
return thread;
}
}));
因此,您的任务将实现Runnable
并如下所示:
public SocketWorker implements Runnable {
private final ThreadLocal<Socket> threadLocal;
public SocketWorker(ThreadLocal<Socket> threadLocal) {
this.threadLocal = threadLocal;
}
public void run() {
Socket socket = threadLocal.get();
// use the socket ...
}
}