#!/usr/bin/env python
# coding: utf-8
"""
Node layout routines.
"""
import warnings
import itertools
import numpy as np
from functools import wraps
from itertools import combinations, product
from scipy.spatial import Voronoi
from scipy.spatial.distance import cdist, pdist, squareform
from scipy.optimize import minimize, NonlinearConstraint
from rpack import pack
from grandalf.graphs import Vertex, Edge, Graph
from grandalf.layouts import SugiyamaLayout, DummyVertex
from ._utils import (
_edge_list_to_adjacency_matrix,
_edge_list_to_adjacency_list,
_get_subgraph,
_get_unique_nodes,
_get_n_points_on_a_circle,
_invert_dict,
_get_connected_components,
_convert_polar_to_cartesian_coordinates,
_get_angle,
)
DEBUG = False
def _handle_multiple_components(layout_function):
"""Most layout algorithms only handle graphs that consist of a giant
single component, and fail to find a suitable layout if the graph
consists of more than component. This decorator wraps a given
layout function such that if the graph contains more than one
component, the layout is first applied to each individual
component, and then the component layouts are combined using
rectangle packing.
"""
@wraps(layout_function)
def wrapped_layout_function(edges, nodes=None, *args, **kwargs):
# determine if there are more than one component
adjacency_list = _edge_list_to_adjacency_list(edges, directed=False)
components = _get_connected_components(adjacency_list)
if nodes:
unconnected_nodes = set(nodes) - set(_get_unique_nodes(edges))
if unconnected_nodes:
for node in unconnected_nodes:
components.append([node])
if len(components) > 1:
if layout_function.__name__ == 'get_linear_layout':
return get_layout_for_multiple_components(
edges, components, layout_function, mode='side-by-side', *args, **kwargs)
else:
return get_layout_for_multiple_components(
edges, components, layout_function, mode='packed', *args, **kwargs)
else:
return layout_function(edges, *args, **kwargs)
return wrapped_layout_function
def get_layout_for_multiple_components(edges, components, layout_function,
origin, scale, mode='packed', *args, **kwargs):
"""Determine suitable bounding box dimensions and placement for each
component in the graph, and then compute the layout of each
individual component given the constraint of the bounding box.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
components : list of sets
The connected components of the graph.
layout_function : function
Handle to the function computing the relative positions of each node within a component.
The args and kwargs are passed through to this function.
origin : tuple
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
mode : str, default 'packed'
The way in which individual components are arranged w.r.t. each other.
One of:
- 'packed' : Use rectangle packing to tightly arrange components.
- 'side-by-side' : Components are arranged next to each other.
*args, **kwargs
Passed through to layout_function
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
"""
if mode == 'packed':
bboxes = _get_packed_component_bboxes(components, origin, scale)
elif mode == 'side-by-side':
bboxes = _get_side_by_side_component_bboxes(components, origin, scale)
else:
msg = "Variable 'mode' is one of:"
msg += "\n - 'packed'"
msg += "\n - 'side-by-side'"
msg += f"\nCurrent value: '{mode}'"
raise ValueError(msg)
node_positions = dict()
for ii, (component, bbox) in enumerate(zip(components, bboxes)):
if len(component) > 1:
subgraph = _get_subgraph(edges, component)
component_node_positions = layout_function(subgraph, origin=bbox[:2], scale=bbox[2:], *args, **kwargs)
node_positions.update(component_node_positions)
else:
# component is a single node, which we can simply place at the centre of the bounding box
node_positions[component.pop()] = np.array([bbox[0] + 0.5 * bbox[2], bbox[1] + 0.5 * bbox[3]])
return node_positions
def _get_packed_component_bboxes(components, origin, scale, power=0.8, pad_by=0.1):
"""Partition the canvas given by origin and scale into bounding boxes, one for each component.
Use rectangle packing to tightly arrange bounding boxes.
Parameters
----------
components : list of sets
The connected components of the graph.
origin : tuple
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
power : float, default 0.8
The dimensions each bounding box are given by |V|^power by |V|^power,
where |V| are the total number of nodes.
Returns
-------
bboxes : list of tuples
The (left, bottom, width height) bounding boxes for each component.
"""
relative_dimensions = [_get_bbox_dimensions(len(component), power=power) for component in components]
# Add a padding between boxes, such that nodes cannot end up touching in the final layout.
# We choose a padding proportional to the dimensions of the largest box.
maximum_dimensions = np.max(relative_dimensions, axis=0)
pad_x, pad_y = pad_by * maximum_dimensions
padded_dimensions = [(width + pad_x, height + pad_y) for (width, height) in relative_dimensions]
# rpack only works on integers, hence multiply by some large scalar to retain some precision;
# NB: for some strange reason, rpack's running time is sensitive to the size of the boxes, so don't make the scalar too large
# TODO find alternative to rpack
scalar = 20
integer_dimensions = [(int(scalar*width), int(scalar*height)) for width, height in padded_dimensions]
origins = pack(integer_dimensions) # NB: rpack claims to return upper-left corners, when it actually returns lower-left corners
bboxes = [(x, y, scalar*width, scalar*height) for (x, y), (width, height) in zip(origins, relative_dimensions)]
# rescale boxes to canvas, effectively reversing the upscaling
bboxes = _rescale_bboxes_to_canvas(bboxes, origin, scale)
if DEBUG:
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
fig, ax = plt.subplots(1,1)
for bbox in bboxes:
ax.add_artist(Rectangle(bbox[:2], bbox[2], bbox[3], color=np.random.rand(3), zorder=-1))
# plt.show()
return bboxes
def _get_bbox_dimensions(n, power=0.5):
"""Given n nodes, compute appropriately scaled square bbox dimensions."""
# TODO: factor in the dimensions of the canvas
# such that the rescaled boxes are approximately square
return (n**power, n**power)
def _rescale_bboxes_to_canvas(bboxes, origin, scale):
"""Convert relative bbox dimensions to absolute bbox dimensions given the dimensions of the available canvas."""
lower_left_hand_corners = [(x, y) for (x, y, _, _) in bboxes]
upper_right_hand_corners = [(x+w, y+h) for (x, y, w, h) in bboxes]
minimum = np.min(lower_left_hand_corners, axis=0)
maximum = np.max(upper_right_hand_corners, axis=0)
total_width, total_height = maximum - minimum
# shift to (0, 0)
min_x, min_y = minimum
lower_left_hand_corners = [(x - min_x, y-min_y) for (x, y) in lower_left_hand_corners]
# rescale
scale_x = scale[0] / total_width
scale_y = scale[1] / total_height
lower_left_hand_corners = [(x*scale_x, y*scale_y) for x, y in lower_left_hand_corners]
dimensions = [(w * scale_x, h * scale_y) for (_, _, w, h) in bboxes]
rescaled_bboxes = [(x, y, w, h) for (x, y), (w, h) in zip(lower_left_hand_corners, dimensions)]
# shift by origin
x0, y0 = origin
shifted_bboxes = [(x + x0, y + y0, w, h) for (x, y, w, h) in rescaled_bboxes]
return shifted_bboxes
def _get_side_by_side_component_bboxes(components, origin, scale, pad_by=0.05):
"""Partition the canvas given by origin and scale into bounding boxes, one for each component.
Position bounding boxes next to each other.
Parameters
----------
components : list of sets
The connected components of the graph.
origin : tuple
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
Returns
-------
bboxes : list of tuples
The (left, bottom, width height) bounding boxes for each component.
"""
relative_dimensions = [(len(component), 1) for component in components]
# Add a padding between boxes, such that nodes cannot end up touching in the final layout.
# We choose a padding proportional to the dimensions of the largest box.
maximum_dimensions = np.max(relative_dimensions, axis=0)
pad_x, pad_y = pad_by * maximum_dimensions
padded_dimensions = [(width + pad_x, height + pad_y) for (width, height) in relative_dimensions]
x = 0
origins = []
for (dx, _) in padded_dimensions:
origins.append((x, 0))
x += dx
bboxes = [(x, y, width, height) for (x, y), (width, height) in zip(origins, relative_dimensions)]
# rescale boxes to canvas, effectively reversing the upscaling
bboxes = _rescale_bboxes_to_canvas(bboxes, origin, scale)
if DEBUG:
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
fig, ax = plt.subplots(1,1)
for bbox in bboxes:
ax.add_artist(Rectangle(bbox[:2], bbox[2], bbox[3], color=np.random.rand(3)))
plt.show()
return bboxes
[docs]
@_handle_multiple_components
def get_fruchterman_reingold_layout(edges,
edge_weights = None,
k = None,
origin = (0, 0),
scale = (1, 1),
pad_by = 0.05,
initial_temperature = 1.,
total_iterations = 50,
node_size = 0,
node_positions = None,
fixed_nodes = None,
*args, **kwargs):
"""'Spring' or Fruchterman-Reingold node layout.
Uses the Fruchterman-Reingold algorithm [Fruchterman1991]_ to compute node positions.
This algorithm simulates the graph as a physical system, in which nodes repell each other.
For connected nodes, this repulsion is counteracted by an attractive force exerted by the edges, which are simulated as springs.
The resulting layout is hence often referred to as a 'spring' layout.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
edge_weights : dict
Mapping of edges to edge weights.
k : float or None, default None
Expected mean edge length. If None, initialized to the sqrt(area / total nodes).
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
total_iterations : int, default 50
Number of iterations.
initial_temperature: float, default 1.
Temperature controls the maximum node displacement on each iteration.
Temperature is decreased on each iteration to eventually force the algorithm into a particular solution.
The size of the initial temperature determines how quickly that happens.
Values should be much smaller than the values of `scale`.
node_size : scalar or dict, default 0.
Size (radius) of nodes.
Providing the correct node size minimises the overlap of nodes in the graph,
which can otherwise occur if there are many nodes, or if the nodes differ considerably in size.
node_positions : dict or None, default None
Mapping of nodes to their (initial) x,y positions. If None are given,
nodes are initially placed randomly within the bounding box defined by `origin` and `scale`.
If the graph has multiple components, explicit initial positions may result in a ValueError,
if the initial positions fall outside of the area allocated to that specific component.
fixed_nodes : list or None, default None
Nodes to keep fixed at their initial positions.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Fruchterman1991] Fruchterman, TMJ and Reingold, EM (1991) ‘Graph drawing by force‐directed placement’,
Software: Practice and Experience
"""
assert len(edges) > 0, "The list of edges has to be non-empty."
# This is just a wrapper around `_fruchterman_reingold`, which implements (the loop body of) the algorithm proper.
# This wrapper handles the initialization of variables to their defaults (if not explicitely provided),
# and checks inputs for self-consistency.
origin = np.array(origin)
scale = np.array(scale)
assert len(origin) == len(scale), \
"Arguments `origin` (d={}) and `scale` (d={}) need to have the same number of dimensions!".format(len(origin), len(scale))
dimensionality = len(origin)
if fixed_nodes is None:
fixed_nodes = []
connected_nodes = _get_unique_nodes(edges)
if node_positions is None: # assign random starting positions to all nodes
node_positions_as_array = np.random.rand(len(connected_nodes), dimensionality) * scale + origin
unique_nodes = connected_nodes
else:
# 1) check input dimensionality
dimensionality_node_positions = np.array(list(node_positions.values())).shape[1]
assert dimensionality_node_positions == dimensionality, \
"The dimensionality of values of `node_positions` (d={}) must match the dimensionality of `origin`/ `scale` (d={})!".format(dimensionality_node_positions, dimensionality)
is_valid = _is_within_bbox(list(node_positions.values()), origin=origin, scale=scale)
if not np.all(is_valid):
error_message = "Some given node positions are not within the data range specified by `origin` and `scale`!"
error_message += "\n\tOrigin : {}, {}".format(*origin)
error_message += "\n\tScale : {}, {}".format(*scale)
error_message += "\nThe following nodes do not fall within this range:"
for ii, (node, position) in enumerate(node_positions.items()):
if not is_valid[ii]:
error_message += "\n\t{} : {}".format(node, position)
error_message += "\nThis error can occur if the graph contains multiple components but some or all node positions are initialised explicitly (i.e. node_positions != None)."
raise ValueError(error_message)
# 2) handle discrepancies in nodes listed in node_positions and nodes extracted from edges
if set(node_positions.keys()) == set(connected_nodes):
# all starting positions are given;
# no superfluous nodes in node_positions;
# nothing left to do
unique_nodes = connected_nodes
else:
# some node positions are provided, but not all
for node in connected_nodes:
if not (node in node_positions):
warnings.warn("Position of node {} not provided. Initializing to random position within frame.".format(node))
node_positions[node] = np.random.rand(2) * scale + origin
unconnected_nodes = []
for node in node_positions:
if not (node in connected_nodes):
unconnected_nodes.append(node)
fixed_nodes.append(node)
# warnings.warn("Node {} appears to be unconnected. The current node position will be kept.".format(node))
unique_nodes = connected_nodes + unconnected_nodes
node_positions_as_array = np.array([node_positions[node] for node in unique_nodes])
total_nodes = len(unique_nodes)
if isinstance(node_size, (int, float)):
node_size = node_size * np.ones((total_nodes))
elif isinstance(node_size, dict):
node_size = np.array([node_size[node] if node in node_size else 0. for node in unique_nodes])
adjacency = _edge_list_to_adjacency_matrix(
edges, edge_weights=edge_weights, unique_nodes=unique_nodes)
# Forces in FR are symmetric.
# Hence we need to ensure that the adjacency matrix is also symmetric.
adjacency = adjacency + adjacency.transpose()
if fixed_nodes:
is_mobile = np.array([False if node in fixed_nodes else True for node in unique_nodes], dtype=bool)
mobile_positions = node_positions_as_array[is_mobile]
fixed_positions = node_positions_as_array[~is_mobile]
mobile_node_sizes = node_size[is_mobile]
fixed_node_sizes = node_size[~is_mobile]
# reorder adjacency
total_mobile = np.sum(is_mobile)
reordered = np.zeros((adjacency.shape[0], total_mobile))
reordered[:total_mobile, :total_mobile] = adjacency[is_mobile][:, is_mobile]
reordered[total_mobile:, :total_mobile] = adjacency[~is_mobile][:, is_mobile]
adjacency = reordered
else:
is_mobile = np.ones((total_nodes), dtype=bool)
mobile_positions = node_positions_as_array
fixed_positions = np.zeros((0, 2))
mobile_node_sizes = node_size
fixed_node_sizes = np.array([])
if k is None:
# area = np.product(scale)
area = np.pi * (min(scale) / 2) ** 2
k = np.sqrt(area / float(total_nodes))
temperatures = _get_temperature_decay(initial_temperature, total_iterations)
# --------------------------------------------------------------------------------
# main loop
for ii, temperature in enumerate(temperatures):
candidate_positions = _fruchterman_reingold(mobile_positions, fixed_positions,
mobile_node_sizes, fixed_node_sizes,
adjacency, temperature, k)
is_valid = _is_within_bbox(candidate_positions, origin=origin, scale=scale)
mobile_positions[is_valid] = candidate_positions[is_valid]
# --------------------------------------------------------------------------------
# format output
node_positions_as_array[is_mobile] = mobile_positions
if np.all(is_mobile):
node_positions_as_array = _fit_to_frame(node_positions_as_array, origin, scale, pad_by)
node_positions = dict(zip(unique_nodes, node_positions_as_array))
return node_positions
def _is_within_bbox(points, origin, scale):
"""Check if each of the given points is within the bounding box given by origin and scale."""
minima = np.array(origin)
maxima = minima + np.array(scale)
return np.all(np.logical_and(points >= minima, points <= maxima), axis=1)
def _get_temperature_decay(initial_temperature, total_iterations, mode='quadratic', eps=1e-9):
"""Compute all temperature values for a given initial temperature and decay model."""
x = np.linspace(0., 1., total_iterations)
if mode == 'quadratic':
y = (x - 1.)**2 + eps
elif mode == 'linear':
y = (1. - x) + eps
else:
raise ValueError("Argument `mode` one of: 'linear', 'quadratic'.")
return initial_temperature * y
def _fruchterman_reingold(mobile_positions, fixed_positions,
mobile_node_radii, fixed_node_radii,
adjacency, temperature, k):
"""Inner loop of Fruchterman-Reingold layout algorithm."""
combined_positions = np.concatenate([mobile_positions, fixed_positions], axis=0)
combined_node_radii = np.concatenate([mobile_node_radii, fixed_node_radii])
delta = mobile_positions[np.newaxis, :, :] - combined_positions[:, np.newaxis, :]
distance = np.linalg.norm(delta, axis=-1)
# alternatively: (hack adapted from igraph)
if np.sum(distance==0) - np.trace(distance==0) > 0: # i.e. if off-diagonal entries in distance are zero
warnings.warn("Some nodes have the same position; repulsion between the nodes is undefined.")
rand_delta = np.random.rand(*delta.shape) * 1e-9
is_zero = distance <= 0
delta[is_zero] = rand_delta[is_zero]
distance = np.linalg.norm(delta, axis=-1)
# subtract node radii from distances to prevent nodes from overlapping
distance -= mobile_node_radii[np.newaxis, :] + combined_node_radii[:, np.newaxis]
# prevent distances from becoming less than zero due to overlap of nodes
distance[distance <= 0.] = 1e-6 # 1e-13 is numerical accuracy, and we will be taking the square shortly
with np.errstate(divide='ignore', invalid='ignore'):
direction = delta / distance[..., None] # i.e. the unit vector
# calculate forces
repulsion = _get_fr_repulsion(distance, direction, k)
attraction = _get_fr_attraction(distance, direction, adjacency, k)
displacement = attraction + repulsion
# limit maximum displacement using temperature
displacement_length = np.linalg.norm(displacement, axis=-1)
displacement = displacement / displacement_length[:, None] * np.clip(displacement_length, None, temperature)[:, None]
mobile_positions = mobile_positions + displacement
return mobile_positions
def _get_fr_repulsion(distance, direction, k):
"""Compute repulsive forces."""
with np.errstate(divide='ignore', invalid='ignore'):
magnitude = k**2 / distance
vectors = direction * magnitude[..., None]
# Note that we cannot apply the usual strategy of summing the array
# along either axis and subtracting the trace,
# as the diagonal of `direction` is np.nan, and any sum or difference of
# NaNs is just another NaN.
# Also we do not want to ignore NaNs by using np.nansum, as then we would
# potentially mask the existence of off-diagonal zero distances.
for ii in range(vectors.shape[-1]):
np.fill_diagonal(vectors[:, :, ii], 0)
return np.sum(vectors, axis=0)
def _get_fr_attraction(distance, direction, adjacency, k):
"""Compute attractive forces."""
magnitude = 1./k * distance**2 * adjacency
vectors = -direction * magnitude[..., None] # NB: the minus!
for ii in range(vectors.shape[-1]):
np.fill_diagonal(vectors[:, :, ii], 0)
return np.sum(vectors, axis=0)
def _rescale_to_frame(node_positions, origin, scale):
"""Rescale node positions such that all nodes are within the bounding box."""
node_positions = node_positions.copy() # force copy, as otherwise the `fixed_nodes` argument is effectively ignored
node_positions -= np.min(node_positions, axis=0)
# normalize only when nodes are not already aligned, otherwise we divide by zero
np.divide(node_positions, np.ptp(node_positions, axis=0), where=np.ptp(node_positions, axis=0) != 0, out=node_positions)
# if nodes are aligned in any one dimension, place in the middle along that dimension
np.add(node_positions, np.full(node_positions.ndim, 0.5), where=np.ptp(node_positions, axis=0) == 0, out=node_positions)
node_positions *= scale[None, ...]
node_positions += origin[None, ...]
return node_positions
@_handle_multiple_components
def get_random_layout(edges, origin=(0,0), scale=(1,1)):
nodes = _get_unique_nodes(edges)
return {node : np.random.rand(2) * scale + origin for node in nodes}
[docs]
@_handle_multiple_components
def get_sugiyama_layout(edges, origin=(0, 0), scale=(1, 1), pad_by=0.05, node_size=3, total_iterations=3):
"""'Dot' or Sugiyama node layout.
Uses the Sugiyama algorithm [Sugiyama1981]_ to compute node positions.
This function is a wrapper around the SugiyamaLayout class in grandalf.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
total_iterations : int, default 50
Number of iterations.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Sugiyama1981] Sugiyama, K; Tagawa, S; Toda, M (1981) 'Methods for visual understanding of hierarchical system structures',
IEEE Transactions on Systems, Man, and Cybernetics
"""
# TODO potentially test that graph is a DAG
nodes = _get_unique_nodes(edges)
graph = _get_grandalf_graph(edges, nodes, node_size)
layout = SugiyamaLayout(graph.C[0])
layout.init_all()
layout.draw(total_iterations)
# extract node positions
node_positions = dict()
for layer in layout.layers:
for vertex in layer:
if not isinstance(vertex, DummyVertex):
# The DummyVertex class is used by the sugiyama layout to represent
# *long* edges, i.e. edges that span over several ranks.
# For these edges, a DummyVertex is inserted in every inner layer.
# Here we ignore them, as they are not part of the final layout.
node_positions[vertex.data] = vertex.view.xy
# rescale to canvas
# TODO: by rescaling, we effectively ignore the node_size argument
nodes, positions = zip(*node_positions.items())
positions = _rescale_to_frame(np.array(positions), np.array(origin) + pad_by * np.array(scale), (1 - pad_by) * np.array(scale))
# place roots on top, leaves on bottom
positions[:, 1] -= origin[1] + scale[1]
positions[:, 1] *= -1
return dict(zip(nodes, positions))
def _get_grandalf_graph(edges, nodes, node_size):
"""Construct a grandalf.Graph object from the given edge list, node list, and node sizes."""
node_to_grandalf_vertex = dict()
for node in nodes:
# initialize vertex object
vertex = Vertex(node)
# initialize view
if isinstance(node_size, (int, float)):
vertex.view = vertex_view(2 * node_size, 2 * node_size)
elif isinstance(node_size, dict):
vertex.view = vertex_view(2 * node_size[node], 2 * node_size[node])
else:
raise TypeError
node_to_grandalf_vertex[node] = vertex
V = list(node_to_grandalf_vertex.values())
E = [Edge(node_to_grandalf_vertex[source], node_to_grandalf_vertex[target]) for source, target in edges]
G = Graph(V, E)
return G
class vertex_view(object):
def __init__(self, w, h):
self.w = w
self.h = h
[docs]
@_handle_multiple_components
def get_radial_tree_layout(edges, origin=(0, 0), scale=(1, 1), pad_by=0.05, node_size=3, total_iterations=3):
"""Radial tree layout.
Uses the Sugiyama algorithm [Sugiyama1981]_ to compute node positions.
This function is a wrapper around the SugiyamaLayout class in grandalf.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
total_iterations : int, default 50
Number of iterations.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Sugiyama1981] Sugiyama, K; Tagawa, S; Toda, M (1981) 'Methods for visual understanding of hierarchical system structures',
IEEE Transactions on Systems, Man, and Cybernetics
"""
sugiyama_positions = get_sugiyama_layout(edges,
origin = (0, 0),
scale = (1, 1),
pad_by = 0,
node_size = node_size,
total_iterations = total_iterations
)
# determine the size of the largest layer
nodes_per_layer = dict()
for node, (_, y) in sugiyama_positions.items():
if y in nodes_per_layer:
nodes_per_layer[y] += 1
else:
nodes_per_layer[y] = 1
max_nodes_per_layer = np.max(list(nodes_per_layer.values()))
# alternatively:
# y_coordinates = np.array(list(sugiyama_positions.values()))[:, 1]
# (_, max_nodes_per_layer), = Counter(y_coordinates).most_common(1)
max_angle = 2 * np.pi * max_nodes_per_layer / (max_nodes_per_layer + 1)
max_radius = np.min(scale) / 2
max_radius *= (1 - pad_by) # shrink to make room for node artists, labels, and annotations
offset = np.array([origin[0] + 0.5 * scale[0], origin[1] + 0.5 * scale[1]])
node_positions = dict()
for node, (x, y) in sugiyama_positions.items():
angle = max_angle * x
radius = max_radius * (1 - y)
node_positions[node] = _convert_polar_to_cartesian_coordinates(radius, angle) + offset
return node_positions
[docs]
@_handle_multiple_components
def get_circular_layout(edges, origin=(0, 0), scale=(1, 1), pad_by=0.05, node_order=None, reduce_edge_crossings=True):
"""Circular node layout.
By default, this implementation uses a heuristic to arrange the nodes such that the edge crossings are minimised.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
node_order : list or None, default None
The (initial) ordering of nodes (counter-clockwise) before layout optimisation.
Set :code:`reduce_edge_crossings` to :code:`False` to skip optimisation and retain the given node order in the returned layout.
reduce_edge_crossings : bool, default True
If True, attempts to minimize edge crossings via the algorithm outlined in [Baur2005]_.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Baur2005] Baur & Brandes (2005) Crossing reduction in circular layouts.
"""
nodes = _get_unique_nodes(edges)
center = np.array(origin) + 0.5 * np.array(scale)
radius = np.min(scale) / 2
radius *= (1 - pad_by) # fudge factor to make space for self-loops, annotations, etc
positions = _get_n_points_on_a_circle(center, radius, len(nodes), start_angle=0)
if node_order:
nodes = [node for node in node_order if node in nodes]
if reduce_edge_crossings:
if not _is_complete(edges):
# remove self-loops as these should have no bearing on the solution
edges = [(source, target) for source, target in edges if source != target]
nodes = _reduce_crossings(edges)
return dict(zip(nodes, positions))
def _is_complete(edges):
"""Check if the graph is fully connected."""
nodes = _get_unique_nodes(edges)
minimal_complete_graph = list(combinations(nodes, 2))
if len(edges) < len(minimal_complete_graph):
return False
for edge in minimal_complete_graph:
if (edge not in edges) and (edge[::-1] not in edges):
return False
return True
def _reduce_crossings(edges):
"""Implements Baur & Brandes (2005) Crossing reduction in circular layouts."""
adjacency_list = _edge_list_to_adjacency_list(edges, directed=False)
node_order = _initialize_node_order(adjacency_list)
node_order = _optimize_node_order(adjacency_list, node_order)
return node_order
def _initialize_node_order(node_to_neighbours):
"""Implements "Connectivity & Crossings" variant from Baur & Brandes (2005)."""
nodes = list(node_to_neighbours.keys())
ordered = nodes[:1]
remaining = nodes[1:]
closed_edges = []
start, = ordered
open_edges = set([(start, neighbour) for neighbour in node_to_neighbours[start]])
while remaining:
minimum_unplaced_neighbours = np.inf
maximum_placed_neighbours = 0
for ii, node in enumerate(remaining):
placed_neighbours = len([neighbour for neighbour in node_to_neighbours[node] if neighbour in ordered])
unplaced_neighbours = len([neighbour for neighbour in node_to_neighbours[node] if neighbour not in ordered])
if (placed_neighbours > maximum_placed_neighbours) or \
((placed_neighbours == maximum_placed_neighbours) and (unplaced_neighbours < minimum_unplaced_neighbours)):
maximum_placed_neighbours = placed_neighbours
minimum_unplaced_neighbours = unplaced_neighbours
selected = node
selected_idx = ii
remaining.pop(selected_idx)
closed_edges = set([(node, selected) for node in node_to_neighbours[selected] if node in ordered])
open_edges -= closed_edges
a = _get_total_crossings(ordered + [selected] + remaining, closed_edges, open_edges)
b = _get_total_crossings([selected] + ordered + remaining, closed_edges, open_edges)
if a < b:
ordered.append(selected)
else:
ordered.insert(0, selected)
open_edges |= set([(selected, node) for node in node_to_neighbours[selected] if node not in ordered])
return ordered
def _get_total_crossings(node_order, edges1, edges2=None):
"""Compute the number of crossings for a given node order."""
if edges2 is None:
edges2 = edges1
total_crossings = 0
for edge1, edge2 in itertools.product(edges1, edges2):
total_crossings += int(_is_cross(node_order, edge1, edge2))
return total_crossings
def _is_cross(node_order, edge_1, edge_2):
"""Check if two edges cross, given the node order."""
(s1, t1), = np.where(np.in1d(node_order, edge_1, assume_unique=True))
(s2, t2), = np.where(np.in1d(node_order, edge_2, assume_unique=True))
if (s1 < s2 < t1 < t2) or (s2 < s1 < t2 < t2):
return True
else:
return False
def _optimize_node_order(node_to_neighbours, node_order=None, max_iterations=100):
"""Implement circular sifting as outlined in Baur & Brandes (2005)."""
if node_order is None:
node_order = list(node_to_neighbours.keys())
node_order = np.array(node_order)
node_to_edges = dict()
for node, neighbours in node_to_neighbours.items():
node_to_edges[node] = [(node, neighbour) for neighbour in neighbours]
undirected_edges = set()
for edges in node_to_edges.values():
for edge in edges:
if edge[::-1] not in undirected_edges:
undirected_edges.add(edge)
total_crossings = _get_total_crossings(node_order, undirected_edges)
total_nodes = len(node_order)
for iteration in range(max_iterations):
previous_best = total_crossings
for u in node_order.copy():
best_order = node_order.copy()
minimum_crossings = total_crossings
(ii,), = np.where(node_order == u)
for jj in range(ii+1, ii+total_nodes):
v = node_order[jj%total_nodes]
cuv = _get_total_crossings(node_order, node_to_edges[u], node_to_edges[v])
node_order = _swap_values(node_order, u, v)
cvu = _get_total_crossings(node_order, node_to_edges[u], node_to_edges[v])
total_crossings = total_crossings - cuv + cvu
if total_crossings < minimum_crossings:
best_order = node_order.copy()
minimum_crossings = total_crossings
node_order = best_order
total_crossings = minimum_crossings
improvement = previous_best - total_crossings
if not improvement:
break
if (iteration + 1) == max_iterations:
warnings.warn("Maximum number of iterations reached. Aborting further node layout optimisations.")
return node_order
def _swap_values(arr, value_1, value_2):
"""Swap all occurrences of two values in a given array."""
arr = arr.copy()
is_value_1 = arr == value_1
is_value_2 = arr == value_2
arr[is_value_1] = value_2
arr[is_value_2] = value_1
return arr
def _reduce_node_overlap(node_positions, origin, scale, fixed_nodes=None, eta=0.1, total_iterations=10):
"""Use a constrained version of Lloyds algorithm to move nodes apart from each other.
See also
--------
https://en.wikipedia.org/wiki/Lloyd%27s_algorithm
"""
unique_nodes = list(node_positions.keys())
positions = np.array(list(node_positions.values()))
if fixed_nodes:
is_mobile = np.array([False if node in fixed_nodes else True for node in unique_nodes], dtype=bool)
else:
is_mobile = np.ones((len(unique_nodes)), dtype=bool)
for _ in range(total_iterations):
centroids = _get_voronoi_centroids(positions)
delta = centroids - positions
new = positions + eta * delta
# constrain Lloyd's algorithm by only updating positions where the new position is within the bbox
valid = _is_within_bbox(new, origin, scale)
mask = np.logical_and(valid, is_mobile)
positions[mask] = new[mask]
return dict(zip(unique_nodes, positions))
def _remove_node_overlap(node_positions, node_size, origin, scale, fixed_nodes=None, tolerance=1e-6, maximum_iterations=100):
"""Uses a constrained variation of Lloyds algorithm to move nodes apart from each other until none overlap."
See also
--------
https://en.wikipedia.org/wiki/Lloyd%27s_algorithm
"""
unique_nodes = list(node_positions.keys())
positions = np.array(list(node_positions.values()))
radii = np.array([node_size[node] for node in node_positions])
if fixed_nodes:
is_mobile = np.array([False if node in fixed_nodes else True for node in unique_nodes], dtype=bool)
else:
is_mobile = np.ones((len(unique_nodes)), dtype=bool)
minimum_distances = radii[np.newaxis, :] + radii[:, np.newaxis]
minimum_distances[np.diag_indices_from(minimum_distances)] = 0 # ignore distances to self
# Initialize the first loop.
distances = cdist(positions, positions)
displacements = np.max(np.clip(minimum_distances - distances, 0, None), axis=-1)
ctr = 0
while np.any(displacements > tolerance) & (ctr < maximum_iterations):
centroids = _get_voronoi_centroids(positions)
# Compute the direction from each point towards its corresponding Voronoi centroid.
deltas = centroids - positions
magnitudes = np.linalg.norm(deltas, axis=-1)
directions = deltas / magnitudes[:, np.newaxis]
# Mask NaNs that arise if the magnitude is zero, i.e. the point is already center of the Voronoi cell.
directions[np.isnan(directions)] = 0
# Step into the direction of the centroid.
# Clipping prevents overshooting of the centroid when stepping into the direction of the centroid.
# We step by half the displacement as the other overlapping point will be moved in approximately the opposite direction.
new = positions + np.clip(0.5 * displacements, None, magnitudes)[:, np.newaxis] * directions
# Constrain Lloyd's algorithm by only updating positions where the new position is within the bbox.
valid = _is_within_bbox(new, origin, scale)
mask = np.logical_and(valid, is_mobile)
positions[mask] = new[mask]
# Initialize next loop.
distances = cdist(positions, positions)
displacements = np.max(np.clip(minimum_distances - distances, 0, None), axis=-1)
ctr += 1
return dict(zip(unique_nodes, positions))
def _get_voronoi_centroids(positions):
"""Construct a Voronoi diagram from the given node positions and determine the center of each cell."""
voronoi = Voronoi(positions)
centroids = np.zeros_like(positions)
for ii, idx in enumerate(voronoi.point_region):
region = [jj for jj in voronoi.regions[idx] if jj != -1] # i.e. ignore points at infinity; TODO: compute correctly clipped regions
centroids[ii] = _get_centroid(voronoi.vertices[region])
return centroids
def _clip_to_frame(positions, origin, scale):
"""Prevent node positions from leaving the bounding box given by origin and scale."""
origin = np.array(origin)
scale = np.array(scale)
for ii, (minimum, maximum) in enumerate(zip(origin, origin+scale)):
positions[:, ii] = np.clip(positions[:, ii], minimum, maximum)
return positions
def _get_centroid(polygon):
"""Compute the centroid of a polygon."""
# TODO: formula may be incorrect; correct one here:
# https://en.wikipedia.org/wiki/Centroid#Of_a_polygon
return np.mean(polygon, axis=0)
@_handle_multiple_components
def get_linear_layout(edges, origin=(0, 0), scale=(1, 1), pad_by=0.05, node_order=None, reduce_edge_crossings=True):
"""Linear node layout.
If :code:`reduce_edge_crossings` is set to :code:`True`, the algorithm
attempts to minimize edge crossings via the algorithm outlined in [Baur2005]_.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
node_order : list or None, default None
The (initial) ordering of nodes (counter-clockwise) before layout optimisation.
Set :code:`reduce_edge_crossings` to :code:`False` to skip optimisation and retain the given node order in the returned layout.
reduce_edge_crossings : bool, default True
If True, attempts to minimize edge crossings via the algorithm outlined in [Baur2005]_.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Baur2005] Baur & Brandes (2005) Crossing reduction in circular layouts.
"""
nodes = _get_unique_nodes(edges)
total_nodes = len(nodes)
x = np.linspace(origin[0] + pad_by * scale[0], origin[0] + (1 - pad_by) * scale[0], total_nodes)
y = np.full(total_nodes, origin[1] + 0.5 * scale[1])
positions = np.c_[x, y]
if node_order:
nodes = [node for node in node_order if node in nodes]
if reduce_edge_crossings:
if not _is_complete(edges):
# remove self-loops as these should have no bearing on the solution
edges = [(source, target) for source, target in edges if source != target]
nodes = _reduce_crossings(edges)
# The algorithm in _reduce_crossings assumes that nodes are arranged in a circle.
# As a consequence, when nodes are arranged along a line, a community may be split
# with members on both ends of the line.
# Here we find a better split by minimising the total edge length.
nodes = _minimize_total_edge_length(nodes, edges)
return dict(zip(nodes, positions))
def _minimize_total_edge_length(nodes, edges):
total_iterations = len(nodes)
output = nodes
optimum = np.inf
for ii in range(total_iterations):
node_to_position = dict(zip(nodes, range(len(nodes))))
total_edge_length = np.sum([np.abs(node_to_position[source] - node_to_position[target]) for (source, target) in edges])
if total_edge_length < optimum:
output = nodes
optimum = total_edge_length
nodes = np.r_[nodes[1:], nodes[0]]
return output
[docs]
def get_bipartite_layout(edges, nodes=None, subsets=None, origin=(0, 0), scale=(1, 1), pad_by=0.05, reduce_edge_crossings=True):
"""Bipartite node layout.
By default, this implementation uses a heuristic to arrange the nodes such that the edge crossings are reduced.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
subsets : list
The two layers of the graph. If None, a two-coloring is used to separate the nodes into two subsets.
However, if the graph consists of multiple components, this partitioning into two layers is ambiguous, as multiple solutions exist.
origin : tuple
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
reduce_edge_crossings : bool, default True
If True, attempts to reduce edge crossings via the algorithm outlined in [Eades1994]_.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Eades1994] Eades & Wormald (1994) Edge crossings in drawings of bipartite graphs.
"""
adjacency_list = _edge_list_to_adjacency_list(edges, directed=False)
if nodes:
for node in nodes:
adjacency_list.setdefault(node, set())
if subsets:
left, right = subsets
else:
if len(_get_connected_components(adjacency_list)) > 1:
import warnings
msg = "The graph consistst of multiple components, and hence the partitioning into two subsets/layers is ambiguous!"
msg += "\n"
msg += "Use the `subsets` argument to explicitly specify the desired partitioning."
warnings.warn(msg)
left, right = _get_bipartite_sets(adjacency_list)
if reduce_edge_crossings:
if not _is_complete_bipartite(edges, left, right):
left, right = _reduce_crossings_bipartite(adjacency_list, left, right)
# shrink frame to apply padding
origin = np.array(origin) + pad_by * np.array(scale)
scale = np.array(scale) * (1 - 2 * pad_by)
if len(left) > len(right):
spacing = scale[1] / (len(left) - 1)
else:
try:
spacing = scale[1] / (len(right) - 1)
except ZeroDivisionError:
# The graph consists of a single edge.
spacing = 1.
node_positions = dict()
for subset, xx in ((left, origin[0]), (right, origin[0] + scale[0])):
y = spacing * np.arange(len(subset))
y -= np.mean(y)
y += scale[1] / 2
for node, yy in zip(subset, y):
node_positions[node] = (xx, yy)
return node_positions
def _get_bipartite_sets(adjacency_list):
colour = _get_two_colouring(adjacency_list)
left = [node for node in colour if colour[node] == 0]
right = [node for node in colour if colour[node] == 1]
return left, right
def _get_two_colouring(adjacency_list):
# Adapted from networkx.algorithms.bipartite.color.
colour = dict()
for node in adjacency_list:
if (node in colour):
continue
elif len(adjacency_list[node]) == 0:
colour[node] = 0
else:
queue = [node]
colour[node] = 1
while queue:
node = queue.pop()
for neighbour in adjacency_list[node]:
if neighbour in colour:
if colour[neighbour] == colour[node]:
raise Exception("Graph is not bipartite.")
else:
colour[neighbour] = 1 - colour[node]
queue.append(neighbour)
return colour
def _is_complete_bipartite(edges, left, right):
"""Check if the bipartite graph is fully connected."""
minimal_complete_graph = list(product(left, right))
if len(edges) < len(minimal_complete_graph):
return False
for edge in minimal_complete_graph:
if (edge not in edges) and (edge[::-1] not in edges):
return False
return True
def _reduce_crossings_bipartite(adjacency_list, left, right):
"""Reduce the number of crossings in a bipartite graph using the median heuristic proposed in Eades & Wormald (1994)."""
left_ranks = {node : ii for ii, node in enumerate(left)}
right_ranks = dict()
for node in right:
neighbours = adjacency_list[node]
if neighbours:
right_ranks[node] = np.median([left_ranks[neighbour] for neighbour in neighbours])
else:
right_ranks[node] = 0
# TODO: break ties.
# If one node has an even number of neighbours and the other an odd number, than the odd one should have the lower rank.
# For the other two cases (even/even, odd/odd), no tie-break procedure seems specified in the paper.
return left, sorted(right_ranks, key=right_ranks.get)
[docs]
def get_multipartite_layout(edges, layers, layer_positions=None, origin=(0, 0), scale=(1, 1), pad_by=0.05, reduce_edge_crossings=True, uniform_node_spacing=True):
"""Layered node layout for a multipartite graph.
By default, this implementation uses a heuristic to arrange the nodes such that the edge crossings are reduced.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
layers : list
List of node subsets, one for each layer of the graph.
layer_positions : list, default None
A list of x-coordinates, one for each layer.
If None provided, layers are placed evenly between origin[0] and origin[0] + scale[0].
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
reduce_edge_crossings : bool, default True
If True, attempts to reduce edge crossings via the algorithm outlined in [Eades1994]_.
uniform_node_spacing : bool, default True
If True, the spacing between nodes is uniform across layers.
Otherwise, nodes in each layer are distributed evenly across the full height of the canvas.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Eades1994] Eades & Wormald (1994) Edge crossings in drawings of bipartite graphs.
"""
# shrink frame to apply padding
origin = np.array(origin) + pad_by * np.array(scale)
scale = np.array(scale) * (1 - 2 * pad_by)
# set the space between nodes
if uniform_node_spacing:
try:
spacing = scale[1] / (np.max([len(layer) for layer in layers]) - 1)
node_spacings = [spacing * np.ones_like(layer, dtype=float) for layer in layers]
except ZeroDivisionError:
# The graph has at most a single edge between each pair of layers.
spacing = 1
node_spacings = [spacing * np.ones_like(layer, dtype=float) for layer in layers]
else:
node_spacings = []
for ii, layer in enumerate(layers):
if len(layer) > 1:
node_spacings.append(1./(len(layer) - 1) * np.ones_like(layer, float))
else:
node_spacings.append(np.ones_like(layer, float))
# set the space between layers
if layer_positions is None:
layer_positions = np.linspace(origin[0], origin[0] + scale[0], len(layers))
else:
if (np.min(layer_positions) < origin[0]) or (np.max(layer_positions) > origin[0] + scale[0]):
import warnings
warnings.warn("Some layer positions are outside the bounding box defined by `origin` and `scale`.")
# fix positions of nodes in first layer
node_positions = dict()
node_positions.update(_get_node_positions_within_layer(layers[0], node_spacings[0], layer_positions[0], origin, scale))
# assign the position of nodes in subsequent layers
total_layers = len(layers)
for ii in range(1, total_layers):
left = layers[ii-1]
right = layers[ii]
union = set(left) | set(right)
edges_between_layers = [(source, target) for (source, target) in edges if (source in union) and (target in union)]
adjacency_list = _edge_list_to_adjacency_list(edges_between_layers, directed=False)
# add unconnected nodes
for node in left:
adjacency_list.setdefault(node, set())
for node in right:
adjacency_list.setdefault(node, set())
if reduce_edge_crossings:
if not _is_complete_bipartite(edges, left, right):
left, right = _reduce_crossings_bipartite(adjacency_list, left, right)
node_positions.update(_get_node_positions_within_layer(right, node_spacings[ii], layer_positions[ii], origin, scale))
return node_positions
def _get_node_positions_within_layer(node_order, node_spacing, layer_position, origin, scale):
node_positions = dict()
xx = layer_position
y = node_spacing * np.arange(len(node_order))
y -= np.mean(y)
y += origin[1] + scale[1] / 2
for node, yy in zip(node_order, y):
node_positions[node] = np.array((xx, yy))
return node_positions
[docs]
def get_shell_layout(edges, shells, radii=None, origin=(0, 0), scale=(1, 1), pad_by=0.05, reduce_edge_crossings=True):
"""Shell layout.
This is a wrapper around `get_multipartite_layout` that arranges nodes in shells around a center instead of in layers.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
shells : list
List of node subsets, one for each shell of the graph.
radii : list, default None
List of radii, one for each shell.
If None, radii are chosen such that the shells are evenly spaced within the bounding box defined by origin and scale.
origin : tuple
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
:code:`xmin = origin[0] + pad_by * scale[0]`
:code:`xmax = origin[0] + scale[0] - pad_by * scale[0]`
:code:`ymin = origin[1] + pad_by * scale[1]`
:code:`ymax = origin[1] + scale[1] - pad_by * scale[1]`
reduce_edge_crossings : bool, default True
If True, attempts to reduce edge crossings via the algorithm outlined in [Eades1994]_.
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
References
----------
.. [Eades1994] Eades & Wormald (1994) Edge crossings in drawings of bipartite graphs.
"""
if radii is None:
if len(shells[0]) == 1:
# Innermost shell consists of a single node, and hence should have no size.
radii = np.linspace(0, (1 - pad_by) * np.min(scale) / 2, len(shells))
else:
# Innermost shell consists of multiple nodes and should hence have a non-zero radius.
radii = np.linspace(0, (1 - pad_by) * np.min(scale) / 2, len(shells) + 1)[1:]
relative_radii = np.array(radii) / np.max(radii)
multipartite_positions = get_multipartite_layout(
edges, shells, layer_positions=relative_radii,
reduce_edge_crossings=reduce_edge_crossings,
uniform_node_spacing=False,
pad_by=0,
)
node_to_shell = {node : shell for shell in shells for node in shell}
max_radius = np.max(radii)
center = np.array([origin[0] + 0.5 * scale[0], origin[1] + 0.5 * scale[1]])
node_positions = dict()
for node, (x, y) in multipartite_positions.items():
shell = node_to_shell[node]
max_angle = 2 * np.pi * len(shell) / (len(shell) + 1)
angle = max_angle * y
radius = max_radius * x
node_positions[node] = _convert_polar_to_cartesian_coordinates(radius, angle) + center
return node_positions
def _get_community_sizes(node_to_community, scale):
"""Compute the area of the canvas reserved for each community."""
total_nodes = len(node_to_community)
max_radius = np.linalg.norm(scale) / 2
scalar = max_radius / total_nodes # this is the worst case scenario, where all comunities are lined up like beads on a string; may warrant revisiting
community_to_nodes = _invert_dict(node_to_community)
community_size = {community : len(nodes) * scalar for community, nodes in community_to_nodes.items()}
return community_size
def _get_community_positions(edges, node_to_community, community_size, origin, scale, pad_by):
"""Compute a centroid position for each community."""
# create a weighted graph, in which each node corresponds to a community,
# and each edge weight to the number of edges between communities
between_community_edges = _find_between_community_edges(edges, node_to_community)
# find layout for communities
return get_fruchterman_reingold_layout(
list(between_community_edges.keys()), edge_weight=between_community_edges,
node_size=community_size, origin=origin, scale=scale, pad_by=pad_by,
)
def _find_between_community_edges(edges, node_to_community):
"""Convert the graph into a weighted network of communities."""
between_community_edges = dict()
for (ni, nj) in edges:
ci = node_to_community[ni]
cj = node_to_community[nj]
if ci != cj:
if (ci, cj) in between_community_edges:
between_community_edges[(ci, cj)] += 1
elif (cj, ci) in between_community_edges:
# only compute the undirected graph
between_community_edges[(cj, ci)] += 1
else:
between_community_edges[(ci, cj)] = 1
return between_community_edges
def _get_within_community_positions(edges, node_to_community):
"""Positions nodes within communities."""
community_to_nodes = _invert_dict(node_to_community)
node_positions = dict()
for community, nodes in community_to_nodes.items():
if len(nodes) > 1:
subgraph = _get_subgraph(edges, list(nodes))
if subgraph:
subgraph_node_positions = get_fruchterman_reingold_layout(
subgraph, nodes=nodes, origin=np.array([-1, -1]), scale=np.array([2, 2]))
node_positions.update(subgraph_node_positions)
else:
warnings.warn(f"There are no connections within community {community}. The placement of of nodes within this community is arbitrary.")
node_positions.update({node : np.random.rand(2) * 2 + np.array([-1, -1]) for node in nodes})
elif len(nodes) == 1:
node_positions.update({nodes.pop() : np.array([0., 0.])})
return node_positions
def _combine_positions(node_to_community, community_centroids, community_size, relative_node_positions):
node_positions = dict()
for node, community in node_to_community.items():
xy = community_centroids[community]
delta = relative_node_positions[node] * community_size[community]
node_positions[node] = xy + delta
return node_positions
def _rotate_communities(edges, node_to_community, community_centroids, node_positions, step_size=0.1, max_iterations=200):
between_community_edges = [(source, target) for (source, target) in edges \
if node_to_community[source] != node_to_community[target]]
for _ in range(max_iterations):
# compute torques
community_torque = {community : 0 for community in set(list(node_to_community.values()))}
for (source, target) in between_community_edges:
# source
community = node_to_community[source]
r = community_centroids[community] - node_positions[source]
delta = node_positions[target] - node_positions[source]
F = delta * np.linalg.norm(delta) # direction * distance**2
community_torque[community] += np.cross(r, F)
# target
community = node_to_community[target]
r = community_centroids[community] - node_positions[target]
delta = node_positions[source] - node_positions[target]
F = delta * np.linalg.norm(delta)
community_torque[community] += np.cross(r, F)
# update node positions
for node, community in node_to_community.items():
node_positions[node] = _rotate(step_size * -community_torque[community],
node_positions[node],
community_centroids[community])
return node_positions
def _rotate(angle, points, origin=(0, 0)):
# https://stackoverflow.com/a/58781388/2912349
R = np.array([[np.cos(angle), -np.sin(angle)],
[np.sin(angle), np.cos(angle)]])
origin = np.atleast_2d(origin)
points = np.atleast_2d(points)
return np.squeeze((R @ (points.T-origin.T) + origin.T).T)
[docs]
@_handle_multiple_components
def get_geometric_layout(edges, edge_length, node_size=0., tol=1e-3, origin=(0, 0), scale=(1, 1), pad_by=0.05):
"""Node layout for defined edge lengths but unknown node positions.
Node positions are determined through non-linear optimisation: the
total distance between nodes is maximised subject to the constraint
imposed by the edge lengths, which are used as upper bounds.
If provided, node sizes are used to set lower bounds to minimise collisions.
..note:: This implementation is slow.
Parameters
----------
edges : list
The edges of the graph, with each edge being represented by a (source node ID, target node ID) tuple.
edge_length : dict
Mapping of edges to their lengths.
node_size : scalar or dict, default 0.
Size (radius) of nodes.
Providing the correct node size minimises the overlap of nodes in the graph,
which can otherwise occur if there are many nodes, or if the nodes differ considerably in size.
tol : float, default 1e-3
The tolerance of the cost function. Small values increase the accuracy, large values improve the computation time.
origin : tuple, default (0, 0)
The (float x, float y) coordinates corresponding to the lower left hand corner of the bounding box specifying the extent of the canvas.
scale : tuple, default (1, 1)
The (float x, float y) dimensions representing the width and height of the bounding box specifying the extent of the canvas.
pad_by : float, default 0.05
Padding around node positions to reduce clipping of the node artists with the frame,
and to create space for routing curved edges including self-loops around nodes.
This results in the following bounding box:
xmin, xmax = origin[0] + pad_by * scale[0], origin[0] + scale[0] - pad_by * scale[0]
ymin, ymax = origin[1] + pad_by * scale[1], origin[1] + scale[1] - pad_by * scale[1]
Returns
-------
node_positions : dict
Dictionary mapping each node ID to (float x, float y) tuple, the node position.
"""
# TODO: assert triangle inequality is not violated.
# HOLD: probably not necessary, as minimisation can still proceed when triangle inequality is violated.
# assert that the edges fit within the canvas dimensions
width, height = scale
max_length = np.sqrt(width**2 + height**2)
too_long = dict()
for edge, length in edge_length.items():
if length > max_length:
too_long[edge] = length
if too_long:
msg = f"The following edges exceed the dimensions of the canvas (`scale={scale}`):"
for edge, length in too_long.items():
msg += f"\n\t{edge} : {length}"
msg += "\nEither increase the `scale` parameter, or decrease the edge lengths."
raise ValueError(msg)
# ensure that graph is bi-directional
edges = edges + [(target, source) for (source, target) in edges] # forces copy
edges = list(set(edges))
# upper bound: pairwise distance matrix with unknown distances set to the maximum possible distance given the canvas dimensions
lengths = []
for (source, target) in edges:
if (source, target) in edge_length:
lengths.append(edge_length[(source, target)])
else:
lengths.append(edge_length[(target, source)])
sources, targets = zip(*edges)
nodes = sources + targets
unique_nodes = set(nodes)
indices = range(len(unique_nodes))
node_to_idx = dict(zip(unique_nodes, indices))
source_indices = [node_to_idx[source] for source in sources]
target_indices = [node_to_idx[target] for target in targets]
total_nodes = len(unique_nodes)
distance_matrix = np.full((total_nodes, total_nodes), max_length)
distance_matrix[source_indices, target_indices] = lengths
distance_matrix[np.diag_indices(total_nodes)] = 0
upper_bounds = squareform(distance_matrix)
# lower bound: sum of node sizes
if isinstance(node_size, (int, float)):
sizes = node_size * np.ones((total_nodes))
elif isinstance(node_size, dict):
sizes = np.array([node_size[node] if node in node_size else 0. for node in unique_nodes])
sum_of_node_sizes = sizes[np.newaxis, :] + sizes[:, np.newaxis]
sum_of_node_sizes -= np.diag(np.diag(sum_of_node_sizes)) # squareform requires zeros on diagonal
lower_bounds = squareform(sum_of_node_sizes)
invalid = lower_bounds > upper_bounds
lower_bounds[invalid] = upper_bounds[invalid] - 1e-8
# For an extended discussion of this cost function and alternatives see:
# https://stackoverflow.com/q/75137677/2912349
def cost_function(positions):
return 1 / np.sum(np.log(pdist(positions.reshape((-1, 2))) + 1))
def constraint_function(positions):
positions = np.reshape(positions, (-1, 2))
return pdist(positions)
initial_positions = _initialise_geometric_node_layout(edges, edge_length)
nonlinear_constraint = NonlinearConstraint(constraint_function, lb=lower_bounds, ub=upper_bounds, jac='2-point')
result = minimize(
cost_function,
initial_positions.flatten(),
method='SLSQP',
jac='2-point',
constraints=[nonlinear_constraint],
options=dict(ftol=tol),
)
if not result.success:
print("Warning: could not compute valid node positions for the given edge lengths.")
print(f"scipy.optimize.minimize: {result.message}.")
node_positions_as_array = result.x.reshape((-1, 2))
node_positions_as_array = _fit_to_frame(node_positions_as_array, np.array(origin), np.array(scale), pad_by)
node_positions = dict(zip(unique_nodes, node_positions_as_array))
return node_positions
def _initialise_geometric_node_layout(edges, edge_length):
"""Initialises the node positions using the FR algorithm with weights.
Shorter edges are given a larger weight such that the nodes experience a strong attractive force."""
edge_weight = dict()
for edge, length in edge_length.items():
edge_weight[edge] = 1 / length
node_positions = get_fruchterman_reingold_layout(edges, edge_weight=edge_weight)
return np.array(list(node_positions.values()))
def _fit_to_frame(positions, origin, scale, pad_by):
"""Rotate, rescale and shift a set of positions such that they fit
inside a frame while preserving the relative distances between
them."""
# find major axis
delta = positions[np.newaxis, :] - positions[:, np.newaxis]
distances = np.sum(delta**2, axis=-1)
ii, jj = np.where(np.triu(distances)==np.max(distances))
# use the first if there are several solutions
ii = ii[0]
jj = jj[0]
# pivot around half-way point
pivot = positions[ii] + 0.5 * delta[ii, jj]
angle = _get_angle(*delta[ii, jj])
if scale[0] < scale[1]: # portrait
rotated_positions = _rotate((np.pi/2 - angle) % np.pi, positions, pivot)
else: # landscape
rotated_positions = _rotate(-angle % np.pi, positions, pivot)
# shift to (0, 0)
shifted_positions = rotated_positions - np.min(rotated_positions, axis=0)[np.newaxis, :]
# rescale & center
dx, dy = np.ptp(rotated_positions, axis=0)
if dx/scale[0] < dy/scale[1]:
rescaled_positions = shifted_positions * (1 - 2 * pad_by) * scale[1] / dy
rescaled_positions[:, 0] += (scale[0] - np.ptp(rescaled_positions[:, 0])) / 2
rescaled_positions[:, 1] += pad_by * scale[1]
else:
rescaled_positions = shifted_positions * (1 - 2 * pad_by) * scale[0] / dx
rescaled_positions[:, 0] += pad_by * scale[0]
rescaled_positions[:, 1] += (scale[1] - np.ptp(rescaled_positions[:, 1])) / 2
# shift to origin
reshifted_positions = rescaled_positions + np.array(origin)[np.newaxis, :]
return reshifted_positions