1.介绍
Hash是以K->V形式存储,而Set则是K存储,空间节省了很多
Redis中Set是String类型的无序集合;集合成员是唯一的。
这就意味着集合中不能出现重复的数据。可根据应用场景需要选用该数据类型。(比如:好友/关注/粉丝/感兴趣的人/黑白名单)
2.源码解析
Redis使用Dict和IntSet保存Set数据
// 1. inset 数据结构,在set数据量小且都是整型数据时使用 typedef struct intset { 编码范围,由具体存储值决定 uint32_t encoding; 数组长度 uint32_t length; 具体存储元素的容器 int8_t contents[]; } intset;
2. dict 相关数据结构,即是 hash 的实现相关的数据结构 /* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing,for the old to the new table. */ typedef dictht { dictEntry **table; unsigned long size; unsigned sizemask; unsigned used; } dictht; typedef dict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 unsigned long iterators; number of iterators currently running } dict; If safe is set to 1 this is a safe iterator,that means,you can call * dictAdd,dictFind,and other functions against the dictionary even while * iterating. Otherwise it is a non safe iterator,and only dictNext() * should be called while iterating. dictIterator { dict *d; index; int table,safe; dictEntry *entry,*nextEntry; unsafe iterator fingerprint for misuse detection. */ long fingerprint; } dictIterator; typedef dictEntry { key; union { val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; } dictEntry; typedef dictType { unsigned int (*hashFunction)(const key); void *(*keyDup)(void *privdata,void *(*valDup)(obj); int (*keyCompare)(void *key1,1)">key2); void (*keyDestructor)(void (*valDestructor)(obj); } dictType;
3.SADD
加一个或多个指定的member元素到集合的 key中.指定的一个或者多个元素member 如果已经在集合key中存在则忽略.
如果集合key 不存在,则新建集合key,并添加member元素到集合key中.
如果key 的类型不是集合则返回错误.
时间复杂度:O(N)
127.0.0.1:6379> sadd myset "Hello" (integer) 1 0 6379> smembers myset 1) " 6379>
用法: SADD key member1 [member2] t_set.c,添加member void saddCommand(client *c) { robj *set; int j,added = 0 先从当前db中查找set实例 set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { 1. 新建set实例并添加到当前db中 set = setTypeCreate(c->argv[2]->ptr); dbAdd(c->db,1)">1],1)">); } else { set->type != OBJ_SET) { addReply(c,shared.wrongtypeerr); return; } } 对于n个member,一个个地添加即可 for (j = 2; j < c->argc; j++) { 2. 只有添加成功,added 才会加1 if (setTypeAdd(set,c->argv[j]->ptr)) added++; } 命令传播 if (added) { signalModifiedKey(c->db,1)">]); notifyKeyspaceEvent(NOTIFY_SET,sadd",c->db->id); } server.dirty += added; 响应添加成功的数量 addReplyLongLong(c,added); } 1. 创建新的set集合实例(需根据首次的参数类型判定) Factory method to return a set that *can* hold "value". When the object has * an integer-encodable value,an intset will be returned. Otherwise a regular * hash table. robj *setTypeCreate(sds value) { 如果传入的value是整型,则创建 intset 类型的set 否则使用dict类型的set 一般地,第一个数据为整型,后续数据也应该为整型,所以这个数据结构相对稳定 而hash的容器创建时,只使用了一 ziplist 创建,这是不一样的实现 if (isSdsRepresentableAsLongLong(value,NULL) == C_OK) createIntsetObject(); createSetObject(); } 1.1. 创建 intset 型的set object.c robj *createIntsetObject(void) { intset *is = intsetNew(); robj *o = createObject(OBJ_SET,1)">is); o->encoding = OBJ_ENCODING_INTSET; o; } intset.c,new一个空的intset对象 Create an empty intset. intset *intsetNew(is = zmalloc(sizeof(intset)); is->encoding = intrev32ifbe(INTSET_ENC_INT16); is->length = return ; } 1.2. 创建dict 型的set robj *createSetObject() { dict *d = dictCreate(&setDictType,NULL); robj *o = createObject(OBJ_SET,d); o->encoding = OBJ_ENCODING_HT; dict.c Create a new hash table dict *dictCreate(dictType *type,privDataPtr) { dict *d = zmalloc(sizeof(*d)); _dictInit(d,type,privDataPtr); d; } Initialize the hash table */ int _dictInit(dict *d,dictType *privDataPtr) { _dictReset(&d->ht[]); _dictReset(&d->ht[]); d->type = type; d->privdata = privDataPtr; d->rehashidx = -; d->iterators = DICT_OK; } 2. 添加member到set集合中 添加元素 Add the specified value into a set. * * If the value was already member of the set,nothing is done and 0 is * returned,otherwise the new element is added and 1 is returned. int setTypeAdd(robj *subject,sds value) { llval; 2.1. HT编码和INTSET编码分别处理就好 if (subject->encoding == OBJ_ENCODING_HT) { dict *ht = subject->ptr; 以 value 为 key,添加实例到ht中 实现过程也很简单,大概就是如果存在则返回NULL(即无需添加),辅助rehash,分配内存创建dictEntry实例,稍后简单看看 dictEntry *de = dictAddRaw(ht,value); (de) { 重新设置key为 sdsdup(value),value为NULL dictSetKey(ht,de,sdsdup(value)); dictSetVal(ht,NULL); return ; } } 2.2. intset 编码的member添加 else OBJ_ENCODING_INTSET) { 尝试解析value为 long 型,值写入 llval 中 C_OK) { uint8_t success = ; 情况1. 可添加到intset中 subject->ptr = intsetAdd(subject->ptr,llval,&success); (success) { Convert to regular set when the intset contains * too many entries. */ 默认: 512,intset大于之后,则转换为ht hash表模式存储 if (intsetLen(subject->ptr) > server.set_max_intset_entries) 2.3. 转换intset编码为 ht 编码 setTypeConvert(subject,OBJ_ENCODING_HT); ; } } { 情况2. member 是字符串型,先将set容器转换为 ht 编码,再重新执行dict的添加模式 Failed to get integer from object,convert to regular set. setTypeConvert(subject,OBJ_ENCODING_HT); The set *was* an intset and this value is not integer * encodable,so dictAdd should always work. serverAssert(dictAdd(subject->ptr,sdsdup(value),1)"> DICT_OK); ; } } { serverPanic(Unknown set encoding); } ; } 2.1. 添加member到dict中(略解,在hash数据结构解析中已介绍) dict.c,添加某key到 d 字典中 Low level add. This function adds the entry but instead of setting * a value returns the dictEntry structure to the user,that will make * sure to fill the value field as he wishes. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value,example: * * entry = dictAddRaw(dict,mykey); * if (entry != NULL) dictSetSignedIntegerVal(entry,1000); * * Return values: * * If key already exists NULL is returned. * If key was added,the hash entry is returned to be manipulated by the caller. dictEntry *dictAddRaw(dict *d,1)">key) { index; dictEntry *entry; dictht *ht; (dictIsRehashing(d)) _dictRehashStep(d); Get the index of the new element,or -1 if * the element already exists. 获取需要添加的key的存放位置下标(slot),如果该key已存在,则返回-1(无可用slot) if ((index = _dictKeyIndex(d,key)) == -) NULL; Allocate the memory and store the new entry. * Insert the element in top,with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[]; entry = zmalloc(entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++; Set the hash entry fields. dictSetKey(d,entry,key); entry; } 2.2. 添加整型数据到 intset中 添加value Insert an integer in the intset intset *intsetAdd(intset *is,int64_t value,uint8_t *success) { 获取value的所属范围 uint8_t valenc = _intsetValueEncoding(value); uint32_t pos; if (success) *success = Upgrade encoding if necessary. If we need to upgrade,we know that * this value should be either appended (if > 0) or prepended (if < 0),* because it lies outside the range of existing values. 默认 is->encoding 为 INTSET_ENC_INT16 (16位长) 2.2.1. 即超过当前预设的位长,则需要增大预设,然后添加 此时的value可以确定: 要么是最大,要么是最小 (所以我们可以推断,此intset应该是有序的) if (valenc > intrev32ifbe(is->encoding)) { This always succeeds,so we don't need to curry *success. */ return intsetUpgradeAndAdd(,value); } Abort if the value is already present in the set. * This call will populate "pos" with the right position to insert * the value when it cannot be found. 2.2.2. 在当前环境下添加value 找到value则说明元素已存在,不可再添加 pos 保存比value小的第1个元素的位置 if (intsetSearch(pos)) { ; } is = intsetResize(is->length)+); 在pos不是末尾位置时,需要留出空位,依次移动后面的元素 if (pos < intrev32ifbe(is->length)) intsetMoveTail( 针对编码位不变更的情况下设置pos位置的值 _intsetSet(is->length = intrev32ifbe(intrev32ifbe(); 判断 value 的位长 INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64 2 < 4 < 8 Return the required encoding for the provided value. static uint8_t _intsetValueEncoding(int64_t v) { if (v < INT32_MIN || v > INT32_MAX) INTSET_ENC_INT64; if (v < INT16_MIN || v > INT16_MAX) INTSET_ENC_INT32; else INTSET_ENC_INT16; } 2.2.1. 升级预设位长,并添加value intset.c Upgrades the intset to a larger encoding and inserts the given integer. static intset *intsetUpgradeAndAdd(intset *encoding); uint8_t newenc = _intsetValueEncoding(value); int length = intrev32ifbe(length); int prepend = value < 0 ? 1 : First set new encoding and resize intrev32ifbe(newenc); 每次必进行扩容 ); Upgrade back-to-front so we don't overwrite values. * Note that the "prepend" variable is used to make sure we have an empty * space at either the beginning or the end of the intset. 因编码发生变化,元素的位置已经不能一一对应,需要按照原来的编码依次转移过来 从后往前依次赋值,所以,内存位置上不存在覆盖问题(后面内存位置一定是空的),直接依次赋值即可(高效复制) while(length--) _intsetSet( Set the value at the beginning or the end. 对新增加的元素,负数添加到第0位,否则添加到最后一个元素后一位 (prepend) _intsetSet( _intsetSet(length),1)"> Resize the intset static intset *intsetResize(intset *encoding); malloc is = zrealloc(sizeof(intset)+size); 获取pos位置的值 Return the value at pos,given an encoding. static int64_t _intsetGetEncoded(intset * pos,uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,1)">(v64)); memrev64ifbe(&v64); v64; } INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)(v32)); memrev32ifbe(&v32); v32; } { memcpy(&v16,((int16_t*)(v16)); memrev16ifbe(&v16); v16; } } 只是这里数据类型是不确定的,所以使用指针进行赋值 Set the value at pos,using the configured encoding. static void _intsetSet(intset *if (encoding == INTSET_ENC_INT64) { ((int64_t*)is->contents)[pos] = value; memrev64ifbe(((int64_t*)is->contents)+pos); } INTSET_ENC_INT32) { ((int32_t*) value; memrev32ifbe(((int32_t*) { ((int16_t*) value; memrev16ifbe(((int16_t*)pos); } } 2.2.2. 在编码类型未变更的情况,需要查找可以存放value的位置(为了确认该value是否已存在,以及小于value的第一个位置赋值) Search for the position of "value". Return 1 when the value was found and * sets "pos" to the position of the value within the intset. Return 0 when * the value is not present in the intset and sets "pos" to the position * where "value" can be inserted. static uint8_t intsetSearch(intset *pos) { int min = 0,max = intrev32ifbe(is->length)-1,mid = -; int64_t cur = - The value can never be found when the set is empty if (intrev32ifbe(is->length) == if (pos) *pos = ; ; } Check for the case where we know we cannot find the value,* but do know the insert position. 因 intset 是有序数组,即可以判定是否超出范围,如果超出则元素必定不存在 if (value > _intsetGet()) { if (pos) *pos = intrev32ifbe(length); ; } if (value < _intsetGet( 使用二分查找 while(max >= min) { mid = ((unsigned int)min + (unsigned int)max) >> ; cur = _intsetGet(if (value > cur) { min = mid+if (value < cur) { max = mid- 找到了 break; } } if (value == cur) { if (pos) *pos = mid; 在没有找到的情况下,min就是第一个比 value 小的元素 min; ; } } intset移动(内存移动) void intsetMoveTail(intset *fromvoid *src,1)">dst; uint32_t bytes = intrev32ifbe(is->length)-; uint32_t encoding = intrev32ifbe(encoding); INTSET_ENC_INT64) { src = (int64_t*)is->contents+; dst = (int64_t*)to; bytes *= (int64_t); } INTSET_ENC_INT32) { src = (int32_t*); dst = (int32_t*)(int32_t); } { src = (int16_t*); dst = (int16_t*)(int16_t); } memmove(dst,src,bytes); } 2.3. 转换intset编码为 ht 编码 (如果遇到string型的value或者intset数量大于阀值(默认:512)时) Convert the set to specified encoding. The resulting dict (when converting * to a hash table) is presized to hold the number of elements in the original * set. void setTypeConvert(robj *setobj,1)"> enc) { setTypeIterator *si; 要求外部必须保证 set类型且 intset 编码 serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET && setobj->encoding == OBJ_ENCODING_INTSET); OBJ_ENCODING_HT) { int64_t intele; 直接创建一个 dict 来容纳数据 dict *d = dictCreate(& Presize the dict to avoid rehashing 直接一次性扩容成需要的大小 dictExpand(d,intsetLen(setobj->ptr)); To add the elements we extract integers and create redis objects setTypeIterator 迭代器是转换的关键 si = setTypeInitIterator(setobj); while (setTypeNext(si,&element,&intele) != -) { element:ht编码时的key,intele: intset编码时的value element = sdsfromlonglong(intele); 因set特性保证是无重复元素,所以添加dict时,必然应成功 此处应无 rehash,而是直接计算 hashCode,放置元素,时间复杂度 O(1) serverAssert(dictAdd(d,element,1)"> DICT_OK); } 释放迭代器 setTypeReleaseIterator(si); setobj->encoding = OBJ_ENCODING_HT; zfree(setobj->ptr); setobj->ptr = d; } Unsupported set conversion); } } 获取set集合的迭代器 setTypeIterator *setTypeInitIterator(robj *subject) { setTypeIterator *si = zmalloc((setTypeIterator)); 设置迭代器公用信息 si->subject = subject; si->encoding = subject->encoding; hash表则需要再迭代 dict if (si->encoding == OBJ_ENCODING_HT) { si->di = dictGetIterator(subject->ptr); } intset 比较简单,直接设置下标即可 OBJ_ENCODING_INTSET) { si->ii = si; } d) { dictIterator *iter = zmalloc(iter)); iter->d = d; iter->table = ; iter->index = -; iter->safe = ; iter->entry = NULL; iter->nextEntry = NULL; iter; } Move to the next entry in the set. Returns the object at the current * position. * * Since set elements can be internally be stored as SDS strings or * simple arrays of integers,setTypeNext returns the encoding of the * set object you are iterating,and will populate the appropriate pointer * (sdsele) or (llele) accordingly. * * Note that both the sdsele and llele pointers should be passed and cannot * be NULL since the function will try to defensively populate the non * used field with values which are easy to trap if misused. * * When there are no longer elements -1 is returned. int setTypeNext(setTypeIterator *si,sds *sdsele,int64_t *llele) { hash表返回key OBJ_ENCODING_HT) { dictEntry *de = dictNext(si->di); if (de == NULL) return -; *sdsele = dictGetKey(de); *llele = -123456789; Not needed. Defensive. } intset 直接获取下标对应的元素即可 if (!intsetGet(si->subject->ptr,si->ii++; *sdsele = NULL; } Wrong set encoding in setTypeNextreturn si->encoding; } case1: intset直接叠加下标即可 Sets the value to the value at the given position. When this position is * out of range the function returns 0,when in range it returns 1. uint8_t intsetGet(intset *value) { length)) { *value = _intsetGet(static int64_t _intsetGet(intset * pos) { return _intsetGetEncoded(encoding)); } (附带)case2: dict的迭代 iter) { 一直迭代查找 while ( iter->entry 为NULL,有两种可能: 1. 初始化时; 2. 上一元素为迭代完成(hash冲突) if (iter->entry == NULL) { dictht *ht = &iter->d->ht[iter->table]; if (iter->index == -1 && iter->table == ) { if (iter->safe) iter->d->iterators++; iter->fingerprint = dictFingerprint(iter->d); } 直接使用下标进行迭代,如果中间有空闲位置该如何处理?? 看起来redis是使用了全量迭代元素的处理办法,即有可能有许多空迭代过程 一般地,也是进行两层迭代,jdk的hashmap迭代实现为直接找到下一次非空的元素为止 iter->index++ 直到迭代完成所有元素,否则会直到找到一个元素为止 if (iter->index >= (long) ht->size) { if (dictIsRehashing(iter->d) && iter->table == ) { iter->table++; iter->index = ; ht = &iter->d->ht[]; } { ; } } iter->entry = ht->table[iter->index]; } entry不为空,就一定有nextEntry?? iter->entry = iter->nextEntry; } 如果当前entry为空,则继续迭代下一个 index entry) { We need to save the 'next' here,the iterator user * may delete the entry we are returning. iter->nextEntry = iter->entry->next; return iter->entry; } } NULL; }
4.SISMEMBER
返回成员 member 是否是存储的集合 key的成员.
如果member元素是集合key的成员,则返回1
如果member元素不是key的成员,或者集合key不存在,则返回0
时间复杂度:O(1)
6379> sismember myset World6379>
用法: SISMEMBER key member void sismemberCommand(client *if ((set = lookupKeyReadOrReply(c,shared.czero)) == NULL || checkType(c,OBJ_SET)) 主要方法 setTypeIsMember if (setTypeIsMember(ptr)) 回复1 addReply(c,shared.cone); 回复0t_set.c int setTypeIsMember(robj * OBJ_ENCODING_HT) { hash 表的查找方式,hashCode 计算,链表查找,就这么简单 return dictFind((dict*)subject->ptr,value) != NULL; } 如果当前的set集合是 intset 编码的,则只有查找值也是整型的情况下才可能查找到元素 C_OK) { intset 查找,而且 intset 是有序的,所以直接使用二分查找即可 return intsetFind((intset*)subject->ptr,llval); } } ; }
Determine whether a value belongs to this set uint8_t intsetFind(intset * 最大范围检查,加二分查找 intsetSearch 前面已介绍 return valenc <= intrev32ifbe(is->encoding) && intsetSearch(sinter:返回指定所有的集合的成员的交集,例如(共同好友)
sdiff:
返回一个集合与给定集合的差集的元素
sunion
返回给定的多个集合的并集中的所有成员
6379> sadd myset1 1 2 3 4 55 6379> sadd myset2 5 6 7 sinter myset1 myset2 32) 43) 5 sdiff myset1 myset2 12 sunion myset1 myset2 4) 5) 6) 67) 76379>sinter源码解析
用法: SINTER key1 [key2] void sinterCommand(client *c) { 第三个参数是用来存储 交集结果的,两段代码已做复用,说明存储过程还是比较简单的 sinterGenericCommand(c,c->argv+void sinterGenericCommand(client *c,robj **setkeys,unsigned long setnum,robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; robj *dstset = NULL; sds elesds; int64_t intobj; void *replylen = NULL; unsigned long j,cardinality = encoding; 0; j < setnum; j++ 依次查找每个key的set实例 robj *setobj = dstkey ? lookupKeyWrite(c->db,setkeys[j]) : lookupKeyRead(c-> 只要有一个set为空,则交集必定为为,无需再找 if (!setobj) { zfree(sets); (dstkey) { 没有交集,直接将dstKey 删除,注意此逻辑?? if (dbDelete(c->++; } addReply(c,shared.czero); } { addReply(c,shared.emptymultibulk); } ; } (checkType(c,OBJ_SET)) { zfree(sets); ; } sets[j] = setobj; } Sort sets from the smallest to largest,this will improve our * algorithm's performance 快速排序算法,将 sets 按照元素长度做排序,使最少元素的set排在最前面 qsort(sets,setnum,1)">sizeof(robj*),qsortCompareSetsByCardinality); The first thing we should output is the total number of elements... * since this is a multi-bulk write,but at this stage we don't know * the intersection set size,so we use a trick,append an empty object * to the output list and save the pointer to later modify it with the * right length dstkey) { replylen = addDeferredMultiBulkLength(c); } If we have a target key where to store the resulting set * create this key with an empty set inside dstset = createIntsetObject(); } Iterate all the elements of the first (smallest) set,and test * the element against all the other sets,if at least one set does * not include the element it is discarded 看来redis也是直接通过迭代的方式来完成交集功能 迭代最少的set集合,依次查找后续的set集合,当遇到一个不存在的set时,上值被排除,否则是交集 si = setTypeInitIterator(sets[while((encoding = setTypeNext(si,&elesds,&intobj)) != -1; j < setnum; j++if (sets[j] == sets[0]) continue 以下是查找过程 分 hash表查找 和 intset 编码查找 OBJ_ENCODING_INTSET) { intset with intset is simple... and fast 两个集合都是 intset 编码,直接二分查找即可 if (sets[j]->encoding == OBJ_ENCODING_INTSET && !intsetFind((intset*)sets[j]-> in order to compare an integer with an object we * have to use the generic function,creating an object * for this } if (sets[j]->encoding == OBJ_ENCODING_HT) { 编码不一致,但元素可能相同 setTypeIsMember 复用前面的代码,直接查找即可 elesds = sdsfromlonglong(intobj); setTypeIsMember(sets[j],elesds)) { sdsfree(elesds); ; } sdsfree(elesds); } } OBJ_ENCODING_HT) { ; } } } Only take action when all sets contain the member 当迭代完所有集合,说明每个set中都存在该值,是交集(注意分析最后一个迭代) if (j == setnum) { 不存储交集的情况下,直接响应元素值即可 dstkey) { OBJ_ENCODING_HT) addReplyBulkCBuffer(c,elesds,sdslen(elesds)); addReplyBulkLongLong(c,intobj); cardinality++; } 要存储交集数据,将值存储到 dstset 中 { OBJ_ENCODING_INTSET) { elesds = sdsfromlonglong(intobj); setTypeAdd(dstset,elesds); sdsfree(elesds); } { setTypeAdd(dstset,elesds); } } } } setTypeReleaseIterator(si); (dstkey) { Store the resulting set into the target,if the intersection * is not an empty set. 存储集合之前会先把原来的数据删除,如果进行多次交集运算,dstKey 就相当于临时表咯 int deleted = dbDelete(c->if (setTypeSize(dstset) > ) { dbAdd(c->sinterstoreid); } { decrRefCount(dstset); addReply(c,shared.czero); (deleted) notifyKeyspaceEvent(NOTIFY_GENERIC,1)">delid); } signalModifiedKey(c-> { setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); } compare 方法 int qsortCompareSetsByCardinality(void *s1,1)">s2) { return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2); } 快排样例 sort.lua -- extracted from Programming Pearls,page 110 function qsort(x,l,u,f) if l<u then local m=math.random(u-(l-1))+l-1 -- choose a random pivot in range l..u x[l],x[m]=x[m],x[l] -- swap pivot to first position local t=x[l] -- pivot value m=l local i=l+1 while i<=u do -- invariant: x[l+1..m] < t <= x[m+1..i-] f(x[i],t) then m=m+ x[m],x[i]=x[i],x[m] -- swap x[i] and x[m] end i=i+ end x[l],1)"> swap pivot to a valid place -- x[l+1..m-1] < x[m] <= x[m+..u] qsort(x,m-sdiff和sunion源码解析
void sunionCommand(client *c) { sunionDiffGenericCommand(c,c->argv+void sunionstoreCommand(client *2,1)">],1)">void sdiffCommand(client *void sdiffstoreCommand(client *用法: SDIFFSTORE destination key1 [key2] 看起来sdiff 与 sunion 共用了一段代码,为啥呢? 想想 sql 中的 full join c->argv[1] 是 dstKey sunionDiffGenericCommand(c,SET_OP_DIFF); }void sunionDiffGenericCommand(client *c,robj **setkeys,1)"> setnum,robj *dstkey,1)"> op) { robj **sets = zmalloc( NULL; sds ele; int diff_algo = 同样的套路,先查找各key的实例 不同的是,这里的key允许不存在,但不允许类型不一致 ) { robj *setobj = dstkey ?setobj) { sets[j] = NULL; setobj; } Select what DIFF algorithm to use. * * Algorithm 1 is O(N*M) where N is the size of the element first set * and M the total number of sets. * * Algorithm 2 is O(N) where N is the total number of elements in all * the sets. * * We compute what is the best bet with the current input here. 针对差集运算,做算法优化 if (op == SET_OP_DIFF && sets[]) { long algo_one_work = ; if (sets[j] == NULL) ; algo_one_work += setTypeSize(sets[]); algo_two_work += setTypeSize(sets[j]); } Algorithm 1 has better constant times and performs less operations * if there are elements in common. Give it some advantage. algo_one_work /= ; diff_algo = (algo_one_work <= algo_two_work) ? if (diff_algo == 1 && setnum > With algorithm 1 it is better to order the sets to subtract * by decreasing size,so that we are more likely to find * duplicated elements ASAP. qsort(sets+ We need a temp set object to store our union. If the dstkey * is not NULL (that is,we are inside an SUNIONSTORE operation) then * this set object will be the resulting object to set into the target key dstset = createIntsetObject(); if (op == SET_OP_UNION) { Union is trivial,just add every element of every set to the * temporary set. if (!sets[j]) continue; non existing keys are like empty sets */ 依次添加即可,对于 sunion 来说,有序是无意义的 si = setTypeInitIterator(sets[j]); while((ele = setTypeNextObject(si)) != NULL) { if (setTypeAdd(dstset,ele)) cardinality++; sdsfree(ele); } setTypeReleaseIterator(si); } } 使用算法1,依次迭代最大元素 0] && diff_algo == DIFF Algorithm 1: * * We perform the diff by iterating all the elements of the first set,* and only adding it to the target set if the element does not exist * into all the other sets. * * This way we perform at max N*M operations,where N is the size of * the first set,and M the number of sets. si = setTypeInitIterator(sets[]); NULL) { no key is an empty set. break; same set! 只要有一个相同,就不算是差集?? if (setTypeIsMember(sets[j],ele)) ; } 这里的差集是所有set的值都不相同或者为空??? 尴尬了 setnum) { There is no other set with this element. Add it. setTypeAdd(dstset,ele); cardinality++; } sdsfree(ele); } setTypeReleaseIterator(si); } 使用算法2,直接以第一个元素为基础,后续set做remove,最后剩下的就是差集 DIFF Algorithm 2: * * Add all the elements of the first set to the auxiliary set. * Then remove all the elements of all the next sets from it. * * This is O(N) where N is the sum of all the elements in every * set. si =if (j == ) { ; } if (setTypeRemove(dstset,ele)) cardinality--; } sdsfree(ele); } setTypeReleaseIterator(si); Exit if result set is empty as any additional removal * of elements will have no effect. if (cardinality == 0) Output the content of the resulting set,if not in STORE mode dstkey) { addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); 响应差集列表 NULL) { addReplyBulkCBuffer(c,ele,sdslen(ele)); sdsfree(ele); } setTypeReleaseIterator(si); decrRefCount(dstset); } If we have a target key where to store the resulting set * create this key with the result set inside 存储差集列表,响应差集个数 dbAdd(c->sunionstore" : sdiffstore; } zfree(sets); } This is used by SDIFF and in this case we can receive NULL that should * be handled as empty sets. int qsortCompareSetsByRevCardinality(s2) { robj *o1 = *(robj**)s1,*o2 = *(robj**)s2; return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : ); }6.SPOP
从存储在
key
的集合中移除并返回一个或多个随机元素。此操作与
SRANDMEMBER
类似,它从一个集合中返回一个或多个随机元素,但不删除元素。spop myset1 smembers myset1 6379>
源码解析
用法: SPOP key [count] void spopCommand(client *aux; sds sdsele; int64_t llele; if (c->argc == 3 弹出指定数量的元素,略 spopWithCountCommand(c); if (c->argc > ) { addReply(c,shared.Syntaxerr); ; } Make sure a key with the name inputted exists,and that it's type is * indeed a set set = lookupKeyWriteOrReply(c,shared.nullbulk)) == NULL || Get a random element from the set 1. 随机获取一个元素,这是 spop 的定义 encoding = setTypeRandomElement(llele); Remove the element from the set 2. 删除元素 OBJ_ENCODING_INTSET) { ele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set-> { ele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(ptr); } notifyKeyspaceEvent(NOTIFY_SET,1)">spopid); Replicate/AOF this command as an SREM operation aux = createStringObject(SREM4); rewriteClientCommandVector(c,1)">3,aux,ele); decrRefCount(aux); Add the element to the reply addReplyBulk(c,ele); decrRefCount(ele); Delete the set if it's empty if (setTypeSize(set) == ) { dbDelete(c->db,1)">]); notifyKeyspaceEvent(NOTIFY_GENERIC,1)">id); } Set has been modified signalModifiedKey(c->db,1)">]); server.dirty++ 没啥好说的,就看下是如何随机的就好了 随机获取一个元素,赋值给 sdsele|llele Return random element from a non empty set. * The returned element can be a int64_t value if the set is encoded * as an "intset" blob of integers,or an SDS string if the set * is a regular set. * * The caller provides both pointers to be populated with the right * object. The return value of the function is the object->encoding * field of the object and is used by the caller to check if the * int64_t pointer or the redis object pointer was populated. * * Note that both the sdsele and llele pointers should be passed and cannot * be NULL since the function will try to defensively populate the non * used field with values which are easy to trap if misused. int setTypeRandomElement(robj *setobj,1)">if (setobj->encoding == 1.1. dict 型的随机 dictEntry *de = dictGetRandomKey(setobj->ptr); *sdsele = 1.2. intset 型的随机 *llele = intsetRandom(setobj->ptr); *sdsele = NULL; return setobj-> 1.1. dict 型的随机 Return a random entry from the hash table. Useful to * implement randomized algorithms dictEntry *dictGetRandomKey(dict *d) { dictEntry *he,1)">orighe; unsigned h; listlen,listele; if (dictSize(d) == (dictIsRehashing(d)) _dictRehashStep(d); 基本原理就是一直接随机获取下标,直到有值 (dictIsRehashing(d)) { do We are sure there are no elements in indexes from 0 * to rehashidx-1 获取随机下标,须保证在 两个hash表的范围内 h = d->rehashidx + (random() % (d->ht[0].size + d->ht[1].size - d->rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[].size] : d->ht[].table[h]; } while(he == NULL); } { h = random() & d->ht[].sizemask; he = d->ht[ NULL); } Now we found a non empty bucket,but it is a linked * list and we need to get a random element from the list. * The only sane way to do so is counting the elements and * select a random index. listlen = ; orighe = he; 对于hash冲突情况,再随机一次 while(he) { he = he->next; listlen++; } listele = random() % listlen; he = orighe; while(listele--) he = he->next; he; } 1.2. intset 型的随机 Return random member int64_t intsetRandom(intset *) { 这个随机就简单了,直接获取随机下标,因为intset可以保证自身元素的完整性 return _intsetGet(length)); }