作者:京东物流 高圆庆
通常一个对象创建、销毁非常耗时的时候,我们不会频繁的创建和销毁它,而是考虑复用。复用对象的一种做法就是对象池,将创建好的对象放入池中维护起来,下次再用的时候直接拿池中已经创建好的对象继续用,这就是池化的思想。在java中,有很多池管理的概念,典型的如线程池,数据库连接池,socket连接池。本文章讲介绍apache提供的通用对象池框架GenericObjectPool,以及基于GenericObjectPool实现的sftp连接池在国际物流调度履约系统中的应用。
Apache Commons Pool是一个对象池的框架,他提供了一整套用于实现对象池化的API。它提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool和GenericObjectPool,其中GenericObjectPool是我们最常用的对象池,内部实现也最复杂。GenericObjectPool的UML图如下所示:
从图中可以看出,GenericObjectPool实现了ObjectPool接口,而ObjectPool就是对象池的核心接口,它定义了一个对象池应该实现的行为。
对象的创建需要通过对象工厂来创建,对象工厂需要实现BasePooledObjectFactory接口。ObjectPool接口中往池中添加一个对象,就需要使用对象工厂来创建一个对象。该接口说明如下:
public interface PooledObjectFactory<T> {
/**
* 创建一个可由池提供服务的实例,并将其封装在由池管理的PooledObject中。
*/
PooledObject<T> makeObject() throws Exception;
/**
* 销毁池不再需要的实例
*/
void destroyObject(PooledObject<T> p) throws Exception;
/**
* 确保实例可以安全地由池返回
*/
boolean validateObject(PooledObject<T> p);
/**
* 重新初始化池返回的实例
*/
void activateObject(PooledObject<T> p) throws Exception;
/**
* 取消初始化要返回到空闲对象池的实例
*/
void passivateObject(PooledObject<T> p) throws Exception;
}
GenericObjectPoolConfig是封装GenericObject池配置的简单“结构”,此类不是线程安全的;它仅用于提供创建池时使用的属性。大多数情况,可以使用GenericObjectPoolConfig提供的默认参数就可以满足日常的需求,GenericObjectPoolConfig是一个抽象类,实际应用中需要新建配置类,然后继承它。
上面三步就是最简单的流程,由于取和还的流程步骤都在borrowObject和returnObject方法中固定的,所以我们只要重写Factory工厂类的makeObject()和validateObject以及destroyObject方法即可实现最简单的池的管理控制,通过构造方法传入该Factory工厂类对象则可以创建最简单的对象池管理类。这算是比较好的解耦设计模式,借和还的流程如下图所示:
redis的java客户端jedis就是基于Apache Commons Pool对象池的框架来实现的。
对象工厂类只需实现activateObject、destroyObject、makeObject、validateObject方法即可,源码如下:
class JedisFactory implements PooledObjectFactory<Jedis> {
private final String host;
private final int port;
private final int timeout;
private final int newTimeout;
private final String password;
private final int database;
private final String clientName;
public JedisFactory(String host, int port, int timeout, String password, int database) {
this(host, port, timeout, password, database, (String)null);
}
public JedisFactory(String host, int port, int timeout, String password, int database, String clientName) {
this(host, port, timeout, timeout, password, database, clientName);
}
public JedisFactory(String host, int port, int timeout, int newTimeout, String password, int database, String clientName) {
this.host = host;
this.port = port;
this.timeout = timeout;
this.newTimeout = newTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
}
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
if (jedis.getDB() != (long)this.database) {
jedis.select(this.database);
}
}
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
jedis.quit();
} catch (Exception var4) {
}
jedis.disconnect();
} catch (Exception var5) {
}
}
}
public PooledObject<Jedis> makeObject() throws Exception {
Jedis jedis = new Jedis(this.host, this.port, this.timeout, this.newTimeout);
jedis.connect();
if (null != this.password) {
jedis.auth(this.password);
}
if (this.database != 0) {
jedis.select(this.database);
}
if (this.clientName != null) {
jedis.clientSetname(this.clientName);
}
return new DefaultPooledObject(jedis);
}
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
}
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
BinaryJedis jedis = (BinaryJedis)pooledJedis.getObject();
try {
return jedis.isConnected() && jedis.ping().equals("PONG");
} catch (Exception var4) {
return false;
}
}
}
public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig() {
this.setTestWhileIdle(true);
this.setMinEvictableIdleTimeMillis(60000L);
this.setTimeBetweenEvictionRunsMillis(30000L);
this.setNumTestsPerEvictionRun(-1);
}
}
在国际物流履约系统中,我们和客户交互文件经常使用sftp服务器,因为创建sftp服务器的连接比较耗时,所以基于Apache Commons Pool对象池的框架来实现的我们自己的sftp链接池。
SftpPool比较简单,直接继承GenericObjectPool。
public class SftpPool extends GenericObjectPool<Sftp> {
public SftpPool(SftpFactory factory, SftpPoolConfig config, SftpAbandonedConfig abandonedConfig) {
super(factory, config, abandonedConfig);
}
}
这是基于Apache Commons Pool框架实现自定义对象池的核心类,代码如下:
public class SftpFactory extends BasePooledObjectFactory<Sftp> {
private static final String CHANNEL_TYPE = "sftp";
private static Properties sshConfig = new Properties();
private String host;
private int port;
private String username;
private String password;
static {
sshConfig.put("StrictHostKeyChecking", "no");
}
@Override
public Sftp create() {
try {
JSch jsch = new JSch();
Session sshSession = jsch.getSession(username, host, port);
sshSession.setPassword(password);
sshSession.setConfig(sshConfig);
sshSession.connect();
ChannelSftp channel = (ChannelSftp) sshSession.openChannel(CHANNEL_TYPE);
channel.connect();
log.info("sftpFactory创建sftp");
return new Sftp(channel);
} catch (JSchException e) {
log.error("连接sftp失败:", e);
throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
}
}
/**
* @param sftp 被包装的对象
* @return 对象包装器
*/
@Override
public PooledObject<Sftp> wrap(Sftp sftp) {
return new DefaultPooledObject<>(sftp);
}
/**
* 销毁对象
* @param p 对象包装器
*/
@Override
public void destroyObject(PooledObject<Sftp> p) {
log.info("开始销毁channelSftp");
if (p!=null) {
Sftp sftp = p.getObject();
if (sftp!=null) {
ChannelSftp channelSftp = sftp.getChannelSftp();
if (channelSftp!=null) {
channelSftp.disconnect();
log.info("销毁channelSftp成功");
}
}
}
}
/**
* 检查连接是否可用
*
* @param p 对象包装器
* @return {@code true} 可用,{@code false} 不可用
*/
@Override
public boolean validateObject(PooledObject<Sftp> p) {
if (p!=null) {
Sftp sftp = p.getObject();
if (sftp!=null) {
try {
sftp.getChannelSftp().cd("./");
log.info("验证连接是否可用,结果为true");
return true;
} catch (SftpException e) {
log.info("验证连接是否可用,结果为false",e);
return false;
}
}
}
log.info("验证连接是否可用,结果为false");
return false;
}
public static class Builder {
private String host;
private int port;
private String username;
private String password;
public SftpFactory build() {
return new SftpFactory(host, port, username, password);
}
public Builder host(String host) {
this.host = host;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder username(String username) {
this.username = username;
return this;
}
public Builder password(String password) {
this.password = password;
return this;
}
}
}
配置类继承了GenericObjectPoolConfig,可继承该类的默认属性,也可自定义配置参数。
public class SftpPoolConfig extends GenericObjectPoolConfig<Sftp> {
public static class Builder {
private int maxTotal;
private int maxIdle;
private int minIdle;
private boolean lifo;
private boolean fairness;
private long maxWaitMillis;
private long minEvictableIdleTimeMillis;
private long evictorShutdownTimeoutMillis;
private long softMinEvictableIdleTimeMillis;
private int numTestsPerEvictionRun;
private EvictionPolicy<Sftp> evictionPolicy; // 仅2.6.0版本commons-pool2需要设置
private String evictionPolicyClassName;
private boolean testOnCreate;
private boolean testOnBorrow;
private boolean testOnReturn;
private boolean testWhileIdle;
private long timeBetweenEvictionRunsMillis;
private boolean blockWhenExhausted;
private boolean jmxEnabled;
private String jmxNamePrefix;
private String jmxNameBase;
public SftpPoolConfig build() {
SftpPoolConfig config = new SftpPoolConfig();
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setLifo(lifo);
config.setFairness(fairness);
config.setMaxWaitMillis(maxWaitMillis);
config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
config.setEvictorShutdownTimeoutMillis(evictorShutdownTimeoutMillis);
config.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
config.setEvictionPolicy(evictionPolicy);
config.setEvictionPolicyClassName(evictionPolicyClassName);
config.setTestOnCreate(testOnCreate);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setTestWhileIdle(testWhileIdle);
config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
config.setBlockWhenExhausted(blockWhenExhausted);
config.setJmxEnabled(jmxEnabled);
config.setJmxNamePrefix(jmxNamePrefix);
config.setJmxNameBase(jmxNameBase);
return config;
}
}
读取配置文件,创建SftpFactory、SftpPoolConfig、SftpPool,代码如下:
@Configuration
@ConditionalOnClass(SftpPool.class)
@EnableConfigurationProperties(SftpClientProperties.class)
public class SftpClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ISftpClient sftpClient(SftpClientProperties sftpClientProperties) {
if (sftpClientProperties.isMultiple()) {
MultipleSftpClient multipleSftpClient = new MultipleSftpClient();
sftpClientProperties.getClients().forEach((name, properties) -> {
SftpFactory sftpFactory = createSftpFactory(properties);
SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(properties);
SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(properties);
SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);
ISftpClient sftpClient = new SftpClient(sftpPool);
multipleSftpClient.put(name, sftpClient);
});
return multipleSftpClient;
}
SftpFactory sftpFactory = createSftpFactory(sftpClientProperties);
SftpPoolConfig sftpPoolConfig = createSftpPoolConfig(sftpClientProperties);
SftpAbandonedConfig sftpAbandonedConfig = createSftpAbandonedConfig(sftpClientProperties);
SftpPool sftpPool = new SftpPool(sftpFactory, sftpPoolConfig, sftpAbandonedConfig);
return new SftpClient(sftpPool);
}
public SftpFactory createSftpFactory(SftpClientProperties properties) {
return new SftpFactory.Builder()
.host(properties.getHost())
.port(properties.getPort())
.username(properties.getUsername())
.password(properties.getPassword())
.build();
}
public SftpPoolConfig createSftpPoolConfig(SftpClientProperties properties) {
SftpClientProperties.Pool pool = properties.getPool();
return new SftpPoolConfig.Builder()
.maxTotal(pool.getMaxTotal())
.maxIdle(pool.getMaxIdle())
.minIdle(pool.getMinIdle())
.lifo(pool.isLifo())
.fairness(pool.isFairness())
.maxWaitMillis(pool.getMaxWaitMillis())
.minEvictableIdleTimeMillis(pool.getMinEvictableIdleTimeMillis())
.evictorShutdownTimeoutMillis(pool.getEvictorShutdownTimeoutMillis())
.softMinEvictableIdleTimeMillis(pool.getSoftMinEvictableIdleTimeMillis())
.numTestsPerEvictionRun(pool.getNumTestsPerEvictionRun())
.evictionPolicy(null)
.evictionPolicyClassName(DefaultEvictionPolicy.class.getName())
.testOnCreate(pool.isTestOnCreate())
.testOnBorrow(pool.isTestOnBorrow())
.testOnReturn(pool.isTestOnReturn())
.testWhileIdle(pool.isTestWhileIdle())
.timeBetweenEvictionRunsMillis(pool.getTimeBetweenEvictionRunsMillis())
.blockWhenExhausted(pool.isBlockWhenExhausted())
.jmxEnabled(pool.isJmxEnabled())
.jmxNamePrefix(pool.getJmxNamePrefix())
.jmxNameBase(pool.getJmxNameBase())
.build();
}
public SftpAbandonedConfig createSftpAbandonedConfig(SftpClientProperties properties) {
SftpClientProperties.Abandoned abandoned = properties.getAbandoned();
return new SftpAbandonedConfig.Builder()
.removeAbandonedOnBorrow(abandoned.isRemoveAbandonedOnBorrow())
.removeAbandonedOnMaintenance(abandoned.isRemoveAbandonedOnMaintenance())
.removeAbandonedTimeout(abandoned.getRemoveAbandonedTimeout())
.logAbandoned(abandoned.isLogAbandoned())
.requireFullStackTrace(abandoned.isRequireFullStackTrace())
.logWriter(new PrintWriter(System.out))
.useUsageTracking(abandoned.isUseUsageTracking())
.build();
}
}
SftpClient是实际工作的类,从SftpClient 中可获取到一个sftp链接,使用完成后,归还给sftpPool。SftpClient代码如下:
public class SftpClient implements ISftpClient {
private SftpPool sftpPool;
/**
* 从sftp连接池获取连接并执行操作
*
* @param handler sftp操作
*/
@Override
public void open(ISftpClient.Handler handler) {
Sftp sftp = null;
try {
sftp = sftpPool.borrowObject();
ISftpClient.Handler policyHandler = new DelegateHandler(handler);
policyHandler.doHandle(sftp);
} catch (Exception e) {
log.error("sftp异常:", e);
throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
} finally {
if (sftp != null) {
sftpPool.returnObject(sftp);
}
}
}
@AllArgsConstructor
static class DelegateHandler implements ISftpClient.Handler {
private ISftpClient.Handler target;
@Override
public void doHandle(Sftp sftp) {
try {
target.doHandle(sftp);
} catch (Exception e) {
log.error("sftp异常:", e);
throw new BizException(ResultCodeEnum.SFTP_EXCEPTION);
}
}
}
}
通过sftp上传文件到XX服务器
//通过SFTP上传到XX
((MultipleSftpClient) sftpClient).choose("XX");
sftpClient.open(sftp -> {
boolean exist = sftp.isExist(inventoryPath);
if(!exist){
sftp.mkdirs(inventoryPath);
}
// 执行sftp操作
InputStream is = new FileInputStream(oneColumnCSVFile);
sftp.upload(inventoryPath, titleName, is);
log.info("inventory upload over");
});
通过本文的介绍可以知道,Apache Commons Pool定义了一个对象池的行为,提供了可扩展的配置类和对象工厂,封装了对象创建、从池中获取对象、归还对象的核心流程。还介绍了开源框架Jedis是如何基于GenericObjectPool来实现的连接池。最后介绍了国际物流履约系统中是如何基于GenericObjectPool来管理Sftp连接的。
掌握了GenericObjectPool的核心原理,我们就可以通过实现几个关键的接口,创建一个对象池管理工具,在项目中避免了对象的频繁创建和销毁,从而显著提升程序的性能。