Part 2: Trustbit Logistics Hackathon - Implementing Basic Logistics Simulator
In the previous article (Part 1: Trustbit Logistics Hackathon - Behind the Scenes) we talked about organizing our Logistics Hackathon on Sustainability - how it came to be. Let’s get to the technical details now.
The hackathon had two major technical building blocks:
Logistics Simulation Runtime
Development environment for the participants
This article will focus on the simulation runtime. Here is how it plugs into the bigger picture:
As you can see, logistics simulation had a lot of features to handle. There was a simulation core with extensions plugged into it: supply chain, open cargo market, CO2 and sustainability, driver fatigue and travel fleets.
The simulation core had to solve these problems:
Host a map of locations connected by the roads.
Let trucks drive on that map between the locations
Compute passing of time, based on the truck speed (and road speed profile)
Build a route for trucks (e.g. the fastest path)
To demonstrate the essence, let’s implement that core in a standalone Python application. This is going to be a mix of a truck driving simulator AND a route planner.
Problem Definition
Given a fixed map of locations and roads, the simulation will have to find the fastest (not shortest!) path between any two locations: A and B. It will do that by launching truck instances from the location A in all directions at once. At every intersection it will fork and create more instances in all unvisited directions.
Time will move forward, as soon as the first truck instance arrives to location B, we can abort the simulation. The route that this winning truck took would be our fastest route.
By the way, if you have seen the Transport Tycoon Katas, you will recognize that problem immediately.
Constraints
To get things started, the map will look like this:
This graph will be represented by a CSV file:
Later on we could replace that file with something larger. A map of main roads in Europe, for example.
Defining the app interface is a viable option to constrain the solution and make it more manageable.
Let’s document the app interface before starting to code:
This app finds and prints a shortest route from location A to B. The route is printed along with the expected times of arrival. Usage: route.py FROM TO For example: ./route.py Steamdrift Leverstorm 0.00h DEPART Steamdrift 14.26h ARRIVE Cogburg 24.81h ARRIVE Irondale 31.88h ARRIVE Leverstorm
Simulation Input/Output
We start by importing the necessary modules. Only standard built-in Python libraries are being used:
sys - for working with arguments
PriorityQueue - First-in/First-out queue that is ordered by some value
defaultdict - just a convenient dictionary that provides a default value if a key doesn’t exist
import sys from queue import PriorityQueue from collections import defaultdict
Given that we can already grab two arguments:
Data Structures
We can also load a map into memory. It will be represented by a dictionary where:
key is name of a location;
value is a list of roads that connect this location to the other locations.
For each road we will also calculate the expected time to travel, storing it as tuple:
MAP = defaultdict(list) with open("s02e02_map.csv") as f: lines = f.readlines() for line in lines[1:]: loc_a, loc_b, km, speed = line.split(",") time_to_travel = float(km) / int(speed) MAP[loc_a].append((loc_b, time_to_travel)) MAP[loc_b].append((loc_a, time_to_travel))
The routing algorithm will need to track a list of locations already visited
. Let's make it a set:
visited = set()
The next step is to define the most important data structure in our entire simulation - the future
timeline. It will be represented by a priority queue that will hold a list of events which are about to happen.
future = PriorityQueue()
Each item in the future timeline will be represented by a tuple of:
time - float;
location where the truck arrives - just a name;
travel milestones of truck; we need to gather this in order to print the fastest path at the end.
At this point we don’t need to store truck identity or cargo, because there is only one truck travelling across these roads.
We are going to represent time as a float - total number of hours that has passed since the beginning of the simulation. It could be anything, even a date-time, but a float is good enough for now.
Truck milestones are a fun concept to represent in code. Each milestone captures a significant event in a logistics world. In our case we just track the fact that truck visits a specific location (or starts at one):, so we need just a timestamp, location
We could, for instance, have an accumulating list of past milestones foreach truck replica. When a truck “forks” at a location, we could append that location to the milestone history and copy it over to all new truck instances. We would make a copy for each replica, to avoid overwriting or mixing the individual histories.
A slightly more interesting way is to avoid copying and reuse shared travel milestones by using a linked list, where each subsequent step is a tuple that points to the previous one: (time, "location", parent)
.
For example, when truck starts, its milestones will look like:
(0, "Steamdrift", None)
If that truck arrives to Cogburg at 14.26, its milestones will look like:
(14.26, "Cogburg", (0, "Steamdrift", None) )
If another truck replica arrives to Rustbort at 15.67, its milestones will look like:
(15.67, "Cogburg", (0, "Steamdrift", None) )
And because this is a linked list, the (0, "Steamdrift", None)
part will be the same tuple instance shared between both truck instances.
We can actually put these tuple instances into the priority queue and they will sorted by the time of their occurrence, the smallest time coming first. This is because tuples are ordered by their first element first. And our first element is the timestamp!
The Algorithm
The algorithm will work this way:
Create the first element of the travel milestones in form of
(0, "starting location", None)
and put it into the priority queue.Repeat forever:
take the next event from the
future
priority queue;set the
clock
(current time) to the timestamp from that event. Because of the priority queue ordering, each time theclock
value will be greater or equal to its previous value;if
location
has already beenvisited
by another truck that came earlier - continue with the next item;if location matches to our destination - we got the fastest route; print the history and exit;
add location to the
visited
list, since this truck is the first to arrive here;use
MAP[location]
to look up all possible roads that go out of this location; for eachdestination
:ignore the road, if it leads to a location that has already been visited;
compute the arrival time to a
destination
by adding current time to the time to travel (available in the map);schedule a new milestone item - put it into the priority queue with the arrival time as a timestamp; don’t forget to link it to the previous milestone history, since we’ll need it for printing the fastest path.
Implementation
We already have our start and end locations from the CLI arguments. All the data structures are already in place, so let’s write down the algorithm.
First, create the first element of the start history and put it into the priority queue:
future.put((0, start, None))
Now, start the “repeat forever part”. We grab the next scheduled event (represented by the head of the linked list) from the priority queue. We can also destruct it right away, to set current clock
and current location
to the milestone information.
while not future.empty(): trip = future.get() (clock, location, parent) = trip
Skip processing and continue, if that location
was already visited:
if location in visited: continue
If our current location
matches desired end
location - grab all the previous milestones from this travel instance, reverse them, print and exit happily:
if location == end: # we arrived. Let's reverse the trip history to print it path = [(clock, location)] while parent: clock, location, parent = parent path.append((clock, location)) for clock, location in reversed(path): print(f"{clock:>5.2f}h {'ARRIVE' if clock else 'DEPART'} {location}") break
Otherwise, we still have some work to do. Add location to the visited
set:
visited.add(location)
Then for each possible destination that leads to an unvisited location, schedule a truck arrival into the future
:
for destination, time_to_travel in MAP[location]: if destination not in visited: arrival_time = clock + time_to_travel future.put((arrival_time, destination, trip))
That is all we need to be able to find the fastest route between two locations:
> python3 route.py Steamdrift Leverstorm 0.00h DEPART Steamdrift 14.26h ARRIVE Cogburg 24.81h ARRIVE Irondale 31.88h ARRIVE Leverstorm
At this point you can try replacing the existing map (csv file) with your own and then routing between these locations.
That concludes the entire implementation. If you want to see the entire algorithm in one place, check out this Python solution for DDD Transport Tycoon Exercise 2.2.
Summary
We went through the implementation of a basic logistics simulation in Python. It searches for the fastest path between any two locations by simulating a truck traveling in all directions at once.
The implementation is extremely naive, especially if we compare to Graph Neural Networks used in Google Maps. However, it provides the mental model for understanding and building more realistic logistics simulations.