序
本文主要研究一下spring-data-redis的连接池的校验
lettuce
LettucePoolingConnectionProvider
spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean { private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class); private final LettuceConnectionProvider connectionProvider; private final GenericObjectPoolConfig poolConfig; private final Map, GenericObjectPool >> poolRef = new ConcurrentHashMap(32); private final Map , GenericObjectPool >> pools = new ConcurrentHashMap(32); LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) { Assert.notNull(connectionProvider, "ConnectionProvider must not be null!"); Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); this.connectionProvider = connectionProvider; this.poolConfig = clientConfiguration.getPoolConfig(); } public > T getConnection(Class connectionType) { GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> { return ConnectionPoolSupport.createGenericObjectPool(() -> { return this.connectionProvider.getConnection(connectionType); }, this.poolConfig, false); }); try { StatefulConnection connection = (StatefulConnection)pool.borrowObject(); this.poolRef.put(connection, pool); return (StatefulConnection)connectionType.cast(connection); } catch (Exception var4) { throw new PoolException("Could not get a resource from the pool", var4); } } public AbstractRedisClient getRedisClient() { if (this.connectionProvider instanceof RedisClientProvider) { return ((RedisClientProvider)this.connectionProvider).getRedisClient(); } else { throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName())); } } public void release(StatefulConnection connection) { GenericObjectPool > pool = (GenericObjectPool)this.poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } else { pool.returnObject(connection); } } public void destroy() throws Exception { if (!this.poolRef.isEmpty()) { log.warn("LettucePoolingConnectionProvider contains unreleased connections"); this.poolRef.forEach((connection, pool) -> { pool.returnObject(connection); }); this.poolRef.clear(); } this.pools.forEach((type, pool) -> { pool.close(); }); this.pools.clear(); }}
- 这里调用ConnectionPoolSupport.createGenericObjectPool来创建连接池
ConnectionPoolSupport.createGenericObjectPool
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java
public static> GenericObjectPool createGenericObjectPool( Supplier connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); AtomicReference > poolRef = new AtomicReference<>(); GenericObjectPool pool = new GenericObjectPool (new RedisPooledObjectFactory (connectionSupplier), config) { @Override public T borrowObject() throws Exception { return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject(); } @Override public void returnObject(T obj) { if (wrapConnections && obj instanceof HasTargetConnection) { super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); return; } super.returnObject(obj); } }; poolRef.set(pool); return pool; }
- 这里使用了RedisPooledObjectFactory
ConnectionPoolSupport.RedisPooledObjectFactory
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java
private static class RedisPooledObjectFactory> extends BasePooledObjectFactory { private final Supplier connectionSupplier; RedisPooledObjectFactory(Supplier connectionSupplier) { this.connectionSupplier = connectionSupplier; } @Override public T create() throws Exception { return connectionSupplier.get(); } @Override public void destroyObject(PooledObject p) throws Exception { p.getObject().close(); } @Override public PooledObject wrap(T obj) { return new DefaultPooledObject<>(obj); } @Override public boolean validateObject(PooledObject p) { return p.getObject().isOpen(); } }
- 这里继承了BasePooledObjectFactory,重写了validate等方法,这里validate是通过isOpen来判断
RedisChannelHandler.isOpen
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java
public abstract class RedisChannelHandlerimplements Closeable, ConnectionFacade { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); private Duration timeout; private CloseEvents closeEvents = new CloseEvents(); private final RedisChannelWriter channelWriter; private final boolean debugEnabled = logger.isDebugEnabled(); private volatile boolean closed; private volatile boolean active = true; private volatile ClientOptions clientOptions; //...... /** * Notification when the connection becomes active (connected). */ public void activated() { active = true; closed = false; } /** * Notification when the connection becomes inactive (disconnected). */ public void deactivated() { active = false; } /** * * @return true if the connection is active and not closed. */ public boolean isOpen() { return active; } @Override public synchronized void close() { if (debugEnabled) { logger.debug("close()"); } if (closed) { logger.warn("Connection is already closed"); return; } if (!closed) { active = false; closed = true; channelWriter.close(); closeEvents.fireEventClosed(this); closeEvents = new CloseEvents(); } }}
- isOpen是通过active字段来判断的,而active在deactivated或者close的时候变为false,初始化以及在activated的时候变为true
- 可以看到对于docker pause这种造成的timeout,active这种方式检测不出来
LettuceConnectionFactory.SharedConnection.validateConnection
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
/** * Validate the connection. Invalid connections will be closed and the connection state will be reset. */ void validateConnection() { synchronized (this.connectionMonitor) { boolean valid = false; if (connection != null && connection.isOpen()) { try { if (connection instanceof StatefulRedisConnection) { ((StatefulRedisConnection) connection).sync().ping(); } if (connection instanceof StatefulRedisClusterConnection) { ((StatefulRedisConnection) connection).sync().ping(); } valid = true; } catch (Exception e) { log.debug("Validation failed", e); } } if (!valid) { if (connection != null) { connectionProvider.release(connection); } log.warn("Validation of shared connection failed. Creating a new connection."); resetConnection(); this.connection = getNativeConnection(); } } }
- 这个是默认开启LettuceConnectionFactory的shareNativeConnection走的获取连接的方法
- 如果LettuceConnectionFactory的validateConnection为true的话(
默认为false
),则会自己在每次get的时候执行一下validateConnection
DefaultLettucePool.LettuceFactory
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java
private static class LettuceFactory extends BasePooledObjectFactory> { private final RedisClient client; private int dbIndex; public LettuceFactory(RedisClient client, int dbIndex) { this.client = client; this.dbIndex = dbIndex; } public void activateObject(PooledObject > pooledObject) throws Exception { if (pooledObject.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex); } } public void destroyObject(PooledObject > obj) throws Exception { try { ((StatefulConnection)obj.getObject()).close(); } catch (Exception var3) { ; } } public boolean validateObject(PooledObject > obj) { try { if (obj.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)obj.getObject()).sync().ping(); } return true; } catch (Exception var3) { return false; } } public StatefulConnection create() throws Exception { return this.client.connect(LettuceConnection.CODEC); } public PooledObject > wrap(StatefulConnection obj) { return new DefaultPooledObject(obj); } }
- 被废弃的DefaultLettucePool里头有个LettuceFactory,其validate是通过ping来判断的,因而更为准确
jedis
JedisConnectionFactory
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { //...... private PoolcreatePool() { if (isRedisSentinelAware()) { return createRedisSentinelPool(this.sentinelConfig); } return createRedisPool(); } /** * Creates {@link JedisSentinelPool}. * * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}. * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool createRedisSentinelPool(RedisSentinelConfiguration config) { GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig(); return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()), poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName()); } /** * Creates {@link JedisPool}. * * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool createRedisPool() { return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName(), isUseSsl(), clientConfiguration.getSslSocketFactory().orElse(null), // clientConfiguration.getSslParameters().orElse(null), // clientConfiguration.getHostnameVerifier().orElse(null)); } //......}
- 不管是JedisPool还是JedisSentinelPool,里头使用的是JedisFactory
JedisFactory.validateObject
jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java
class JedisFactory implements PooledObjectFactory{ private final AtomicReference hostAndPort = new AtomicReference (); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; //...... @Override public boolean validateObject(PooledObject pooledJedis) { final BinaryJedis jedis = pooledJedis.getObject(); try { HostAndPort hostAndPort = this.hostAndPort.get(); String connectionHost = jedis.getClient().getHost(); int connectionPort = jedis.getClient().getPort(); return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort && jedis.isConnected() && jedis.ping().equals("PONG"); } catch (final Exception e) { return false; } }}
- JedisFactory实现了PooledObjectFactory接口,其validateObject方法不仅校验isConnected,而且也校验了ping方法
- ping方法只要超时就会抛出异常,从而校验失败,因而可以感知到docker pause带来的timeout,从而将连接从连接池剔除
小结
- spring-date-redis的2.0及以上版本废弃了原来的LettucePool,改为使用LettucePoolingClientConfiguration
- 这里有一个问题,就是旧版连接池校验是采用ping的方式,而新版连接池校验则是使用active字段来标识,对于docker pause识别不出来
- 对于lettuce其shareNativeConnection参数默认为true,且validateConnection为false,第一次从连接池borrow到连接之后,就一直复用底层的连接,也没有归还。如果要每次获取连接都走连接池获取然后归还,需要设置shareNativeConnection为false
- jedis的连接池实现,其validateObject方法不仅校验isConnected,而且也校验了ping方法,因而能够感知到docker pause带来的timeout,从而将连接从连接池剔除
- 对于lettuce来说,如果要识别docker pause的异常,有两个方案,一个是修复ConnectionPoolSupport中RedisPooledObjectFactory的validateObject方法,不仅判断isOpen,还需要ping一下;另外一个是不开启连接池,并且将LettuceConnectionFactory的validateConnection参数设置为true