In [2]:
from IPython.display import Image
Image(filename='banner.png') 
Out[2]:

How do we find duplicates among multiple, giant datasets?

Implementing adaptive blocking for database deduplication, part 4

In the part 1 of this post, I described the problem of database deduplication. We need to compare every record in the database to every other record, and for databases of substantial size, there are simply too many records to address directly. We use blocking techniques to reduce the number of pairs to consider, the candidate set of pairs. In part 2, I showed how to set up the data so that we can search for the optimum candidate set. In part 3, I showed how to do the search.

Here in part 4, I'll explain how to use the pairs without creating redundant pairs. Finally in part 5, I'll talk about the implications of this work, and give some pointers on things to watch out for.

The Scheme We're Processing

$(name\_en\_meta\_first \wedge name\_en\_meta\_last \wedge year) \vee \\ (date\_of\_death \wedge governorate \wedge sorted\_name\_en\_lsh) \vee \\ (governorate \wedge name\_en\_first4 \wedge yearmo) \vee \\ (governorate \wedge name\_last5 \wedge year) \vee \\ (governorate \wedge sortedname\_en\_meta\_last \wedge name\_lsh) \vee \\ (sex \wedge date\_of\_death \wedge name\_last5) \vee \\ (governorate \wedge sortedname\_en\_last5 \wedge yearmo) \vee \\ (date\_of\_death \wedge sortedname\_meta\_first \wedge sortedname\_en\_meta\_last) \vee \\ (location \wedge name\_first5 \wedge year) \vee \\ (sex \wedge sortedname\_en \wedge name\_en\_no\_mo\_lsh) \vee \\ (name\_first5 \wedge name\_en\_last4 \wedge yearmo) \vee \\ (governorate \wedge sortedname\_en\_first5 \wedge sortedname\_en\_last5) \vee \\ (date\_of\_death \wedge sortedname\_en\_meta\_first \wedge sortedname\_lsh) \vee \\ (age\_group \wedge name\_en\_meta\_first \wedge yearmo) \vee \\ (location \wedge sortedname\_en\_meta\_first \wedge sortedname\_en\_meta\_last) \vee \\ (location \wedge name\_en\_meta\_first \wedge yearmo) \vee \\ (governorate \wedge sortedname \wedge yearmo) \vee \\ (name \wedge age \wedge age\_group)$

This is a disjunction of conjunctions.

Non-Redundant Pairs

Creating pairs from blocks creates a lot of duplicate pairs. Referring back to Figure 1 in part 1, we considered the scheme $(birth\_year \wedge sex) \vee last\_name$. The first rule, $(birth\_year \wedge sex)$, generated a lot of pairs, among them [(1, 2), (1, 9), (2, 9)]. Note that the second rule, $last\_name$, also generates a number of pairs, including these same three. Generating the pairs a second time is redundant, but it's not easy to figure out how to identify the redundant pairs. By definition, they are generated by different rules.

We should expect a lot of redundant pairs. Many of our rules are likely to generate the same pairs, especially with an optimized rule-searching approach. All the rules in our scheme include some aspect of the information from the victims' names with a combination of location and date of death information. While each rule finds slightly different groups of pairs, many (if not most) of the same pairs will keep coming up. It would be more efficient to process those pairs only once.

The Simplest Approach

If we're generating a relatively small number of pairs, we can keep them in memory. Each time we process another block, we check each new pair against the ones we already have and we only use it if we have not yet generated it. However, this approach fails pretty quickly as the number of pairs grows into the dozens of millions.

The Dedoop Approach

With a little bit of thinking ahead, we can avoid processing the redundant pairs. This part of this post implements an algorithm given by Kolb et al. in "Dedoop: Efficient Deduplication with Hadoop" (2012) and the Dedoop project more generally. (I'm indebted to the dedupe project for pointing me to Dedoop).

The Dedoop team created this approach so that lots of computers could help to generate the list of pairs without repeating each others' work using a MapReduce framework. That's great, and will be a big advantage for us when we use MapReduce. For the project here, thanks to the optimization of the rules in the previous step, we now have few enough pairs that we can do it in one pass, and we could use the simpler "keep it all in memory" approach. However, this is still a fast approach, and it's a good foundation for bigger problems we might encounter later.

The Dedoop approach starts by building an index from blocks to all the ids in each block, and it builds a reverse index from each id to all the blocks that contain it. As the algorithm processes each block, it generates pairs of ids. With each pair, it compares the two ids' lists of blocks. If they share a block that would have been generated before the current block, it skips that pair. I'll explain in more detail in the code below.

In [8]:
# First open the data
import pandas as pd
import pickle
records = pd.read_pickle("rp.pkl")
c2ri = pickle.load(open("c2ri.pkl", 'rb'))
rule_data = pickle.load(open("rules.pkl", 'rb'))
In [9]:
# this is what we calculated in the previous step. We'll use the first 9 rules
rules_q = ['r6962', 'r3138', 'r3726', 'r3665', 'r3863', 
           'r1786', 'r3777', 'r3434', 'r3994', 'r1911', 
           'r5249', 'r3746', 'r3478', 'r3058', 'r4205', 
           'r4193', 'r3572', 'r695'][0:9]

The next step sets up the most important part of this puzzle, the indexes between the block_ids (for blocks) and the hashids (for records). The amount of computer time that this takes will grow linearly as a function of the number of blocks ($b$) and the number of records ($n$): $O(b+n)$. That means it will grow a lot more slowly than the number of pairs which grows as $O(n*b)$. This can handle a pretty big problem.

In [10]:
#
# block2ids[block_id]: set([hashid0, hashid1, ...])
# hash2blockid[hashid]: [block_id0, block_id1, ...]
# 
from collections import defaultdict 
import time 
start = time.time()

block2ids = dict()
hash2blockids = defaultdict(list)

for rule_id in rules_q:
    print(rule_id)
    cols = rule_data[rule_id]['cols']
    for name, block in records.groupby(cols):
        block_key = str(name)
        block2ids[block_key] = set(block.hashid)
        for hashid in block2ids[block_key]:
            hash2blockids[hashid].append(block_key)
            
end = time.time()
print("time is = {:2.0f}s".format(end - start))  # time is = 202s
r6962
r3138
r3726
r3665
r3863
r1786
r3777
r3434
r3994
time is = 185s
In [7]:
start = time.time()

# now sort the list of blockids each hashid is keeping
# this means that when we compare the lists of block_ids from 2 hashids,
# we'll get a stable comparison. 
for hashid in hash2blockids:
    hash2blockids[hashid] = sorted(hash2blockids[hashid]) 

# for each blockid, add a sequence position for each hash. This enables us to 
# look at the prior blocks, that is, the blocks processed before the current one
# from the point of view of each of the hashids.
for block_key in block2ids:
    post_hashids = set()
    for hashid in block2ids[block_key]: 
        i = hash2blockids[hashid].index(block_key)
        post_hashids.add((hashid, i))
    block2ids[block_key] = post_hashids

end = time.time()
print("time is = {:2.0f}s".format(end - start))   # time is =  10s
time is = 10s

Let's look in a little more detail at the data structures here.

from pprint import pprint
pprint(block2ids.popitem())

("('Idlib', 'AZZAQ', 257)",
 {('00da1e931479287e209dbe77f6077de0e74ae585', 4),
  ('17dc66a95a8b2ebfb2916ad1f170c44f396d5ec3', 2),
  ('18a3159e33e5abfdc166ccb90a0f7ee5e7c369d4', 4),
  ('1b02e0c9f12cd10ebbfb502d8866e07b05b06f8e', 4),
  ('1d445184eb364e60449357d26bfd44e8ded18a30', 5),
<snip>
  ('d48e06e6b314855319ec44d2d0c02869b39cf976', 3))

We've grabbed an arbitrary (key, value) pair from the block2ids dict. The output shows a block: ('Idlib', 'AZZAQ', 257). This block come from the rule $(governorate \wedge sortedname\_en\_last5 \wedge name\_en\_no\_mo\_lsh)$, so the values that define the block are a governorate, the last 5 characters of the sorted name in English, and a Locality Sensitive Hashing group on the English name with all the 'Mohammed' strings removed. (There are 35 records in this block, but I'm only showing a few here for brevity).

Each record is shown with its id, and the position that this block occupies in the list of blocks (that is kept in the other half of the index, which maps hashids to blocks).

This next block shows the blocks to which the first id above pertains.

hash2blockids['00da1e931479287e209dbe77f6077de0e74ae585']

["('2012-10-16', '', 'RSK')",
 "('2012-10-16', 'Idlib', '')",
 "('APTL', 'HPRN', '2012')",
 "('Idlib', 'ABDU', '2012-10')",
 "('Idlib', 'AZZAQ', 257)",
 "('Idlib', 'RSK', '2012-10')",
 "('Idlib', 'لخليل', '2012')",
 "('M', '2012-10-16', 'لخليل')",
 "('كنصفرة', 'عبد ا', 17)"]

This record comes from the Syrian Center for Statistics and Research (CSR-SY). They publish their list of victims of killings here. I listed our other data partners in part 1.

Remember that in the previous dict, we saw that block2ids remembers not only the hashids, but also the position this block occupies in the list of blocks to which this hashid pertains. The position for this hashid was 4, and the block we're watching can be seen in position 4 (remember that python counts from 0, so position 4 is the fifth row).

The point of all this indexing is to enable the process of generating and processing pairs to be non-redundant -- without knowing the whole list of candidate pairs. Even if we split up the work of processing blocks into pairs among a number of different computers, if they use these indices, they won't process the same pair twice.

In [8]:
# emitting non-redundant pairs
# The idea is that we loop over blockids, writing pairs
# before we write a pair, we 
# 
#
start = time.time()

from itertools import combinations as comb 
from functools import partial 

df_from_recs = partial(pd.DataFrame.from_records, 
                       columns=['hash1', 'hash2'])

emitted = 0 
redundant = 0 
blocks = 0 
candidate_set = list()
blk_size = 50000
candidate_pairs = None

# this loop processes all the blockids, but it could be focused on only 
# a narrow range of blocks to work in parallel with other processes.
for blockid,hashids in block2ids.items():
    blocks += 1 
    # process every pair of the (hashid,position) in this block
    for x0,x1 in comb(hashids, 2):
        # unpack the tuples into hashid,index 
        hashid0, index0 = x0
        hashid1, index1 = x1
        # pull the blocks prior to this one from each hashid's list of blocks
        s0 = set(hash2blockids[hashid0][0:index0])
        s1 = set(hash2blockids[hashid1][0:index1])
        # then check if they intersect
        if s0 & s1: 
            redundant += 1 
        else:
            # at this point, do something useful with the pairs
            # the most likely thing to do is to calculate a vector of 
            # similarity-dissimilarity measures for fields, parts of fields, 
            # or groups of fields. These calculations may be time-consuming. 
            #
            # we require that pairs be stored in lexigraphic order, so sort them
            #
            pair = (hashid0,hashid1) if hashid0 < hashid1 else (hashid1,hashid0)
            # these do **not** have to be stored in memory. Heck, if we're just 
            # going to put them in a set, we didn't need to do all this, did we? 
            # But what if we're going to use many different processes to create
            # and use pairs? Maybe write them to disk? Or use them to compute
            # feature vectors? The best use of this approach is probably as 
            # a python generator. 
            candidate_set.append(pair)
     
    if len(candidate_set) % blk_size == 0: 
        print("processed {} blocks out of {} in {:0.1f}s".format(
            blocks, len(block2ids), (time.time() - start)))
        
        tmp = df_from_recs(candidate_set, nrows=len(candidate_set))    
        if candidate_pairs is None:
            candidate_pairs = tmp
        else:
            candidate_pairs = candidate_pairs.append(tmp)
        del tmp 
        candidate_set = list()

candidate_pairs = candidate_pairs.append(
    df_from_recs(candidate_set, nrows=len(candidate_set)))
del candidate_set 

print("emitted={}, redundant={}".format(len(candidate_pairs), redundant))
end = time.time()
# 
print("time is = {:2.0f}s".format(end - start))   
processed 1 blocks out of 838849 in 0.0s
processed 2 blocks out of 838849 in 0.0s
processed 3 blocks out of 838849 in 0.0s
processed 142328 blocks out of 838849 in 18.4s
processed 142329 blocks out of 838849 in 19.7s
processed 142330 blocks out of 838849 in 20.3s
processed 142331 blocks out of 838849 in 20.9s
processed 307952 blocks out of 838849 in 45.7s
processed 307953 blocks out of 838849 in 47.0s
processed 307954 blocks out of 838849 in 48.5s
processed 307955 blocks out of 838849 in 49.9s
processed 307956 blocks out of 838849 in 51.3s
processed 307957 blocks out of 838849 in 52.7s
processed 307958 blocks out of 838849 in 54.1s
processed 307959 blocks out of 838849 in 55.5s
processed 307960 blocks out of 838849 in 56.9s
processed 307961 blocks out of 838849 in 58.3s
processed 755490 blocks out of 838849 in 127.2s
processed 755491 blocks out of 838849 in 131.3s
processed 755492 blocks out of 838849 in 135.3s
processed 755493 blocks out of 838849 in 139.3s
processed 828217 blocks out of 838849 in 157.3s
processed 828218 blocks out of 838849 in 158.8s
emitted=43103212, redundant=6030020
time is = 165s

We end up with about 43 million pairs after removing 6 million redundant pairs. Simply recording the pairs takes less than 3 minutes and takes about 5GB of RAM. These pairs could be written to disk, or processed into feature vectors.

This logic processes candidate pairs in blocks. The point is to show how you might chunk the work into pieces. The output is interesting because it shows how uneven the blocks are. The print happens every >50K pairs written to the output.

The code above doesn't actually do anything with the pairs it writes except to count them. What I do in my real implementation is that I encapsulate this logic in a generator that is called by multiple processes. Each process is running on a different CPU core: 6 processes on my laptop, or 31 processes on a big Amazon server. The generator feeds chunks of pairs into each thread, and the threads do the expensive work calculating the feature vectors which contain the similarity measures for each pair. Each thread writes its chunk of feature vectors to a temporary file. In parallel, a reduce task reads the temporary files and appends the chunks to an output file. Calculating the feature vectors is done in the compare step shown in Figure 2 in part 1 of this post.

In the next and final section, I'll talk about the implications of this work, and some directions for future research.