So my cheat with dask worked fine and dandy, until I started inspecting the output (which was to be used as an input for another script). While the script seemed to work fine, however when I started to parse each line I was hit with some funny syntax errors. After some quick inspection I found some of the lines was not printed completely.
Then I started to do some searches to find the reason, and proper term to describe the problem. Apparently printing to stdout itself is not atomic (to some extend not threadsafe?). So dask.bag is highly optimized for concurrency, therefore with my cheat there would be multiple printing operations happening at the same time. If a print operation does not have the opportunity to complete the operation, before a new one starts, the output would appear to be chopped off.
So now the problem is identified, and I started to try suggestions people post around the internet. In the end, there are only two ways to fix the problem, either use a lock, or put all the printing in a queue. Of course, I could have run `.compute()` and get the full result in memory, before starting to print them, but that is not often a realistic option.
Queueing the output
Start by starting a new process that consumes a queue, and keep printing whatever that goes into the queue, until a special token is received.
from multiprocessing import Manager, Process import dask.bag as db def print_to_stdout(queue): while True: msg = queue.get() if msg == '_DONE': break else: print(json.dumps(msg)) def queue_item(item, queue=None): queue.put(item) _manager = Manager() queue = Manager.Queue() printer = Process(print_to_stdout, args=(queue,)) db.range(10000, npartitions=10) .map(do_something) .map(queue_item, queue=queue) .compute() queue.put('_DONE') printer.join()
I personally found the previous approach a little bit overkill. So with suggestions from the dask team I did this instead. Note that requiring a lock is an expensive operation, so I am doing this for a whole partition at at time instead of requiring it for each line.
from multiprocessing import Manager import dask.bag as db def print_to_stdout(partition, lock=None): lock.acquire() for item in partition: print(json.dumps(item)) lock.release() _manager = Manager() db.range(10000, npartitions=10) .map(do_something) .map_partitions(print_to_stdout, lock=lock) .compute()
So the problem is very much fixed now.