博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redis 分布式锁
阅读量:6091 次
发布时间:2019-06-20

本文共 5434 字,大约阅读时间需要 18 分钟。

1 #include 
2 #include
3 #include
4 // -------------------- 5 #include
6 #include
7 // -------------------- 8 #include
9 // -------------------- 10 #include "thread_helper.h" 11 12 #define ATOMIC_UNLOCK 0 13 #define ATOMIC_LOCK 1 14 15 int lock = 0; 16 int num = 0; 17 int total_num = 50; 18 ThreadSchema* ts; 19 20 bool acquire_lock(int sn, redisContext* c) { 21 redisReply* r = NULL; 22 int res = 0; 23 24 if(NULL == c) 25 return false; 26 27 while(true) { 28 r = (redisReply*)redisCommand(c,"SETNX tom_lock 1"); 29 res = r->integer; 30 freeReplyObject(r); 31 32 if (res == 1) { 33 r = (redisReply*)redisCommand(c,"EXPIRE tom_lock 60"); 34 freeReplyObject(r); 35 return true; 36 } else { 37 38 r = (redisReply*)redisCommand(c,"TTL tom_lock"); 39 res = r->integer; 40 //printf("acquire_lock %d %d\n", r->type, r->integer); 41 freeReplyObject(r); 42 43 if(res == -1){ 44 r = (redisReply*)redisCommand(c,"EXPIRE tom_lock 1"); 45 freeReplyObject(r); 46 } 47 } 48 49 usleep(1000); 50 } 51 52 return false; 53 } 54 55 bool release_lock(int sn, redisContext* c) { 56 redisReply* r = NULL; 57 58 while(true) { 59 r = (redisReply*)redisCommand(c,"WATCH tom_lock"); 60 freeReplyObject(r); 61 62 r = (redisReply*)redisCommand(c,"GET tom_lock"); 63 64 //printf("release_lock %lld\n", r->integer); 65 66 if (((REDIS_REPLY_INTEGER == r->type) && (1 == r->integer)) || 67 ((REDIS_REPLY_STRING == r->type) && (strcmp("1", r->str) == 0)) 68 ) { 69 70 //printf("release_lock state 1\n"); 71 72 freeReplyObject(r); 73 74 r = (redisReply*)redisCommand(c,"MULTI"); 75 freeReplyObject(r); 76 77 r = (redisReply*)redisCommand(c,"DEL tom_lock"); 78 freeReplyObject(r); 79 80 r = (redisReply*)redisCommand(c,"EXEC"); 81 82 return true; 83 } else if(REDIS_REPLY_NIL == r->type) { 84 printf("release_lock state 2\n"); 85 86 freeReplyObject(r); 87 88 r = (redisReply*)redisCommand(c,"UNWATCH"); 89 freeReplyObject(r); 90 91 return true; 92 } else { 93 printf("release_lock sn:%d, tate 3\n", sn); 94 95 freeReplyObject(r); 96 97 r = (redisReply*)redisCommand(c,"UNWATCH"); 98 freeReplyObject(r); 99 }100 101 usleep(1000);102 }103 104 return false;105 }106 107 void* test_thread(void* arg) {108 int sn = *((int*)arg);109 redisContext* c = ts->rctx[sn];110 redisReply* r = NULL;111 int tom = 0;112 113 while(true) {114 115 //printf("En Lock sn:%d\n", sn);116 fflush(stdout);117 118 if(!acquire_lock(sn, c)) {119 printf("Co Lock\n");120 fflush(stdout);121 sched_yield();122 continue;123 }124 125 // printf("Lv Lock sn:%d\n", sn);126 // fflush(stdout);127 128 r = (redisReply*)redisCommand(c,"GET tom");129 tom = atoi(r->str);130 freeReplyObject(r);131 132 if(tom >= 50) {133 release_lock(sn, c);134 break;135 }136 137 tom += 1;138 printf("sn: %d, tom: %d\n", sn, tom);139 fflush(stdout);140 141 r = (redisReply*)redisCommand(c,"SET tom %d", tom);142 freeReplyObject(r);143 144 // printf("En UnLock sn:%d\n", sn);145 // fflush(stdout);146 147 release_lock(sn, c);148 149 //usleep(1);150 // printf("Lv UnLock sn:%d\n", sn);151 // fflush(stdout);152 }153 154 printf("Lv Test:%d\n", sn);155 fflush(stdout);156 157 return NULL;158 }159 160 int main(int argc, char* argv[])161 {162 ts = new ThreadSchema(20);163 164 const char *hostname = "127.0.0.1";165 int port = 6379;166 struct timeval timeout = { 1, 500000 }; // 1.5 seconds167 168 for(int i=0; i
num; ++i) {169 ts->rctx[i] = redisConnectWithTimeout(hostname, port, timeout);170 if (ts->rctx[i] == NULL || ts->rctx[i]->err) {171 if (ts->rctx[i]) {172 printf("Connection error: %s\n", ts->rctx[i]->errstr);173 } else {174 printf("Connection error: can't allocate redis context\n");175 }176 exit(1);177 }178 }179 180 redisReply* r = (redisReply*)redisCommand(ts->rctx[0],"DEL tom_lock");181 freeReplyObject(r);182 183 r = (redisReply*)redisCommand(ts->rctx[0],"SET tom 0");184 freeReplyObject(r);185 186 for(int i=0; i
num; ++i)187 {188 pthread_create(&ts->pid[i], NULL, test_thread, &ts->sn[i]);189 }190 191 /* Disconnects and frees the context */192 193 while(num < total_num)194 {195 sched_yield();196 }197 198 return 0;199 }

 

#ifndef THREAD_HELPER_H#define THREAD_HELPER_H#include 
#include
class ThreadSchema{public: ThreadSchema(int n); ~ThreadSchema(); int num; int* sn; redisContext** rctx; pthread_t* pid;};#endif

 

1 #include "thread_helper.h" 2 #include 
3 #include
4 5 ThreadSchema::ThreadSchema(int n) 6 { 7 num = n; 8 9 sn = (int*)malloc(sizeof(int) * num);10 if (NULL != sn) {11 for (int i=0; i < num; ++i) {12 sn[i] = i;13 }14 }15 16 rctx = (redisContext**)malloc(sizeof(redisContext*) * num);17 if (NULL != rctx) {18 memset(rctx, 0, sizeof(redisContext*) * num);19 }20 21 pid = (pthread_t*)malloc(sizeof(pthread_t) * num);22 if(NULL != pid) {23 memset(pid, 0, sizeof(pthread_t*) * num);24 }25 }26 27 ThreadSchema::~ThreadSchema() {28 29 for (int i=0; i < num; ++i) {30 if(NULL != rctx[i]) {31 redisFree(rctx[i]);32 }33 }34 35 if(NULL != rctx) {36 free(rctx);37 rctx = NULL;38 }39 40 if(NULL != pid) {41 free(pid);42 pid = NULL;43 }44 }

 

转载于:https://www.cnblogs.com/tomren/p/6432294.html

你可能感兴趣的文章
Struts2与Struts1区别
查看>>
网站内容禁止复制解决办法
查看>>
Qt多线程
查看>>
我的友情链接
查看>>
想说一点东西。。。。
查看>>
css知多少(8)——float上篇
查看>>
NLB网路负载均衡管理器详解
查看>>
水平添加滚动条
查看>>
PHP中”单例模式“实例讲解
查看>>
VS2008查看dll导出函数
查看>>
VM EBS R12迁移,启动APTier . AutoConfig错误
查看>>
atitit.细节决定成败的适合情形与缺点
查看>>
Mysql利用binlog恢复数据
查看>>
我的友情链接
查看>>
用yum安装mariadb
查看>>
一点IT"边缘化"的人的思考
查看>>
WPF 降低.net framework到4.0
查看>>
搭建一个通用的脚手架
查看>>
开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
查看>>
开源磁盘加密软件VeraCrypt教程
查看>>