Tomcat集群中的Spring Websocket
问题内容:
在当前应用程序中,我们在STOMP上使用Spring
Websockets。我们正在寻求水平扩展。在如何处理多个tomcat实例上的websocket流量以及如何在多个节点上维护会话信息方面,是否有最佳实践?是否有一个可供参考的工作示例?
问题答案:
您的需求可以分为2个子任务:
-
维护多个节点之间的会话信息:您可以尝试由Redis支持的Spring Sessions集群(请参阅:HttpSession with Redis)。这非常简单,并且已经支持Spring Websocket(请参阅:Spring Session&WebSockets)。
-
处理多个tomcat实例上的websocket通信:有几种方法可以做到这一点。
- 第一种方法:使用功能齐全的代理(例如:ActiveMQ)并尝试新功能支持多个WebSocket服务器(来自:4.2.0 RC1)
- 第二种方法:使用功能齐全的代理并实现分布式
UserSessionRegistry
(例如:使用Redis:D)。DefaultUserSessionRegistry
使用内存存储的默认实现。
更新:我使用Redis编写了一个简单的实现,如果您有兴趣,请尝试
要配置功能齐全的代理(代理中继),您可以尝试:
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
...
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost") // broker host
.setRelayPort(61613) // broker port
;
config.setApplicationDestinationPrefixes("/app");
}
@Bean
public UserSessionRegistry userSessionRegistry() {
return new RedisUserSessionRegistry(redisConnectionFactory);
}
...
}
和
import java.util.Set;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.util.Assert;
/**
* An implementation of {@link UserSessionRegistry} backed by Redis.
* @author thanh
*/
public class RedisUserSessionRegistry implements UserSessionRegistry {
/**
* The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
*/
static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";
private final RedisOperations<String, String> sessionRedisOperations;
@SuppressWarnings("unchecked")
public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
this(createDefaultTemplate(redisConnectionFactory));
}
public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
this.sessionRedisOperations = sessionRedisOperations;
}
@Override
public Set<String> getSessionIds(String user) {
Set<String> entries = getSessionBoundHashOperations(user).members();
return (entries != null) ? entries : Collections.<String>emptySet();
}
@Override
public void registerSessionId(String user, String sessionId) {
getSessionBoundHashOperations(user).add(sessionId);
}
@Override
public void unregisterSessionId(String user, String sessionId) {
getSessionBoundHashOperations(user).remove(sessionId);
}
/**
* Gets the {@link BoundHashOperations} to operate on a username
*/
private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
String key = getKey(username);
return this.sessionRedisOperations.boundSetOps(key);
}
/**
* Gets the Hash key for this user by prefixing it appropriately.
*/
static String getKey(String username) {
return BOUNDED_HASH_KEY_PREFIX + username;
}
@SuppressWarnings("rawtypes")
private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "connectionFactory cannot be null");
StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}