Skip to content

Commit

Permalink
Premiers extracteurs NeTEx (StopPlaces) (#4026)
Browse files Browse the repository at this point in the history
* Backport stop places streaming parser

* Backport NeTEx archive parser

* Backport tests

* Add basic analyzer

* Add integration test on ZIP archive traversal (before refactoring)

* Add documentation

* Add doc

* Fix credo warning

* Update apps/transport/test/netex/netex_archive_parser_test.exs

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>

* Update apps/transport/test/netex/stop_places_streaming_parser_test.exs

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>

* Fix outdated comment

* Make file even more unique

* Make it clearer that the implementation tolerates frames

---------

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>
  • Loading branch information
thbar and AntoineAugusti authored Aug 8, 2024
1 parent c5ca082 commit 63a5e48
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 0 deletions.
83 changes: 83 additions & 0 deletions apps/transport/lib/netex/netex_archive_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
defmodule Transport.NeTEx do
@moduledoc """
A first implementation of on-the-fly NeTEx (zip) archive traversal.
The current implementation is specialized into extracting `StopPlace`s, but the code
will be generalized for other uses in a later PR.
"""
require Logger

@doc """
Inside a zip archive opened with `Unzip`, parse a given file
(pointed by `file_name`) and extract the stop places. The file
is read in streaming fashion to save memory, but the stop places
are stacked in a list (all in memory at once).
"""
def read_stop_places(%Unzip{} = unzip, file_name) do
extension = Path.extname(file_name)

cond do
# Entry names ending with a slash `/` are directories. Skip them.
# https://github.com/akash-akya/unzip/blob/689a1ca7a134ab2aeb79c8c4f8492d61fa3e09a0/lib/unzip.ex#L69
String.ends_with?(file_name, "/") ->
[]

extension |> String.downcase() == ".zip" ->
raise "Insupported zip inside zip for file #{file_name}"

extension |> String.downcase() != ".xml" ->
raise "Insupported file extension (#{extension}) for file #{file_name}"

true ->
{:ok, state} =
unzip
|> Unzip.file_stream!(file_name)
|> Stream.map(&IO.iodata_to_binary(&1))
|> Saxy.parse_stream(Transport.NeTEx.StopPlacesStreamingParser, %{
capture: false,
current_tree: [],
stop_places: [],
callback: fn state ->
state |> update_in([:stop_places], &(&1 ++ [state.current_stop_place]))
end
})

state.stop_places
end
end

@doc """
Open the zip file pointed by `zip_file_name` and return an `Unzip.LocalFile` struct.
"""
def with_zip_file_handle(zip_file_name, cb) do
zip_file = Unzip.LocalFile.open(zip_file_name)

try do
{:ok, unzip} = Unzip.new(zip_file)
cb.(unzip)
after
Unzip.LocalFile.close(zip_file)
end
end

@doc """
A higher level method, recommended for general use. Given a NeTEx zip archive stored
on disk, return the list of `StopPlace`s per file contained in the archive.
See tests for actual output. Will be refactored soonish.
"""
def read_all_stop_places(zip_file_name) do
with_zip_file_handle(zip_file_name, fn unzip ->
unzip
|> Unzip.list_entries()
|> Enum.map(fn metadata ->
Logger.debug("Processing #{metadata.file_name}")

{
metadata.file_name,
read_stop_places(unzip, metadata.file_name)
}
end)
end)
end
end
79 changes: 79 additions & 0 deletions apps/transport/lib/netex/stop_places_streaming_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Transport.NeTEx.StopPlacesStreamingParser do
@moduledoc """
This module is a `Saxy` streaming XML parser, able to scan very large XML files
in a way that does not overload the memory (element by element).
It is a first stab at reading NeTEx files to pick `StopPlace`s.
Limitations:
- There are other locations in a NeTEx file where to find stops, and also other concepts (`Quay`),
but this is already providing a first basis on which we'll iterate.
- The result is an accumulated array (not a stream), which is not actually a problem given the
current size of the output, but this could be changed in the future.
- The scanning structure is hard-coded, and we will want later to use something more flexible
(à la XPath), maybe via macros or real XPath calls, but this is good enough for now.
How it works:
- Detect `StopPlace` elements
- Extract their `id` attribute
- Underneath, find the `Centroid/Location/[Latitude|Longitude]` nodes
Some `StopPlace`s do not have any `Centroid` nor `Latitude|Longitude`, but are still scanned.
"""

@behaviour Saxy.Handler
import ExUnit.Assertions

def get_attribute!(attributes, attr_name) do
[value] = for {attr, value} <- attributes, attr == attr_name, do: value
value
end

# NOTE: currently parsing as floats, which are limited in terms of precision,
# but more work on precision will be done later.
def parse_float!(binary) do
{value, ""} = Float.parse(binary)
value
end

# A `StopPlace` is declared, we will start capturing subsequent events
def handle_event(:start_element, {"StopPlace" = element, attributes}, state) do
assert state[:current_stop_place] == nil
assert state[:capture] != true

{:ok,
state
|> Map.put(:current_stop_place, %{id: get_attribute!(attributes, "id")})
|> Map.put(:capture, true)
|> Map.put(:current_tree, [element])}
end

def handle_event(:start_element, {element, _attributes}, state) when state.capture do
{:ok, state |> update_in([:current_tree], &(&1 ++ [element]))}
end

def handle_event(:end_element, "StopPlace" = _node, state) do
state = if state[:callback], do: state[:callback].(state), else: state
{:ok, %{state | current_stop_place: nil, capture: false, current_tree: []}}
end

def handle_event(:end_element, _node, state) do
{:ok, state |> update_in([:current_tree], &(&1 |> List.delete_at(-1)))}
end

def handle_event(:characters, chars, state) when state.current_tree == ["StopPlace", "Name"] do
{:ok, state |> update_in([:current_stop_place], &(&1 |> Map.put(:name, chars)))}
end

def handle_event(:characters, chars, state)
when state.current_tree == ["StopPlace", "Centroid", "Location", "Latitude"] do
{:ok, state |> update_in([:current_stop_place], &(&1 |> Map.put(:latitude, parse_float!(chars))))}
end

def handle_event(:characters, chars, state)
when state.current_tree == ["StopPlace", "Centroid", "Location", "Longitude"] do
{:ok, state |> update_in([:current_stop_place], &(&1 |> Map.put(:longitude, parse_float!(chars))))}
end

def handle_event(_, _, state), do: {:ok, state}
end
50 changes: 50 additions & 0 deletions apps/transport/test/netex/netex_archive_parser_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule Transport.NeTEx.ArchiveParserTest do
use ExUnit.Case, async: true

defmodule ZipCreator do
@moduledoc """
A light wrapper around OTP `:zip` features. Does not support streaming here,
but massages the string <-> charlist differences.
"""
@spec create!(String.t(), [{String.t(), binary()}]) :: no_return()
def create!(zip_filename, file_data) do
{:ok, ^zip_filename} =
:zip.create(
zip_filename,
file_data
|> Enum.map(fn {name, content} -> {name |> to_charlist(), content} end)
)
end
end

# not fully correct XML, but close enough for what we want to test
def some_netex_content do
"""
<GeneralFrame>
<members>
<StopPlace id="FR:HELLO:POYARTIN:001">
<Name>Poyartin</Name>
<Centroid>
<Location>
<Latitude>43.6690</Latitude>
<Longitude>-0.9190</Longitude>
</Location>
</Centroid>
</StopPlace>
</members>
</GeneralFrame>
"""
end

test "traverse the archive and return relevant content" do
tmp_file = System.tmp_dir!() |> Path.join("temp-netex-#{Ecto.UUID.generate()}.zip")
ZipCreator.create!(tmp_file, [{"arrets.xml", some_netex_content()}])

# given a zip netex archive containing 1 file, I want the output I expected
[{"arrets.xml", data}] = Transport.NeTEx.read_all_stop_places(tmp_file)

assert data == [
%{id: "FR:HELLO:POYARTIN:001", latitude: 43.669, longitude: -0.919, name: "Poyartin"}
]
end
end
76 changes: 76 additions & 0 deletions apps/transport/test/netex/stop_places_streaming_parser_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Transport.NeTEx.StopPlacesStreamingParserTest do
use ExUnit.Case, async: true

alias Transport.NeTEx.StopPlacesStreamingParser

def get_stop_places(xml) do
state = %{
current_stop_place: nil,
capture: false,
current_tree: [],
stop_places: [],
callback: fn state ->
state |> update_in([:stop_places], &(&1 ++ [state.current_stop_place]))
end
}

{:ok, final_state} = Saxy.parse_string(xml, StopPlacesStreamingParser, state)
final_state.stop_places
end

test "parses a simple NeTEx StopPlace with Name and Coordinates" do
xml = """
<StopPlace id="FR:HELLO:POYARTIN:001">
<Name>Poyartin</Name>
<Centroid>
<Location>
<Latitude>43.6690</Latitude>
<Longitude>-0.9190</Longitude>
</Location>
</Centroid>
</StopPlace>
"""

assert get_stop_places(xml) == [
%{
id: "FR:HELLO:POYARTIN:001",
name: "Poyartin",
latitude: 43.6690,
longitude: -0.9190
}
]
end

test "parses multiple stop places, including ones with limited data" do
xml = """
<SomeParentNode>
<StopPlace id="FR:HELLO:AUBIERE:001">
<Name>Aubière</Name>
</StopPlace>
<StopPlace id="FR:HELLO:AUBIERE:001:A">
<Name>Aubière (A)</Name>
<ParentSiteRef ref="FR:HELLO:AUBIERE:001"/>
<Centroid>
<Location>
<Latitude>45.7594</Latitude>
<Longitude>3.1130</Longitude>
</Location>
</Centroid>
</StopPlace>
</SomeParentNode>
"""

assert get_stop_places(xml) == [
%{
id: "FR:HELLO:AUBIERE:001",
name: "Aubière"
},
%{
id: "FR:HELLO:AUBIERE:001:A",
name: "Aubière (A)",
latitude: 45.7594,
longitude: 3.1130
}
]
end
end
76 changes: 76 additions & 0 deletions scripts/netex_analyzer.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
resources =
DB.Resource
|> DB.Repo.all()

# count
resources
|> Enum.count()
|> IO.inspect()

df =
resources
|> Enum.map(fn r ->
%{
id: r.id,
url: r.url,
title: r.title,
unverified_format: r.format,
description: r.description
}
end)
|> Enum.filter(&(&1.unverified_format == "NeTEx"))

netex =
df
|> Task.async_stream(
fn r ->
url = r.url
file = Path.join("cache-dir", "resource-#{r.id}.dat")
status_file = file <> ".status.json"

unless File.exists?(status_file) do
IO.puts("Saving #{url}")
url = if String.contains?(url, "|"), do: URI.encode(url), else: url

%{status: status} =
Transport.HTTPClient.get!(url,
decode_body: false,
compressed: false,
into: File.stream!(file)
)

File.write!(status_file, %{status: status} |> Jason.encode!())
end

%{"status" => status} = File.read!(status_file) |> Jason.decode!()

r
|> Map.put(:http_status, status)
|> Map.put(:local_path, file)
end,
max_concurrency: 10,
timeout: 120_000
)
|> Stream.map(fn {:ok, result} -> result end)
|> Stream.reject(&is_nil(&1))
|> Task.async_stream(
fn r ->
IO.puts("Processing file #{r.id}")

try do
count =
Transport.NeTEx.read_all_stop_places(r.local_path)
|> Enum.flat_map(fn {_file, stops} -> stops end)
# some stop places have no latitude in NeTEx
|> Enum.reject(fn p -> is_nil(p[:latitude]) end)
|> Enum.count()

IO.puts("#{count} StopPlaces detected")
rescue
e -> IO.puts("Som'thing bad happened")
end
end,
max_concurrency: 5,
timeout: 60_000 * 5
)
|> Stream.run()

0 comments on commit 63a5e48

Please sign in to comment.