博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring-data-redis的连接池的校验
阅读量:5872 次
发布时间:2019-06-19

本文共 13591 字,大约阅读时间需要 45 分钟。

本文主要研究一下spring-data-redis的连接池的校验

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);    private final LettuceConnectionProvider connectionProvider;    private final GenericObjectPoolConfig poolConfig;    private final Map
, GenericObjectPool
>> poolRef = new ConcurrentHashMap(32); private final Map
, GenericObjectPool
>> pools = new ConcurrentHashMap(32); LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) { Assert.notNull(connectionProvider, "ConnectionProvider must not be null!"); Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); this.connectionProvider = connectionProvider; this.poolConfig = clientConfiguration.getPoolConfig(); } public
> T getConnection(Class
connectionType) { GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> { return ConnectionPoolSupport.createGenericObjectPool(() -> { return this.connectionProvider.getConnection(connectionType); }, this.poolConfig, false); }); try { StatefulConnection
connection = (StatefulConnection)pool.borrowObject(); this.poolRef.put(connection, pool); return (StatefulConnection)connectionType.cast(connection); } catch (Exception var4) { throw new PoolException("Could not get a resource from the pool", var4); } } public AbstractRedisClient getRedisClient() { if (this.connectionProvider instanceof RedisClientProvider) { return ((RedisClientProvider)this.connectionProvider).getRedisClient(); } else { throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName())); } } public void release(StatefulConnection
connection) { GenericObjectPool
> pool = (GenericObjectPool)this.poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } else { pool.returnObject(connection); } } public void destroy() throws Exception { if (!this.poolRef.isEmpty()) { log.warn("LettucePoolingConnectionProvider contains unreleased connections"); this.poolRef.forEach((connection, pool) -> { pool.returnObject(connection); }); this.poolRef.clear(); } this.pools.forEach((type, pool) -> { pool.close(); }); this.pools.clear(); }}
  • 这里调用ConnectionPoolSupport.createGenericObjectPool来创建连接池

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

public static 
> GenericObjectPool
createGenericObjectPool( Supplier
connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); AtomicReference
> poolRef = new AtomicReference<>(); GenericObjectPool
pool = new GenericObjectPool
(new RedisPooledObjectFactory
(connectionSupplier), config) { @Override public T borrowObject() throws Exception { return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject(); } @Override public void returnObject(T obj) { if (wrapConnections && obj instanceof HasTargetConnection) { super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); return; } super.returnObject(obj); } }; poolRef.set(pool); return pool; }
  • 这里使用了RedisPooledObjectFactory

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

private static class RedisPooledObjectFactory
> extends BasePooledObjectFactory
{ private final Supplier
connectionSupplier; RedisPooledObjectFactory(Supplier
connectionSupplier) { this.connectionSupplier = connectionSupplier; } @Override public T create() throws Exception { return connectionSupplier.get(); } @Override public void destroyObject(PooledObject
p) throws Exception { p.getObject().close(); } @Override public PooledObject
wrap(T obj) { return new DefaultPooledObject<>(obj); } @Override public boolean validateObject(PooledObject
p) { return p.getObject().isOpen(); } }
  • 这里继承了BasePooledObjectFactory,重写了validate等方法,这里validate是通过isOpen来判断

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java

public abstract class RedisChannelHandler
implements Closeable, ConnectionFacade { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); private Duration timeout; private CloseEvents closeEvents = new CloseEvents(); private final RedisChannelWriter channelWriter; private final boolean debugEnabled = logger.isDebugEnabled(); private volatile boolean closed; private volatile boolean active = true; private volatile ClientOptions clientOptions; //...... /** * Notification when the connection becomes active (connected). */ public void activated() { active = true; closed = false; } /** * Notification when the connection becomes inactive (disconnected). */ public void deactivated() { active = false; } /** * * @return true if the connection is active and not closed. */ public boolean isOpen() { return active; } @Override public synchronized void close() { if (debugEnabled) { logger.debug("close()"); } if (closed) { logger.warn("Connection is already closed"); return; } if (!closed) { active = false; closed = true; channelWriter.close(); closeEvents.fireEventClosed(this); closeEvents = new CloseEvents(); } }}
  • isOpen是通过active字段来判断的,而active在deactivated或者close的时候变为false,初始化以及在activated的时候变为true
  • 可以看到对于docker pause这种造成的timeout,active这种方式检测不出来

LettuceConnectionFactory.SharedConnection.validateConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

/**         * Validate the connection. Invalid connections will be closed and the connection state will be reset.         */        void validateConnection() {            synchronized (this.connectionMonitor) {                boolean valid = false;                if (connection != null && connection.isOpen()) {                    try {                        if (connection instanceof StatefulRedisConnection) {                            ((StatefulRedisConnection) connection).sync().ping();                        }                        if (connection instanceof StatefulRedisClusterConnection) {                            ((StatefulRedisConnection) connection).sync().ping();                        }                        valid = true;                    } catch (Exception e) {                        log.debug("Validation failed", e);                    }                }                if (!valid) {                    if (connection != null) {                        connectionProvider.release(connection);                    }                    log.warn("Validation of shared connection failed. Creating a new connection.");                    resetConnection();                    this.connection = getNativeConnection();                }            }        }
  • 这个是默认开启LettuceConnectionFactory的shareNativeConnection走的获取连接的方法
  • 如果LettuceConnectionFactory的validateConnection为true的话(默认为false),则会自己在每次get的时候执行一下validateConnection

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java

private static class LettuceFactory extends BasePooledObjectFactory
> { private final RedisClient client; private int dbIndex; public LettuceFactory(RedisClient client, int dbIndex) { this.client = client; this.dbIndex = dbIndex; } public void activateObject(PooledObject
> pooledObject) throws Exception { if (pooledObject.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex); } } public void destroyObject(PooledObject
> obj) throws Exception { try { ((StatefulConnection)obj.getObject()).close(); } catch (Exception var3) { ; } } public boolean validateObject(PooledObject
> obj) { try { if (obj.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)obj.getObject()).sync().ping(); } return true; } catch (Exception var3) { return false; } } public StatefulConnection
create() throws Exception { return this.client.connect(LettuceConnection.CODEC); } public PooledObject
> wrap(StatefulConnection
obj) { return new DefaultPooledObject(obj); } }
  • 被废弃的DefaultLettucePool里头有个LettuceFactory,其validate是通过ping来判断的,因而更为准确

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {    //......    private Pool
createPool() { if (isRedisSentinelAware()) { return createRedisSentinelPool(this.sentinelConfig); } return createRedisPool(); } /** * Creates {@link JedisSentinelPool}. * * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}. * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool
createRedisSentinelPool(RedisSentinelConfiguration config) { GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig(); return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()), poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName()); } /** * Creates {@link JedisPool}. * * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool
createRedisPool() { return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName(), isUseSsl(), clientConfiguration.getSslSocketFactory().orElse(null), // clientConfiguration.getSslParameters().orElse(null), // clientConfiguration.getHostnameVerifier().orElse(null)); } //......}
  • 不管是JedisPool还是JedisSentinelPool,里头使用的是JedisFactory

JedisFactory.validateObject

jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java

class JedisFactory implements PooledObjectFactory
{ private final AtomicReference
hostAndPort = new AtomicReference
(); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; //...... @Override public boolean validateObject(PooledObject
pooledJedis) { final BinaryJedis jedis = pooledJedis.getObject(); try { HostAndPort hostAndPort = this.hostAndPort.get(); String connectionHost = jedis.getClient().getHost(); int connectionPort = jedis.getClient().getPort(); return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort && jedis.isConnected() && jedis.ping().equals("PONG"); } catch (final Exception e) { return false; } }}
  • JedisFactory实现了PooledObjectFactory接口,其validateObject方法不仅校验isConnected,而且也校验了ping方法
  • ping方法只要超时就会抛出异常,从而校验失败,因而可以感知到docker pause带来的timeout,从而将连接从连接池剔除

小结

  • spring-date-redis的2.0及以上版本废弃了原来的LettucePool,改为使用LettucePoolingClientConfiguration
  • 这里有一个问题,就是旧版连接池校验是采用ping的方式,而新版连接池校验则是使用active字段来标识,对于docker pause识别不出来
  • 对于lettuce其shareNativeConnection参数默认为true,且validateConnection为false,第一次从连接池borrow到连接之后,就一直复用底层的连接,也没有归还。如果要每次获取连接都走连接池获取然后归还,需要设置shareNativeConnection为false
  • jedis的连接池实现,其validateObject方法不仅校验isConnected,而且也校验了ping方法,因而能够感知到docker pause带来的timeout,从而将连接从连接池剔除
  • 对于lettuce来说,如果要识别docker pause的异常,有两个方案,一个是修复ConnectionPoolSupport中RedisPooledObjectFactory的validateObject方法,不仅判断isOpen,还需要ping一下;另外一个是不开启连接池,并且将LettuceConnectionFactory的validateConnection参数设置为true

doc

转载地址:http://sahnx.baihongyu.com/

你可能感兴趣的文章
oracle数组的使用
查看>>
POJ3342 Party at Hali-Bula(树型DP求最大独立集+唯一解判断)
查看>>
matplotlib ----- 多子图, subplots
查看>>
无线路由器连接电信光纤宽带光猫设置方法(转)
查看>>
git的CentOS服务端安装和windows客户端的使用
查看>>
会议06
查看>>
快速提高看盘能力的十大方法
查看>>
线性代数与矩阵论 习题 1.2.1
查看>>
度量空间的一个例子:离散度量空间
查看>>
从有理数到实数(序)
查看>>
自适应滤波:最小二乘法
查看>>
linux下部署jdk+Tomcat
查看>>
ssl介绍以及双向认证和单向认证原理 (转)
查看>>
pyqt,Qt Designer 界面布局子窗口可移动
查看>>
Computer Vision Algorithm Implementations
查看>>
P1144 最短路计数
查看>>
Vue.js - Day4
查看>>
代码测试工具?
查看>>
20155229 2016-2007-2 《Java程序设计》第一周学习总结
查看>>
[转自360kr]如何才能进入Facebook工作?公司内部工程师告诉你
查看>>