IT技术互动交流平台

简单注解实现集群同步锁(spring+redis+注解)

来源:IT165收集  发布日期:2016-08-15 21:56:56

互联网面试的时候,是不是面试官常问一个问题如何保证集群环境下数据操作并发问题,常用的synchronized肯定是无法满足了,或许你可以借助for update对数据加锁。本文的最终解决方式你只要在方法上加一个@P4jSyn注解就能保证集群环境下同synchronized的效果,且锁的key可以任意指定。本注解还支持了锁的超时机制。

本文需要对redis、spring和spring-data-redis有一定的了解。当然你可以借助本文的思路对通过注解对方法返回数据进行缓存,类似com.google.code.simple-spring-memcached的@ReadThroughSingleCache。

第一步: 介绍两个自定义注解P4jSyn、P4jSynKey

P4jSyn:必选项,标记在方法上,表示需要对该方法加集群同步锁;

P4jSynKey:可选项,加在方法参数上,表示以方法某个参数作为锁的key,用来保证更多的坑,P4jSynKey并不是强制要添加的,当没有P4jSynKey标记的情况下只会以P4jSyn的synKey作为锁key。

 

package com.yaoguoyin.redis.lock;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <b>同步锁:</b><br/>
 * 主要作用是在服务器集群环境下保证方法的synchronize;<br/>
 * 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;<br/>
 * 如果原有“A任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“B任务”获取到,在“B任务”成功货物锁会并不会终止“A任务”的执行;<br/>
 * <br/>
 * <b>注意:</b><br/>
 * 使用过程中需要注意keepMills、toWait、sleepMills、maxSleepMills等参数的场景使用;<br/>
 * 需要安装redis,并使用spring和spring-data-redis等,借助redis NX等方法实现。
 * 
 * @see com.yaoguoyin.redis.lock.P4jSynKey
 * @see com.yaoguoyin.redis.lock.RedisLockAspect
 * 
 * @author partner4java
 *
 */
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface P4jSyn {

	/**
	 * 锁的key<br/>
	 * 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/>
	 * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/>
	 * 
	 */
	String synKey();

	/**
	 * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/>
	 * 单位毫秒,默认20秒<br/>
	 * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/>
	 * 但是没有比较强的业务要求下,不建议设置为0
	 */
	long keepMills() default 20 * 1000;

	/**
	 * 当获取锁失败,是继续等待还是放弃<br/>
	 * 默认为继续等待
	 */
	boolean toWait() default true;

	/**
	 * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/>
	 * 默认为10毫秒
	 * 
	 * @return
	 */
	long sleepMills() default 10;

	/**
	 * 锁获取超时时间:<br/>
	 * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出
	 * {@link java.util.concurrent.TimeoutException.TimeoutException}
	 * ,可捕获此异常做相应业务处理;<br/>
	 * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去;
	 * 
	 * @return
	 */
	long maxSleepMills() default 60 * 1000;
}
package com.yaoguoyin.redis.lock;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <b>同步锁 key</b><br/>
 * 加在方法的参数上,指定的参数会作为锁的key的一部分
 * 
 * @author partner4java
 *
 */
@Target({ ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface P4jSynKey {

	/**
	 * key的拼接顺序
	 * 
	 * @return
	 */
	int index() default 0;
}
这里就不再对两个注解进行使用上的解释了,因为注释已经说明的很详细了。
使用示例:

 

 

package com.yaoguoyin.redis.lock;

import org.springframework.stereotype.Component;

@Component
public class SysTest {
	private static int i = 0;

	@P4jSyn(synKey = "12345")
	public void add(@P4jSynKey(index = 1) String key, @P4jSynKey(index = 0) int key1) {
		i++;
		System.out.println("i=-===========" + i);
	}
}

第二步:切面编程
在不影响原有代码的前提下,保证执行同步,目前最直接的方式就是使用切面编程

 

 

package com.yaoguoyin.redis.lock;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * 锁的切面编程<br/>
 * 针对添加@RedisLock 注解的方法进行加锁
 * 
 * @see com.yaoguoyin.redis.lock.P4jSyn
 * 
 * @author partner4java
 *
 */
@Aspect
public class RedisLockAspect {
	@Autowired
	@Qualifier("redisTemplate")
	private RedisTemplate<String, Long> redisTemplate;

	@Around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.P4jSyn)")
	public Object lock(ProceedingJoinPoint pjp) throws Throwable {
		P4jSyn lockInfo = getLockInfo(pjp);
		if (lockInfo == null) {
			throw new IllegalArgumentException("配置参数错误");
		}

		String synKey = getSynKey(pjp, lockInfo.synKey());
		if (synKey == null || "".equals(synKey)) {
			throw new IllegalArgumentException("配置参数synKey错误");
		}

		boolean lock = false;
		Object obj = null;
		try {
			// 超时时间
			long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();

			while (!lock) {
				long keepMills = System.currentTimeMillis() + lockInfo.keepMills();
				lock = setIfAbsent(synKey, keepMills);

				// 得到锁,没有人加过相同的锁
				if (lock) {
					obj = pjp.proceed();
				}
				// 锁设置了没有超时时间
				else if (lockInfo.keepMills() <= 0) {
					// 继续等待获取锁
					if (lockInfo.toWait()) {
						// 如果超过最大等待时间抛出异常
						if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {
							throw new TimeoutException("获取锁资源等待超时");
						}
						TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
					} else {
						break;
					}
				}
				// 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁
				else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) {
					lock = true;
					obj = pjp.proceed();
				}
				// 没有得到任何锁
				else {
					// 继续等待获取锁
					if (lockInfo.toWait()) {
						// 如果超过最大等待时间抛出异常
						if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {
							throw new TimeoutException("获取锁资源等待超时");
						}
						TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
					}
					// 放弃等待
					else {
						break;
					}
				}
			}

		} catch (Exception e) {
			e.printStackTrace();
			throw e;
		} finally {
			// 如果获取到了锁,释放锁
			if (lock) {
				releaseLock(synKey);
			}
		}
		return obj;
	}

	/**
	 * 获取包括方法参数上的key<br/>
	 * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey
	 * 
	 */
	private String getSynKey(ProceedingJoinPoint pjp, String synKey) {
		try {
			synKey = "RedisSyn+" + synKey;
			Object[] args = pjp.getArgs();
			if (args != null && args.length > 0) {
				MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
				Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();

				SortedMap<Integer, String> keys = new TreeMap<Integer, String>();

				for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {
					P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);
					if (p4jSynKey != null) {
						Object arg = args[ix];
						if (arg != null) {
							keys.put(p4jSynKey.index(), arg.toString());
						}
					}
				}

				if (keys != null && keys.size() > 0) {
					for (String key : keys.values()) {
						synKey = synKey + key;
					}
				}
			}

			return synKey;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	@SuppressWarnings("unchecked")
	private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {
		if (annotations != null && annotations.length > 0) {
			for (final Annotation annotation : annotations) {
				if (annotationClass.equals(annotation.annotationType())) {
					return (T) annotation;
				}
			}
		}

		return null;
	}

	/**
	 * 获取RedisLock注解信息
	 */
	private P4jSyn getLockInfo(ProceedingJoinPoint pjp) {
		try {
			MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
			Method method = methodSignature.getMethod();
			P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);
			return lockInfo;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	public BoundValueOperations<String, Long> getOperations(String key) {
		return redisTemplate.boundValueOps(key);
	}

	/**
	 * Set {@code value} for {@code key}, only if {@code key} does not exist.
	 * <p>
	 * See http://redis.io/commands/setnx
	 * 
	 * @param key
	 *            must not be {@literal null}.
	 * @param value
	 *            must not be {@literal null}.
	 * @return
	 */
	public boolean setIfAbsent(String key, Long value) {
		return getOperations(key).setIfAbsent(value);
	}

	public long getLock(String key) {
		Long time = getOperations(key).get();
		if (time == null) {
			return 0;
		}
		return time;
	}

	public long getSet(String key, Long value) {
		Long time = getOperations(key).getAndSet(value);
		if (time == null) {
			return 0;
		}
		return time;
	}

	public void releaseLock(String key) {
		redisTemplate.delete(key);
	}
}

RedisLockAspect会对添加注解的方法进行特殊处理,具体可看lock方法。

 

大致思路就是:1、首选借助redis本身支持对应的setIfAbsent方法,该方法的特点是如果redis中已有该数据不保存返回false,不存该数据保存返回true;2、如果setIfAbsent返回true标识拿到同步锁,可进行操作,操作后并释放锁;3、如果没有通过setIfAbsent拿到数据,判断是否对锁设置了超时机制,没有设置判断是否需要继续等待;4、判断是否锁已经过期,需要对(System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills)))进行细细的揣摩一下,getSet可能会改变了其他人拥有锁的超时时间,但是几乎可以忽略;5、没有得到任何锁,判断继续等待还是退出。

第三步:spring的基本配置

 

#*****************jedis连接参数设置*********************#  
  
#redis服务器ip #     
redis.hostName=127.0.0.1
  
#redis服务器端口号#    
redis.port=6379

#redis服务器外部访问密码
redis.password=XXXXXXXXXX
  
#************************jedis池参数设置*******************#    
  
#jedis的最大分配对象#    
jedis.pool.maxActive=1000  

jedis.pool.minIdle=100

#jedis最大保存idel状态对象数 #    
jedis.pool.maxIdle=1000
  
#jedis池没有对象返回时,最大等待时间 #    
jedis.pool.maxWait=5000  
  
#jedis调用borrowObject方法时,是否进行有效检查#    
jedis.pool.testOnBorrow=true  
  
#jedis调用returnObject方法时,是否进行有效检查 #    
jedis.pool.testOnReturn=true  
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:redis="http://www.springframework.org/schema/redis" xmlns:cache="http://www.springframework.org/schema/cache"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
            http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
            http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context-4.2.xsd
            http://www.springframework.org/schema/aop 
            http://www.springframework.org/schema/aop/spring-aop-4.1.xsd 
            http://www.springframework.org/schema/redis 
            http://www.springframework.org/schema/redis/spring-redis.xsd 
            http://www.springframework.org/schema/cache 
            http://www.springframework.org/schema/cache/spring-cache.xsd">

	<!-- 开启注解 -->
	<aop:aspectj-autoproxy />

	<bean class="com.yaoguoyin.redis.lock.RedisLockAspect" />

	<!-- 扫描注解包范围 -->
	<context:component-scan base-package="com.yaoguoyin" />

	<!-- 引入redis配置 -->
	<context:property-placeholder location="classpath:config.properties" />

	<!-- 连接池 -->
	<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
		<property name="minIdle" value="${jedis.pool.minIdle}" />
		<property name="maxIdle" value="${jedis.pool.maxIdle}" />
		<property name="maxWaitMillis" value="${jedis.pool.maxWait}" />
	</bean>

	<!-- p:password="${redis.pass}" -->
	<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${redis.hostName}" p:port="${redis.port}"
		p:password="${redis.password}" p:pool-config-ref="poolConfig" />

	<!-- 类似于jdbcTemplate -->
	<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="redisConnectionFactory" />

</beans>
 
 
redis的安装本文就不再说明。

测试

 

 

package com.yaoguoyin.redis;

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" })
public class BaseTest extends AbstractJUnit4SpringContextTests {
	
}
package com.yaoguoyin.redis.lock;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.yaoguoyin.redis.BaseTest;

public class RedisTest extends BaseTest {
	@Autowired
	private SysTest sysTest;

	@Test
	public void testHello() throws InterruptedException {
		for (int i = 0; i < 100; i++) {
			new Thread(new Runnable() {

				@Override
				public void run() {
					try {
						TimeUnit.SECONDS.sleep(1);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					sysTest.add("xxxxx", 111111);
				}
			}).start();
		}

		TimeUnit.SECONDS.sleep(20);
	}
	
	@Test
	public void testHello2() throws InterruptedException{
		sysTest.add("xxxxx", 111111);
		TimeUnit.SECONDS.sleep(10);
	}
}

你可以对

void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)
去除注解@P4jSyn进行测试对比。

 

demo整体下载地址(示例使用了maven):http://download.csdn.net/detail/partner4java/9602420

ps:本demo的执行性能取决于redis和java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。

Tag标签: 注解   集群  
  • 专题推荐

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规