Click to change color scheme

Notes on codes, projects and everything

Latest Post

Protobuf and dask

While JSON is a fine data-interchange format, however it does have some limitations. It is well-known for its simplicity, that even a non-programmer can easily compose a JSON file (but humanity will surprise you IRL). Therefore, it is found almost everywhere, from numerous web APIs, to geospatial data (GeoJSON), and even semantic web (RDF/JSON).

The biggest problem with the format is that it is relatively inefficient to parse. While most major languages supports the parsing of JSON, they are usually not very efficient. This can be a problem when significant amount of time is spent parsing and serializing JSON instead of doing real work with them. Also if storage is a limited resource, data saved in JSON can easily take up a huge lot of space. Lastly, some people might want proper statically typed data, which JSON itself doesn’t handle (unless some kind of schema is used/enforced).

This brings us to protobuf. In short, protobuf is a binary data-interchange format released by Google. While it is great and addresses some of the problems discussed before. It does bring new problems, which has to be addressed manually.

So my current set up is relying heavily on dask.bag to process a huge JSON raw data. So after performing some processing work with the raw data, the result is then turned into a list of protobuf messages. However, while JSON has JSONlines, there’s no equivalent standard format for protobuf. Even the official reference says (emphasis mine)

If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself. When you read the messages back in, you read the size, then read the bytes into a separate buffer, then parse from that buffer. (If you want to avoid copying bytes to a separate buffer, check out the CodedInputStream class (in both C++ and Java) which can be told to limit reads to a certain number of bytes.)

Quoted from: Streaming Multiple Messages | Protocol Buffers Techniques

So there’s no way to tell when a record start / ends, there’s even no specific guideline one can follow to put multiple records into 1 file (or stream them over the network). After spending some time researching, filing a bad feature request, and talking to people around me, I ended up with the current implementation. So the chosen strategy to store a list of protobuf is by following the TFRecords format.

For illustration purpose, this is the code for reading JSON, process it, and output as protobuf

from dask import delayed
from dask.delayed import compute
from dask.diagnostics import ProgressBar
import dask
import dask.bag as db
import tensorflow
import ujson

def process_the_data(item):
    from proto import message_pb2

    result = message_pb2.Message()
    result.foo = item.foo
    result.bar = item.bar

    return result

def to_protobuf_list(partition, idx, filename):
    with tensorflow.python_io.TFRecordWriter(filename.replace('*', str(idx))) as _output:
        for item in partition:
            _output.write(item.SerializeToString())

with ProgressBar(), dask.set_options(get=dask.multiprocessing.get):
    partitions = (db.read_text('/path/to/json/*.json')
        .map(ujson.loads)
        .map(process_the_data)
        .to_delayed())

    compute(*tuple(delayed(to_protobuf_list)(partition, idx, '/path/to/tfrecords/*.tfr')
                         for idx, partition
                         in enumerate(partitions)))

In order to read TFRecords

import glob

from dask.diagnostics import ProgressBar
from dask import delayed
import tensorflow
import dask
import dask.bag as db

def message_parse(serialized):
    from proto import message_pb2

    result = address_pb2.Address()
    result.ParseFromString(serialized)

    return result

with ProgressBar(), dask.set_options(get=dask.multiprocessing.get):
    partitions = tuple(delayed(tensorflow.python_io.tf_record_iterator)(path)
                       for path
                       in sorted(glob.iglob('/path/to/tfrecords/*.tfr')))

    print(db.from_delayed(partitions)
        .map(message_parse)
        .map(process_the_data)
        .compute())

Another way of getting this done, is by prefixing a length in front of each record. It could be done with minor modification to the code above. Instead of using tensorflow’s TFRecordsWriter, just open a normal binary file for writing, and write the number of bytes/bits before writing the actual record.

Leave a comment

Other new Posts