# Incidence matrix and related manipulations
import numpy as np
# Adjacency tensor construction
from itertools import permutations
# User output
import warnings
# Hypergraph metadata
import pandas as pd
import networkx as nx
# Nice printing
from rich import print
# Possibly remove this dependency
import copy
# HAT modules
from HAT import graph
from HAT import export
from HAT import laplacian
[docs]
class Hypergraph:
"""Represents a hypergraph structure, enabling complex multi-way relationships between nodes.
This class implements a hypergraph where edges (or hyperedges) can connect multiple vertices.
Internally, the hypergraph can be represented by an incidence matrix, an adjacency tensor, or an
edge list, depending on the available inputs and the uniformity of the hypergraph.
Formally, a hypergraph :math:`H=(V,E)` is defined by a set of vertices :math:`V` and a set of
edges :math:`E`, where each edge :math:`e \\in E` may contain any subset of vertices in :math:`V`.
Unlike traditional graphs, hypergraph edges can connect more than two vertices, allowing for
multi-way interactions. Uniform hypergraphs, where all edges have the same number of vertices,
can be efficiently represented using tensors.
Attributes
----------
edge_list : list of lists or None
Stores the list of edges if provided during initialization.
adjacency_tensor : ndarray or None
Stores the adjacency tensor if provided during initialization.
incidence_matrix : ndarray or None
Stores the incidence matrix if generated or provided during initialization.
nodes : ndarray or None
Array of nodes (vertices) in the hypergraph.
edges : ndarray or None
Array of edges (hyperedges) in the hypergraph.
uniform : bool
Indicates if the hypergraph is uniform.
k : int
The degree of uniformity in terms of edge size, if applicable.
directed : bool, default=False
Indicates if the hypergraph is directed.
verbose : bool, default=False
Additional warning messages display when true
"""
def __init__(self,
edge_list=None,
adjacency_tensor=None,
incidence_matrix=None,
nodes=None,
edges=None,
uniform=None,
order=None,
directed=None,
compress=True,# this argument tells the constructor to rearrange the numerical
# representation based on the set .edges dataframe.
verbose=False
):
# Auth: Joshua Pickard
# jpic@umich.edu
# Date: Nov 30, 2022
# print("Hypergraph constructor!")
# Validate arguments
self._validate_constructor_arguments(
edge_list = edge_list,
adjacency_tensor = adjacency_tensor,
incidence_matrix = incidence_matrix,
nodes = nodes,
edges = edges,
uniform = uniform,
order = order,
directed = directed
)
# Convert all nodes to integers, and build a map between integers and names
name_to_int = None
if edge_list is not None:
new_edge_list, name_to_int = convert_nodes_to_integers(edge_list)
original_edge_list = copy.deepcopy(edge_list)
edge_list = new_edge_list
# Assign object data
self._edge_list = edge_list
self._adjacency_tensor = adjacency_tensor
self._incidence_matrix = incidence_matrix
self._nodes = nodes
self._edges = edges
self._uniform = uniform
self._order = order
self._directed = directed
self._verbose = verbose
self._reset = {
'adjacency tensor': True,
'incidence matrix': True,
'edge list': True
}
if self._adjacency_tensor is not None:
self._reset['adjacency tensor'] = False
if self._incidence_matrix is not None:
self._reset['incidence matrix'] = False
if self._edge_list is not None:
self._reset['edge list'] = False
# Set uniform and order of the hypergraph
self._detect_uniform_and_order(uniform, order)
# Set directed property of the hypergraph
self._detect_directed(directed)
# Set the nodes dataframe
if self._nodes is None:
flat_set = None
if self._adjacency_tensor is not None:
num_nodes = self._adjacency_tensor.shape[0]
elif self._edge_list is not None:
if isinstance(edge_list[0][0], list):
flat_list = [item for sublist in edge_list for subsublist in sublist for item in subsublist]
flat_set = list(set(flat_list))
num_nodes = len(flat_set)
else:
flat_set = list(set([j for edge in edge_list for j in edge]))
num_nodes = len(flat_set)
elif self._incidence_matrix is not None:
num_nodes = self.incidence_matrix.shape[0]
self._nodes = pd.DataFrame({'Nodes': np.arange(num_nodes)})
if name_to_int is not None:
int_to_name = {v: k for k, v in name_to_int.items()}
node_labels = []
for k in self._nodes['Nodes'].values:
node_labels.append(int_to_name[k])
self._nodes['Names'] = node_labels
elif 'Nodes' not in self._nodes.columns:
self._nodes['Nodes'] = np.arange(self._nodes.shape[0])
warnings.warn('`Nodes` column not found in the provided in self._nodes. It has been added') if self._verbose else None
# Set the edges dataframe
if self._edges is None:
# Determine the number of edges, the nodes, the head, and the tail
num_edges, edge_nodes, head, tail = 0, [], [], []
if self._edge_list is not None:
num_edges = len(self._edge_list)
if self._directed:
for edge in self._edge_list:
edge_nodes.append(sorted(edge[0] + edge[1]))
head.append(sorted(edge[0]))
tail.append(sorted(edge[1]))
else:
edge_nodes = self._edge_list
elif self._incidence_matrix is not None:
num_edges = self.incidence_matrix.shape[1]
for iedge in range(num_edges):
edge_nodes.append(np.where(self._incidence_matrix[:,iedge] != 0)[0])
if self._directed:
head.append(np.where(self._incidence_matrix[:,iedge] > 0)[0])
tail.append(np.where(self._incidence_matrix[:,iedge] < 0)[0])
elif self._adjacency_tensor is not None:
compress = True
idxs = np.where(self._adjacency_tensor != 0)
head = [[node] for node in list(idxs[0])]
for iedge in range(len(idxs[0])):
tail.append(sorted([idxs[k][iedge] for k in range(1,len(idxs))]))
edge_nodes.append(sorted([head[iedge][0]] + tail[-1]))
num_edges = len(head)
if self.directed:
self._edges = pd.DataFrame(
{
'Nodes' : edge_nodes,
"head" : head,
"tail" : tail
}
)
else:
self._edges = pd.DataFrame(
{
'Nodes' : edge_nodes,
}
)
if 'Edges' not in self._edges.columns:
self.edges['Edges'] = list(np.arange(self._edges.shape[0]))
# = pd.DataFrame({'Edges': list(np.arange(self._edges.shape[0]))})
warnings.warn('"Edges" column not found in the provided nodes dataframe.') if self._verbose else None
warnings.warn('This column has been appended.') if self._verbose else None
if compress:
# TODO: filter self._edges to achieve:
# 1. no duplicate rows (done)
# 2. no duplicate tails (if directed)
# Identify duplicate rows based on string representations
self._edges['row_hash'] = self._edges.drop(["Edges"], axis=1).apply(lambda row: str(row.values), axis=1)
self._edges = self._edges.drop_duplicates(subset=['row_hash']).drop(columns=['row_hash'])
self.edges['Edges'] = list(np.arange(self._edges.shape[0]))
# reset the incidence matrix, edge list, or adjacency tensor based on self.edges
if incidence_matrix is not None:
self._set_incidence_matrix()
if edge_list is not None:
self._set_edge_list()
if adjacency_tensor is not None:
self._set_adjacency_tensor()
def _validate_constructor_arguments(
self,
edge_list=None,
adjacency_tensor=None,
incidence_matrix=None,
nodes=None,
edges=None,
uniform=None,
order=None,
directed=None
):
'''
This appears to do nothing and can be killed
'''
# At least one numerical representation should be supplied
if edge_list is None and adjacency_tensor is None and incidence_matrix is None and edges is None:
warnings.warn("The edge list, incidence matrix, adjacency tensor, and edge DataFrame are None.", UserWarning) if self._verbose else None
if order is not None and not (uniform == True):
warnings.warn("If the hypergraph order is fixed then it should be k-uniform") if self._verbose else None
def _detect_uniform_and_order(self, uniform, order):
if self._adjacency_tensor is not None:
self._uniform = True
self._order = len(self._adjacency_tensor.shape)
elif self._incidence_matrix is not None:
nonzero_counts = np.count_nonzero(self._incidence_matrix, axis=0)
self._uniform = np.all(nonzero_counts == nonzero_counts[0])
self._order = nonzero_counts[0]
elif self._edge_list is not None:
# Determine if the hypergraph is directed
if not isinstance(self._edge_list[0][0], list): # (undirected)
k = len(self._edge_list[0])
self._uniform = True
self._order = k
for e in self._edge_list:
if len(e) != k:
self._uniform = False
break
else: # (directed)
k = len(self._edge_list[0][0])
self._uniform = True
self._order = k
for e in self._edge_list:
if len(e[0]) != k:
self._uniform = False
break
if self._uniform == False:
self._order = -1
if self._uniform != uniform and uniform is not None:
warnings.warn('The provided and detected `uniform` are not in agreement!') if self._verbose else None
if self._order != order and order is not None:
warnings.warn('The provided and detected `order` are not in agreement!') if self._verbose else None
def _detect_directed(self, directed):
if self._edges is not None and "head" in self._edges.columns and "tail" in self._edges.columns:
self._directed = True
# The adjacency tensor is directed
elif self._adjacency_tensor is not None:
self._directed = True
# The indicence matrix is directed if there are +/- terms
elif self._incidence_matrix is not None and np.sum(self._incidence_matrix > 0) > 0 and np.sum(self._incidence_matrix < 0) > 0:
self._directed = True
# The edge list is directed if an edge has the form: [[head], [tail]]
elif self._edge_list is not None:
self._directed = True
for edge in self._edge_list:
if not (len(edge) >= 2 and isinstance(edge[0], list) and isinstance(edge[1], list)):
self._directed = False
# By default, the system is undirected
else:
self._directed = False
if self._directed != directed and directed is not None:
warnings.warn('The provided and detected `directed` are not in agreement!') if self._verbose else None
@property
def nodes(self):
return self._nodes
@property
def edges(self):
return self._edges
@property
def directed(self):
return self._directed
@property
def nnodes(self):
"""Returns the number of nodes (vertices) in the hypergraph.
Returns
-------
int
Number of nodes in the hypergraph.
"""
return self._nodes.shape[0] if self._nodes is not None else 0
@property
def nedges(self):
"""Returns the number of edges (hyperedges) in the hypergraph.
Returns
-------
int or bool
Number of edges if available, otherwise False if edges are not defined.
"""
return self.edges.shape[0]
@property
def edge_weights(self):
if 'weight' not in self.edges.columns:
self.edges['weight'] = np.ones((self.nedges,))
return np.array(self.edges['weight'].values)
@property
def node_weights(self):
if 'weight' not in self.nodes.columns:
self.nodes['weight'] = np.ones((self.nnodes,))
return np.array(self.nodes['weight'].values)
@property
def uniform(self):
"""Returns if the hypergraph was uniform."""
return self._uniform
@property
def order(self):
return self._order
@property
def incidence_matrix(self):
"""Returns the incidence matrix of the hypergraph, constructing it if necessary."""
if self._incidence_matrix is None or self._reset['incidence matrix']:
self._set_incidence_matrix() # Automatically set if not yet defined
self._reset['incidence matrix'] = False
return self._incidence_matrix
@property
def edge_list(self):
"""Returns the edge list of the hypergraph, constructing it if necessary."""
if self._edge_list is None or self._reset['edge list']:
self._set_edge_list() # Automatically set if not yet defined
self._reset['edge list'] = False
return self._edge_list
@property
def adjacency_tensor(self):
"""Returns the adjacency tensor of the hypergraph, constructing it if necessary."""
if self._adjacency_tensor is None or self._reset['adjacency tensor']:
self._set_adjacency_tensor() # Automatically set if not yet defined
self._reset['adjacency tensor'] = False
return self._adjacency_tensor
@property
def star_graph(self):
return graph.star_graph(self)
@property
def clique_graph(self):
return graph.clique_graph(self)
@property
def connected_components(self):
G = self.clique_graph
components = list(nx.connected_components(G))
node_to_component = {}
for component_id, component_nodes in enumerate(components):
for node in component_nodes:
node_to_component[node] = component_id
connected_components = []
for i in range(self._nodes.shape[0]):
connected_components.append(node_to_component[i])
self._nodes['connected components'] = connected_components
return self._nodes[['Nodes', 'connected components']]
@property
def laplacian_matrix(self, laplacian_type='bolla'):
return laplacian.laplacian_matrix(self, laplacian_type=laplacian_type)
def _set_incidence_matrix(self):
"""Sets self._incidence_matrix based on self._edges
"""
incidence_matrix = np.zeros((self.nnodes, self.nedges))
for iedge in range(self.nedges):
if self.directed:
tail = self.edges["tail"].iloc[iedge]
head = self.edges["head"].iloc[iedge]
for node in tail:
incidence_matrix[node, iedge] = -1
for node in head:
incidence_matrix[node, iedge] = 1
else:
nodes = self.edges['Nodes'].iloc[iedge]
for node in nodes:
incidence_matrix[node, iedge] = 1
self._incidence_matrix = incidence_matrix
def _set_edge_list(self):
"""Sets self._edge_lsit based on self._edges
"""
edge_list = []
if self.directed:
for iedge in range(self.nedges):
edge_list.append(list([self.edges["head"].iloc[iedge], self.edges["tail"].iloc[iedge]]))
else:
for iedge in range(self.nedges):
edge_list.append(list(self.edges['Nodes'].iloc[iedge]))
self._edge_list = edge_list
def _set_adjacency_tensor(self):
"""Sets self._adjacency_tensor based on self._edges
"""
adjacency_tensor = np.zeros((self.nnodes,) * self.order, dtype=int)
if self.directed:
for iedge in range(self.nedges):
tail = self.edges["tail"].iloc[iedge]
head = self.edges["head"].iloc[iedge]
if not isinstance(head, list):
head = [head]
if not isinstance(tail, list):
tail = [tail]
for head_node in head:
for perm in permutations(tail):
adjacency_tensor[tuple([head_node] + list(perm))] = 1
else:
for iedge in range(self.nedges):
edge_nodes = self.edges['Nodes'].iloc[iedge]
for perm in permutations(edge_nodes):
adjacency_tensor[tuple(perm)] = 1
self._adjacency_tensor = adjacency_tensor
def _remove_duplicates(self, list_of_lists):
if isinstance(list_of_lists[0][0], int):
return list_of_lists
# Use a set to track unique items
unique_items = set()
deduplicated_list = []
for item in list_of_lists:
# Convert the two inner lists to a tuple of sorted tuples
normalized_item = tuple(sorted(map(tuple, item)))
if normalized_item not in unique_items:
unique_items.add(normalized_item)
deduplicated_list.append(item)
return deduplicated_list
[docs]
def add_node(self, properties=None):
"""
Adds a new node to `self._nodes`. Properties is a dictionary specifying
values for the columns in `self._nodes`. Missing properties are filled
with `None`.
Parameters:
properties (dict, optional): A dictionary containing column names as keys
and their corresponding values. Defaults to None.
"""
if properties is None:
properties = {}
properties['Nodes'] = self.nnodes
# Convert properties to a DataFrame with one row
new_row = pd.DataFrame([properties])
# Align new_row columns with self._nodes, filling missing columns with None
self._nodes = pd.concat([self._nodes, new_row])
self._nodes.reset_index(drop=True, inplace=True)
# Representations must be reset based on the dataframes
for key in self._reset.keys():
self._reset[key] = True
[docs]
def add_edge(self, nodes, properties=None):
# Validate nodes
if isinstance(nodes[0], list):
for node in nodes[0] + nodes[1]:
if node not in self.nodes.index:
warnings.warn(f'Node {node} not found in the hypergraph') if self._verbose else None
else:
for node in nodes:
if node not in self.nodes.index:
warnings.warn(f'Node {node} not found in the hypergraph') if self._verbose else None
# Initialize properties if None
properties = properties or {}
# Handle directed vs. undirected edges
if self.directed:
if len(nodes) < 2:
raise ValueError("Directed edges require at least two nodes (Head and Tail).")
properties["head"] = nodes[0]
properties["tail"] = nodes[1]
properties['Nodes'] = nodes[0] + nodes[1]
else:
properties['Nodes'] = nodes
properties['Edges'] = self.nedges
# Append the edge to the DataFrame
new_row = pd.DataFrame([properties])
self._edges = pd.concat([self._edges, new_row], ignore_index=True)
self._edges.reset_index(drop=True, inplace=True)
# Representations must be reset based on the dataframes
for key in self._reset.keys():
self._reset[key] = True
@property
def dual(self):
"""The dual hypergraph is constructed.
Let :math:`H=(V,E)` be a hypergraph. In the dual hypergraph each original edge :math:`e in E`
is represented as a vertex and each original vertex :math:`v in E` is represented as an edge. Numerically, the
transpose of the incidence matrix of a hypergraph is the incidence matrix of the dual hypergraph.
:return: Hypergraph object
:rtype: *Hypergraph*
References
----------
- Yang, Chaoqi, et al. "Hypergraph learning with line expansion." arXiv preprint arXiv:2005.04843 (2020).
"""
# Auth: Joshua Pickard
# jpic@umich.edu
# Date: Nov 30, 2022
return Hypergraph(incidence_matrix=self.incidence_matrix.T)
@property
def hif(self):
export.to_hif(self)
@property
def hypernetx(self):
return export.to_hypernetx(self)
[docs]
@classmethod
def from_hypernetx(cls, HG_hnx):
node_df = pd.DataFrame(
{
'Nodes' : np.arange(HG_hnx.dataframe['nodes'].nunique()),
'Names': list(HG_hnx.dataframe['nodes'].unique())
}
)
edge_list, node_names, weight, properties = [], [], [], []
for _, df in HG_hnx.dataframe.groupby('edges'):
node_names.append(list(df['nodes'].unique()))
idxs = []
for node in node_names[-1]:
idxs.append(np.where(node_df['Names'] == node)[0][0])
edge_list.append(idxs)
if 'weight' in df.columns:
weight.append(df['weight'].iloc[0])
else:
weight.append(1)
edge_df = pd.DataFrame(
{
'Edges' : np.arange(len(edge_list)),
'Nodes' : edge_list,
'Node Names' : node_names,
'Weight': weight
}
)
HG = Hypergraph(
edge_list = edge_list,
nodes = node_df,
edges = edge_df
)
return HG
[docs]
@classmethod
def from_networkx(cls, nxg):
import networkx as nx
node_data = list(nxg.nodes(data=True))
nodes_df = pd.DataFrame(node_data, columns=['node', 'attributes'])
nodes_df['Nodes'] = [i for i in range(len(node_data))]
nodes, head, tail = [], [], []
edges_df = nx.to_pandas_edgelist(nxg)
edge_list = []
if isinstance(nxg, nx.classes.digraph.DiGraph):
directed = True
for edge in nxg.edges:
i0 = nodes_df[nodes_df['node'] == edge[0]]['Nodes'].iloc[0]
i1 = nodes_df[nodes_df['node'] == edge[1]]['Nodes'].iloc[0]
tail.append(i0)
head.append(i1)
nodes.append([i0, i1])
edge_list.append([[i0], [i1]])
edges_df['head'] = head
edges_df['tail'] = tail
else:
directed=False
for edge in nxg.edges:
i0 = nodes_df[nodes_df['node'] == edge[0]]['Nodes'].iloc[0]
i1 = nodes_df[nodes_df['node'] == edge[1]]['Nodes'].iloc[0]
nodes.append([i0, i1])
edge_list.append([i0, i1])
edges_df['Nodes'] = nodes
HG = Hypergraph(
nodes = nodes_df,
edges = edges_df,
edge_list=edge_list,
compress=False,
directed=directed
)
return HG
[docs]
@classmethod
def from_hif(cls, hif):
# Create nodes dataframe
nodes = pd.DataFrame(hif['nodes'])
if 'node' in nodes.columns:
rename_column_nodes = list(nodes.columns).index('node')
columns_nodes = list(nodes.columns)
columns_nodes[rename_column_nodes] = 'Nodes'
nodes.columns = columns_nodes
# Create edges dataframe
edges = pd.DataFrame(hif['edges'])
if 'nodes' in edges.columns:
rename_column_edges = list(edges.columns).index('nodes')
columns_edges = list(edges.columns)
columns_edges[rename_column_edges] = 'Nodes'
edges.columns = columns_edges
if 'attrs' in edges.columns:
attrs_expanded = pd.json_normalize(edges['attrs'])
edges = pd.concat([edges.drop(columns=['attrs']), attrs_expanded], axis=1)
if 'attrs' in nodes.columns:
attrs_expanded = pd.json_normalize(nodes['attrs'])
nodes = pd.concat([nodes.drop(columns=['attrs']), attrs_expanded], axis=1)
# Create the edge list
edge_list = [[] for _ in range(edges.shape[0])]
for incidence in hif['incidences']:
edge = incidence['edge']
node = incidence['node']
edge_idx = list(edges['edge'].values).index(edge)
node_idx = list(nodes['Nodes'].values).index(node)
edge_list[edge_idx].append(node_idx)
edges['Nodes'] = edge_list
HG = Hypergraph(
nodes = nodes,
edges = edges,
edge_list=edge_list,
compress=False
)
return HG
[docs]
def to_hif(self):
"""Converts HAT.Hypergraph to Hypergraph Interchange Format (HIF)
"""
network_type = 'undirected'
metadata = {
'uniform' : self.uniform,
'order' : self.order,
'directed' : self.directed
}
incidence, nodes, edges = [], [], []
for inode in range(self.nodes.shape[0]):
node = {
'node': inode,
# 'weight':None,
'attrs': {}
}
for property in self.nodes.columns:
if property in ['Nodes']:
continue
elif property == 'weight':
node['weight'] = self.nodes[property].iloc[inode]
else:
node['attrs'][property] = self.nodes[property].iloc[inode]
nodes.append(node)
for iedge in range(self.edges.shape[0]):
incident_nodes = list(self.edges['Nodes'].iloc[iedge])
edge = {
'edge' : iedge,
# 'weight': None,
'attrs': {
'nodes' : incident_nodes
}
}
edge_incidence = {
'edge' : iedge,
'node' : None,
# 'weight': None,
# 'direction': None,
'attrs': {}
}
for property in self.edges.columns:
if property in ['Nodes', "head", "tail"]:
continue
if property == 'weight':
edge['weight'] = self.edges[property].iloc[iedge]
edge_incidence['weight'] = self.edges[property].iloc[iedge]
else:
edge['attrs'][property] = self.edges[property].iloc[iedge]
edge_incidence['attrs'][property] = self.edges[property].iloc[iedge]
if self.directed:
edge['attrs']['head'] = self.edges['head'].iloc[iedge]
edge['attrs']['tail'] = self.edges['tail'].iloc[iedge]
edges.append(edge)
for node in incident_nodes:
edge_incidence['node'] = node
if self.directed:
if node in edge['attrs']['head']:
edge_incidence['direction'] = 'head'
else:
edge_incidence['direction'] = 'tail'
incidence.append(edge_incidence.copy())
hif = {
'network-type': network_type,
'metadata': metadata,
'nodes': nodes,
'edges': edges,
'incidences': incidence
}
return hif
@property
def hypergraphx(self):
return export.to_hypergraphx(self)
[docs]
def convert_nodes_to_integers(edge_list):
# Step 1: Check if the lowest level contains integers
# Flatten the edge_list and check if any of the elements are integers
all_elements = [elem for sublist in edge_list for elem in sublist]
# If all elements are integers, no need to do anything
if all(isinstance(elem, int) for elem in all_elements):
return edge_list, None # No changes, return original edge_list
# Step 2: Map each unique name to an integer if names are present
# Create a mapping from names to unique integers
unique_names = set(all_elements)
name_to_int = {name: idx for idx, name in enumerate(unique_names)}
# Step 3: Create a new edge_list with integer mappings
new_edge_list = [[name_to_int[name] for name in edge] for edge in edge_list]
# Step 4: Return the new edge_list and the mapping
return new_edge_list, name_to_int