Notes on codes, projects and everything

Oh NO! My print output is chopped!

So my cheat with dask worked fine and dandy, until I started inspecting the output (which was to be used as an input for another script). While the script seemed to work fine, however when I started to parse each line I was hit with some funny syntax errors. After some quick inspection I found some of the lines was not printed completely.

Then I started to do some searches to find the reason, and proper term to describe the problem. Apparently printing to stdout itself is not atomic (to some extend not threadsafe?). So dask.bag is highly optimized for concurrency, therefore with my cheat there would be multiple printing operations happening at the same time. If a print operation does not have the opportunity to complete the operation, before a new one starts, the output would appear to be chopped off.

So now the problem is identified, and I started to try suggestions people post around the internet. In the end, there are only two ways to fix the problem, either use a lock, or put all the printing in a queue. Of course, I could have run `.compute()` and get the full result in memory, before starting to print them, but that is not often a realistic option.

Queueing the output

Start by starting a new process that consumes a queue, and keep printing whatever that goes into the queue, until a special token is received.

from multiprocessing import Manager, Process
import dask.bag as db

def print_to_stdout(queue):
    while True:
        msg = queue.get()

        if msg == '_DONE':
            break
        else:
            print(json.dumps(msg))

def queue_item(item, queue=None):
    queue.put(item)

_manager = Manager()
queue = Manager.Queue()
printer = Process(print_to_stdout, args=(queue,))

db.range(10000, npartitions=10)
  .map(do_something)
  .map(queue_item, queue=queue)
  .compute()

queue.put('_DONE')
printer.join()

Lock

I personally found the previous approach a little bit overkill. So with suggestions from the dask team I did this instead. Note that requiring a lock is an expensive operation, so I am doing this for a whole partition at at time instead of requiring it for each line.

from multiprocessing import Manager
import dask.bag as db

def print_to_stdout(partition, lock=None):
    lock.acquire()
    for item in partition:
        print(json.dumps(item))
    lock.release()

_manager = Manager()

db.range(10000, npartitions=10)
  .map(do_something)
  .map_partitions(print_to_stdout, lock=lock)
  .compute()

So the problem is very much fixed now.

leave your comment

name is required

email is required

have a blog?

This blog uses scripts to assist and automate comment moderation, and the author of this blog post does not hold responsibility in the content of posted comments. Please note that activities such as flaming, ungrounded accusations as well as spamming will not be entertained.

Click to change color scheme