1水平分库
最近在做一个IM系统,之前的旧系统没有考虑到用户量会增长得这么庞大,导致现在数据库性能瓶颈非常严重,迫切需要分库,用于减少每个库的用户数量,进而分摊负载,最终达到数据库横向扩展的目的。
数据库水平分库是以用户Id为分库的依据,同一个用户的所有数据都在同一个库上,每个库有着相同的表结构。为了实现开发人员来说对数据库的透明访问,分库框架需要解决二个问题:
1、 方法参数中有用户id的数据的新增,查询及修改
2、 方法参数中无用户id的数据的查询
2用户id
把用户名和密码所在的表定义为用户表,用户id即是用户表中的惟一性标识的整形值,如果用户的用户名只有一种方式,那么id可以是用户名的hash值,此时用户表也是分库的;如果用户的用户名有多种方式,比如允许用户使用email登陆,也允许用户使用手机号码登陆,那么用户id应该是用户表中的递增字段值,此时用户表应该是不分库的,这时可以把用户表独立为另一个库,称之为认证库。我们的项目应用是属于后者。
1 3 解决方案
3.1 说明
简单服务即为DAO,每个domain都对应一个简单服务,简单服务之间不允许互相依赖;复杂服务可以依赖多个简单服务,但不能直接访问数据库,复杂服务对数据库的操作必须通过单简单服务。
使用hibernate作为访问数据库的中间层,结合Spring的Aop拦截方法,简单服务代理与简单服务实现相同的接口,一个简单服务对应二个实例,一个引用动态获取数据库连接的sessionFactory,另一个引用Hibernate Shards的sessionFactory
3.2 方法参数中有用户Id
Spring Aop拦截简单服务代理的所有方法,如果方法的第一个参数为userid,则将userid
放到当前线程中,并选择引用动态获取数据库连接的sessionFactory的简单服务实例,在获取数据库连接时根据当前线程的userid选择相应连接,流程如下:
3.3 方法参数中无用户Id
Spring Aop拦截简单服务代理的所有方法,如果方法的第一个参数为非userid,选择引用Hibernate Shards的sessionFactory的简单服务实例,遍历所有数据库,并返回汇总后的数据。这种情况下只允许读,不允许写。流程如下:
1 4实现
4.1 简单服务代理
对每个简单服务用jdk动态代理生成一个代理对像,复杂服务依赖代理对像。
4.2 实例化
在简单服务类上标注@DetachDbService,则会产生三个实例(框架实现):
1. 简单服务代理实例
2. 引用动态获取数据库连接的sessionFactory的简单服务实例
3. 引用Hibernate Shards的sessionFactory简单服务实例
4.3 方法参数
如果是到某个库获取数据,则第一个参数必须为Long或者UseridAble类型,用于获取userid
4.4 userid与数据库关系
可选方案
|
优点
|
缺点
|
按号段分
|
可部分迁移
|
数据分布不均
|
取模
|
数据分布均匀
|
迁移数据量是1/(n+1),不能按服务器性能分配
|
在认证库中保存数据库配置
|
灵活,可部分迁移
|
查询前需要先从数据库或缓存中获得此配置
|
总的来说,取模是最优方案,但是考虑到服务器性能可能不一致,而又需要充分利用服务器资源,所以需要在取模的同时加上权重。比如现在有二台数据库,权重为1:2,那么用户id先对3取模,0的为第一台服务器,1,2的为第二台服务器。
4.5精确分页
由于hibernate shards不能到某个库或者其中的几个库中去查询,并且它的分页是先到所有的库中将所有符合条件的数据取回到内存中再进行分页,所以不可能使用它的分页。
先用hibernate shards到各个库上查出符合条件的数目及数据库标识(标识为查询表中最小用户id),返回结果后对标识进行排序(这样确保同样的查询条件在翻页的时候能够以同样的顺序查询数据库,以达到精确查询的目的)。根据这个结果计算出每个数据库取值的段,然后用动态数据库连接按之前排好的顺序遍历数据库进行查找,段为0的直接跳过,找满结果则返回。
比如现在有3个库,要查询所在地为深圳的用户,通过hibernate shards查得数据如下:
|
深圳地区用户总数
|
深圳特区用户最小id
|
DB1
|
7
|
2
|
DB2
|
5
|
1
|
DB3
|
30
|
3
|
这时按用户最小id排序结果是DB2,DB1,DB3
假设每页10条记录,
第一页的数据是从DB2中取5条,DB1中取前5条,不需要到DB3去取
第二页的数据是从DB1中取后2条,在DB3中取前8条,不需要到DB1中去取
第三页数据是从DB3中取第9到第18条,不需要到DB1和DB2中去取
… …
缺点:不能精确排序
5关键代码
Java代码
- package com.konceptusa.infinet.annotation;
-
- import java.lang.annotation.Documented;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
-
- import org.springframework.beans.factory.annotation.Autowire;
-
- /**
- * 简单服务类实例化标注
- * @author Jwin
- *
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target( { ElementType.TYPE })
- @Documented
- public @interface DetachDbService
- {
- boolean lazy() default false;
- Autowire autoWire() default Autowire.BY_NAME;
- String init() default "";
- String destroy() default "";
- }
package com.konceptusa.infinet.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.beans.factory.annotation.Autowire;
/**
* 简单服务类实例化标注
* @author Jwin
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target( { ElementType.TYPE })
@Documented
public @interface DetachDbService
{
boolean lazy() default false;
Autowire autoWire() default Autowire.BY_NAME;
String init() default "";
String destroy() default "";
}
Java代码
- package com.konceptusa.infinet.annotation.handler;
-
- import java.util.ArrayList;
- import java.util.List;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.aop.framework.ProxyFactoryBean;
- import org.springframework.beans.MutablePropertyValues;
- import org.springframework.beans.factory.config.RuntimeBeanReference;
- import org.springframework.beans.factory.support.RootBeanDefinition;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
- import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
- import com.konceptusa.infinet.annotation.DetachDbService;
-
- /**
- * 向spring中注册简单服务代理实例,引用动态数据库连结的简单服务实例,引用hibernate shards的简单服务实例
- * @author Jwin
- *
- */
- public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
- {
- private final static String SESSIONFACTORYNAME = "sessionFactory";
- public final static String DYNAMIC_POSTFIX = "Dynamic";
- public final static String SHARDS_POSTFIX = "Shards";
- private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
-
- private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
-
- public Class annotation()
- {
- return DetachDbService.class;
- }
-
- @Override
- protected void handle(DetachDbService s, Class target)
- {
- String name = target.getSimpleName();
- if (!name.endsWith("ServiceImpl"))
- {
- throw new IllegalConfigException(target.getName()
- + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
- }
- name = getBeanName(name);
- String dynamicName = name + DYNAMIC_POSTFIX;
- String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
- //生成动态获取数据库连接的简单服务实例
- createBean(s, target, dynamicName, dynamicSessionFactory);
- String shardsName = name + SHARDS_POSTFIX;
- String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
- //生成查询所有数据库的简单服务实例
- createBean(s, target, shardsName, shardsFactory);
- //生成简单服务代理类
- RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
- List<String> interceptorNamesList = new ArrayList<String>();
- interceptorNamesList.add(DETACHDBINTERCEPTOR);
- mpv.addPropertyValue("interceptorNames", interceptorNamesList);
- definition.setPropertyValues(mpv);
- registerBeanDefinition(name, definition);
- }
-
- private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
- {
- RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
- beanDefinition.setPropertyValues(mpv);
- registerBeanDefinition(name, beanDefinition);
- }
-
- private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
- {
- RootBeanDefinition definition = new RootBeanDefinition();
- definition.setAbstract(false);
- definition.setBeanClass(target);
- definition.setSingleton(true);
- definition.setLazyInit(s.lazy());
- definition.setAutowireCandidate(true);
- definition.setAutowireMode(s.autoWire().value());
-
- if (!"".equals(s.init()))
- {
- definition.setInitMethodName(s.init().trim());
- }
- if (!"".equals(s.destroy()))
- {
- definition.setDestroyMethodName(s.destroy().trim());
- }
-
- if (LOG.isDebugEnabled())
- {
- LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
- }
- SpringAnnotationUtils.readProperties(target, definition);
- return definition;
- }
-
- private String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
-
- }
package com.konceptusa.infinet.annotation.handler;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
import com.konceptusa.infinet.annotation.DetachDbService;
/**
* 向spring中注册简单服务代理实例,引用动态数据库连结的简单服务实例,引用hibernate shards的简单服务实例
* @author Jwin
*
*/
public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
{
private final static String SESSIONFACTORYNAME = "sessionFactory";
public final static String DYNAMIC_POSTFIX = "Dynamic";
public final static String SHARDS_POSTFIX = "Shards";
private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
public Class annotation()
{
return DetachDbService.class;
}
@Override
protected void handle(DetachDbService s, Class target)
{
String name = target.getSimpleName();
if (!name.endsWith("ServiceImpl"))
{
throw new IllegalConfigException(target.getName()
+ " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
}
name = getBeanName(name);
String dynamicName = name + DYNAMIC_POSTFIX;
String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
//生成动态获取数据库连接的简单服务实例
createBean(s, target, dynamicName, dynamicSessionFactory);
String shardsName = name + SHARDS_POSTFIX;
String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
//生成查询所有数据库的简单服务实例
createBean(s, target, shardsName, shardsFactory);
//生成简单服务代理类
RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
MutablePropertyValues mpv = new MutablePropertyValues();
mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
List<String> interceptorNamesList = new ArrayList<String>();
interceptorNamesList.add(DETACHDBINTERCEPTOR);
mpv.addPropertyValue("interceptorNames", interceptorNamesList);
definition.setPropertyValues(mpv);
registerBeanDefinition(name, definition);
}
private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
{
RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
MutablePropertyValues mpv = new MutablePropertyValues();
mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
beanDefinition.setPropertyValues(mpv);
registerBeanDefinition(name, beanDefinition);
}
private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
{
RootBeanDefinition definition = new RootBeanDefinition();
definition.setAbstract(false);
definition.setBeanClass(target);
definition.setSingleton(true);
definition.setLazyInit(s.lazy());
definition.setAutowireCandidate(true);
definition.setAutowireMode(s.autoWire().value());
if (!"".equals(s.init()))
{
definition.setInitMethodName(s.init().trim());
}
if (!"".equals(s.destroy()))
{
definition.setDestroyMethodName(s.destroy().trim());
}
if (LOG.isDebugEnabled())
{
LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
}
SpringAnnotationUtils.readProperties(target, definition);
return definition;
}
private String getBeanName(String name)
{
name = name.substring(0, name.length() - "Impl".length());
name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
return name;
}
}
Java代码
- package com.konceptusa.infinet.detach.aop;
-
- import org.aopalliance.intercept.MethodInterceptor;
- import org.aopalliance.intercept.MethodInvocation;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.util.MethodInvoker;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.UseridAble;
- import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
-
- /**
- * 分库简单服务代理
- * @author Jwin
- *
- */
- public class DetachDBInterceptor implements MethodInterceptor
- {
- private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
- public Object invoke(MethodInvocation invoke) throws Throwable
- {
- int len = invoke.getArguments().length;
- Long id = null;
- if(len >= 1)
- {
- Object arg = invoke.getArguments()[0];
- if(arg instanceof UseridAble)
- {
- UseridAble useridAble = (UseridAble) arg;
- id = useridAble.getUserid();
- }
- else if(arg instanceof Long)
- {
- id = (Long) arg;
- }
- }
- if(id != null)
- {
- UseridContextHolder.setUserid(id);
- try
- {
- return invoke(invoke, id);
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- else
- {
- return invoke(invoke, id);
- }
- }
- private Object invoke(MethodInvocation invoke, Long id) throws Throwable
- {
- String str = invoke.getThis().toString();
- int start = str.lastIndexOf(".");
- int end = str.lastIndexOf("@");
- String className = str.substring(start + 1, end);
- String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
- if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
- {
- postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
- }
- String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
- if(LOG.isDebugEnabled())
- LOG.debug("select service " + serviceName + " for userid = " + id);
- Object service = ObjectFactory.getManagedObject(serviceName);
- if(service == null)
- {
- throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
- }
- MethodInvoker invoker = new MethodInvoker();
- invoker.setArguments(invoke.getArguments());
- invoker.setTargetObject(service);
- invoker.setTargetMethod(invoke.getMethod().getName());
- invoker.prepare();
- return invoker.invoke();
- }
-
- }
package com.konceptusa.infinet.detach.aop;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.MethodInvoker;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.framework.core.support.ObjectFactory;
import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
import com.konceptusa.infinet.detach.UseridAble;
import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
/**
* 分库简单服务代理
* @author Jwin
*
*/
public class DetachDBInterceptor implements MethodInterceptor
{
private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
public Object invoke(MethodInvocation invoke) throws Throwable
{
int len = invoke.getArguments().length;
Long id = null;
if(len >= 1)
{
Object arg = invoke.getArguments()[0];
if(arg instanceof UseridAble)
{
UseridAble useridAble = (UseridAble) arg;
id = useridAble.getUserid();
}
else if(arg instanceof Long)
{
id = (Long) arg;
}
}
if(id != null)
{
UseridContextHolder.setUserid(id);
try
{
return invoke(invoke, id);
}finally
{
UseridContextHolder.removeUserid();
}
}
else
{
return invoke(invoke, id);
}
}
private Object invoke(MethodInvocation invoke, Long id) throws Throwable
{
String str = invoke.getThis().toString();
int start = str.lastIndexOf(".");
int end = str.lastIndexOf("@");
String className = str.substring(start + 1, end);
String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
{
postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
}
String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
if(LOG.isDebugEnabled())
LOG.debug("select service " + serviceName + " for userid = " + id);
Object service = ObjectFactory.getManagedObject(serviceName);
if(service == null)
{
throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
}
MethodInvoker invoker = new MethodInvoker();
invoker.setArguments(invoke.getArguments());
invoker.setTargetObject(service);
invoker.setTargetMethod(invoke.getMethod().getName());
invoker.prepare();
return invoker.invoke();
}
}
Java代码
- package com.konceptusa.infinet.detach.datasource;
-
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
-
- import javax.sql.DataSource;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
- import org.springframework.util.Assert;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
- import com.konceptusa.infinet.detach.service.ISelectDBService;
-
- /**
- * 动态获取数据库连接基类
- * @author Jwin
- *
- */
- public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
- {
- private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
- public final static int defaultDataSourceId = -1;
- protected MultiHibernateProperties multiHibernateProperties;
- protected ISelectDBService selectDBService;
- private String newWeights;
- private String oldWeights;
- private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
- public void setSelectDBService(ISelectDBService selectDBService)
- {
- this.selectDBService = selectDBService;
- }
- public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
- {
- this.multiHibernateProperties = multiHibernateProperties;
- }
- @Override
- protected Object determineCurrentLookupKey()
- {
- Long id = UseridContextHolder.getUserid();
- return selectDBService.selectDb(id);
- }
-
- @Override
- public void afterPropertiesSet()
- {
- LOG.info("init dynamic datasource start");
- Assert.notNull(multiHibernateProperties);
- Assert.notNull(selectDBService);
- List<Properties> properties = multiHibernateProperties.getShardProperties();
- Assert.notEmpty(properties);
- int dataSourceCount = 0;
- for(Properties p : properties)
- {
- dataSourceCount++;
- createDataSource(dataSourceMap, p);
- }
- createDefaultDataSource(dataSourceMap);
- selectDBService.setDefaultDataSourceId(defaultDataSourceId);
- selectDBService.setDataSourceCount(dataSourceCount);
- setTargetDataSources(dataSourceMap);
- setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
- initWeight(dataSourceCount);
- super.afterPropertiesSet();
- LOG.info("init dynamic datasource success");
- }
- public void initWeight(int dataSourceCount)
- {
- Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
- Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
- int totalOldWeight = 0;
- int totalNewWeight = 0;
- if(newWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("newWeights " + newWeights);
- String[] weights = StringUtils.split(newWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- newWeightMap.put(totalNewWeight + j, i);
- }
- totalNewWeight += w;
- }
- }
- else
- {
- totalNewWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- newWeightMap.put(i, i);
- }
- }
- if(oldWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("oldWeights " + oldWeights);
- String[] weights = StringUtils.split(oldWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- oldWeightMap.put(totalOldWeight + j, i);
- }
- totalOldWeight += w;
- }
- }
- else
- {
- totalOldWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- oldWeightMap.put(i, i);
- }
- }
- if(LOG.isInfoEnabled())
- LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
- selectDBService.setTotalNewWeight(totalNewWeight);
- selectDBService.setNewWeightIdMap(newWeightMap);
- selectDBService.setTotalOldWeight(totalOldWeight);
- selectDBService.setOldWeightIdMap(oldWeightMap);
- }
- protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
- protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
- public void setNewWeights(String newWeights)
- {
- this.newWeights = newWeights;
- }
- public void setOldWeights(String oldWeights)
- {
- this.oldWeights = oldWeights;
- }
- public Map<Integer, DataSource> getDataSourceMap()
- {
- return dataSourceMap;
- }
-
- }
package com.konceptusa.infinet.detach.datasource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.Assert;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
import com.konceptusa.infinet.detach.service.ISelectDBService;
/**
* 动态获取数据库连接基类
* @author Jwin
*
*/
public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
{
private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
public final static int defaultDataSourceId = -1;
protected MultiHibernateProperties multiHibernateProperties;
protected ISelectDBService selectDBService;
private String newWeights;
private String oldWeights;
private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
public void setSelectDBService(ISelectDBService selectDBService)
{
this.selectDBService = selectDBService;
}
public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
{
this.multiHibernateProperties = multiHibernateProperties;
}
@Override
protected Object determineCurrentLookupKey()
{
Long id = UseridContextHolder.getUserid();
return selectDBService.selectDb(id);
}
@Override
public void afterPropertiesSet()
{
LOG.info("init dynamic datasource start");
Assert.notNull(multiHibernateProperties);
Assert.notNull(selectDBService);
List<Properties> properties = multiHibernateProperties.getShardProperties();
Assert.notEmpty(properties);
int dataSourceCount = 0;
for(Properties p : properties)
{
dataSourceCount++;
createDataSource(dataSourceMap, p);
}
createDefaultDataSource(dataSourceMap);
selectDBService.setDefaultDataSourceId(defaultDataSourceId);
selectDBService.setDataSourceCount(dataSourceCount);
setTargetDataSources(dataSourceMap);
setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
initWeight(dataSourceCount);
super.afterPropertiesSet();
LOG.info("init dynamic datasource success");
}
public void initWeight(int dataSourceCount)
{
Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
int totalOldWeight = 0;
int totalNewWeight = 0;
if(newWeights != null)
{
if(LOG.isInfoEnabled())
LOG.info("newWeights " + newWeights);
String[] weights = StringUtils.split(newWeights,";");
if(weights.length > dataSourceCount)
{
throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
}
for(int i=0;i<weights.length;i++)
{
int w = Integer.parseInt(weights[i]);
for(int j=0;j<w;j++)
{
newWeightMap.put(totalNewWeight + j, i);
}
totalNewWeight += w;
}
}
else
{
totalNewWeight = dataSourceCount;
for(int i=0;i<dataSourceCount;i++)
{
newWeightMap.put(i, i);
}
}
if(oldWeights != null)
{
if(LOG.isInfoEnabled())
LOG.info("oldWeights " + oldWeights);
String[] weights = StringUtils.split(oldWeights,";");
if(weights.length > dataSourceCount)
{
throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
}
for(int i=0;i<weights.length;i++)
{
int w = Integer.parseInt(weights[i]);
for(int j=0;j<w;j++)
{
oldWeightMap.put(totalOldWeight + j, i);
}
totalOldWeight += w;
}
}
else
{
totalOldWeight = dataSourceCount;
for(int i=0;i<dataSourceCount;i++)
{
oldWeightMap.put(i, i);
}
}
if(LOG.isInfoEnabled())
LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
selectDBService.setTotalNewWeight(totalNewWeight);
selectDBService.setNewWeightIdMap(newWeightMap);
selectDBService.setTotalOldWeight(totalOldWeight);
selectDBService.setOldWeightIdMap(oldWeightMap);
}
protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
public void setNewWeights(String newWeights)
{
this.newWeights = newWeights;
}
public void setOldWeights(String oldWeights)
{
this.oldWeights = oldWeights;
}
public Map<Integer, DataSource> getDataSourceMap()
{
return dataSourceMap;
}
}
package com.konceptusa.infinet.detach.datasource;
import java.beans.PropertyVetoException;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
import com.mchange.v2.c3p0.ComboPooledDataSource;
/**
* 基于c3p0连接池的动态获取连接类
* @author Jwin
*
*/
public class DynamicC3p0DataSource extends AbstractDynamicDataSource
{
private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class);
private int initialSize = 1;
private int maxActive = 1;
private int minActive = 1;
private int maxIdleTime = 30;
private String automaticTestTable = "Test";
private int acquireIncrement = 3;
private int maxStatements = 100;
private int maxStatementsPerConnection = 3;
private int numHelperThreads = 3;
private int idleConnectionTestPeriod = 30;
protected void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap)
{
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setUser("sa");
dataSource.setPassword("");
dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase());
try
{
dataSource.setDriverClass("org.hsqldb.jdbcDriver");
} catch (PropertyVetoException e)
{
throw new IllegalConfigException(e);
}
dataSource.setInitialPoolSize(initialSize);
dataSource.setMaxPoolSize(maxActive);
dataSource.setMinPoolSize(minActive);
dataSource.setMaxIdleTime(maxIdleTime);
dataSource.setAcquireIncrement(acquireIncrement);
dataSource.setNumHelperThreads(numHelperThreads);
dataSource.setAutomaticTestTable(automaticTestTable);
dataSource.setMaxStatements(maxStatements);
dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
dataSourceMap.put(defaultDataSourceId, dataSource);
}
@Override
protected void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p)
{
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey));
LOG.info("init datasource url " + dataSource.getJdbcUrl());
dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey));
dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey));
try
{
dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey));
} catch (PropertyVetoException e)
{
throw new IllegalConfigException(e);
}
dataSource.setInitialPoolSize(initialSize);
dataSource.setMaxPoolSize(maxActive);
dataSource.setMinPoolSize(minActive);
dataSource.setMaxIdleTime(maxIdleTime);
dataSource.setAcquireIncrement(acquireIncrement);
dataSource.setNumHelperThreads(numHelperThreads);
dataSource.setAutomaticTestTable(automaticTestTable);
dataSource.setMaxStatements(maxStatements);
dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
String id = p.getProperty(MultiHibernateProperties.shardIdKey);
dataSourceMap.put(Integer.parseInt(id), dataSource);
}
public void setInitialSize(int initialSize)
{
this.initialSize = initialSize;
}
public void setMaxActive(int maxActive)
{
this.maxActive = maxActive;
}
public void setMaxIdleTime(int maxIdle)
{
this.maxIdleTime = maxIdle;
}
public void setAcquireIncrement(int acquireIncrement)
{
this.acquireIncrement = acquireIncrement;
}
public void setMaxStatements(int maxStatements)
{
this.maxStatements = maxStatements;
}
public void setMaxStatementsPerConnection(int maxStatementsPerConnection)
{
this.maxStatementsPerConnection = maxStatementsPerConnection;
}
public void setNumHelperThreads(int numHelperThreads)
{
this.numHelperThreads = numHelperThreads;
}
public void setAutomaticTestTable(String automaticTestTable)
{
this.automaticTestTable = automaticTestTable;
}
public void setMinActive(int minActive)
{
this.minActive = minActive;
}
public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod)
{
this.idleConnectionTestPeriod = idleConnectionTestPeriod;
}
}
Java代码
- package com.konceptusa.infinet.imsupport.detach;
-
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.transaction.annotation.Propagation;
- import org.springframework.transaction.annotation.Transactional;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.dao.HibernateQueryListCallback;
- import com.konceptusa.framework.core.dao.hql.Hql;
- import com.konceptusa.framework.core.service.BaseServiceSupport;
- import com.konceptusa.framework.core.service.Page;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.CountId;
- import com.konceptusa.infinet.detach.CountIdComparetor;
- import com.konceptusa.infinet.detach.MagrateAble;
- import com.konceptusa.infinet.detach.QueryListAble;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
-
- /**
- * 多个数据库综合查询,简单服务类父类
- * @author Jwin
- *
- * @param <T>
- */
- @Transactional(readOnly=true, rollbackFor = Exception.class)
- public abstract class BaseServiceSupportForMulti<T> extends BaseServiceSupport<T> implements QueryListAble<T>,MagrateAble<T>
- {
- private final static Log LOG = LogFactory.getLog(BaseServiceSupportForMulti.class);
- @Override
- protected int findCountByHql(Hql hql)
- {
- List<Long> countList = (List<Long>) getHibernateTemplate().execute(
- new HibernateQueryListCallback(new Hql("select count(*) "
- + hql.getHql(), hql.getCache(), hql.getParameters())));
- Long counts = 0L;
- for(Long count : countList)
- {
- counts += count;
- }
- return counts.intValue();
- }
- @Transactional(readOnly=true, rollbackFor = Exception.class,propagation=Propagation.NOT_SUPPORTED)
- public List<T> queryList(Hql hql, int from, int offset)
- {
- return queryListByHql(hql, from, offset);
- }
-
- public List<CountId> queryCount(Hql hql)
- {
- List<Object[]> list = queryListByHql(hql);
- List<CountId> countList = new ArrayList<CountId>(list.size());
- for(Object[] l : list)
- {
- if(l[1] != null)
- {
- CountId count = new CountId((Long) l[1],(Long)l[0]);
- countList.add(count);
- }
- }
- Collections.sort(countList, new CountIdComparetor());
- return countList;
- }
- protected String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
- protected Page queryPageByHql(Hql hql,String useridName, int start, int offset)
- {
- Hql countHql = new Hql("select count(*),min(" + useridName + ") "
- + hql.getHql(), hql.getCache(), hql.getParameters());
- return queryPageByHql(countHql, hql, start, offset);
- }
- //先查出各个数据库的总数及标识,然后对标识进行排序,最后根据这个结果遍历数据库进行分页查找,找满结果则返回。
- private Page queryPageByHql(Hql countHql,Hql listHql,int start, int offset)
- {
- QueryListAble<T> serviceShards = getShardsService();
- QueryListAble<T> serviceDynamic = getDynamicService();
- List<CountId> countList = serviceShards.queryCount(countHql);
- //相对于当前之前所有数据库的总数偏移
- int totalCount = 0;
- //相对于所有数据库的结束偏移
- int end = start + offset;
- //相对于当前数据库的开始偏移量
- int startRelative = -1;
- List<T> queryList = new ArrayList<T>(offset);
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- //之前所有库总数小于开始偏移量,继续下一个数据库
- if(totalCount < start)
- {
- continue;
- }
- //之前所有库总数第一次大于开始偏移量
- if(startRelative == -1)
- {
- startRelative = count.getCount().intValue() - (totalCount - start);
- }
- else
- {
- startRelative = 0;
- }
- int relativeCount = totalCount - end;
- if(relativeCount >= 0)
- {
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //计算相对于当前库的偏移
- int offsetRelative = count.getCount().intValue() - relativeCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- break;
- }
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //计算相对于当前库的偏移
- int offsetRelative = totalCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- } finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- totalCount = 0;
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- }
- return new Page<T>(totalCount, queryList);
- }
- protected Page queryPageByHql(String hqlstr,String useridName, int start, int offset,Object ... values)
- {
- Hql listHql = Hql.createIndexHql(