Data Ingestion — Build Your Own “Map Reduce”?
Data Ingestion — Build Your Own “Map Reduce”?
Map-reduce, multi-process, Python profiling [using Py-Spy]
Why map reduce
Let’s say you work on Facebook; you have lots of data and probably needs lots of map-reduce tasks. You will use mrjob/PySpark/spark/hadoop. You got the point — you need 1 framework to rule them all. You need a system: where will temp file be stored, API with cloud, data security, multi-tenant etc.. You need standards — standards between developers to themselves, between developers to devops etc. ;
Let’s say, for the other hand, your a solopreneur/small startup. Max 3–4 developers team. You need things to work, and work fast. Don’t have 10ks of map-reduce jobs, but probably 1 or 2. You won’t be using Hadoop, that’s for sure. Might be using:
Different approaches
Linq
not really map reduce per se, more like “sql w/out sql engine” However, this adds complexities of .net to your environment; e.g. read release notes and understand if you can run it on your different OSes (production, staging, developers machines). Also — need to learn C#; loading from files, different encodings, saving, iterators etc.. If you’re not proficient with C#, this could be one-time investment which will not worth it.
Pros: Python native lib; able to debug easily (using inline) run locally e.g. multi process on local machine, use hadoop, dataproc (seems that”Dataproc is a managed Spark and Hadoop service…” ) etc. However, lots of moving parts and different configuration options.
Custom made map-reduce
Let’s go to UCI Machine Learning website (2015 is on the phone..) Choose some dataset, and test
Some notes: We don’t need SHA-256 and not evey base64; nothing will happen if keys will not distribute very equally. we could take MMH3; googling “python murmurhash” gives 2 interesting results; and since both use the same cpp code, let’s take the one with most stars Other options would be to simply do (% NUM_SHARDS) or even shift right (however must have shards count == power of 2).
mini setup script:
# get data
wget http://archive.ics.uci.edu/ml/machine-learning-databases/00516/os_scan/OS%20Scan_dataset.csv.gz
gunzip 'OS Scan_dataset.csv.gz'
wc -l all_accelerometer_data_pids_13.csv
# create venv
venv_path=/tmp/venv_mr/
python3 -m venv ${venv_path}
source ${venv_path}/bin/activate
python3 -m pip install mmh3
python3 -m pip install tqdm
python3 -m pip install py-spy
and 2 python test scripts:
#!/usr/bin/env python3
import mmh3
import tqdm
NUMBER_OF_SHARDS = 10
id_to_file = {}
for i in range(NUMBER_OF_SHARDS):
id_to_file[i] = open('/tmp/%s.shard.txt' % (i), 'w')
def get_shard_index(id_, num_shards):
return mmh3.hash(id_, signed=False) % num_shards
with open(r'all_accelerometer_data_pids_13.csv', 'r') as fin:
for line in tqdm.tqdm(fin, mininterval=1):
id_ = line.split(',', 1)[0]
id_to_file[get_shard_index(id_, NUMBER_OF_SHARDS)].write("%s\n" % (line))
for i in range(NUMBER_OF_SHARDS):
id_to_file[i].close()
#!/usr/bin/env python3
import multiprocessing
import mmh3
import tqdm
NUMBER_OF_PROCESSES = multiprocessing.cpu_count()-1
NUMBER_OF_SHARDS = 10
id_to_file = {}
for i in range(NUMBER_OF_SHARDS):
id_to_file[i] = open('/tmp/%s.shard.txt' % (i), 'w')
def _get_shard_index(id_, num_shards):
return mmh3.hash(id_, signed=False) % num_shards
def get_id_and_line(line):
id_ = line.split(',', 1)[0]
shard_id = _get_shard_index(id_, NUMBER_OF_SHARDS)
return shard_id, line
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
with open(r'all_accelerometer_data_pids_13.csv', 'r') as fin:
iter_data = pool.imap(get_id_and_line, fin)
for shard_id, line in tqdm.tqdm(iter_data, mininterval=1):
id_to_file[shard_id].write("%s\n" % (line))
for i in range(NUMBER_OF_SHARDS):
id_to_file[i].close()
Results: imap runs much slower; we can look at it/sec from tqdm to see that:
# test.py sample after 4 seconds:
2801493it [00:04, 566075.99it/s]
# test_imap.py sample after 4 seconds:
73439it [00:04, 18754.44it/s]# test.py sample after 4 seconds: 2801493it [00:04, 566075.99it/s] # test_imap.py sample after 4 seconds: 73439it [00:04, 18754.44it/s]
We could see the non-imap version is 30x faster!
Q&A
Q: Why setup.sh and not requirements.txt file? A: this is not production code; it’s aimed for quick reproducibility, not for having exact same lib (e.g. security etc.)
Q: Why MMH3 and not sha256? A: This is not a security product, we don’t need cryptographic hash; we just need a nice distribution of keys , and we want this to be fast.
Q: Why is imap slower than single process? A: Might be because the imap version has lots of overhead because of IPC ; The trade-off between offloading the (alleged) “heavy lifting” calculation of hash to external process is being erased by the IPC.
use py-spy for Python profiling
Q: Why? A: Using process pool might worth it if task is more CPU bound; here, task is more io bound the overhead of MMH hash doesn’t justify it.
Q: Conclusion? A: it depends Also — depends on size of file. Also — depends on post-processing each shard. conclusion — test mrjob as well; it might have a better IPC.
Originally published at https://dev.to on December 24, 2021.