While JSON is a fine data-interchange format, however it does have some limitations. It is well-known for its simplicity, that even a non-programmer can easily compose a JSON file (but humanity will surprise you IRL). Therefore, it is found almost everywhere, from numerous web APIs, to geospatial data (GeoJSON), and even semantic web (RDF/JSON).
The biggest problem with the format is that it is relatively inefficient to parse. While most major languages supports the parsing of JSON, they are usually not very efficient. This can be a problem when significant amount of time is spent parsing and serializing JSON instead of doing real work with them. Also if storage is a limited resource, data saved in JSON can easily take up a huge lot of space. Lastly, some people might want proper statically typed data, which JSON itself doesn’t handle (unless some kind of schema is used/enforced).
This brings us to protobuf. In short, protobuf is a binary data-interchange format released by Google. While it is great and addresses some of the problems discussed before. It does bring new problems, which has to be addressed manually.
So my current set up is relying heavily on dask.bag
to process a huge JSON raw data. So after performing some processing work with the raw data, the result is then turned into a list of protobuf messages. However, while JSON has JSONlines, there’s no equivalent standard format for protobuf. Even the official reference says (emphasis mine)
If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself. When you read the messages back in, you read the size, then read the bytes into a separate buffer, then parse from that buffer. (If you want to avoid copying bytes to a separate buffer, check out the CodedInputStream class (in both C++ and Java) which can be told to limit reads to a certain number of bytes.)
Quoted from: Streaming Multiple Messages | Protocol Buffers Techniques
So there’s no way to tell when a record start / ends, there’s even no specific guideline one can follow to put multiple records into 1 file (or stream them over the network). After spending some time researching, filing a bad feature request, and talking to people around me, I ended up with the current implementation. So the chosen strategy to store a list of protobuf is by following the TFRecords format.
For illustration purpose, this is the code for reading JSON, process it, and output as protobuf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | from dask import delayed from dask.delayed import compute from dask.diagnostics import ProgressBar import dask import dask.bag as db import tensorflow import ujson def process_the_data(item): from proto import message_pb2 result = message_pb2.Message() result.foo = item.foo result.bar = item.bar return result def to_protobuf_list(partition, idx, filename): with tensorflow.python_io.TFRecordWriter(filename.replace('*', str(idx))) as _output: for item in partition: _output.write(item.SerializeToString()) with ProgressBar(), dask.set_options(get=dask.multiprocessing.get): partitions = (db.read_text('/path/to/json/*.json') .map(ujson.loads) .map(process_the_data) .to_delayed()) compute(*tuple(delayed(to_protobuf_list)(partition, idx, '/path/to/tfrecords/*.tfr') for idx, partition in enumerate(partitions))) |
In order to read TFRecords
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import glob from dask.diagnostics import ProgressBar from dask import delayed import tensorflow import dask import dask.bag as db def message_parse(serialized): from proto import message_pb2 result = address_pb2.Address() result.ParseFromString(serialized) return result with ProgressBar(), dask.set_options(get=dask.multiprocessing.get): partitions = tuple(delayed(tensorflow.python_io.tf_record_iterator)(path) for path in sorted(glob.iglob('/path/to/tfrecords/*.tfr'))) print(db.from_delayed(partitions) .map(message_parse) .map(process_the_data) .compute()) |
Another way of getting this done, is by prefixing a length in front of each record. It could be done with minor modification to the code above. Instead of using tensorflow’s TFRecordsWriter, just open a normal binary file for writing, and write the number of bytes/bits before writing the actual record.