# -*- coding: utf-8 -*-
import logging
import utool as ut
import networkx as nx
import itertools as it
from wbia.algo.graph.nx_utils import edges_inside, e_
print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')
[docs]class GraphHelperMixin(ut.NiceRepr):
def __nice__(self):
return 'nNodes={}, nEdges={}'.format(
self.number_of_nodes(), self.number_of_edges()
)
[docs] def has_nodes(self, nodes):
return (self.has_node(node) for node in nodes)
[docs] def has_edges(self, edges):
return (self.has_edge(*edge) for edge in edges)
[docs] def edges(self, nbunch=None, data=False, default=None):
# Force edges to always be returned in upper triangular form
edges = super(GraphHelperMixin, self).edges(nbunch, data, default)
if data:
return (e_(u, v) + (d,) for u, v, d in edges)
else:
return (e_(u, v) for u, v in edges)
[docs]class NiceGraph(nx.Graph, GraphHelperMixin):
pass
[docs]class nx_UnionFind(object):
"""
Based of nx code
"""
def __init__(self, elements=None):
if elements is None:
elements = ()
self.parents = {}
self.weights = {}
self.add_elements(elements)
[docs] def clear(self):
self.parents = {}
self.weights = {}
def __getitem__(self, element):
# check for previously unknown element
if self.add_element(element):
return element
# find path of objects leading to the root
path = [element]
root = self.parents[element]
while root != path[-1]:
path.append(root)
root = self.parents[root]
# compress the path and return
for ancestor in path:
self.parents[ancestor] = root
return root
def __iter__(self):
return iter(self.parents)
[docs] def rebalance(self, elements=None):
if elements is None:
elements = list(self.parents.keys())
# Make sure only one operation is needed to lookup any node
for x in elements:
parent = self[x]
self.parents[x] = parent
self.weights[x] = 1
[docs] def to_sets(self):
yield from it.groups(self.parents).values()
[docs] def union(self, *objects):
"""Find the sets containing the objects and merge them all."""
roots = [self[x] for x in objects]
# HACK: use the lowest node number to preserve
# node labels through cuts. (has some runtime penalty)
if False:
# Find the best root with the maximum weight
best = max(roots, key=lambda r: self.weights[r])
else:
best = min(roots)
for r in roots:
if r != best:
self.weights[best] += self.weights[r]
self.parents[r] = best
[docs] def remove_entire_cc(self, elements):
# NOTE: this will not work in general. This only
# works if all elements are a unique component.
for x in elements:
del self.weights[x]
del self.parents[x]
[docs] def add_element(self, x):
if x not in self.parents:
self.weights[x] = 1
self.parents[x] = x
return True
return False
[docs] def add_elements(self, elements):
for x in elements:
if x not in self.parents:
self.weights[x] = 1
self.parents[x] = x
[docs]class DynConnGraph(nx.Graph, GraphHelperMixin):
"""
Dynamically connected graph.
Maintains a data structure parallel to a normal networkx graph that
maintains dynamic connectivity for fast connected compoment queries.
Underlying Data Structures and limitations are
Data Structure | Insertion | Deletion | CC Find |
-----------------------------------------------------
* UnionFind | lg(n) | n | No
* UnionFind2 | n* | n | 1
* EulerTourForest | lg^2(n) | lg^2(n) | lg(n) / lglg(n) - - Ammortized
* it seems to be very quick
References:
https://courses.csail.mit.edu/6.851/spring14/lectures/L20.pdf
https://courses.csail.mit.edu/6.851/spring14/lectures/L20.html
http://cs.stackexchange.com/questions/33595/maintaining-connecte
https://en.wikipedia.org/wiki/Dynamic_connectivity#Fully_dynamic_connectivity
CommandLine:
python -m wbia.algo.graph.nx_dynamic_graph DynConnGraph
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.nx_dynamic_graph import * # NOQA
>>> self = DynConnGraph()
>>> self.add_edges_from([(1, 2), (2, 3), (4, 5), (6, 7), (7, 4)])
>>> self.add_edges_from([(10, 20), (20, 30), (40, 50), (60, 70), (70, 40)])
>>> self._ccs
>>> u, v = 20, 1
>>> assert self.node_label(u) != self.node_label(v)
>>> assert self.connected_to(u) != self.connected_to(v)
>>> self.add_edge(u, v)
>>> assert self.node_label(u) == self.node_label(v)
>>> assert self.connected_to(u) == self.connected_to(v)
>>> self.remove_edge(u, v)
>>> assert self.node_label(u) != self.node_label(v)
>>> assert self.connected_to(u) != self.connected_to(v)
>>> ccs = list(self.connected_components())
>>> ut.quit_if_noshow()
>>> import wbia.plottool as pt
>>> pt.qtensure()
>>> pt.show_nx(self)
# todo: check if nodes exist when adding
"""
def __init__(self, *args, **kwargs):
# raise NotImplementedError('unfinished')
self._ccs = {}
self._union_find = nx_UnionFind()
super(DynConnGraph, self).__init__(*args, **kwargs)
[docs] def clear(self):
super(DynConnGraph, self).clear()
self._ccs = {}
self._union_find.clear()
def __nice__(self):
return 'nNodes={}, nEdges={}, nCCs={}'.format(
self.number_of_nodes(), self.number_of_edges(), self.number_of_components()
)
[docs] def number_of_components(self):
return len(self._ccs)
[docs] def component(self, label):
return self._ccs[label]
component_nodes = component
[docs] def connected_to(self, node):
return self._ccs[self._union_find[node]]
[docs] def node_label(self, node):
"""
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.nx_dynamic_graph import * # NOQA
>>> self = DynConnGraph()
>>> self.add_edges_from([(1, 2), (2, 3), (4, 5), (6, 7)])
>>> assert self.node_label(2) == self.node_label(1)
>>> assert self.node_label(2) != self.node_label(4)
"""
return self._union_find[node]
[docs] def node_labels(self, *nodes):
return [self._union_find[node] for node in nodes]
[docs] def are_nodes_connected(self, u, v):
return ut.allsame(self.node_labels(u, v))
[docs] def connected_components(self):
"""
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.nx_dynamic_graph import * # NOQA
>>> self = DynConnGraph()
>>> self.add_edges_from([(1, 2), (2, 3), (4, 5), (6, 7)])
>>> ccs = list(self.connected_components())
>>> result = 'ccs = {}'.format(ut.repr2(ccs, nl=0))
>>> print(result)
ccs = [{1, 2, 3}, {4, 5}, {6, 7}]
"""
yield from self._ccs.values()
[docs] def component_labels(self):
yield from self._ccs.keys()
# -----
def _cut(self, u, v):
"""Decremental connectivity (slow)"""
old_nid1 = self._union_find[u]
old_nid2 = self._union_find[v]
if old_nid1 != old_nid2:
return
# Need to break appart entire component and then reconstruct it
old_cc = self._ccs[old_nid1]
del self._ccs[old_nid1]
self._union_find.remove_entire_cc(old_cc)
# Might be faster to just do DFS to find the CC
internal_edges = edges_inside(self, old_cc)
# Add nodes in case there are no edges to it
for n in old_cc:
self._add_node(n)
for edge in internal_edges:
self._union(*edge)
def _union(self, u, v):
"""Incremental connectivity (fast)"""
# logger.info('Union ({})'.format((u, v)))
self._add_node(u)
self._add_node(v)
old_nid1 = self._union_find[u]
old_nid2 = self._union_find[v]
self._union_find.union(u, v)
new_nid = self._union_find[u]
for old_nid in [old_nid1, old_nid2]:
if new_nid != old_nid:
parts = self._ccs.pop(old_nid)
# FIXME: this step can be quite bad for time complexity.
# An Euler Tour Tree might solve the issue
self._ccs[new_nid].update(parts)
def _add_node(self, n):
if self._union_find.add_element(n):
# logger.info('Add ({})'.format((n)))
self._ccs[n] = {n}
def _remove_node(self, n):
if n in self._union_find.parents:
del self._union_find.weights[n]
del self._union_find.parents[n]
del self._ccs[n]
[docs] def add_edge(self, u, v, **attr):
"""
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.nx_dynamic_graph import * # NOQA
>>> self = DynConnGraph()
>>> self.add_edges_from([(1, 2), (2, 3), (4, 5), (6, 7), (7, 4)])
>>> assert self._ccs == {1: {1, 2, 3}, 4: {4, 5, 6, 7}}
>>> self.add_edge(1, 5)
>>> assert self._ccs == {1: {1, 2, 3, 4, 5, 6, 7}}
"""
self._union(u, v)
super(DynConnGraph, self).add_edge(u, v, **attr)
[docs] def add_edges_from(self, ebunch, **attr):
ebunch = list(ebunch)
# logger.info('add_edges_from %r' % (ebunch,))
for e in ebunch:
self._union(*e)
super(DynConnGraph, self).add_edges_from(ebunch, **attr)
# ----
[docs] def add_node(self, n, **attr):
self._add_node(n)
super(DynConnGraph, self).add_node(n, **attr)
[docs] def add_nodes_from(self, nodes, **attr):
nodes = list(nodes)
for n in nodes:
self._add_node(n)
super(DynConnGraph, self).add_nodes_from(nodes, **attr)
# ----
[docs] def remove_edge(self, u, v):
"""
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.nx_dynamic_graph import * # NOQA
>>> self = DynConnGraph()
>>> self.add_edges_from([(1, 2), (2, 3), (4, 5), (6, 7), (7, 4)])
>>> assert self._ccs == {1: {1, 2, 3}, 4: {4, 5, 6, 7}}
>>> self.add_edge(1, 5)
>>> assert self._ccs == {1: {1, 2, 3, 4, 5, 6, 7}}
>>> self.remove_edge(1, 5)
>>> assert self._ccs == {1: {1, 2, 3}, 4: {4, 5, 6, 7}}
"""
super(DynConnGraph, self).remove_edge(u, v)
self._cut(u, v)
[docs] def remove_edges_from(self, ebunch):
ebunch = list(ebunch)
super(DynConnGraph, self).remove_edges_from(ebunch)
# Can do this more efficiently for bulk edges
for e in ebunch:
self._cut(*e)
# -----
[docs] def remove_nodes_from(self, nodes):
# remove edges as well
nodes = list(nodes)
for n in nodes:
nbrs = list(self.adj[n].keys())
self.remove_edges_from((n, v) for v in nbrs)
for n in nodes:
self._remove_node(n)
super(DynConnGraph, self).remove_nodes_from(nodes)
[docs] def remove_node(self, n):
r"""
CommandLine:
python -m wbia.algo.graph.nx_dynamic_graph remove_node
Example:
>>> # ENABLE_DOCTEST
>>> from wbia.algo.graph.nx_dynamic_graph import * # NOQA
>>> self = DynConnGraph()
>>> self.add_edges_from([(1, 2), (2, 3), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)])
>>> assert self._ccs == {1: {1, 2, 3}, 4: {4, 5, 6, 7, 8, 9}}
>>> self.remove_node(2)
>>> assert self._ccs == {1: {1}, 3: {3}, 4: {4, 5, 6, 7, 8, 9}}
>>> self.remove_node(7)
>>> assert self._ccs == {1: {1}, 3: {3}, 4: {4, 5, 6}, 8: {8, 9}}
"""
# remove edges as well
nbrs = list(self.adj[n].keys())
self.remove_edges_from((n, v) for v in nbrs)
self._remove_node(n)
super(DynConnGraph, self).remove_node(n)
[docs] @profile
def subgraph(self, nbunch, dynamic=False):
if dynamic is False:
H = nx.Graph()
nbunch = set(nbunch)
H.add_nodes_from(nbunch)
H.add_edges_from(edges_inside(self, nbunch))
else:
H = super(DynConnGraph, self).subgraph(nbunch)
for n in nbunch:
# need to add individual nodes
H._add_node(n)
# Recreate the connected compoment structure
for u, v in H.edges():
H._union(u, v)
return H