Data Ingestion — Build Your Own “Map Reduce”?

Map-reduce, multi-process, Python profiling [using Py-Spy]

Why map reduce

Let’s say you work on Facebook; you have lots of data and probably needs lots of map-reduce tasks. You will use mrjob/PySpark/spark/hadoop. You got the point — you need 1 framework to rule them all. You need a system: where will temp file be stored, API with cloud, data security, multi-tenant etc.. You need standards — standards between developers to themselves, between developers to devops etc. ;

Let’s say, for the other hand, your a solopreneur/small startup. Max 3–4 developers team. You need things to work, and work fast. Don’t have 10ks of map-reduce jobs, but probably 1 or 2. You won’t be using Hadoop, that’s for sure. Might be using:

Different approaches

Linq

not really map reduce per se, more like “sql w/out sql engine” However, this adds complexities of .net to your environment; e.g. read release notes and understand if you can run it on your different OSes (production, staging, developers machines). Also — need to learn C#; loading from files, different encodings, saving, iterators etc.. If you’re not proficient with C#, this could be one-time investment which will not worth it.

Pros: Python native lib; able to debug easily (using inline) run locally e.g. multi process on local machine, use hadoop, dataproc (seems that”Dataproc is a managed Spark and Hadoop service…” ) etc. However, lots of moving parts and different configuration options.

Custom made map-reduce

Let’s go to UCI Machine Learning website (2015 is on the phone..) Choose some dataset, and test

Some notes: We don’t need SHA-256 and not evey base64; nothing will happen if keys will not distribute very equally. we could take MMH3; googling “python murmurhash” gives 2 interesting results; and since both use the same cpp code, let’s take the one with most stars Other options would be to simply do (% NUM_SHARDS) or even shift right (however must have shards count == power of 2).

mini setup script:

# get data
wget http://archive.ics.uci.edu/ml/machine-learning-databases/00516/os_scan/OS%20Scan_dataset.csv.gz
gunzip 'OS Scan_dataset.csv.gz'
wc -l all_accelerometer_data_pids_13.csv

# create venv
venv_path=/tmp/venv_mr/
python3 -m venv ${venv_path}
source ${venv_path}/bin/activate

python3 -m pip install mmh3
python3 -m pip install tqdm
python3 -m pip install py-spy

and 2 python test scripts:

#!/usr/bin/env python3
import mmh3
import tqdm

NUMBER_OF_SHARDS = 10

id_to_file = {}
for i in range(NUMBER_OF_SHARDS):
  id_to_file[i] = open('/tmp/%s.shard.txt' % (i), 'w')

def get_shard_index(id_, num_shards):
  return mmh3.hash(id_, signed=False) % num_shards

with open(r'all_accelerometer_data_pids_13.csv', 'r') as fin:
  for line in tqdm.tqdm(fin, mininterval=1):
    id_ = line.split(',', 1)[0]
    id_to_file[get_shard_index(id_, NUMBER_OF_SHARDS)].write("%s\n" % (line))

for i in range(NUMBER_OF_SHARDS):
  id_to_file[i].close()


#!/usr/bin/env python3
import multiprocessing
import mmh3
import tqdm

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()-1
NUMBER_OF_SHARDS = 10

id_to_file = {}
for i in range(NUMBER_OF_SHARDS):
  id_to_file[i] = open('/tmp/%s.shard.txt' % (i), 'w')

def _get_shard_index(id_, num_shards):
  return mmh3.hash(id_, signed=False) % num_shards

def get_id_and_line(line):
    id_ = line.split(',', 1)[0]
    shard_id = _get_shard_index(id_, NUMBER_OF_SHARDS)
    return shard_id, line

pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
with open(r'all_accelerometer_data_pids_13.csv', 'r') as fin:
  iter_data = pool.imap(get_id_and_line, fin)
  for shard_id, line in tqdm.tqdm(iter_data, mininterval=1):
    id_to_file[shard_id].write("%s\n" % (line))

for i in range(NUMBER_OF_SHARDS):
  id_to_file[i].close()

Results: imap runs much slower; we can look at it/sec from tqdm to see that:

# test.py sample after 4 seconds:
2801493it [00:04, 566075.99it/s]

# test_imap.py sample after 4 seconds:
73439it [00:04, 18754.44it/s]# test.py sample after 4 seconds: 2801493it [00:04, 566075.99it/s] # test_imap.py sample after 4 seconds: 73439it [00:04, 18754.44it/s]

We could see the non-imap version is 30x faster!

Q&A

Q: Why setup.sh and not requirements.txt file? A: this is not production code; it’s aimed for quick reproducibility, not for having exact same lib (e.g. security etc.)

Q: Why MMH3 and not sha256? A: This is not a security product, we don’t need cryptographic hash; we just need a nice distribution of keys , and we want this to be fast.

Q: Why is imap slower than single process? A: Might be because the imap version has lots of overhead because of IPC ; The trade-off between offloading the (alleged) “heavy lifting” calculation of hash to external process is being erased by the IPC.

use py-spy for Python profiling

Q: Why? A: Using process pool might worth it if task is more CPU bound; here, task is more io bound the overhead of MMH hash doesn’t justify it.

Q: Conclusion? A: it depends Also — depends on size of file. Also — depends on post-processing each shard. conclusion — test mrjob as well; it might have a better IPC.

Originally published at https://dev.to on December 24, 2021.