How to parse PBF file format in Elixir?

I would like to read PBF Format and extract data from small OSM region eg. bremen.osm.pbf and map it to Postgres db or even filter data by tags to get eg. only cycleway=*, but on first step I don’t know How to perform basic file read and preview what data it contains? There are many parsers in other languages but not in Elixir, does anybody did it before in Elixir, what options are available?

There seem to be no pre-made packages for parsing PBF on Hex.

Working with Files is fortunately a joy with Elixir. It’s also suitable for parsing large files since you can easily stream file contents and thus avoid excessive memory consumption. Check out the introduction from the Elixir website:
https://elixir-lang.org/getting-started/io-and-the-file-system.html

I aware that Elixir would be perfect to handle even whole planet file, but on my current state of knowledge I still don’t know how to map binary file in this specific case. Can’t decode it with eg. exprotobuf where I don’t have *.proto schema definitions but only pure data.

Is it even possible to parse protobuf data without the .proto declarations? How would you do that in some other language?

I found schemas in osmformat.proto and fileformat.proto but not sure how to use them with file reader.

You can take those and run them through one of the erlang/elixir protocol buffer things to create modules that could then work with those formats, same as you would do with other languages. :slight_smile:

Ok, that wasn’t so bad, I got Blob and BlobHeader, but stuck in decoding in next step like in example. I think that’s due :zlib.inflate returns io list not binary, so gpb can’t decode it.

defmodule OsmMix.FileProtocol do
  use Protobuf, from: Path.expand("../lib/protos/fileformat.proto", __DIR__)
end
defmodule OsmMix.OsmProtocol do
  use Protobuf, from: Path.expand("../lib/protos/osmformat.proto", __DIR__)
end

{:ok, encoded} = File.read(Path.expand("../osm_data/bremen.osm.pbf", __DIR__))
blob = OsmMix.FileProtocol.Blob.decode(encoded)
# %OsmMix.FileProtocol.Blob{OBSOLETE_bzip2_data: nil, lzma_data: nil,
# raw: "OSMData", raw_size: 1229949,
# zlib_data: <<120, 156, 196, 189, 249, 115, 36, 217, 121, 32, 230, 193, 141,
#   135, 163, 113, 244, 61, 215, 155, 238, 225, 176, 91, 236, 2, 42, 179, 110,
#  140, 40, 6, 110, 244, 224, 232, 110, 20, 166, 193, 32, 205, 40, 103, 85, 189,
#   ...>>}
blob_header = OsmMix.FileProtocol.BlobHeader.decode(encoded)
# %OsmMix.FileProtocol.BlobHeader{datasize: 614120, indexdata: nil,
# type: "OSMData"}
z = :zlib.open()
:zlib.inflateInit(z)
compressed = blob.zlib_data
uncompressed = :zlib.inflate(z, compressed)
# [[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[...], ...], <<168, ...>>], <<170, 166, ...>>]
:zlib.close(z)

primitive = OsmMix.OsmProtocol.PrimitiveBlock.decode(uncompressed)

** (FunctionClauseError) no function clause matching in :gpb.decode_field/4
            (gpb) src/gpb.erl:228: :gpb.decode_field([[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[...], ...], <<168, ...>>], <<170, 166, ...>>], [{:field, :stringtable, 1, 2, {:msg, OsmMix.OsmProtocol.StringTable}, :required, []}, {:field, :primitivegroup, 2, 3, {:msg, OsmMix.OsmProtocol.PrimitiveGroup}, :repeated, []}, {:field, :granularity, 17, 4, :int32, :optional, [default: 100]}, {:field, :lat_offset, 19, 5, :int64, :optional, [default: 0]}, {:field, :lon_offset, 20, 6, :int64, :optional, [default: 0]}, {:field, :date_granularity, 18, 7, :int32, :optional, [default: 1000]}], [{{:msg, OsmMix.OsmProtocol.HeaderBlock}, [{:field, :bbox, 1, 2, {:msg, OsmMix.OsmProtocol.HeaderBBox}, :optional, []}, {:field, :required_features, 4, 3, :string, :repeated, []}, {:field, :optional_features, 5, 4, :string, :repeated, []}, {:field, :writingprogram, 16, 5, :string, :optional, []}, {:field, :source, 17, 6, :string, :optional, []}, {:field, :osmosis_replication_timestamp, 32, 7, :int64, :optional, []}, {:field, :osmosis_replication_sequence_number, 33, 8, :int64, :optional, []}, {:field, :osmosis_replication_base_url, 34, 9, :string, :optional, []}]}, {{:msg, OsmMix.OsmProtocol.HeaderBBox}, [{:field, :left, 1, 2, :sint64, :required, []}, {:field, :right, 2, 3, :sint64, :required, []}, {:field, :top, 3, 4, :sint64, :required, []}, {:field, :bottom, 4, 5, :sint64, :required, []}]}, {{:msg, OsmMix.OsmProtocol.PrimitiveBlock}, [{:field, :stringtable, 1, 2, {:msg, OsmMix.OsmProtocol.StringTable}, :required, []}, {:field, :primitivegroup, 2, 3, {:msg, OsmMix.OsmProtocol.PrimitiveGroup}, :repeated, []}, {:field, :granularity, 17, 4, :int32, :optional, [default: 100]}, {:field, :lat_offset, 19, 5, :int64, :optional, [default: 0]}, {:field, :lon_offset, 20, 6, :int64, :optional, [default: 0]}, {:field, :date_granularity, 18, 7, :int32, :optional, [default: 1000]}]}, {{:msg, OsmMix.OsmProtocol.PrimitiveGroup}, [{:field, :nodes, 1, 2, {:msg, OsmMix.OsmProtocol.Node}, :repeated, []}, {:field, :dense, 2, 3, {:msg, OsmMix.OsmProtocol.DenseNodes}, :optional, []}, {:field, :ways, 3, 4, {:msg, OsmMix.OsmProtocol.Way}, :repeated, []}, {:field, :relations, 4, 5, {:msg, OsmMix.OsmProtocol.Relation}, :repeated, []}, {:field, :changesets, 5, 6, {:msg, OsmMix.OsmProtocol.ChangeSet}, :repeated, []}]}, {{:msg, OsmMix.OsmProtocol.StringTable}, [{:field, :s, 1, 2, :bytes, :repeated, []}]}, {{:msg, OsmMix.OsmProtocol.Info}, [{:field, :version, 1, 2, :int32, :optional, [default: -1]}, {:field, :timestamp, 2, 3, :int64, :optional, []}, {:field, :changeset, 3, 4, :int64, :optional, []}, {:field, :uid, 4, 5, :int32, :optional, []}, {:field, :user_sid, 5, 6, :uint32, :optional, []}, {:field, :visible, 6, 7, :bool, :optional, []}]}, {{:msg, OsmMix.OsmProtocol.DenseInfo}, [{:field, :version, 1, 2, :int32, :repeated, [:packed]}, {:field, :timestamp, 2, 3, :sint64, :repeated, [:packed]}, {:field, :changeset, 3, 4, :sint64, :repeated, [:packed]}, {:field, :uid, 4, 5, :sint32, :repeated, [:packed]}, {:field, :user_sid, 5, 6, :sint32, :repeated, [:packed]}, {:field, :visible, 6, 7, :bool, :repeated, [:packed]}]}, {{:msg, OsmMix.OsmProtocol.ChangeSet}, [{:field, :id, 1, 2, :int64, :required, []}]}, {{:msg, OsmMix.OsmProtocol.Node}, [{:field, :id, 1, 2, :sint64, :required, []}, {:field, :keys, 2, 3, :uint32, :repeated, [:packed]}, {:field, :vals, 3, 4, :uint32, :repeated, [:packed]}, {:field, :info, 4, 5, {:msg, OsmMix.OsmProtocol.Info}, :optional, []}, {:field, :lat, 8, 6, :sint64, :required, []}, {:field, :lon, 9, 7, :sint64, :required, []}]}, {{:msg, OsmMix.OsmProtocol.DenseNodes}, [{:field, :id, 1, 2, :sint64, :repeated, [:packed]}, {:field, :denseinfo, 5, 3, {:msg, OsmMix.OsmProtocol.DenseInfo}, :optional, []}, {:field, :lat, 8, 4, :sint64, :repeated, [:packed]}, {:field, :lon, 9, 5, :sint64, :repeated, [:packed]}, {:field, :keys_vals, 10, 6, :int32, :repeated, [:packed]}]}, {{:msg, OsmMix.OsmProtocol.Way}, [{:field, :id, 1, 2, :int64, :required, []}, {:field, :keys, 2, 3, :uint32, :repeated, [:packed]}, {:field, :vals, 3, 4, :uint32, :repeated, [:packed]}, {:field, :info, 4, 5, {:msg, OsmMix.OsmProtocol.Info}, :optional, []}, {:field, :refs, 8, 6, :sint64, :repeated, [:packed]}]}, {{:enum, OsmMix.OsmProtocol.Relation.MemberType}, [NODE: 0, WAY: 1, RELATION: 2]}, {{:msg, OsmMix.OsmProtocol.Relation}, [{:field, :id, 1, 2, :int64, :required, []}, {:field, :keys, 2, 3, :uint32, :repeated, [:packed]}, {:field, :vals, 3, 4, :uint32, :repeated, [:packed]}, {:field, :info, 4, 5, {:msg, OsmMix.OsmProtocol.Info}, :optional, []}, {:field, :roles_sid, 8, 6, :int32, :repeated, [:packed]}, {:field, :memids, 9, 7, :sint64, :repeated, [:packed]}, {:field, :types, 10, 8, {:enum, OsmMix.OsmProtocol.Relation.MemberType}, :repeated, [:packed]}]}], {OsmMix.OsmProtocol.PrimitiveBlock, {OsmMix.OsmProtocol.StringTable, []}, [], :undefined, :undefined, :undefined, :undefined})

I found function to change iolist to binary :zlib.inflate(z, compressed) |> :erlang.iolist_to_binary() and now I can get any OSM data :slight_smile: Now I wonder if can I process large file opening and decompression in chunks and store each chunk parallelly in db?

1 Like

How can I open and read PBF file with File.stream!? I would like to chunk by four bytes and decode it by BlobHeader, but it seems that encoding is bad because chunks looks like:

"\n\tOS"
"MHea"
<<100, 101, 114, 24>>
<<160, 1, 16, 143>>

Why Elixir is replacing e.g. <<10, 9, 79, 83>> to "\n\tOS"?
My current code:

File.stream!(Path.expand("../osm_data/bremen.osm.pbf", __DIR__), [:binary, :raw, read_ahead: 10_000_000], 4)
|> Stream.drop(3)
|> Stream.take(5)
|> Stream.map(fn(encoded) ->
  # <<100, 101, 114, 24>>
  OsmMix.FileProtocol.BlobHeader.decode(encoded) #no case clause matching: :group_end
end)
|> Stream.run()

Actually I’m trying to translate following Java code, but can’t get it working:

FileInputStream fis = new FileInputStream("bremen.osm.pbf");
DataInputStream dis = new DataInputStream(fis);

for (;;) {
  if (dis.available() <= 0) break;
  
  int len = dis.readInt();
  byte[] blobHeader = new byte[len];
  dis.read(blobHeader);
  BlobHeader h = BlobHeader.parseFrom(blobHeader);
  byte[] blob = new byte[h.getDatasize()];
  dis.read(blob);
  Blob b = Blob.parseFrom(blob);

  InputStream blobData;
  if (b.hasZlibData()) {
    blobData = new InflaterInputStream(b.getZlibData().newInput());
  } else {
    blobData = b.getRaw().newInput();
  }
  System.out.println("> " + h.getType());
  if (h.getType().equals("OSMHeader")) {
    HeaderBlock hb = HeaderBlock.parseFrom(blobData);
    System.out.println("hb: " + hb.getSource());
  } else if (h.getType().equals("OSMData")) {
    PrimitiveBlock pb = PrimitiveBlock.parseFrom(blobData);
    System.out.println("pb: " + pb.getGranularity());
  }
}

fis.close();

Because that is what it is. :slight_smile:

The <<...>> format is the binary of bytes in base-10, and the "..." is also a binary but printed as a string. In the two that were not printed in string format it is because they had non-printable characters, but they are still the same thing, just a binary. :slight_smile:

1 Like

In IEx it’s also helpful to use the i command to get more info

iex(37)> <<10, 9, 79, 83>> == "\n\tOS"
true
iex(38)> i <<10, 9, 79, 83>>
Term
  "\n\tOS"
Data type
  BitString
Byte size
  4
Description
  This is a string: a UTF-8 encoded binary. It's printed surrounded by
  "double quotes" because all UTF-8 encoded codepoints in it are printable.
Raw representation
  <<10, 9, 79, 83>>
Reference modules
  String, :binary
Implemented protocols
  Inspect, Ecto.DataType, Phoenix.Param, Slugify, Poison.Encoder, IEx.Info, Timber.Eventable, Msgpax.Packer, String.Chars, List.Chars, Collectable, Ecto.Queryable, Poison.Decoder, Phoenix.HTML.Safe, Plug.Exception
2 Likes

I figured out how to read bytes and uncompress properly PBF file, steps are following:

  1. read manually four bytes first,
  2. interpret them as an integer (big-endian),
  3. read that many bytes and parse as a BlobHeader,
  4. and that in turn will tell how many bytes to read and parse as a Blob
  5. decompress zlib_data from Blob
  6. decode PrimitiveBlob
  7. and so on.

Issue is that I can’t use Stream or Flow to read data when I don’t know what data size will be next, do you have idea how to do it not manually in Elixir?

My current implementation:

{:ok, file} = :file.open(Path.expand("../osm_data/bremen.osm.pbf", __DIR__), [:read, :binary])
{:ok, s} = :file.read(file, 4)
bigedian = s |> :binary.decode_unsigned(:big)
{:ok, blob_header_portion} = :file.read(file, bigedian)
header = OsmMix.FileProtocol.BlobHeader.decode(blob_header_portion)
{:ok, blob_portion} = :file.read(file, header.datasize)
blob = OsmMix.FileProtocol.Blob.decode(blob_portion)

z = :zlib.open()
:zlib.inflateInit(z)
compressed = blob.zlib_data
uncompressed = :zlib.inflate(z, compressed) |> :erlang.iolist_to_binary()
:zlib.close(z)
primitive = OsmMix.OsmProtocol.PrimitiveBlock.decode(uncompressed)

pry(1)> %OsmMix.OsmProtocol.PrimitiveBlock{date_granularity: 1000,
 granularity: 100, lat_offset: 0,
 lon_offset: 0, primitivegroup: [],
 stringtable: %OsmMix.OsmProtocol.StringTable{s: []}}

Key most helpful quote from PBF file format documentation:

The format is a repeating sequence of:

  • int4: length of the BlobHeader message in network byte order
  • serialized BlobHeader message
  • serialized Blob message (size is given in the header)

As I understand :file.open means that it’s not loaded to memory, so I have only to read opened file in for loop by 4 bytes to parse header and do it until file end.

1 Like

Hi! I have recently put up a library for parsing and decoding .pbf files, take a look at https://github.com/mpraski/pbf-parser if you’re still interested. Currently using it together with Flow, exemplary pipeline is included in the readme.

7 Likes