Source code for tenpy.linalg.np_conserved

r"""A module to handle charge conservation in tensor networks.

A detailed introduction to this module (including notations) can be found in :doc:`/intro_npc`.

This module `np_conserved` implements a class :class:`Array`
designed to make use of charge conservation in tensor networks.
The idea is that the `Array` class is used in a fashion very similar to
the :class:`numpy.ndarray`, e.g you can call the functions :func:`tensordot` or :func:`svd`
(of this module) on them.
The structure of the algorithms (as DMRG) is thus the same as with basic numpy ndarrays.

Internally, an :class:`Array` saves charge meta data to keep track of blocks which are nonzero.
All possible operations (e.g. tensordot, svd, ...) on such arrays preserve the total charge
structure. In addition, these operations make use of the charges to figure out which of the blocks
it has to use/combine - this is the basis for the speed-up.

Overview
^^^^^^^^

.. rubric:: Classes

.. autosummary::
    Array
    ~tenpy.linalg.charges.ChargeInfo
    ~tenpy.linalg.charges.LegCharge
    ~tenpy.linalg.charges.LegPipe

.. rubric :: Array creation

.. autosummary::
    Array.from_ndarray_trivial
    Array.from_ndarray
    Array.from_func
    Array.from_func_square
    zeros
    eye_like
    diag

.. rubric:: Concatenation

.. autosummary::
    concatenate
    grid_concat
    grid_outer

.. rubric:: Detecting charges of flat arrays

.. autosummary::
    detect_qtotal
    detect_legcharge
    detect_grid_outer_legcharge

.. rubric:: Contraction of some legs

.. autosummary::
    tensordot
    outer
    inner
    trace

.. rubric:: Linear algebra

.. autosummary::
    svd
    pinv
    norm
    qr
    expm

.. rubric:: Eigen systems

.. autosummary::
    eigh
    eig
    eigvalsh
    eigvals
    speigs

"""
# Copyright 2018-2019 TeNPy Developers, GNU GPLv3

import numpy as np
import scipy.linalg
from scipy.linalg import blas as BLAS  # python interface to BLAS
import warnings
import itertools
from numbers import Integral

# import public API from charges
from .charges import ChargeInfo, LegCharge, LegPipe
from . import charges  # for private functions
from .svd_robust import svd as svd_flat

from ..tools.misc import to_iterable, anynan, argsort, inverse_permutation, list_to_dict_list
from ..tools.math import speigs as _sp_speigs
from ..tools.math import qr_li, rq_li
from ..tools.string import vert_join, is_non_string_iterable
from ..tools.optimization import optimize, OptimizationFlag, use_cython

__all__ = [
    'QCUTOFF', 'ChargeInfo', 'LegCharge', 'LegPipe', 'Array', 'zeros', 'eye_like', 'diag',
    'concatenate', 'grid_concat', 'grid_outer', 'detect_grid_outer_legcharge', 'detect_qtotal',
    'detect_legcharge', 'trace', 'outer', 'inner', 'tensordot', 'svd', 'pinv', 'norm', 'eigh',
    'eig', 'eigvalsh', 'eigvals', 'speigs', 'qr', 'expm', 'to_iterable_arrays'
]

#: A cutoff to ignore machine precision rounding errors when determining charges
QCUTOFF = np.finfo(np.float64).eps * 10

# the type used for charges
QTYPE = charges.QTYPE

# ##################################
# Array class
# ##################################


[docs]class Array: r"""A multidimensional array (=tensor) for using charge conservation. An `Array` represents a multi-dimensional tensor, together with the charge structure of its legs (for abelian charges). Further information can be found in :doc:`/intro_npc`. The default :meth:`__init__` (i.e. ``Array(...)``) does not insert any data, and thus yields an Array 'full' of zeros, equivalent to :func:`zeros()`. Further, new arrays can be created with one of :meth:`from_ndarray_trivial`, :meth:`from_ndarray`, or :meth:`from_func`, and of course by copying/tensordot/svd etc. In-place methods are indicated by a name starting with ``i``. (But `is_completely_blocked` is not inplace...) Parameters ---------- legcharges : list of :class:`~tenpy.linalg.charges.LegCharge` The leg charges for each of the legs. The :class:`ChargeInfo` is read out from it. dtype : type or string The data type of the array entries. Defaults to np.float64. qtotal : 1D array of QTYPE The total charge of the array. Defaults to 0. Attributes ---------- size stored_blocks rank : int The rank or "number of dimensions", equivalent to ``len(shape)``. shape : tuple(int) The number of indices for each of the legs. dtype : np.dtype The data type of the entries. chinfo : :class:`~tenpy.linalg.charges.ChargeInfo` The nature of the charge. qtotal : 1D array The total charge of the tensor. legs : list of :class:`~tenpy.linalg.charges.LegCharge` The leg charges for each of the legs. labels : dict (string -> int) Labels for the different legs. _data : list of arrays The actual entries of the tensor. _qdata : 2D array (len(_data), rank), dtype np.intp For each of the _data entries the qindices of the different legs. _qdata_sorted : Bool Whether self._qdata is lexsorted. Defaults to `True`, but *must* be set to `False` by algorithms changing _qdata. """ def __init__(self, legcharges, dtype=np.float64, qtotal=None): """see help(self)""" self.legs = list(legcharges) if len(self.legs) == 0: raise ValueError("can't have 0-rank Tensor") self.chinfo = self.legs[0].chinfo self._set_shape() self.dtype = np.dtype(dtype) self.qtotal = self.chinfo.make_valid(qtotal) self._labels = [None] * len(self.legs) self._data = [] self._qdata = np.empty((0, self.rank), dtype=np.intp, order='C') self._qdata_sorted = True self.test_sanity()
[docs] def copy(self, deep=True): """Return a (deep or shallow) copy of self. **Both** deep and shallow copies will share ``chinfo`` and the `LegCharges` in ``legs``. In contrast to a deep copy, the shallow copy will also share the tensor entries, namely the *same* instances of ``_qdata`` and ``_data`` and ``labels`` (and other 'immutable' properties like the shape or dtype). .. note :: Shallow copies are *not* recommended unless you know the consequences! See the following examples illustrating some of the pitfalls. Examples -------- Be (very!) careful when making non-deep copies: In the following example, the original `a` is changed if and only if the corresponding block existed in `a` before. >>> b = a.copy(deep=False) # shallow copy >>> b[1, 2] = 4. Other `inplace` operations might have no effect at all (although we don't guarantee that): >>> a *= 2 # has no effect on `b` >>> b.iconj() # nor does this change `a` """ cp = Array.__new__(Array) cp.__setstate__(self.__getstate__()) cp.legs = list(self.legs) # different list but same instances cp._set_shape() cp._labels = cp._labels[:] # list copy if deep: cp._data = [b.copy() for b in self._data] cp._qdata = self._qdata.copy('C') cp.qtotal = self.qtotal.copy() # even deep copies share legs & chinfo (!) else: cp._data = self._data[:] return cp
def __getstate__(self): """Allow to pickle and copy.""" return self.__dict__ def __setstate__(self, state): """Allow to pickle and copy.""" # order is important for import of old version! if isinstance(state, dict): # allow to import from the non-compiled version self.__dict__.update(state) self._set_shape() elif isinstance(state, tuple): # allow to import from the compiled versions of TenPy 0.3.0 self._data, self._qdata, self._qdata_sorted, self.chinfo, self.dtype, labels, \ self.legs, self.qtotal, self.rank, self.shape = state self.labels = labels # property, requires rank to be set already else: raise ValueError("setstate with incompatible type of state")
[docs] @classmethod def from_ndarray_trivial(cls, data_flat, dtype=None): """convert a flat numpy ndarray to an Array with trivial charge conservation. Parameters ---------- data_flat : array_like The data to be converted to a Array. dtype : ``np.dtype`` The data type of the array entries. Defaults to dtype of `data_flat`. Returns ------- res : :class:`Array` An Array with data of data_flat. """ data_flat = np.asarray(data_flat) # unspecified dtype if dtype is None: dtype = data_flat.dtype data_flat = data_flat.astype(dtype, copy=False) chinfo = ChargeInfo() legs = [LegCharge.from_trivial(s, chinfo) for s in data_flat.shape] res = cls(legs, dtype) res._data = [data_flat] res._qdata = np.zeros((1, res.rank), np.intp) res._qdata_sorted = True res.test_sanity() return res
[docs] @classmethod def from_ndarray(cls, data_flat, legcharges, dtype=None, qtotal=None, cutoff=None): """convert a flat (numpy) ndarray to an Array. Parameters ---------- data_flat : array_like The flat ndarray which should be converted to a npc `Array`. The shape has to be compatible with legcharges. legcharges : list of :class:`LegCharge` The leg charges for each of the legs. The :class:`ChargeInfo` is read out from it. dtype : ``np.dtype`` The data type of the array entries. Defaults to dtype of `data_flat`. qtotal : None | charges The total charge of the new array. cutoff : float Blocks with ``np.max(np.abs(block)) > cutoff`` are considered as zero. Defaults to :data:`QCUTOFF`. Returns ------- res : :class:`Array` An Array with data of `data_flat`. See also -------- detect_qtotal : used to detect ``qtotal`` if not given. """ if cutoff is None: cutoff = QCUTOFF data_flat = np.asarray(data_flat) # unspecified dtype if dtype is None: dtype = data_flat.dtype data_flat = data_flat.astype(dtype, copy=False) res = cls(legcharges, dtype, qtotal) # without any data if res.shape != data_flat.shape: raise ValueError("Incompatible shapes: legcharges {0!s} vs flat {1!s} ".format( res.shape, data_flat.shape)) if qtotal is None: res.qtotal = qtotal = detect_qtotal(data_flat, legcharges, cutoff) data = [] qdata = [] for qindices in res._iter_all_blocks(): sl = res._get_block_slices(qindices) if np.all(res._get_block_charge(qindices) == qtotal): data.append(np.array(data_flat[sl], dtype=res.dtype)) # copy data qdata.append(qindices) elif np.any(np.abs(data_flat[sl]) > cutoff): warnings.warn("flat array has non-zero entries in blocks incompatible with charge", stacklevel=2) res._data = data res._qdata = np.array(qdata, dtype=np.intp, order='C').reshape((len(qdata), res.rank)) res._qdata_sorted = True res.test_sanity() return res
[docs] @classmethod def from_func(cls, func, legcharges, dtype=None, qtotal=None, func_args=(), func_kwargs={}, shape_kw=None): """Create an Array from a numpy func. This function creates an array and fills the blocks *compatible* with the charges using `func`, where `func` is a function returning a `array_like` when given a shape, e.g. one of ``np.ones`` or ``np.random.standard_normal``. Parameters ---------- func : callable A function-like object which is called to generate the data blocks. We expect that `func` returns a flat array of the given `shape` convertible to `dtype`. If no `shape_kw` is given, it is called like ``func(shape, *fargs, **fkwargs)``, otherwise as ``func(*fargs, `shape_kw`=shape, **fkwargs)``. `shape` is a tuple of int. legcharges : list of :class:`LegCharge` The leg charges for each of the legs. The :class:`ChargeInfo` is read out from it. dtype : None | type | string The data type of the output entries. Defaults to np.float64. Defaults to `None`: obtain it from the return value of the function. Note that this argument is not given to func, but rather a type conversion is performed afterwards. You might want to set a `dtype` in `func_kwargs` as well. qtotal : None | charges The total charge of the new array. Defaults to charge 0. func_args : iterable Additional arguments given to `func`. func_kwargs : dict Additional keyword arguments given to `func`. shape_kw : None | str If given, the keyword with which shape is given to `func`. Returns ------- res : :class:`Array` An Array with blocks filled using `func`. """ if dtype is None: # create a small test block to derive the dtype shape = (2, 2) if shape_kw is None: block = func(shape, *func_args, **func_kwargs) else: kws = func_kwargs.copy() kws[shape_kw] = shape block = func(*func_args, **kws) block = np.asarray(block) dtype = block.dtype res = cls(legcharges, dtype, qtotal) # without any data yet. data = [] qdata = [] # iterate over all qindices compatible with qtotal qindices = np.array([qi for qi in res._iter_all_blocks()], dtype=np.intp) block_charges = res._get_block_charge(qindices.T) # .T: allows to use 2D `qindices` compatible = np.all(block_charges == res.qtotal, axis=1) for qindices in qindices[compatible]: shape = res._get_block_shape(qindices) if shape_kw is None: block = func(shape, *func_args, **func_kwargs) else: kws = func_kwargs.copy() kws[shape_kw] = shape block = func(*func_args, **kws) block = np.asarray(block, dtype=res.dtype) data.append(block) qdata.append(qindices) res._data = data res._qdata = np.array(qdata, dtype=np.intp, order='C').reshape((len(qdata), res.rank)) res._qdata_sorted = True # _iter_all_blocks is in lexiographic order res.test_sanity() return res
[docs] @classmethod def from_func_square(cls, func, leg, dtype=None, func_args=(), func_kwargs={}, shape_kw=None): """Create an Array from a (numpy) function. This function creates an array and fills the blocks *compatible* with the charges using `func`, where `func` is a function returning a `array_like` when given a shape, e.g. one of ``np.ones`` or ``np.random.standard_normal`` or the functions defined in :mod:`~tenpy.linalg.random_matrix`. Parameters ---------- func : callable A function-like object which is called to generate the data blocks. We expect that `func` returns a flat array of the given `shape` convertible to `dtype`. If no `shape_kw` is given, it is called like ``func(shape, *fargs, **fkwargs)``, otherwise as ``func(*fargs, `shape_kw`=shape, **fkwargs)``. `shape` is a tuple of int. leg : :class:`LegCharge` The leg charges for the first leg; the second leg is set to ``leg.conj()``. The :class:`ChargeInfo` is read out from it. dtype : None | type | string The data type of the output entries. Defaults to `None`: obtain it from the return value of the function. Note that this argument is not given to func, but rather a type conversion is performed afterwards. You might want to set a `dtype` in `func_kwargs` as well. func_args : iterable Additional arguments given to `func`. func_kwargs : dict Additional keyword arguments given to `func`. shape_kw : None | str If given, the keyword with which shape is given to `func`. Returns ------- res : :class:`Array` An Array with blocks filled using `func`. """ blocked = leg.is_blocked() if not blocked: pipe = LegPipe([leg]) legs = [pipe, pipe.conj()] else: legs = [leg, leg.conj()] res = Array.from_func(func, legs, dtype, None, func_args, func_kwargs, shape_kw) if not blocked: return res.split_legs() return res
[docs] def zeros_like(self): """Return a copy of self with only zeros as entries, containing no `_data`.""" res = self.copy(deep=False) res._data = [] res._qdata = np.empty((0, res.rank), dtype=np.intp) res._qdata_sorted = True return res
[docs] def test_sanity(self): """Sanity check. Raises ValueErrors, if something is wrong. """ if optimize(OptimizationFlag.skip_arg_checks): return if len(self.legs) == 0: raise ValueError("We don't allow rank-0 tensors without legs") for l in self.legs: if l.chinfo != self.chinfo: raise ValueError("leg has different ChargeInfo:\n{0!s}\n vs {1!s}".format( l.chinfo, self.chinfo)) if self.shape != tuple([lc.ind_len for lc in self.legs]): raise ValueError("shape mismatch with LegCharges\n self.shape={0!s} != {1!s}".format( self.shape, tuple([lc.ind_len for lc in self.legs]))) for l in self.legs: l.test_sanity() if any([self.dtype != d.dtype for d in self._data]): raise ValueError("wrong dtype: {0!s} vs\n {1!s}".format( self.dtype, [self.dtype != d.dtype for d in self._data])) if self._qdata.shape != (self.stored_blocks, self.rank): raise ValueError("_qdata shape wrong") if self._qdata.dtype != np.intp: raise ValueError("wront dtype of _qdata") if np.any(self._qdata < 0) or np.any(self._qdata >= [l.block_number for l in self.legs]): raise ValueError("invalid qind in _qdata") if not self._qdata.flags['C_CONTIGUOUS']: raise ValueError("qdata is not C-contiguous") if self._qdata_sorted: perm = np.lexsort(self._qdata.T) if np.any(perm != np.arange(len(perm))): raise ValueError("_qdata_sorted == True, but _qdata is not sorted") # check total charge block_q = np.sum([l.get_charge(qi) for l, qi in zip(self.legs, self._qdata.T)], axis=0) block_q = self.chinfo.make_valid(block_q) if np.any(block_q != self.qtotal): raise ValueError("some row of _qdata is incompatible with total charge")
# TODO: check labels? # properties ============================================================== @property def size(self): """The number of dtype-objects stored.""" return np.sum([t.size for t in self._data], dtype=np.int_) @property def stored_blocks(self): """The number of (non-zero) blocks stored in :attr:`_data`.""" return len(self._data) @property def labels(self): warnings.warn("Deprecated access of Array.labels as dictionary.", category=FutureWarning, stacklevel=2) dict_lab = {} for i, l in enumerate(self._labels): if l is not None: dict_lab[l] = i return dict_lab @labels.setter def labels(self, dict_lab): warnings.warn("Deprecated setting of Array.labels with dictionary.", category=FutureWarning, stacklevel=2) list_lab = [None] * self.rank for k, v in dict_lab.items(): if list_lab[v] is not None: raise ValueError("Two labels point to the same index " + repr(dict_lab)) list_lab[v] = str(k) self._labels = list_lab # labels ==================================================================
[docs] def get_leg_index(self, label): """translate a leg-index or leg-label to a leg-index. Parameters ---------- label : int | string The leg-index directly or a label (string) set before. Returns ------- leg_index : int The index of the label. See also -------- get_leg_indices : calls get_leg_index for a list of labels. iset_leg_labels : set the labels of different legs. """ if not isinstance(label, Integral): try: label = self._labels.index(label) except ValueError: # not in List msg = "Label not found: {0!r}, current labels: {1!r}".format(label, self._labels) raise KeyError(msg) from None else: if label < 0: label += self.rank if label > self.rank or label < 0: raise ValueError("axis {0:d} out of rank {1:d}".format(label, self.rank)) return label
[docs] def get_leg_indices(self, labels): """Translate a list of leg-indices or leg-labels to leg indices. Parameters ---------- labels : iterable of string/int The leg-labels (or directly indices) to be translated in leg-indices. Returns ------- leg_indices : list of int The translated labels. See also -------- get_leg_index : used to translate each of the single entries. iset_leg_labels : set the labels of different legs. """ return [self.get_leg_index(l) for l in labels]
[docs] def iset_leg_labels(self, labels): """Set labels for the different axes/legs; in place. Introduction to leg labeling can be found in :doc:`/intro_npc`. Parameters ---------- labels : iterable (strings | None), len=self.rank One label for each of the legs. An entry can be None for an anonymous leg. See also -------- get_leg: translate the labels to indices. """ if len(labels) != self.rank: raise ValueError("Need one leg label for each of the legs, got: " + str(list(labels))) for i, l in enumerate(labels): if l is None: continue if l == '': raise ValueError("use `None` for empty labels") if l in labels[i + 1:]: raise ValueError("Duplicate label entry in " + repr(labels)) self._labels = list(labels) return self
[docs] def get_leg_labels(self): """Return list of the leg labels, with `None` for anonymous legs.""" return self._labels[:]
[docs] def has_label(self, label): """Check whether a given label exists.""" return (label in self._labels)
[docs] def get_leg(self, label): """Return ``self.legs[self.get_leg_index(label)]``. Convenient function returning the leg corresponding to a leg label/index. """ return self.legs[self.get_leg_index(label)]
[docs] def ireplace_label(self, old_label, new_label): """Replace the leg label `old_label` with `new_label`; in place.""" old_index = self.get_leg_index(old_label) labels = self._labels[:] labels[old_index] = None new_label = str(new_label) if new_label in labels: msg = "Duplicate label: trying to set {0!r} in {1!r}".format(new_label, labels) raise ValueError(msg) labels[old_index] = new_label self._labels = labels return self
[docs] def replace_label(self, old_label, new_label): """Return a shallow copy with the leg label `old_label` replaced by `new_label`.""" return self.copy(deep=False).ireplace_label(old_label, new_label)
[docs] def ireplace_labels(self, old_labels, new_labels): """Replace leg label ``old_labels[i]`` with ``new_labels[i]``; in place.""" old_inds = self.get_leg_indices(old_labels) labels = self._labels[:] for i in old_inds: labels[i] = None for i, new_label in zip(old_inds, new_labels): new_label = str(new_label) if new_label in labels: msg = "Duplicate label: trying to set {0!r} in {1!r}".format(new_label, labels) raise ValueError(msg) labels[i] = new_label self._labels = labels return self
[docs] def replace_labels(self, old_labels, new_labels): """Return a shallow copy with ``old_labels[i]`` replaced by ``new_labels[i]``.""" return self.copy(deep=False).ireplace_labels(old_labels, new_labels)
[docs] def idrop_labels(self, old_labels=None): """Remove leg labels from self; in place. Parameters ---------- old_labels : list of str|int The leg labels/indices for which the label should be removed. By default (None), remove all labels. """ if old_labels is None: self._labels = [None] * self.rank return self old_inds = self.get_leg_indices(old_labels) labels = self._labels[:] for i in old_inds: labels[i] = None self._labels = labels return self
# string output =========================================================== def __repr__(self): return "<npc.Array shape={0!s} charge={1!s} labels={2!s}>".format( self.shape, self.chinfo, self.get_leg_labels()) def __str__(self): res = [repr(self)[:-1], vert_join([str(l) for l in self.legs], delim='|')] if np.prod(self.shape) < 100: res.append(str(self.to_ndarray())) res.append('>') return '\n'.join(res)
[docs] def sparse_stats(self): """Returns a string detailing the sparse statistics.""" total = np.prod(self.shape) if total == 0: return "Array without entries, one axis is empty." nblocks = self.stored_blocks stored = self.size nonzero = np.sum([np.count_nonzero(t) for t in self._data], dtype=np.int_) bs = np.array([t.size for t in self._data], dtype=np.float) if nblocks > 0: captsparse = float(nonzero) / stored bs_min = int(np.min(bs)) bs_max = int(np.max(bs)) bs_mean = np.sum(bs) / nblocks bs_med = np.median(bs) bs_var = np.var(bs) else: captsparse = 1. bs_min = bs_max = bs_mean = bs_med = bs_var = 0 res = "{nonzero:d} of {total:d} entries (={nztotal:g}) nonzero,\n" \ "stored in {nblocks:d} blocks with {stored:d} entries.\n" \ "Captured sparsity: {captsparse:g}\n" \ "Block sizes min:{bs_min:d} mean:{bs_mean:.2f} median:{bs_med:.1f} " \ "max:{bs_max:d} var:{bs_var:.2f}" return res.format(nonzero=nonzero, total=total, nztotal=nonzero / total, nblocks=nblocks, stored=stored, captsparse=captsparse, bs_min=bs_min, bs_max=bs_max, bs_mean=bs_mean, bs_med=bs_med, bs_var=bs_var)
# accessing entries =======================================================
[docs] def to_ndarray(self): """Convert self to a dense numpy ndarray.""" res = np.zeros(self.shape, dtype=self.dtype) for block, slices, _, _ in self: # that's elegant! :) res[slices] = block return res
def __iter__(self): """Allow to iterate over the non-zero blocks, giving all `_data`. Yields ------ block : ndarray the actual entries of a charge block blockslices : tuple of slices for each of the legs a slice giving the range of the block in the original tensor charges : list of charges the charge value(s) for each of the legs (taking `qconj` into account) qdat : ndarray the qindex for each of the legs """ for block, qdat in zip(self._data, self._qdata): blockslices = [] qs = [] for (qi, l) in zip(qdat, self.legs): blockslices.append(l.get_slice(qi)) qs.append(l.get_charge(qi)) yield block, tuple(blockslices), qs, qdat def __getitem__(self, inds): """Acces entries with ``self[inds]``. Parameters ---------- inds : tuple A tuple specifying the `index` for each leg. An ``Ellipsis`` (written as ``...``) replaces ``slice(None)`` for missing axes. For a single `index`, we currently support: - A single integer, choosing an index of the axis, reducing the dimension of the resulting array. - A ``slice(None)`` specifying the complete axis. - A ``slice``, which acts like a `mask` in :meth:`iproject`. - A 1D array_like(bool): acts like a `mask` in :meth:`iproject`. - A 1D array_like(int): acts like a `mask` in :meth:`iproject`, and if not orderd, a subsequent permuation with :meth:`permute` Returns ------- res : `dtype` Only returned, if a single integer is given for all legs. It is the entry specified by `inds`, giving ``0.`` for non-saved blocks. or sliced : :class:`Array` A copy with some of the data removed by :meth:`take_slice` and/or :meth:`project`. Notes ----- ``self[i]`` is equivalent to ``self[i, ...]``. ``self[i, ..., j]`` is syntactic sugar for ``self[(i, Ellipsis, i2)]`` Raises ------ IndexError If the number of indices is too large, or if an index is out of range. """ int_only, inds = self._pre_indexing(inds) if int_only: pos = np.array([l.get_qindex(i) for i, l in zip(inds, self.legs)]) try: block = self.get_block(pos[:, 0]) except IndexError: return self.dtype.type(0) if block is None: return self.dtype.type(0) else: return block[tuple(pos[:, 1])] # advanced indexing return self._advanced_getitem(inds) def __setitem__(self, inds, other): """Assign ``self[inds] = other``. Should work as expected for both basic and advanced indexing as described in :meth:`__getitem__`. `other` can be: - a single value (if all of `inds` are integer) or for slicing/advanced indexing: - a :class:`Array`, with charges as ``self[inds]`` returned by :meth:`__getitem__`. - or a flat numpy array, assuming the charges as with ``self[inds]``. """ int_only, inds = self._pre_indexing(inds) if int_only: pos = np.array([l.get_qindex(i) for i, l in zip(inds, self.legs)]) block = self.get_block(pos[:, 0], insert=True) block[tuple(pos[:, 1])] = other return # advanced indexing if not isinstance(other, Array): # if other is a flat array, convert it to an npc Array like_other = self.zeros_like() for i, leg in enumerate(like_other.legs): if isinstance(leg, LegPipe): like_other.legs[i] = leg.to_LegCharge() like_other = like_other._advanced_getitem(inds) other = Array.from_ndarray(other, like_other.legs, self.dtype, like_other.qtotal) self._advanced_setitem_npc(inds, other)
[docs] def get_block(self, qindices, insert=False): """Return the ndarray in ``_data`` representing the block corresponding to `qindices`. Parameters ---------- qindices : 1D array of np.intp The qindices, for which we need to look in _qdata. insert : bool If True, insert a new (zero) block, if `qindices` is not existent in ``self._data``. Otherwise just return ``None``. Returns ------- block: ndarray | ``None`` The block in ``_data`` corresponding to qindices. If `insert`=False and there is not block with qindices, return ``None``. Raises ------ IndexError If `qindices` are incompatible with charge and `raise_incomp_q`. """ if not np.all(self._get_block_charge(qindices) == self.qtotal): raise IndexError("trying to get block for qindices incompatible with charges") # find qindices in self._qdata match = np.argwhere(np.all(self._qdata == qindices, axis=1))[:, 0] if len(match) == 0: if insert: res = np.zeros(self._get_block_shape(qindices), dtype=self.dtype) self._data.append(res) self._qdata = np.append(self._qdata, [qindices], axis=0) self._qdata_sorted = False return res else: return None return self._data[match[0]]
[docs] def take_slice(self, indices, axes): """Return a copy of self fixing `indices` along one or multiple `axes`. For a rank-4 Array ``A.take_slice([i, j], [1,2])`` is equivalent to ``A[:, i, j, :]``. Parameters ---------- indices : (iterable of) int The (flat) index for each of the legs specified by `axes`. axes : (iterable of) str/int Leg labels or indices to specify the legs for which the indices are given. Returns ------- sliced_self : :class:`Array` A copy of self, equivalent to taking slices with indices inserted in axes. See also -------- add_leg : opposite action of inserting a new leg. """ axes = self.get_leg_indices(to_iterable(axes)) indices = np.asarray(to_iterable(indices), dtype=np.intp) if len(axes) != len(indices): raise ValueError("len(axes) != len(indices)") if indices.ndim != 1: raise ValueError("indices may only contain ints") res = self.copy(deep=True) if len(axes) == 0: return res # nothing to do # qindex and index_within_block for each of the axes pos = np.array([self.legs[a].get_qindex(i) for a, i in zip(axes, indices)]) # which axes to keep keep_axes = [a for a in range(self.rank) if a not in axes] res.legs = [self.legs[a] for a in keep_axes] res._set_shape() labels = self._labels res._labels = [labels[a] for a in keep_axes] # calculate new total charge for a, (qi, _) in zip(axes, pos): res.qtotal -= self.legs[a].get_charge(qi) res.qtotal = self.chinfo.make_valid(res.qtotal) # which blocks to keep axes = np.array(axes, dtype=np.intp) keep_axes = np.array(keep_axes, dtype=np.intp) keep_blocks = np.all(self._qdata[:, axes] == pos[:, 0], axis=1) res._qdata = np.array(self._qdata[np.ix_(keep_blocks, keep_axes)], copy=False, order='C') # res._qdata_sorted is not changed # determine the slices to take on _data sl = [slice(None)] * self.rank for a, ri in zip(axes, pos[:, 1]): sl[a] = ri # the indices within the blocks sl = tuple(sl) # finally take slices on _data res._data = [block[sl] for block, k in zip(res._data, keep_blocks) if k] return res
[docs] def add_trivial_leg(self, axis=0, label=None, qconj=1): """Add a trivial leg (with just one entry) to `self`. Parameters ---------- axis : int The new leg is inserted before index `axis`. label : str | ``None`` If not ``None``, use it as label for the new leg. qconj : +1 | -1 The direction of the new leg. Returns ------- extended : :class:`Array` A (possibly) *shallow* copy of self with an additional leg of ind_len 1 and charge 0. """ if axis < 0: axis += self.rank res = self.copy(deep=False) leg = LegCharge.from_qflat(self.chinfo, [self.chinfo.make_valid(None)], qconj=qconj) res.legs.insert(axis, leg) if label is not None and label in self._labels: raise ValueError("label already exists") res._labels.insert(axis, label) res._set_shape() res._data = res._data[:] # make a copy for j, T in enumerate(res._data): res._data[j] = T.reshape(T.shape[:axis] + (1, ) + T.shape[axis:]) res._qdata = np.array(np.hstack( [res._qdata[:, :axis], np.zeros([len(res._data), 1], np.intp), res._qdata[:, axis:]]), copy=False, order='C') return res
[docs] def add_leg(self, leg, i, axis=0, label=None): """Add a leg to `self`, setting the current array as slice for a given index. Parameters ---------- leg : :class:`LegCharge` The charge data of the leg to be added. i : int Index within the leg for which the data of `self` should be set. axis : axis The new leg is inserted before this current axis. label : str | ``None`` If not ``None``, use it as label for the new leg. Returns ------- extended : :class:`Array` A copy of self with the new `leg` at axis `axis` , such that ``extended.take_slice(i, axis)`` returns a copy of `self`. See also -------- take_slice : opposite action reducing the number of legs. """ if axis < 0: axis += self.rank legs = list(self.legs) legs.insert(axis, leg) qi, _ = leg.get_qindex(i) labels = self._labels[:] if label is not None and label in labels: raise ValueError("label already exists") labels.insert(axis, label) qtotal = self.chinfo.make_valid(self.qtotal + leg.get_charge(qi)) extended = Array(legs, self.dtype, qtotal) extended._labels = labels slices = [slice(None, None)] * self.rank slices[axis] = i extended[tuple(slices)] = self # use existing implementation return extended
[docs] def extend(self, axis, extra): """Increase the dimension of a given axis, filling the values with zeros. Parameters ---------- axis : int | str The axis (or axis-label) to be extended. extra : :class:`LegCharge` | int By what to extend, i.e. the charges to be appended to the leg of `axis`. An int stands for extending the length of the array by a single new block of that size with zero charges. Returns ------- extended : :class:`Array` A copy of self with the specified axis increased. """ extended = self.copy(deep=True) ax = self.get_leg_index(axis) extended.legs[ax] = extended.legs[ax].extend(extra) extended._set_shape() return extended
# handling of charges =====================================================
[docs] def gauge_total_charge(self, axis, newqtotal=None, new_qconj=None): """Changes the total charge by adjusting the charge on a certain leg. The total charge is given by finding a nonzero entry [i1, i2, ...] and calculating:: qtotal = self.chinfo.make_valid( np.sum([l.get_charge(l.get_qindex(qi)[0]) for i, l in zip([i1,i2,...], self.legs)], axis=0)) Thus, the total charge can be changed by redefining (= shifting) the LegCharge of a single given leg. This is exaclty what this function does. Parameters ---------- axis : int or string The new leg (index or label), for which the charge is changed. newqtotal : charge values, defaults to 0 The new total charge. new_qconj: {+1, -1, None} Whether the new LegCharge points inward (+1) or outward (-1) afterwards. By default (None) use the previous ``self.legs[leg].qconj``. Returns ------- copy : :class:`Array` A shallow copy of self with ``copy.qtotal == newqtotal`` and new ``copy.legs[leg]``. The new leg will be a :class`LegCharge`, even if the old leg was a :class:`LegPipe`. """ res = self.copy(deep=False) ax = self.get_leg_index(axis) old_qconj = self.legs[ax].qconj if new_qconj is None: new_qconj = old_qconj if new_qconj not in [-1, +1]: raise ValueError("invalid new_qconj") chinfo = self.chinfo newqtotal = res.qtotal = chinfo.make_valid(newqtotal).copy() # default zero chdiff = newqtotal - self.qtotal new_charges = self.legs[ax].charges + old_qconj * chdiff if old_qconj != new_qconj: new_charges = -new_charges new_charges = chinfo.make_valid(new_charges) res.legs[ax] = LegCharge.from_qind(chinfo, self.legs[ax].slices, new_charges, new_qconj) return res
[docs] def add_charge(self, add_legs, chinfo=None, qtotal=None): """Add charges. Parameters ---------- add_legs : iterable of :class:`LegCharge` One `LegCharge` for each axis of `self`, to be added to the one in :attr:`legs`. chargeinfo : :class:`ChargeInfo` The ChargeInfo for all charges; create new if ``None``. qtotal : None | charges The total charge with respect to `add_legs`. If ``None``, derive it from non-zero entries of ``self``. Returns ------- charges_added : :class:`Array` A copy of `self`, where the LegCharges `add_legs` where added to `self.legs`. Note that the LegCharges are neither bunched or sorted; you might want to use :meth:`sort_legcharge`. """ if len(add_legs) != self.rank: raise ValueError("wrong number of legs in `add_legs`") if chinfo is not None: chinfo2 = ChargeInfo.add([self.chinfo, add_legs[0].chinfo]) assert chinfo == chinfo2 else: chinfo = ChargeInfo.add([self.chinfo, add_legs[0].chinfo]) legs = [ LegCharge.from_add_charge([leg, leg2], chinfo) for (leg, leg2) in zip(self.legs, add_legs) ] if qtotal is None: for block, slices, _, _ in self: leg_slices = [] for leg, sl in zip(add_legs, slices): mask = np.zeros(leg.ind_len, np.bool) mask[sl] = True leg_slices.append(leg.project(mask)[2]) qtotal = detect_qtotal(self.to_ndarray(), leg_slices) break else: raise ValueError("no non-zero entry: can't detect qtotal") else: qtotal = np.concatenate((self.qtotal, np.array(qtotal, dtype=QTYPE))) res = Array(legs, self.dtype, qtotal) for block, slices, _, _ in self: # use __iter__ res[slices] = block # use __setitem__ return res
[docs] def drop_charge(self, charge=None, chinfo=None): """Drop (one of) the charges. Parameters ---------- charge : int | str Number or `name` of the charge (within `chinfo`) which is to be dropped. ``None`` means dropping all charges. chinfo : :class:`ChargeInfo` The :class:`ChargeInfo` with `charge` dropped; create a new one if ``None``. Returns ------- dropped : :class:`Array` A copy of `self`, where the specified `charge` has been removed. Note that the LegCharges are neither bunched or sorted; you might want to use :meth:`sort_legcharge`. """ chinfo2 = ChargeInfo.drop(self.chinfo, charge) if chinfo is not None: assert chinfo == chinfo2 chinfo2 = chinfo if charge is None: qtotal = None else: if isinstance(charge, str): charge = self.chinfo.names.index(charge) qtotal = np.delete(self.qtotal, charge, 0) res = Array([LegCharge.from_drop_charge(leg, charge, chinfo2) for leg in self.legs], self.dtype, qtotal) for block, slices, _, _ in self: # use __iter__ res[slices] = block # use __setitem__ return res
[docs] def change_charge(self, charge, new_qmod, new_name='', chinfo=None): """Change the `qmod` of one charge in `chinfo`. Parameters ---------- charge : int | str Number or `name` of the charge (within `chinfo`) which is to be changed. ``None`` means dropping all charges. new_qmod : int The new `qmod` to be set. new_name : str The new name of the charge. chinfo : :class:`ChargeInfo` The :class:`ChargeInfo` with `qmod` of `charge` changed; create a new one if ``None``. Returns ------- changed : :class:`Array` A copy of `self`, where the `qmod` of the specified `charge` has been changed. Note that the LegCharges are neither bunched or sorted; you might want to use :meth:`sort_legcharge`. """ chinfo2 = ChargeInfo.change(self.chinfo, charge, new_qmod, new_name) if chinfo is not None: assert chinfo == chinfo2 chinfo2 = chinfo res = self.copy(deep=True) res.chinfo = chinfo2 res.legs = [ LegCharge.from_change_charge(leg, charge, new_qmod, new_name, chinfo2) for leg in self.legs ] res.test_sanity() return res
[docs] def is_completely_blocked(self): """Return bool whether all legs are blocked by charge.""" return all([l.is_blocked() for l in self.legs])
[docs] def sort_legcharge(self, sort=True, bunch=True): """Return a copy with one or all legs sorted by charges. Sort/bunch one or multiple of the LegCharges. Legs which are sorted *and* bunched are guaranteed to be blocked by charge. Parameters ---------- sort : True | False | list of {True, False, perm} A single bool holds for all legs, default=True. Else, `sort` should contain one entry for each leg, with a bool for sort/don't sort, or a 1D array perm for a given permuation to apply to a leg. bunch : True | False | list of {True, False} A single bool holds for all legs, default=True. Whether or not to bunch at each leg, i.e. combine contiguous blocks with equal charges. Returns ------- perm : tuple of 1D arrays The permutation applied to each of the legs, such that ``cp.to_ndarray() = self.to_ndarray()[np.ix_(*perm)]``. result : Array A shallow copy of self, with legs sorted/bunched. """ if sort is False or sort is True: # ``sort in [False, True]`` doesn't work sort = [sort] * self.rank if bunch is False or bunch is True: bunch = [bunch] * self.rank if not len(sort) == len(bunch) == self.rank: raise ValueError("Wrong len for bunch or sort") # idea: encapsulate legs into pipes wich are sorted/bunched ... axes = [] pipes = [] perms = [None] * self.rank for ax in range(self.rank): if sort[ax] or bunch[ax]: axes.append([ax]) leg = self.legs[ax] pipe = LegPipe([leg], sort=sort[ax], bunch=bunch[ax], qconj=leg.qconj) pipes.append(pipe) else: perms[ax] = np.arange(self.shape[ax], dtype=np.intp) cp = self.combine_legs(axes, pipes=pipes) cp._labels = self._labels[:] # reset labels # ... and convert pipes back to leg charges for ax in axes: ax = ax[0] if sort[ax] or bunch[ax]: pipe = cp.legs[ax] if pipe._perm is None: perms[ax] = np.arange(self.shape[ax], dtype=np.intp) else: p_qind = inverse_permutation(pipe._perm) perms[ax] = self.legs[ax].perm_flat_from_perm_qind(p_qind) cp.legs[ax] = pipe.to_LegCharge() return tuple(perms), cp
[docs] def isort_qdata(self): """(Lexiographically) sort ``self._qdata``; in place. Lexsort ``self._qdata`` and ``self._data`` and set ``self._qdata_sorted = True``. """ if self._qdata_sorted: return if len(self._qdata) < 2: self._qdata_sorted = True return perm = np.lexsort(self._qdata.T) self._qdata = self._qdata[perm, :] self._data = [self._data[p] for p in perm] self._qdata_sorted = True
# reshaping ===============================================================
[docs] def make_pipe(self, axes, **kwargs): """Generates a :class:`~tenpy.linalg.charges.LegPipe` for specified axes. Parameters ---------- axes : iterable of str|int The leg labels for the axes which should be combined. Order matters! **kwargs : Additional keyword arguments given to :class:`~tenpy.linalg.charges.LegPipe`. Returns ------- pipe : :class:`~tenpy.linalg.charges.LegPipe` A pipe of the legs specified by axes. """ axes = self.get_leg_indices(axes) legs = [self.legs[a] for a in axes] return LegPipe(legs, **kwargs)
[docs] def combine_legs(self, combine_legs, new_axes=None, pipes=None, qconj=None): """Reshape: combine multiple legs into multiple pipes. If necessary, transpose before. Parameters ---------- combine_legs : (iterable of) iterable of {str|int} Bundles of leg indices or labels, which should be combined into a new output pipes. If multiple pipes should be created, use a list fore each new pipe. new_axes : None | (iterable of) int The leg-indices, at which the combined legs should appear in the resulting array. Default: for each pipe the position of its first pipe in the original array, (taking into account that some axes are 'removed' by combining). Thus no transposition is perfomed if `combine_legs` contains only contiguous ranges. pipes : None | (iterable of) {:class:`LegPipes` | None} Optional: provide one or multiple of the resulting LegPipes to avoid overhead of computing new leg pipes for the same legs multiple times. The LegPipes are conjugated, if that is necessary for compatibility with the legs. qconj : (iterable of) {+1, -1} Specify whether new created pipes point inward or outward. Defaults to +1. Ignored for given `pipes`, which are not newly calculated. Returns ------- reshaped : :class:`Array` A copy of self, whith some legs combined into pipes as specified by the arguments. See also -------- split_legs : inverse reshaping splitting LegPipes. Notes ----- Labels are inherited from self. New pipe labels are generated as ``'(' + '.'.join(*leglabels) + ')'``. For these new labels, previously unlabeled legs are replaced by ``'?#'``, where ``#`` is the leg-index in the original tensor `self`. Examples -------- >>> oldarray.iset_leg_labels(['a', 'b', 'c', 'd', 'e']) >>> c1 = oldarray.combine_legs([1, 2], qconj=-1) # only single output pipe >>> c1.get_leg_labels() ['a', '(b.c)', 'd', 'e'] Indices of `combine_legs` refer to the original array. If transposing is necessary, it is performed automatically: >>> c2 = oldarray.combine_legs([[0, 3], [4, 1]], qconj=[+1, -1]) # two output pipes >>> c2.get_leg_labels() ['(a.d)', 'c', '(e.b)'] >>> c3 = oldarray.combine_legs([['a', 'd'], ['e', 'b']], new_axes=[2, 1], >>> pipes=[c2.legs[0], c2.legs[2]]) >>> c3.get_leg_labels() ['c', '(e.b)', '(a.d)'] """ # bring arguments into a standard form combine_legs = list(combine_legs) # convert iterable to list # check: is combine_legs `iterable(iterable(int|str))` or `iterable(int|str)` ? if not is_non_string_iterable(combine_legs[0]): # the first entry is (int|str) -> only a single new pipe combine_legs = [combine_legs] if new_axes is not None: new_axes = to_iterable(new_axes) if pipes is not None: pipes = to_iterable(pipes) pipes = self._combine_legs_make_pipes(combine_legs, pipes, qconj) # out-sourced # good for index tricks: convert combine_legs into arrays combine_legs = [np.asarray(self.get_leg_indices(cl), dtype=np.intp) for cl in combine_legs] all_combine_legs = np.concatenate(combine_legs) if len(set(all_combine_legs)) != len(all_combine_legs): raise ValueError("got a leg multiple times: " + str(combine_legs)) new_axes, transp = self._combine_legs_new_axes(combine_legs, new_axes) # out-sourced # permute arguments sucht that new_axes is sorted ascending perm_args = np.argsort(new_axes) combine_legs = [combine_legs[p] for p in perm_args] pipes = [pipes[p] for p in perm_args] new_axes = [new_axes[p] for p in perm_args] # labels: replace non-set labels with '?#' (*before* transpose labels = [(l if l is not None else '?' + str(i)) for i, l in enumerate(self._labels)] # transpose if necessary if transp != tuple(range(self.rank)): res = self.copy(deep=False) res.iset_leg_labels(labels) res = res.itranspose(transp) inv_transp = inverse_permutation(transp) tr_combine_legs = [[inv_transp[a] for a in cl] for cl in combine_legs] return res.combine_legs(tr_combine_legs, new_axes=new_axes, pipes=pipes) # if we come here, combine_legs has the form of `tr_combine_legs`. # HERE we have the standard form of arguments # obtain the new legs # non_combined_legs: axes of self which are not in combine_legs non_combined_legs = np.array([a for a in range(self.rank) if a not in all_combine_legs], dtype=np.intp) legs = [self.legs[ax] for ax in non_combined_legs] for na, p in zip(new_axes, pipes): # not reversed legs.insert(na, p) non_new_axes = np.array([i for i in range(len(legs)) if i not in new_axes], dtype=np.intp) # convert to array for index tricks # get new labels pipe_labels = [self._combine_leg_labels([labels[c] for c in cl]) for cl in combine_legs] for na, p, plab in zip(new_axes, pipes, pipe_labels): labels[na:na + p.nlegs] = [plab] res = Array(legs, self.dtype, self.qtotal) res.legs = legs res._set_shape() res.iset_leg_labels(labels) # the **main work** of copying & reshaping the data if self.stored_blocks == 1: # handle self.stored_blocks == 1 separately for optimization qmap_inds = [ p._map_incoming_qind(self._qdata[:, cl])[0] for p, cl in zip(pipes, combine_legs) ] res_qdata = np.empty((1, res.rank), np.intp) res_qdata[0, non_new_axes] = self._qdata[0, non_combined_legs] slices = [slice(None)] * res.rank for na, p, qi in zip(new_axes, pipes, qmap_inds): q_map_row = p.q_map[qi, :] res_qdata[0, na] = q_map_row[2] slices[na] = slice(*q_map_row[:2]) res_block = np.zeros(res._get_block_shape(res_qdata[0, :]), dtype=res.dtype) res._data = [res_block] res._qdata = res_qdata res._qdata_sorted = True res_block_view = res_block[tuple(slices)] res_block_view[:] = self._data[0].reshape(res_block_view.shape) elif self.stored_blocks > 1: # sourced out for optimization new_axes = np.array(new_axes, np.intp) _combine_legs_worker(self, res, combine_legs, non_combined_legs, new_axes, non_new_axes, pipes) return res
[docs] def split_legs(self, axes=None, cutoff=0.): """Reshape: opposite of combine_legs: split (some) legs which are LegPipes. Reverts :meth:`combine_legs` (except a possibly performed `transpose`). The splited legs are replacing the LegPipes at their position, see the examples below. Labels are split reverting what was done in :meth:`combine_legs`. '?#' labels are replaced with ``None``. Parameters ---------- axes : (iterable of) int|str Leg labels or indices determining the axes to split. The corresponding entries in self.legs must be :class:`LegPipe` instances. Defaults to all legs, which are :class:`LegPipe` instances. cutoff : float Splitted data blocks with ``np.max(np.abs(block)) > cutoff`` are considered as zero. Defaults to 0. Returns ------- reshaped : :class:`Array` A copy of self where the specified legs are splitted. See also -------- combine_legs : this is reversed by split_legs. Examples -------- Given a rank-5 Array `old_array`, you can combine it and split it again: >>> old_array.iset_leg_labels(['a', 'b', 'c', 'd', 'e']) >>> comb_array = old_array.combine_legs([[0, 3], [2, 4]] ) >>> comb_array.get_leg_labels() ['(a.d)', 'b', '(c.e)'] >>> split_array = comb_array.split_legs([0, 2]) >>> split_array.get_leg_labels() ['a', 'd', 'b', 'c', 'e'] """ if axes is None: axes = [i for i, l in enumerate(self.legs) if isinstance(l, LegPipe)] else: axes = sorted(self.get_leg_indices(to_iterable(axes))) if len(set(axes)) != len(axes): raise ValueError("can't split a leg multiple times!") for ax in axes: if not isinstance(self.legs[ax], LegPipe): raise ValueError("can't split leg {ax:d} which is not a LegPipe".format(ax=ax)) if len(axes) == 0: return self.copy(deep=True) elif self.stored_blocks == 0: res = self.copy(deep=True) for ax in reversed(axes): res.legs[ax:ax + 1] = self.legs[ax].legs res._set_shape() elif self.stored_blocks == 1 and all([(self.legs[ax].q_map.shape[0] == 1) for ax in axes]): # optimize: just a single block in each pipe res = self.copy(deep=True) qdata = [[qi] for qi in self._qdata[0, :]] for ax in reversed(axes): pipe = self.legs[ax] res.legs[ax:ax + 1] = pipe.legs qdata[ax] = pipe.q_map[0, 3:] res._set_shape() res._qdata = np.ascontiguousarray(np.concatenate(qdata)).reshape((1, res.rank)) new_block_shape = res._get_block_shape(res._qdata[0, :]) res._data = [res._data[0].reshape(new_block_shape)] else: res = _split_legs_worker(self, axes, cutoff) labels = self._labels[:] for a in sorted(axes, reverse=True): labels[a:a + 1] = self._split_leg_label(labels[a], self.legs[a].nlegs) res.iset_leg_labels(labels) return res
[docs] def as_completely_blocked(self): """Gives a version of self which is completely blocked by charges. Functions like :func:`svd` or :func:`eigh` require a complete blocking by charges. This can be achieved by encapsulating each leg which is not completely blocked into a :class:`LegPipe` (containing only that single leg). The LegPipe will then contain all necessary information to revert the blocking. Returns ------- encapsulated_axes : list of int The leg indices which have been encapsulated into Pipes. blocked_self : :class:`Array` Self (if ``len(encapsulated_axes) = 0``) or a copy of self, which is completely blocked. """ enc_axes = [a for a, l in enumerate(self.legs) if not l.is_blocked()] if len(enc_axes) == 0: return enc_axes, self qconj = [self.legs[a].qconj for a in enc_axes] return enc_axes, self.combine_legs([[a] for a in enc_axes], qconj=qconj)
[docs] def squeeze(self, axes=None): """Like ``np.squeeze``. If a squeezed leg has non-zero charge, this charge is added to :attr:`qtotal`. Parameters ---------- axes : None | (iterable of) {int|str} Labels or indices of the legs which should be 'squeezed', i.e. the legs removed. The corresponding legs must be trivial, i.e., have `ind_len` 1. Returns ------- squeezed : :class:Array | scalar A scalar of ``self.dtype``, if all axes were squeezed. Else a copy of ``self`` with reduced ``rank`` as specified by `axes`. """ if axes is None: axes = tuple([a for a in range(self.rank) if self.shape[a] == 1]) else: axes = tuple(self.get_leg_indices(to_iterable(axes))) for a in axes: if self.shape[a] != 1: raise ValueError("Tried to squeeze non-unit leg") keep = [a for a in range(self.rank) if a not in axes] if len(keep) == 0: index = tuple([0] * self.rank) return self[index] res = self.copy(deep=False) # adjust qtotal res.legs = [self.legs[a] for a in keep] res._set_shape() res.qtotal = self.qtotal.copy() # modified! for a in axes: res.qtotal -= self.legs[a].get_charge(0) res.qtotal = self.chinfo.make_valid(res.qtotal) labels = self.get_leg_labels() res.iset_leg_labels([labels[a] for a in keep]) res._data = [np.squeeze(t, axis=axes).copy() for t in self._data] res._qdata = np.array(self._qdata[:, np.array(keep)], copy=False, order='C') # res._qdata_sorted doesn't change return res
# data manipulation =======================================================
[docs] def astype(self, dtype, copy=True): """Return copy with new dtype, upcasting all blocks in ``_data``. Parameters ---------- dtype : convertible to a np.dtype The new data type. If None, deduce the new dtype as common type of ``self._data``. copy : bool Whether to make a copy of the blocks even if the type didn't change. Returns ------- copy : :class:`Array` Deep copy of self with new dtype. """ cp = self.copy(deep=False) # manual deep copy: don't copy every block twice cp._qdata = cp._qdata.copy() if dtype is None: dtype = np.find_common_type([d.dtype for d in self._data], []) cp.dtype = dtype = np.dtype(dtype) if copy or dtype != self.dtype: cp._data = [d.astype(dtype, copy=copy) for d in self._data] return cp
[docs] def ipurge_zeros(self, cutoff=QCUTOFF, norm_order=None): """Removes ``self._data`` blocks with *norm* less than cutoff; in place. Parameters ---------- cutoff : float Blocks with norm <= `cutoff` are removed. defaults to :data:`QCUTOFF`. norm_order : A valid `ord` argument for `np.linalg.norm`. Default ``None`` gives the Frobenius norm/2-norm for matrices/everything else. Note that this differs from other methods, e.g. :meth:`from_ndarray`, which use the maximum norm. """ if len(self._data) == 0: return self norm = np.array([np.linalg.norm(t, ord=norm_order) for t in self._data]) keep = (norm > cutoff) # bool array self._data = [t for t, k in zip(self._data, keep) if k] self._qdata = self._qdata[keep] # self._qdata_sorted is preserved return self
[docs] def iproject(self, mask, axes): """Applying masks to one or multiple axes; in place. This function is similar as `np.compress` with boolean arrays For each specified axis, a boolean 1D array `mask` can be given, which chooses the indices to keep. .. warning :: Although it is possible to use an 1D int array as a mask, the order is ignored! If you need to permute an axis, use :meth:`permute` or :meth:`sort_legcharge`. Parameters ---------- mask : (list of) 1D array(bool|int) For each axis specified by `axes` a mask, which indices of the axes should be kept. If `mask` is a bool array, keep the indices where `mask` is True. If `mask` is an int array, keep the indices listed in the mask, *ignoring* the order or multiplicity. axes : (list of) int | string The `i`th entry in this list specifies the axis for the `i`th entry of `mask`, either as an int, or with a leg label. If axes is just a single int/string, specify just a single mask. Returns ------- map_qind : list of 1D arrays The mapping of qindices for each of the specified axes. block_masks: list of lists of 1D bool arrays ``block_masks[a][qind]`` is a boolen mask which indices to keep in block ``qindex`` of ``axes[a]``. """ if axes is not to_iterable(axes): mask = [mask] axes = self.get_leg_indices(to_iterable(axes)) mask = [np.asarray(m) for m in mask] if len(axes) != len(mask): raise ValueError("len(axes) != len(mask)") if len(axes) == 0: return [], [] # nothing to do. for i, m in enumerate(mask): # convert integer masks to bool masks if m.dtype != np.bool_: mask[i] = np.zeros(self.shape[axes[i]], dtype=np.bool_) np.put(mask[i], m, True) # Array views may share ``_qdata`` views, so make a copy of _qdata before manipulating self._qdata = self._qdata.copy() block_masks = [] proj_data = np.arange(self.stored_blocks) map_qind = [] for m, a in zip(mask, axes): l = self.legs[a] m_qind, bm, self.legs[a] = l.project(m) map_qind.append(m_qind) block_masks.append(bm) q = self._qdata[:, a] = m_qind[self._qdata[:, a]] piv = (q >= 0) self._qdata = self._qdata[piv] # keeps dimension # self._qdata_sorted is preserved proj_data = proj_data[piv] self._set_shape() # finally project out the blocks data = [] for i, iold in enumerate(proj_data): block = self._data[iold] subidx = [slice(d) for d in block.shape] for m, a in zip(block_masks, axes): subidx[a] = m[self._qdata[i, a]] block = np.compress(m[self._qdata[i, a]], block, axis=a) data.append(block) self._data = data return map_qind, block_masks
[docs] def permute(self, perm, axis): """Apply a permutation in the indices of an axis. Similar as np.take with a 1D array. Roughly equivalent to ``res[:, ...] = self[perm, ...]`` for the corresponding `axis`. Note: This function is quite slow, and usually not needed! Parameters ---------- perm : array_like 1D int The permutation which should be applied to the leg given by `axis`. axis : str | int A leg label or index specifying on which leg to take the permutation. Returns ------- res : :class:`Array` A copy of self with leg `axis` permuted, such that ``res[i, ...] = self[perm[i], ...]`` for ``i`` along `axis`. See also -------- sort_legcharge : can also be used to perform a general permutation. Preferable, since it is faster for permutations which don't mix charge blocks. """ axis = self.get_leg_index(axis) perm = np.asarray(perm, dtype=np.intp) oldleg = self.legs[axis] if len(perm) != oldleg.ind_len: raise ValueError("permutation has wrong length") inv_perm = inverse_permutation(perm) newleg = LegCharge.from_qflat(self.chinfo, oldleg.to_qflat()[perm], oldleg.qconj) newleg = newleg.bunch()[1] res = self.copy(deep=False) # data is replaced afterwards res.legs[axis] = newleg qdata_axis = self._qdata[:, axis] new_block_idx = [slice(None)] * self.rank old_block_idx = [slice(None)] * self.rank data = [] qdata = {} # dict for fast look up: tuple(indices) -> _data index for old_qind, (beg, end) in enumerate(oldleg._slice_start_stop()): old_range = range(beg, end) for old_data_index in np.nonzero(qdata_axis == old_qind)[0]: old_block = self._data[old_data_index] old_qindices = self._qdata[old_data_index] new_qindices = old_qindices.copy() for i_old in old_range: i_new = inv_perm[i_old] qi_new, within_new = newleg.get_qindex(i_new) new_qindices[axis] = qi_new # look up new_qindices in `qdata`, insert them if necessary new_data_ind = qdata.setdefault(tuple(new_qindices), len(data)) if new_data_ind == len(data): # insert new block data.append(np.zeros(res._get_block_shape(new_qindices))) new_block = data[new_data_ind] # copy data new_block_idx[axis] = within_new old_block_idx[axis] = i_old - beg new_block[tuple(new_block_idx)] = old_block[tuple(old_block_idx)] # data blocks copied res._data = data res._qdata_sorted = False res_qdata = res._qdata = np.empty((len(data), self.rank), dtype=np.intp) for qindices, i in qdata.items(): res_qdata[i] = qindices return res
[docs] @use_cython(replacement='Array_itranspose') def itranspose(self, axes=None): """Transpose axes like `np.transpose`; in place. Parameters ---------- axes: iterable (int|string), len ``rank`` | None The new order of the axes. By default (None), reverse axes. """ if axes is None: axes = tuple(reversed(range(self.rank))) else: axes = tuple(self.get_leg_indices(axes)) if len(axes) != self.rank or len(set(axes)) != self.rank: raise ValueError("axes has wrong length: " + str(axes)) if axes == tuple(range(self.rank)): return self # nothing to do axes_arr = np.array(axes) self.legs = [self.legs[a] for a in axes] self._set_shape() labs = self.get_leg_labels() self.iset_leg_labels([labs[a] for a in axes]) self._qdata = np.array(self._qdata[:, axes_arr], order='C') self._qdata_sorted = False self._data = [np.transpose(block, axes) for block in self._data] return self
[docs] def transpose(self, axes=None): """Like :meth:`itranspose`, but on a deep copy.""" cp = self.copy(deep=True) cp.itranspose(axes) return cp
[docs] def iswapaxes(self, axis1, axis2): """Similar as ``np.swapaxes``; in place.""" axis1 = self.get_leg_index(axis1) axis2 = self.get_leg_index(axis2) if axis1 == axis2: return self # nothing to do swap = np.arange(self.rank, dtype=np.intp) swap[axis1], swap[axis2] = axis2, axis1 legs = self.legs legs[axis1], legs[axis2] = legs[axis2], legs[axis1] labels = self._labels labels[axis1], labels[axis2] = labels[axis2], labels[axis1] self._set_shape() self._qdata = self._qdata[:, swap] self._qdata_sorted = False self._data = [t.swapaxes(axis1, axis2) for t in self._data] return self
[docs] def iscale_axis(self, s, axis=-1): """Scale with varying values along an axis; in place. Rescale to ``new_self[i1, ..., i_axis, ...] = s[i_axis] * self[i1, ..., i_axis, ...]``. Parameters ---------- s : 1D array, len=self.shape[axis] The vector with which the axis should be scaled. axis : str|int The leg label or index for the axis which should be scaled. See also -------- iproject : can be used to discard indices for which s is zero. """ axis = self.get_leg_index(axis) s = np.asarray(s) if s.shape != (self.shape[axis], ): raise ValueError("s has wrong shape: " + str(s.shape) + " instead of " + str(self.shape[axis])) self.dtype = np.find_common_type([self.dtype], [s.dtype]) leg = self.legs[axis] if axis != self.rank - 1: self._data = [ np.swapaxes(np.swapaxes(t, axis, -1) * s[leg.get_slice(qi)], axis, -1) for qi, t in zip(self._qdata[:, axis], self._data) ] else: # optimize: no need to swap axes, if axis is -1. self._data = [ t * s[leg.get_slice(qi)] # (it's slightly faster for large arrays) for qi, t in zip(self._qdata[:, axis], self._data) ] return self
[docs] def scale_axis(self, s, axis=-1): """Same as :meth:`iscale_axis`, but return a (deep) copy.""" res = self.copy(deep=False) res._qdata = res._qdata.copy() res.iscale_axis(s, axis) return res
# block-wise operations == element wise with numpy ufunc
[docs] def iunary_blockwise(self, func, *args, **kwargs): """Roughly ``self = f(self)``, block-wise; in place. Applies an unary function `func` to the non-zero blocks in ``self._data``. .. note :: Assumes implicitly that ``func(np.zeros(...), *args, **kwargs)`` gives 0, since we don't let `func` act on zero blocks! Parameters ---------- func : function A function acting on flat arrays, returning flat arrays. It is called like ``new_block = func(block, *args, **kwargs)``. *args : Additional arguments given to function *after* the block. **kwargs : Keyword arguments given to the function. Examples -------- >>> a.iunaray_blockwise(np.real) # get real part >>> a.iunaray_blockwise(np.conj) # same data as a.iconj(), but doesn't charge conjugate. """ if len(args) == 0 == len(kwargs): self._data = [func(t) for t in self._data] else: self._data = [func(t, *args, **kwargs) for t in self._data] if len(self._data) > 0: self.dtype = self._data[0].dtype return self
[docs] def unary_blockwise(self, func, *args, **kwargs): """Roughly ``return func(self)``, block-wise. Copies. Same as :meth:`iunary_blockwise`, but makes a **shallow** copy first. """ res = self.copy(deep=False) return res.iunary_blockwise(func, *args, **kwargs)
[docs] def iconj(self, complex_conj=True): """Wraper around :meth:`self.conj` with ``inplace=True``.""" return self.conj(complex_conj, inplace=True)
[docs] def conj(self, complex_conj=True, inplace=False): """Conjugate: complex conjugate data, conjugate charge data. Conjugate all legs, set negative qtotal. Labeling: takes 'a' -> 'a*', 'a*'-> 'a' and '(a,(b*,c))' -> '(a*, (b, c*))' Parameters ---------- complex_conj : bool Whether the data should be complex conjugated. inplace : bool Whether to apply changes to `self`, or to return a *deep* copy. """ if complex_conj and self.dtype.kind == 'c': if inplace: res = self.iunary_blockwise(np.conj) else: res = self.unary_blockwise(np.conj) else: if inplace: res = self else: res = self.copy(deep=True) res.qtotal = self.chinfo.make_valid(-res.qtotal) res.legs = [l.conj() for l in res.legs] labels = res._labels[:] for i, lbl in enumerate(labels): if lbl is not None: labels[i] = self._conj_leg_label(lbl) res._labels = labels return res
[docs] def complex_conj(self): """Return copy which is complex conjugated *without* conjugating the charge data.""" return self.unary_blockwise(np.conj)
[docs] def norm(self, ord=None, convert_to_float=True): """Norm of flattened data. See :func:`norm` for details. """ if ord == 0: return np.sum([np.count_nonzero(t) for t in self._data], dtype=np.int_) if convert_to_float: new_type = np.find_common_type([np.float_, self.dtype], []) # int -> float if new_type != self.dtype: return self.astype(new_type).norm(ord, False) block_norms = [np.linalg.norm(t.reshape(-1), ord) for t in self._data] # ``.reshape(-1) gives a 1D view and is thus faster than ``.flatten()`` # add a [0] in the list to ensure correct results for ``ord=-inf`` return np.linalg.norm(block_norms + [0], ord)
def __neg__(self): """return ``-self``""" return self.unary_blockwise(np.negative)
[docs] def ibinary_blockwise(self, func, other, *args, **kwargs): """Roughly ``self = func(self, other)``, block-wise; in place. Applies a binary function 'block-wise' to the non-zero blocks of ``self._data`` and ``other._data``, storing result in place. Assumes that `other` is an :class:`Array` as well, with the same shape and compatible legs. If leg labels of `other` and `self` are same up to permutations, `other` gets transposed accordingly before the action. .. note :: Assumes implicitly that ``func(np.zeros(...), np.zeros(...), *args, **kwargs)`` gives 0, since we don't let `func` act on zero blocks! Parameters ---------- func : function Binary function, called as ``new_block = func(block_self, block_other, *args, **kwargs)`` for blocks (=Numpy arrays) of equal shape. other : :class:`Array` Other Array from which to the blocks. *args, **kwargs: Extra (keyword) arguments given to `func`. Examples -------- >>> a.ibinary_blockwise(np.add, b) # equivalent to ``a += b``, if ``b`` is an `Array`. >>> a.ibinary_blockwise(np.max, b) # overwrites ``a`` to ``a = max(a, b)`` """ other = other._transpose_same_labels(self._labels) if len(args) > 0 or len(kwargs) > 0: return self.ibinary_blockwise(lambda a, b: func(a, b, *args, **kwargs), other) if not optimize(OptimizationFlag.skip_arg_checks): if self.rank != other.rank: raise ValueError("different rank!") for self_leg, other_leg in zip(self.legs, other.legs): self_leg.test_equal(other_leg) if np.any(self.qtotal != other.qtotal): raise ValueError("Arrays can't have different `qtotal`!") self.isort_qdata() other.isort_qdata() adata = self._data bdata = other._data aq = self._qdata bq = other._qdata Na, Nb = len(aq), len(bq) if Na == Nb and np.all(aq == bq): # If the qdata structure is identical, we can immediately run through the data. self._data = [func(at, bt) for at, bt in zip(adata, bdata)] else: # otherwise we have to step through comparing left and right qdata # F-style strides to preserve sorting! stride = charges._make_stride([l.block_number for l in self.legs], False) aq_ = np.sum(aq * stride, axis=1) bq_ = np.sum(bq * stride, axis=1) i, j = 0, 0 qdata = [] data = [] while i < Na or j < Nb: if i < Na and j < Nb and aq_[i] == bq_[j]: # a and b are non-zero data.append(func(adata[i], bdata[j])) qdata.append(aq[i]) i += 1 j += 1 elif i >= Na or j < Nb and aq_[i] > bq_[j]: # a is 0 data.append(func(np.zeros_like(bdata[j]), bdata[j])) qdata.append(bq[j]) j += 1 elif j >= Nb or aq_[i] < bq_[j]: # b is 0 data.append(func(adata[i], np.zeros_like(adata[i]))) qdata.append(aq[i]) i += 1 else: # tested a == b or a < b or a > b, so this should never happen assert False # if both are zero, we assume f(0, 0) = 0 self._data = data self._qdata = np.array(qdata, dtype=np.intp) # ``self._qdata_sorted = True`` was set by self.isort_qdata if len(self._data) > 0: self.dtype = np.find_common_type([d.dtype for d in self._data], []) self._data = [np.asarray(a, dtype=self.dtype) for a in self._data] return self
[docs] def binary_blockwise(self, func, other, *args, **kwargs): """Roughly ``return func(self, other)``, block-wise. Copies. Same as :meth:`ibinary_blockwise`, but makes a **shallow** copy first. """ res = self.copy(deep=False) return res.ibinary_blockwise(func, other, *args, **kwargs)
[docs] def matvec(self, other): """This function is used by the Lanczos algorithm needed for DMRG. It is supposed to calculate the matrix - vector - product for a rank-2 matrix ``self`` and a rank-1 vector `other`. """ return tensordot(self, other, axes=1)
[docs] @use_cython(replacement="Array_iadd_prefactor_other") def iadd_prefactor_other(self, prefactor, other): """``self += prefactor * other`` for scalar `prefactor` and :class:`Array` `other`. Note that we allow the type of `self` to change if necessary. Moreover, if `self` and `other` have the same labels in different order, other gets **transposed** before the action. """ if not isinstance(other, Array) or not np.isscalar(prefactor): raise ValueError("wrong argument types: {0!r}, {1!r}".format( type(prefactor), type(other))) self.ibinary_blockwise(np.add, other.__mul__(prefactor)) return self
[docs] @use_cython(replacement="Array_iscale_prefactor") def iscale_prefactor(self, prefactor): """``self *= prefactor`` for scalar `prefactor`. Note that we allow the type of `self` to change if necessary. """ if not np.isscalar(prefactor): raise ValueError("prefactor is not scalar: {0!r}".format(type(prefactor))) if prefactor == 0.: self._data = [] self._qdata = np.empty((0, self.rank), np.intp) self._qdata_sorted = True return self return self.iunary_blockwise(np.multiply, prefactor)
def __add__(self, other): """Return ``self + other``.""" if isinstance(other, Array): res = self.copy(deep=True) return res.iadd_prefactor_other(1., other) return NotImplemented # unknown type of other def __iadd__(self, other): """``self += other``.""" if isinstance(other, Array): return self.iadd_prefactor_other(1., other) return NotImplemented # unknown type of other def __sub__(self, other): """Return ``self - other``.""" if isinstance(other, Array): res = self.copy(deep=True) return res.iadd_prefactor_other(-1., other) return NotImplemented # unknown type of other def __isub__(self, other): """``self -= other``.""" if isinstance(other, Array): return self.iadd_prefactor_other(-1., other) return NotImplemented def __mul__(self, other): """Return ``self * other`` for scalar ``other``. Use explicit functions for matrix multiplication etc.""" if np.isscalar(other): res = self.copy(deep=True) return res.iscale_prefactor(other) return NotImplemented def __rmul__(self, other): """Return ``other * self`` for scalar ``other``.""" if np.isscalar(other): res = self.copy(deep=True) return res.iscale_prefactor(other) return NotImplemented def __imul__(self, other): """``self *= other`` for scalar `other`.""" if np.isscalar(other): return self.iscale_prefactor(other) return NotImplemented def __truediv__(self, other): """Return ``self / other`` for scalar `other`.""" if np.isscalar(other): if other == 0.: raise ZeroDivisionError("a/b for b=0. Types: {0!s}, {1!s}".format( type(self), type(other))) res = self.copy(deep=True) return res.iscale_prefactor(1. / other) return NotImplemented def __itruediv__(self, other): """``self /= other`` for scalar `other`.""" if np.isscalar(other): if other == 0.: raise ZeroDivisionError("a/b for b=0. Types: {0!s}, {1!s}".format( type(self), type(other))) return self.iscale_prefactor(1. / other) return NotImplemented # private functions ======================================================= def _set_shape(self): """Deduce self.shape from self.legs.""" if len(self.legs) == 0: raise ValueError("We don't allow 0-dimensional arrays. Why should we?") self.shape = tuple([lc.ind_len for lc in self.legs]) self.rank = len(self.legs) def _iter_all_blocks(self): """Generator to iterate over all combinations of qindices in lexiographic order. Yields ------ qindices : tuple of int A qindex for each of the legs. """ for block_inds in itertools.product(*[range(l.block_number) for l in reversed(self.legs)]): # loop over all charge sectors in lex order (last leg most siginificant) yield tuple(block_inds[::-1]) # back to legs in correct order def _get_block_charge(self, qindices): """Returns the charge of a block selected by `qindices`. The charge of a single block is defined as :: qtotal = sum_{legs l} legs[l].get_charges(qindices[l])) modulo qmod """ q = np.sum([l.get_charge(qi) for l, qi in zip(self.legs, qindices)], axis=0) return self.chinfo.make_valid(q) def _get_block_slices(self, qindices): """Returns tuple of slices for a block selected by `qindices`.""" return tuple([l.get_slice(qi) for l, qi in zip(self.legs, qindices)]) def _get_block_shape(self, qindices): """Return shape for the block given by qindices.""" return tuple([(l.slices[qi + 1] - l.slices[qi]) for l, qi in zip(self.legs, qindices)]) def _bunch(self, bunch_legs): """Return copy and bunch the qind for one or multiple legs. Parameters ---------- bunch : list of {True, False} One entry for each leg, whether the leg should be bunched. See also -------- sort_legcharge: public API calling this function. """ cp = self.copy(deep=False) # lists for each leg: map_qindex = [None] * cp.rank # array mapping old qindex to new qindex, such that # ``new_leg.charges[m_qindex[i]] == old_leg.charges[i]`` bunch_qindex = [None] * cp.rank # bool array whether the *new* qindex was bunched for li, bunch in enumerate(bunch_legs): idx, new_leg = cp.legs[li].bunch() cp.legs[li] = new_leg # generate entries in map_qindex and bunch_qdindex bunch_qindex[li] = ((idx[1:] - idx[:-1]) > 1) m_qindex = np.zeros(idx[-1], dtype=np.intp) m_qindex[idx[:-1]] = 1 map_qindex[li] = np.cumsum(m_qindex, axis=0) # now map _data and _qdata bunched_blocks = {} # new qindices -> index in new _data new_data = [] new_qdata = [] for old_block, old_qindices in zip(self._data, self._qdata): new_qindices = tuple([m[qi] for m, qi in zip(map_qindex, old_qindices)]) bunch = any([b[qi] for b, qi in zip(bunch_qindex, new_qindices)]) if bunch: if new_qindices not in bunched_blocks: # create enlarged block bunched_blocks[new_qindices] = len(new_data) # cp has new legs and thus gives the new shape new_block = np.zeros(cp._get_block_shape(new_qindices), dtype=cp.dtype) new_data.append(new_block) new_qdata.append(new_qindices) else: new_block = new_data[bunched_blocks[new_qindices]] # figure out where to insert the in the new bunched_blocks old_slbeg = [l.slices[qi] for l, qi in zip(self.legs, old_qindices)] new_slbeg = [l.slices[qi] for l, qi in zip(cp.legs, new_qindices)] slbeg = [(o - n) for o, n in zip(old_slbeg, new_slbeg)] sl = [slice(beg, beg + l) for beg, l in zip(slbeg, old_block.shape)] # insert the old block into larger new block new_block[tuple(sl)] = old_block else: # just copy the old block new_data.append(old_block.copy()) new_qdata.append(new_qindices) cp._data = new_data cp._qdata = np.array(new_qdata, dtype=np.intp).reshape((len(new_data), self.rank)) cp._qsorted = False return cp def _perm_qind(self, p_qind, leg): """Apply a permutation `p_qind` of the qindices in leg `leg` to _qdata; in place.""" # entry ``b`` of of old old._qdata[:, leg] refers to old ``old.legs[leg][b]``. # since new ``new.legs[leg][i] == old.legs[leg][p_qind[i]]``, # we have new ``new.legs[leg][reverse_sort_perm(p_qind)[b]] == old.legs[leg][b]`` # thus we replace an entry `b` in ``_qdata[:, leg]``with reverse_sort_perm(q_ind)[b]. p_qind_r = inverse_permutation(p_qind) self._qdata[:, leg] = p_qind_r[self._qdata[:, leg]] # equivalent to # self._qdata[:, leg] = [p_qind_r[i] for i in self._qdata[:, leg]] self._qdata_sorted = False def _pre_indexing(self, inds): """Check if `inds` are valid indices for ``self[inds]`` and replaces Ellipsis by slices. Returns ------- only_integer : bool Whether all of `inds` are (convertible to) np.intp. inds : tuple, len=self.rank `inds`, where ``Ellipsis`` is replaced by the correct number of slice(None). """ if type(inds) != tuple: # for rank 1 inds = (inds, ) i = next((i for i, idx in enumerate(inds) if idx is Ellipsis), None) # i is index of Ellipsis or None if we don't have one if i is None and len(inds) < self.rank: # need an Ellipsis i = len(inds) inds = inds + (Ellipsis, ) if i is not None: # replace Ellipsis with slice(None) fill = tuple([slice(None)] * (self.rank - len(inds) + 1)) inds = inds[:i] + fill + inds[i + 1:] if len(inds) > self.rank: raise IndexError("too many indices for Array") # do we have only integer entries in `inds`? try: only_int = np.array(inds, dtype=np.intp) assert (only_int.shape == (len(inds), )) except: return False, inds else: return True, inds def _advanced_getitem(self, inds, calc_map_qind=False, permute=True): """Calculate self[inds] for non-integer `inds`. This function is called by self.__getitem__(inds). and from _advanced_setitem_npc with ``calc_map_qind=True``. Parameters ---------- inds : tuple Indices for the different axes, as returned by :meth:`_pre_indexing`. calc_map_qind : Whether to calculate and return the additional `map_qind` and `axes` tuple. permute : If False, don't perform permutations in case one of `inds` is an unsorted index array, but consider it as a mask only, ignoring the order of the indices. Returns ------- map_qind_part2self : function Only returned if `calc_map_qind` is True. This function takes qindices from `res` as arguments and returns ``(qindices, block_mask)`` such that ``res.get_block(part_qindices) = self.get_block(qindices)[block_mask]``. permutation are ignored for this. permutations : list((int, 1D array(int))) Only returned if `calc_map_qind` is True. Collects (axes, permutation) applied to `res` *after* `take_slice` and `iproject`. res : :class:`Array` A copy with the data ``self[inds]``. """ # non-integer inds -> slicing / projection slice_inds = [] # arguments for `take_slice` slice_axes = [] project_masks = [] # arguments for `iproject` project_axes = [] permutations = [] # [axis, mask] for all axes for which we need to call `permute` for a, i in enumerate(inds): if isinstance(i, slice): if i != slice(None): m = np.zeros(self.shape[a], dtype=np.bool_) m[i] = True project_masks.append(m) project_axes.append(a) if i.step is not None and i.step < 0: permutations.append((a, np.arange(np.count_nonzero(m), dtype=np.intp)[::-1])) else: try: iter(i) except: # not iterable: single index slice_inds.append(int(i)) slice_axes.append(a) else: # iterable i = np.asarray(i) project_masks.append(i) project_axes.append(a) if i.dtype != np.bool_: # should be integer indexing perm = np.argsort(i) # check if `i` is sorted if np.any(perm != np.arange(len(perm))): # np.argsort(i) gives the reverse permutation, so reverse it again. # In that way, we get the permuation within the projected indices. permutations.append((a, inverse_permutation(perm))) res = self.take_slice(slice_inds, slice_axes) res_axes = np.cumsum([(a not in slice_axes) for a in range(self.rank)]) - 1 p_map_qinds, p_masks = res.iproject(project_masks, [res_axes[p] for p in project_axes]) permutations = [(res_axes[a], p) for a, p in permutations] if permute: for a, perm in permutations: res = res.permute(perm, a) if not calc_map_qind: return res part2self = self._advanced_getitem_map_qind(inds, slice_axes, slice_inds, project_axes, p_map_qinds, p_masks, res_axes) return part2self, permutations, res def _advanced_getitem_map_qind(self, inds, slice_axes, slice_inds, project_axes, p_map_qinds, p_masks, res_axes): """Generate a function mapping from qindices of `self[inds]` back to qindices of self. This function is called only by `_advanced_getitem(calc_map_qind=True)` to obtain the function `map_qind_part2self`, which in turn in needed in `_advanced_setitem_npc` for ``self[inds] = other``. This function returns a function `part2self`, see doc string in the source for details. Note: the function ignores permutations introduced by `inds` - they are handled separately. """ map_qinds = [None] * self.rank map_blocks = [None] * self.rank for a, i in zip(slice_axes, slice_inds): qi, within_block = self.legs[a].get_qindex(inds[a]) map_qinds[a] = qi map_blocks[a] = within_block for a, m_qind in zip(project_axes, p_map_qinds): map_qinds[a] = np.nonzero(m_qind >= 0)[0] # revert m_qind # keep_axes = neither in slice_axes nor in project_axes keep_axes = [a for a, i in enumerate(map_qinds) if i is None] not_slice_axes = sorted(project_axes + keep_axes) bsizes = [l._get_block_sizes() for l in self.legs] def part2self(part_qindices): """Given `part_qindices` of ``res = self[inds]``, return (`qindices`, `block_mask`) such that ``res.get_block(part_qindices) == self.get_block(qindices)``. """ qindices = map_qinds[:] # copy block_mask = map_blocks[:] # copy for a in keep_axes: qindices[a] = qi = part_qindices[res_axes[a]] block_mask[a] = np.arange(bsizes[a][qi], dtype=np.intp) for a, bmask in zip(project_axes, p_masks): old_qi = part_qindices[res_axes[a]] qindices[a] = map_qinds[a][old_qi] block_mask[a] = bmask[old_qi] # advanced indexing in numpy is tricky ^_^ # np.ix_ can't handle integer entries reducing the dimension. # we have to call it only on the entries with arrays ix_block_mask = np.ix_(*[block_mask[a] for a in not_slice_axes]) # and put the result back into block_mask for a, bm in zip(not_slice_axes, ix_block_mask): block_mask[a] = bm return qindices, tuple(block_mask) return part2self def _advanced_setitem_npc(self, inds, other): """Self[inds] = other for non-integer `inds` and :class:`Array` `other`. This function is called by self.__setitem__(inds, other).""" # suppress warning if we project a pipe with warnings.catch_warnings(): warnings.simplefilter("ignore") map_part2self, permutations, self_part = self._advanced_getitem(inds, calc_map_qind=True, permute=False) # permuations are ignored by map_part2self. # instead of figuring out permuations in self, apply the *reversed* permutations ot other for ax, perm in permutations: other = other.permute(inverse_permutation(perm), ax) # now test compatibility of self_part with `other` if self_part.rank != other.rank: raise IndexError("wrong number of indices") for pl, ol in zip(self_part.legs, other.legs): pl.test_contractible(ol.conj()) if np.any(self_part.qtotal != other.qtotal): raise ValueError("wrong charge for assinging self[inds] = other") # note: a block exists in self_part, if and only if its extended version exists in self. # by definition, non-existent blocks in `other` are zero. # instead of checking which blocks are non-existent, # we first set self[inds] completely to zero for p_qindices in self_part._qdata: qindices, block_mask = map_part2self(p_qindices) block = self.get_block(qindices) block[block_mask] = 0. # overwrite data in self # now we copy blocks from other for o_block, o_qindices in zip(other._data, other._qdata): qindices, block_mask = map_part2self(o_qindices) block = self.get_block(qindices, insert=True) block[block_mask] = o_block # overwrite data in self self.ipurge_zeros(0.) # remove blocks identically zero def _combine_legs_make_pipes(self, combine_legs, pipes, qconj): """Argument parsing for :meth:`combine_legs`: make missing pipes. Generates missing pipes & checks compatibility for provided pipes. """ npipes = len(combine_legs) # default arguments for pipes and qconj if pipes is None: pipes = [None] * npipes elif len(pipes) != npipes: raise ValueError("wrong len of `pipes`") qconj = list(to_iterable(qconj)) if len(qconj) == 1 and 1 < npipes: qconj = [qconj[0]] * npipes # same qconj for all pipes if len(qconj) != npipes: raise ValueError("wrong len of `qconj`") pipes = list(pipes) # make pipes as necessary for i, pipe in enumerate(pipes): if pipe is None: qconj_i = qconj[i] if qconj_i is None: qconj_i = +1 # will change in future to qconj_i_new = self.get_leg(combine_legs[i][0]).qconj if qconj_i != qconj_i_new: warnings.warn( "combine_legs default value for `qconj` will change " "from +1 to `qconj` of the first leg, here `-1`", FutureWarning, 3) pipes[i] = self.make_pipe(axes=combine_legs[i], qconj=qconj_i) else: # test for compatibility legs = [self.get_leg(a) for a in combine_legs[i]] if pipe.nlegs != len(legs): raise ValueError("pipe has wrong number of legs") if legs[0].qconj != pipe.legs[0].qconj: pipes[i] = pipe = pipe.conj() # need opposite qind for self_leg, pipe_leg in zip(legs, pipe.legs): self_leg.test_equal(pipe_leg) return pipes def _combine_legs_new_axes(self, combine_legs, new_axes): """Figure out new_axes and how legs have to be transposed.""" all_combine_legs = np.concatenate(combine_legs) non_combined_legs = np.array([a for a in range(self.rank) if a not in all_combine_legs]) if new_axes is None: # figure out default new_legs first_cl = np.array([cl[0] for cl in combine_legs]) new_axes = [(np.sum(non_combined_legs < a) + np.sum(first_cl < a)) for a in first_cl] else: # test compatibility if len(new_axes) != len(combine_legs): raise ValueError("wrong len of `new_axes`") new_rank = len(combine_legs) + len(non_combined_legs) for i, a in enumerate(new_axes): if a < 0: new_axes[i] = a + new_rank elif a >= new_rank: raise ValueError("new_axis larger than the new number of legs") transp = [[a] for a in non_combined_legs] for s in np.argsort(new_axes): transp.insert(new_axes[s], list(combine_legs[s])) transp = sum(transp, []) # flatten: [a] + [b] = [a, b] return new_axes, tuple(transp) @staticmethod def _combine_leg_labels(labels): """Generate label for legs combined in a :class:`LegPipe`. Examples -------- >>> self._combine_leg_labels(['a', 'b', '(c.d)']) '(a.b.(c.d))' """ return '(' + '.'.join(labels) + ')' @staticmethod def _split_leg_label(label, count): """Revert the combination of labels performed in :meth:`combine_leg_labels`. Return a list of labels corresponding to the original labels before 'combine_leg_labels'. Test that it splits into `count` labels. Examples -------- >>> self._split_leg_label('(a.b.(c.d))', 3) ['a', 'b', '(c.d)'] """ if label is None: return [None] * count if label[0] != '(' or label[-1] != ')': warnings.warn("split leg with label not in Form '(...)': " + repr(label), stacklevel=3) return [None] * count beg = 1 depth = 0 # number of non-closed '(' to the left res = [] for i in range(1, len(label) - 1): c = label[i] if c == '(': depth += 1 elif c == ')': depth -= 1 elif c == '.' and depth == 0: res.append(label[beg:i]) beg = i + 1 res.append(label[beg:i + 1]) if len(res) != count: raise ValueError("wrong number of splitted labels.") for i in range(len(res)): if res[i][0] == '?': res[i] = None return res @staticmethod def _conj_leg_label(label): """Conjugate a leg `label`. Examples -------- >>> self._conj_leg_labels('a') 'a*' >>> self._conj_leg_labels('a*') 'a' >>> self._conj_leg_labels('(a.(b*.c))') '(a*.(b.c*))' """ # first insert '*' after each label, taking into account recursion of LegPipes res = [] beg = 0 for i in range(1, len(label)): if label[i - 1] != ')' and label[i] in '.)': res.append(label[beg:i]) beg = i res.append(label[beg:]) label = '*'.join(res) if label[-1] != ')': label += '*' # remove '**' entries return label.replace('**', '') @use_cython(replacement="Array__imake_contiguous") def _imake_contiguous(self): """Make each of the blocks c-style contigous in memory. Might speed up subsequent tensordot & co by fixing the memory layout to contigous blocks. (No need to call it manually: it's called from tensordot & co anyways!) """ self._data = [np.ascontiguousarray(t) for t in self._data] return self def _transpose_same_labels(self, other_labels): """Return `self.transpose(other_labels)` if the labels are the same but shuffled.""" self_labels = self._labels if self_labels == other_labels: return self # fits if None in self_labels or None in other_labels: if set(self_labels) == set(other_labels) != set([None]): warnings.warn("Not all legs labeled, so no transpose for Array addition. " "Did you intend to transpose?") return self # not all legs labeled if set(self_labels) != set(other_labels): return self # different labels # now: same labels, different order. # TODO to keep backwards compatibility, just warn for now warnings.warn( "Arrays with same labels in different order. Transpose intended?" " We will transpose in the future!", category=FutureWarning, stacklevel=3) return self # TODO: do this for the next release return self.transpose(other_labels)
# ################################## # global functions # ##################################
[docs]def zeros(legcharges, dtype=np.float64, qtotal=None): """Create a npc array full of zeros (with no _data). This is just a wrapper around ``Array(...)``, detailed documentation can be found in the class doc-string of :class:`Array`. """ return Array(legcharges, dtype, qtotal)
[docs]def eye_like(a, axis=0): """Return an identity matrix contractible with the leg `axis` of the :class:`Array` `a`.""" axis = a.get_leg_index(axis) return diag(1., a.legs[axis])
[docs]def diag(s, leg, dtype=None): """Returns a square, diagonal matrix of entries `s`. The resulting matrix has legs ``(leg, leg.conj())`` and charge 0. Parameters ---------- s : scalar | 1D array The entries to put on the diagonal. If scalar, all diagonal entries are the same. leg : :class:`LegCharge` The first leg of the resulting matrix. dtype : None | type The data type to be used for the result. By default, use dtype of `s`. Returns ------- diagonal : :class:`Array` A square matrix with diagonal entries `s`. See also -------- Array.scale_axis : similar as ``tensordot(diag(s), ...)``, but faster. """ s = np.asarray(s, dtype) scalar = (s.ndim == 0) if not scalar and len(s) != leg.ind_len: raise ValueError("len(s)={0:d} not equal to leg.ind_len={1:d}".format(len(s), leg.ind_len)) res = Array((leg, leg.conj()), s.dtype) # default charge is 0 # qdata = [[0, 0], [1, 1], ....] res._qdata = np.arange(leg.block_number, dtype=np.intp)[:, np.newaxis] * np.ones(2, np.intp) # ``res._qdata_sorted = True`` was already set if scalar: res._data = [np.diag(s * np.ones(size, dtype=s.dtype)) for size in leg._get_block_sizes()] else: res._data = [np.diag(s[leg.get_slice(qi)]) for qi in range(leg.block_number)] return res
[docs]def concatenate(arrays, axis=0, copy=True): """Stack arrays along a given axis, similar as np.concatenate. Stacks the qind of the array, without sorting/blocking. Labels are inherited from the first array only. Parameters ---------- arrays : iterable of :class:`Array` The arrays to be stacked. They must have the same shape and charge data except on the specified axis. axis : int | str Leg index or label of the first array. Defines the axis along which the arrays are stacked. copy : bool Whether to copy the data blocks. Returns ------- stacked : :class:`Array` Concatenation of the given `arrays` along the specified axis. See also -------- Array.sort_legcharge : can be used to block by charges along the axis. """ arrays = list(arrays) res = arrays[0].zeros_like() axis = res.get_leg_index(axis) not_axis = np.array([a for a in range(res.rank) if a != axis], dtype=np.intp) # test for compatibility for a in arrays: if a.shape[:axis] != res.shape[:axis] or a.shape[axis + 1:] != res.shape[axis + 1:]: raise ValueError("wrong shape to fit " + repr(a.shape) + " into " + repr(res.shape)) if a.chinfo != res.chinfo: raise ValueError("wrong ChargeInfo") if np.any(a.qtotal != res.qtotal): raise ValueError("wrong qtotal") for l in not_axis: a.legs[l].test_equal(res.legs[l]) dtype = res.dtype = np.find_common_type([a.dtype for a in arrays], []) # stack the data res_axis_bl_sizes = [] res_axis_charges = [] res_qdata = [] res_data = [] qind_shift = 0 # sum of previous `block_number` axis_qconj = res.legs[axis].qconj for a in arrays: leg = a.legs[axis] res_axis_bl_sizes.extend(leg._get_block_sizes()) charges = leg.charges if leg.qconj == axis_qconj else res.chinfo.make_valid(-leg.charges) res_axis_charges.append(charges) qdata = a._qdata.copy() qdata[:, axis] += qind_shift res_qdata.append(qdata) if copy: res_data.extend([np.array(t, dtype) for t in a._data]) else: res_data.extend([np.asarray(t, dtype) for t in a._data]) qind_shift += leg.block_number res_axis_slices = np.append([0], np.cumsum(res_axis_bl_sizes)) res_axis_charges = np.concatenate(res_axis_charges, axis=0) res.legs[axis] = LegCharge.from_qind(res.chinfo, res_axis_slices, res_axis_charges, axis_qconj) res._set_shape() res._qdata = np.concatenate(res_qdata, axis=0) res._qdata_sorted = False res._data = res_data res.test_sanity() return res
[docs]def grid_concat(grid, axes, copy=True): """Given an np.array of npc.Arrays, performs a multi-dimensional concatentation along 'axes'. Similar to :func:`numpy.block`, but only for uniform blocking. Stacks the qind of the array, *without* sorting/blocking. Parameters ---------- grid : array_like of :class:`Array` The grid of arrays. axes : list of int The axes along which to concatenate the arrays, same len as the dimension of the grid. Concatenate arrays of the `i`th axis of the grid along the axis ``axes[i]`` copy : bool Whether the _data blocks are copied. Examples -------- Assume we have rank 2 Arrays ``A, B, C, D`` of shapes ``(1, 2), (1, 4), (3, 2), (3, 4)`` sharing the legs of equal sizes. Then the following grid will result in a ``(1+3, 2+4)`` shaped array: >>> g = grid_concat([[A, B], [C, D]], axes=[0, 1]) >>> g.shape (4, 6) If ``A, B, C, D`` were rank 4 arrays, with the first and last leg as before, and sharing *common* legs ``1`` and ``2`` of dimensions 1, 2, then you would get a rank-4 array: >>> g = grid_concat([[A, B], [C, D]], axes=[0, 3]) >>> g.shape (4, 1, 2, 6) See also -------- Array.sort_legcharge : can be used to block by charges. """ grid = np.asarray(grid, dtype=np.object) if grid.ndim < 1 or grid.ndim != len(axes): raise ValueError("grid has wrong dimension") if grid.ndim == 1: if any([g is None for g in grid]): raise ValueError("`None` entry in 1D grid") return concatenate(grid, axes[0], copy) if any([g is None for g in grid.flat]): new_legs = [] for a, ax in enumerate(axes): tr = [a] + [i for i in range(grid.ndim) if i != a] grid_tr = np.transpose(grid, tr) leg = [] for grid_tr_row in grid_tr: # get first g which is not None in grid_tr_row first_g = next((g for g in grid_tr_row.flat if g is not None), None) if first_g is None: raise ValueError("Full row/column with only `None` entries") else: leg.append(first_g.get_leg(ax)) new_legs.append(leg) assert first_g is not None labels = first_g.get_leg_labels() axes = first_g.get_leg_indices(axes) zeros_legs = first_g.legs[:] for idx, entry in np.ndenumerate(grid): if entry is None: for ax, new_leg, i in zip(axes, new_legs, idx): zeros_legs[ax] = new_leg[i] entry = zeros(zeros_legs, first_g.dtype, first_g.qtotal) entry.iset_leg_labels(labels) grid[idx] = entry return _grid_concat_recursion(grid, axes, copy)
[docs]def grid_outer(grid, grid_legs, qtotal=None): """Given an np.array of npc.Arrays, return the corresponding higher-dimensional Array. Parameters ---------- grid : array_like of {:class:`Array` | None} The grid gives the first part of the axes of the resulting array. Entries have to have all the same shape and charge-data, giving the remaining axes. ``None`` entries in the grid are interpreted as zeros. grid_legs : list of :class:`LegCharge` One LegCharge for each dimension of the grid along the grid. qtotal : charge The total charge of the Array. By default (``None``), derive it out from a non-trivial entry of the grid. Returns ------- res : :class:`Array` An Array with shape ``grid.shape + nontrivial_grid_entry.shape``. Constructed such that ``res[idx] == grid[idx]`` for any index ``idx`` of the `grid` the `grid` entry is not trivial (``None``). See also -------- detect_grid_outer_legcharge : can calculate one missing :class:`LegCharge` of the grid. Examples -------- A typical use-case for this function is the generation of an MPO. Say you have npc.Arrays ``Splus, Sminus, Sz``, each with legs ``[phys.conj(), phys]``. Further, you have to define appropriate LegCharges `l_left` and `l_right`. Then one 'matrix' of the MPO for a nearest neighbour Heisenberg Hamiltonian could look like: >>> Id = np.eye_like(Sz) >>> W_mpo = grid_outer([[Id, Splus, Sminus, Sz, None], ... [None, None, None, None, J*Sminus], ... [None, None, None, None, J*Splus], ... [None, None, None, None, J*Sz], ... [None, None, None, None, Id]], ... leg_charges=[l_left, l_right]) >>> W_mpo.shape (5, 5, 2, 2) """ grid_shape, entries = _nontrivial_grid_entries(grid) if len(grid_shape) != len(grid_legs): raise ValueError("wrong number of grid_legs") if grid_shape != tuple([l.ind_len for l in grid_legs]): raise ValueError("grid shape incompatible with grid_legs") idx, entry = entries[0] # first non-trivial entry chinfo = entry.chinfo dtype = np.find_common_type([e.dtype for _, e in entries], []) legs = list(grid_legs) + entry.legs labels = entry._labels[:] if qtotal is None: # figure out qtotal from first non-zero entry grid_charges = [l.get_charge(l.get_qindex(i)[0]) for i, l in zip(idx, grid_legs)] qtotal = chinfo.make_valid(np.sum(grid_charges + [entry.qtotal], axis=0)) else: qtotal = chinfo.make_valid(qtotal) res = Array(legs, dtype, qtotal) # main work: iterate over all non-trivial entries to fill `res`. for idx, entry in entries: res[idx] = entry # insert the values with Array.__setitem__ partial slicing. if labels is not None and entry._labels != labels: labels = None if labels is not None: res.iset_leg_labels([None] * len(grid_shape) + labels) res.test_sanity() return res
[docs]def detect_grid_outer_legcharge(grid, grid_legs, qtotal=None, qconj=1, bunch=False): """Derive a LegCharge for a grid used for :func:`grid_outer`. Note: The resulting LegCharge is *not* bunched. Parameters ---------- grid : array_like of {:class:`Array` | None} The grid as it will be given to :func:`grid_outer`. grid_legs : list of {:class:`LegCharge` | None} One LegCharge for each dimension of the grid, except for one entry which is ``None``. This missing entry is to be calculated. qtotal : charge The desired total charge of the array. Defaults to 0. Returns ------- new_grid_legs : list of :class:`LegCharge` A copy of the given `grid_legs` with the ``None`` replaced by a compatible LegCharge. The new LegCharge is neither bunched nor sorted! See also -------- detect_legcharge : similar functionality for a flat numpy array instead of a grid. """ grid_shape, entries = _nontrivial_grid_entries(grid) if len(grid_shape) != len(grid_legs): raise ValueError("wrong number of grid_legs") if any([s != l.ind_len for s, l in zip(grid_shape, grid_legs) if l is not None]): raise ValueError("grid shape incompatible with grid_legs") idx, entry = entries[0] # first non-trivial entry chinfo = entry.chinfo axis = [a for a, l in enumerate(grid_legs) if l is None] if len(axis) > 1: raise ValueError("can only derive one grid_leg") axis = axis[0] grid_legs = list(grid_legs) qtotal = chinfo.make_valid(qtotal) # charge 0, if qtotal is not set. qflat = [None] * grid_shape[axis] for idx, entry in entries: grid_charges = [ l.get_charge(l.get_qindex(i)[0]) for a, (i, l) in enumerate(zip(idx, grid_legs)) if a != axis ] qflat_entry = chinfo.make_valid(qtotal - entry.qtotal - np.sum(grid_charges, axis=0)) i = idx[axis] if qflat[i] is None: qflat[i] = qflat_entry elif np.any(qflat[i] != qflat_entry): raise ValueError("different grid entries lead to different charges at index " + str(i)) if any([q is None for q in qflat]): raise ValueError("can't derive flat charge for all indices:" + str(qflat)) grid_legs[axis] = LegCharge.from_qflat(chinfo, chinfo.make_valid(qconj * np.array(qflat)), qconj) return grid_legs
[docs]def detect_qtotal(flat_array, legcharges, cutoff=None): """Returns the total charge (w.r.t `legs`) of first non-zero sector found in `flat_array`. Parameters ---------- flat_array : array The flat numpy array from which you want to detect the charges. legcharges : list of :class:`LegCharge` For each leg the LegCharge. cutoff : float Blocks with ``np.max(np.abs(block)) > cutoff`` are considered as zero. Defaults to :data:`QCUTOFF`. Returns ------- qtotal : charge The total charge fo the first non-zero (i.e. > cutoff) charge block. See also -------- detect_legcharge : detects the charges of one missing LegCharge if `qtotal` is known. detect_grid_outer_legcharge : similar functionality if the flat array is given by a 'grid'. """ if cutoff is None: cutoff = QCUTOFF chinfo = legcharges[0].chinfo test_array = zeros(legcharges) # Array prototype with correct charges for qindices in test_array._iter_all_blocks(): sl = test_array._get_block_slices(qindices) if np.any(np.abs(flat_array[sl]) > cutoff): return test_array._get_block_charge(qindices) warnings.warn("can't detect total charge: no entry larger than cutoff. Return 0 charge.", stacklevel=2) return chinfo.make_valid()
[docs]def detect_legcharge(flat_array, chargeinfo, legcharges, qtotal=None, qconj=+1, cutoff=None): """Calculate a missing `LegCharge` by looking for nonzero entries of a flat array. Parameters ---------- flat_array : ndarray A flat array, in which we look for non-zero entries. chargeinfo : :class:`~tenpy.linalg.charges.ChargeInfo` The nature of the charge. legcharges : list of :class:`LegCharge` One LegCharge for each dimension of flat_array, except for one entry which is ``None``. This missing entry is to be calculated. qconj : {+1, -1} `qconj` for the new calculated LegCharge. qtotal : charges Desired total charge of the array. Defaults to zeros. cutoff : float Blocks with ``np.max(np.abs(block)) > cutoff`` are considered as zero. Defaults to :data:`QCUTOFF`. Returns ------- new_legcharges : list of :class:`LegCharge` A copy of the given `legcharges` with the ``None`` replaced by a compatible LegCharge. The new legcharge is 'bunched', but not sorted! See also -------- detect_grid_outer_legcharge : similar functionality if the flat array is given by a 'grid'. detect_qtotal : detects the total charge, if all legs are known. """ flat_array = np.asarray(flat_array) legs = list(legcharges) if cutoff is None: cutoff = QCUTOFF if flat_array.ndim != len(legs): raise ValueError("wrong number of grid_legs") if any([s != l.ind_len for s, l in zip(flat_array.shape, legs) if l is not None]): raise ValueError("array shape incompatible with legcharges") axis = [a for a, l in enumerate(legs) if l is None] if len(axis) > 1: raise ValueError("can only derive charges for one leg.") axis = axis[0] axis_len = flat_array.shape[axis] if chargeinfo.qnumber == 0: legs[axis] = LegCharge.from_trivial(axis_len, chargeinfo, qconj=qconj) return legs qtotal = chargeinfo.make_valid(qtotal) # charge 0, if qtotal is not set. legs_known = legs[:axis] + legs[axis + 1:] qflat = np.empty([axis_len, chargeinfo.qnumber], dtype=QTYPE) for i in range(axis_len): A_i = np.take(flat_array, i, axis=axis) qflat[i] = detect_qtotal(A_i, legs_known, cutoff) qflat = chargeinfo.make_valid((qtotal - qflat) * qconj) legs[axis] = LegCharge.from_qflat(chargeinfo, qflat, qconj).bunch()[1] return legs
[docs]def trace(a, leg1=0, leg2=1): """Trace of `a`, summing over leg1 and leg2. Requires that the contracted legs are contractible (i.e. have opposite charges). Labels are inherited from `a`. Parameters ---------- leg1, leg2: str|int The leg label or index for the two legs which should be contracted (i.e. summed over). Returns ------- traced : :class:`Array` | ``a.dtype`` A scalar if ``a.rank == 2``, else an :class:`Array` of rank ``a.rank - 2``. Equivalent to ``sum([a.take_slice([i, i], [leg1, leg2]) for i in range(a.shape[leg1])])``. """ ax1, ax2 = a.get_leg_indices([leg1, leg2]) if ax1 == ax2: raise ValueError("leg1 = {0!r} == leg2 = {1!r} ???".format(leg1, leg2)) a.legs[ax1].test_contractible(a.legs[ax2]) if a.rank == 2: # full contraction: ax1, ax2 = 0, 1 or vice versa res = a.dtype.type(0.) for qdata_row, block in zip(a._qdata, a._data): if qdata_row[0] == qdata_row[1]: res += np.trace(block) return res # non-complete contraction keep = np.array([ax for ax in range(a.rank) if ax != ax1 and ax != ax2], dtype=np.intp) legs = [a.legs[ax] for ax in keep] res = Array(legs, a.dtype, a.qtotal) if a.stored_blocks > 0: res_data = {} # dictionary qdata_row -> block for qdata_row, block in zip(a._qdata, a._data): if qdata_row[ax1] != qdata_row[ax2]: continue # not on the diagonal => doesn't contribute new_qdata_row = tuple(qdata_row[keep]) if new_qdata_row in res_data: res_data[new_qdata_row] += np.trace(block, axis1=ax1, axis2=ax2) else: res_data[new_qdata_row] = np.trace(block, axis1=ax1, axis2=ax2) if len(res_data) > 0: res._data = list(res_data.values()) res._qdata = np.array(list(res_data.keys()), np.intp) res._qdata_sorted = False # labels a_labels = a._labels res._labels = [a_labels[ax] for ax in keep] return res
[docs]def outer(a, b): """Forms the outer tensor product, equivalent to ``tensordot(a, b, axes=0)``. Labels are inherited from `a` and `b`. In case of a collision (same label in both `a` and `b`), they are both dropped. Parameters ---------- a, b : :class:`Array` The arrays for which to form the product. Returns ------- c : :class:`Array` Array of rank ``a.rank + b.rank`` such that (for ``Ra = a.rank; Rb = b.rank``):: c[i_1, ..., i_Ra, j_1, ... j_R] = a[i_1, ..., i_Ra] * b[j_1, ..., j_rank_b] """ if a.chinfo != b.chinfo: raise ValueError("different ChargeInfo") dtype = np.find_common_type([a.dtype, b.dtype], []) qtotal = a.chinfo.make_valid(a.qtotal + b.qtotal) res = Array(a.legs + b.legs, dtype, qtotal) # fill with data qdata_a = a._qdata qdata_b = b._qdata grid = np.mgrid[:len(qdata_a), :len(qdata_b)].T.reshape(-1, 2) # grid is lexsorted like qdata, with rows as all combinations of a/b block indices. qdata_res = np.empty((len(qdata_a) * len(qdata_b), res.rank), dtype=np.intp) qdata_res[:, :a.rank] = qdata_a[grid[:, 0]] qdata_res[:, a.rank:] = qdata_b[grid[:, 1]] # use numpys broadcasting to obtain the tensor product idx_reshape = (Ellipsis, ) + tuple([np.newaxis] * b.rank) data_a = [ta[idx_reshape] for ta in a._data] idx_reshape = tuple([np.newaxis] * a.rank) + (Ellipsis, ) data_b = [tb[idx_reshape] for tb in b._data] res._data = [data_a[i] * data_b[j] for i, j in grid] res._qdata = qdata_res res._qdata_sorted = a._qdata_sorted and b._qdata_sorted # since grid is lex sorted # labels res._labels = _drop_duplicate_labels(a._labels, b._labels) return res
[docs]def inner(a, b, axes=None, do_conj=False): """Contract all legs in `a` and `b`, return scalar. Parameters ---------- a, b : class:`Array` The arrays for which to calculate the product. Must have same rank, and compatible LegCharges. axes : ``(axes_a, axes_b)`` | ``'range'``, ``'labels'`` `axes_a` and `axes_b` specifiy the legs of `a` and `b`, respectively, which should be contracted. Legs can be specified with leg labels or indices. We contract leg ``axes_a[i]`` of `a` with leg ``axes_b[i]`` of `b`. The default ``axes='range'`` is equivalent to ``(range(rank), range(rank))``. ``axes='labels'`` is equivalent to either ``(a.get_leg_labels(), a.get_leg_labels())`` for ``do_conj=True``, or to ``(a.get_leg_labels(), conj_labels(a.get_leg_labels()))`` for ``do_conj=False``. In other words, ``axes='labels'`` requires `a` and `b` to have the same/conjugated labels up to a possible transposition, which is then reverted. do_conj : bool If ``False`` (Default), ignore it. if ``True``, conjugate `a` before, i.e., return ``inner(a.conj(), b, axes)`` Returns ------- inner_product : dtype A scalar (of common dtype of `a` and `b`) giving the full contraction of `a` and `b`. """ if a.rank != b.rank: raise ValueError("different rank!") if axes is None or axes == 'range': if axes is None: msg = "inner(): `axes` currently defaults to 'range', will change to 'labels'" warnings.warn(msg, FutureWarning, stacklevel=2) transp = False else: if axes == 'labels': a_labels = a.get_leg_labels() if do_conj: axes = (a_labels, a_labels) else: a_conj_labels = [Array._conj_leg_label(lbl) for lbl in a_labels] axes = (a_labels, a_conj_labels) axes_a, axes_b = axes axes_a = a.get_leg_indices(to_iterable(axes_a)) axes_b = b.get_leg_indices(to_iterable(axes_b)) if len(axes_a) != a.rank or len(axes_b) != b.rank: raise ValueError("no full contraction. Use tensordot instead!") # we can permute axes_a and axes_b. Use that to ensure axes_b = range(b.rank) sort_axes_b = np.argsort(axes_b) axes_a = [axes_a[i] for i in sort_axes_b] transp = (tuple(axes_a) != tuple(range(a.rank))) if transp: a = a.copy(deep=False) a.itranspose(axes_a) # check charge compatibility if not optimize(OptimizationFlag.skip_arg_checks): if a.chinfo != b.chinfo: raise ValueError("different ChargeInfo") for lega, legb in zip(a.legs, b.legs): if do_conj: lega.test_equal(legb) else: lega.test_contractible(legb) return _inner_worker(a, b, do_conj)
[docs]def tensordot(a, b, axes=2): """Similar as ``np.tensordot`` but for :class:`Array`. Builds the tensor product of `a` and `b` and sums over the specified axes. Does not require complete blocking of the charges. Labels are inherited from `a` and `b`. In case of a collision (= the same label would be inherited from `a` and `b` after the contraction), both labels are dropped. Detailed implementation notes are available in the doc-string of :func:`_tensordot_worker`. Parameters ---------- a, b : :class:`Array` The first and second npc Array for which axes are to be contracted. axes : ``(axes_a, axes_b)`` | int A single integer is equivalent to ``(range(-axes, 0), range(axes))``. Alternatively, `axes_a` and `axes_b` specifiy the legs of `a` and `b`, respectively, which should be contracted. Legs can be specified with leg labels or indices. Contract leg ``axes_a[i]`` of `a` with leg ``axes_b[i]`` of `b`. Returns ------- a_dot_b : :class:`Array` The tensorproduct of `a` and `b`, summed over the specified axes. Returns a scalar in case of a full contraction. """ # for details on the implementation, see _tensordot_worker. a, b, axes = _tensordot_transpose_axes(a, b, axes) # optimize/check for special cases no_block = (a.stored_blocks == 0 or b.stored_blocks == 0) # result is zero one_block = (a.stored_blocks == 1 and b.stored_blocks == 1) if axes == a.rank and axes == b.rank: return _inner_worker(a, b, False) # full contraction yields a single number elif no_block or one_block: cut_a = a.rank - axes res = Array(a.legs[:cut_a] + b.legs[axes:], np.find_common_type([a.dtype, b.dtype], []), a.chinfo.make_valid(a.qtotal + b.qtotal)) if one_block: # optimize for special case that a and b have only 1 entry # this is (usually) the case if we have trivial charges if np.all(a._qdata[0, cut_a:] == b._qdata[0, :axes]): # blocks fit together # contract innner axes res._data = [np.tensordot(a._data[0], b._data[0], axes=axes)] c_qdata = np.empty([1, res.rank], np.intp) c_qdata[0, :cut_a] = a._qdata[0, :cut_a] c_qdata[0, cut_a:] = b._qdata[0, axes:] res._qdata = c_qdata res._qdata_sorted = True # else: zero elif axes == 0: return outer(a, b) # no sum necessary else: # #### the main work res = _tensordot_worker(a, b, axes) # labels res._labels = _drop_duplicate_labels(a._labels[:-axes], b._labels[axes:]) return res
[docs]def svd(a, full_matrices=False, compute_uv=True, cutoff=None, qtotal_LR=[None, None], inner_labels=[None, None], inner_qconj=+1): """Singualar value decomposition of an Array `a`. Factorizes ``U, S, VH = svd(a)``, such that ``a = U*diag(S)*VH`` (where ``*`` stands for a :func:`tensordot` and `diag` creates an correctly shaped Array with `S` on the diagonal). For a non-zero `cutoff` this holds only approximately. There is a gauge freedom regarding the charges, see also :meth:`Array.gauge_total_charge`. We ensure contractibility by setting ``U.legs[1] = VH.legs[0].conj()``. Further, we gauge the LegCharge such that `U` and `V` have the desired `qtotal_LR`. Parameters ---------- a : :class:`Array`, shape ``(M, N)`` The matrix to be decomposed. full_matrices : bool If ``False`` (default), `U` and `V` have shapes ``(M, K)`` and ``(K, N)``, where ``K=len(S)``. If ``True``, `U` and `V` are full square unitary matrices with shapes ``(M, M)`` and ``(N, N)``. Note that the arrays are not directly contractible in that case; ``diag(S)`` would need to be a rectangluar ``(M, N)`` matrix. compute_uv : bool Whether to compute and return `U` and `V`. cutoff : ``None`` | float Keep only singular values which are (strictly) greater than `cutoff`. (Then the factorization holds only approximately). If ``None`` (default), ignored. qtotal_LR : [{charges|None}, {charges|None}] The desired `qtotal` for `U` and `VH`, respectively. ``[None, None]`` (Default) is equivalent to ``[None, a.qtotal]``. A single `None` entry is replaced the unique charge satisfying the requirement ``U.qtotal + VH.qtotal = a.qtotal (modulo qmod)``. inner_labels_LR: [{str|None}, {str|None}] The first label corresponds to ``U.legs[1]``, the second to ``VH.legs[0]``. inner_qconj : {+1, -1} Direction of the charges for the new leg. Default +1. The new LegCharge is constructed such that ``VH.legs[0].qconj = qconj``. Returns ------- U : :class:`Array` Matrix with left singular vectors as columns. Shape ``(M, M)`` or ``(M, K)`` depending on `full_matrices`. S : 1D ndarray The singluar values of the array. If no `cutoff` is given, it has lenght ``min(M, N)``. VH : :class:`Array` Matrix with right singular vectors as rows. Shape ``(N, N)`` or ``(K, N)`` depending on `full_matrices`. """ # check arguments if a.rank != 2: raise ValueError("SVD is only defined for a 2D matrix. Use LegPipes!") if full_matrices and ((not compute_uv) or cutoff is not None): raise ValueError("What do you want? Check your goals!") labL, labR = inner_labels a_labels = a._labels # ensure complete blocking piped_axes, a = a.as_completely_blocked() # figure out qtotal_LR qtotal_L, qtotal_R = qtotal_LR if qtotal_L is None and qtotal_R is None: qtotal_R = a.qtotal if qtotal_L is None: qtotal_L = a.chinfo.make_valid(a.qtotal - qtotal_R) elif qtotal_R is None: qtotal_R = a.chinfo.make_valid(a.qtotal - qtotal_L) elif np.any(a.qtotal != a.chinfo.make_valid(qtotal_L + qtotal_R)): raise ValueError("The entries of `qtotal_LR` have to add up to ``a.qtotal``!") qtotal_LR = qtotal_L, qtotal_R # the main work overwrite_a = (len(piped_axes) > 0) U, S, VH = _svd_worker(a, full_matrices, compute_uv, overwrite_a, cutoff, qtotal_LR, inner_qconj) if not compute_uv: return S # 'split' pipes introduced to ensure complete blocking if 0 in piped_axes: U = U.split_legs(0) if 1 in piped_axes: VH = VH.split_legs(1) U.iset_leg_labels([a_labels[0], labL]) VH.iset_leg_labels([labR, a_labels[1]]) return U, S, VH
[docs]def pinv(a, cutoff=1.e-15): """Compute the (Moore-Penrose) pseudo-inverse of a matrix. Equivalent to the following procedure: Perform a SVD, ``U, S, VH = svd(a, cutoff=cutoff)`` with a `cutoff` > 0, calculate ``P = U * diag(1/S) * VH`` (with ``*`` denoting tensordot) and return ``P.conj.transpose()``. Parameters ---------- a : (M, N) :class:`Array` Matrix to be pseudo-inverted. cuttof : float Cutoff for small singular values, as given to :func:`svd`. (Note: different convetion than numpy.) Returns ------- B : (N, M) :class:`Array` The pseudo-inverse of `a`. """ if cutoff <= 0.: raise ValueError("invalid cutoff") # follow exactly the procedure lined out. # however, use inplace methods and don't construct the diagonal matrix explicitly. U, S, VH = svd(a, cutoff=cutoff) X = VH.itranspose().iconj().iscale_axis(1. / S, axis=-1) Z = U.itranspose().iconj() return tensordot(X, Z, axes=1)
[docs]def norm(a, ord=None, convert_to_float=True): r"""Norm of flattened data. Equivalent to ``np.linalg.norm(a.to_ndarray().flatten(), ord)``. In contrast to numpy, we don't distinguish between matrices and vectors, but simply calculate the norm for the **flat** (block) data. The usual `ord`-norm is defined as :math:`(\sum_i |a_i|^{ord} )^{1/ord}`. ========== ====================================== ord norm ========== ====================================== None/'fro' Frobenius norm (same as 2-norm) np.inf ``max(abs(x))`` -np.inf ``min(abs(x))`` 0 ``sum(a != 0) == np.count_nonzero(x)`` other ususal `ord`-norm ========== ====================================== Parameters ---------- a : :class:`Array` | np.ndarray The array of which the norm should be calculated. ord : The order of the norm. See table above. convert_to_float : Convert integer to float before calculating the norm, avoiding int overflow. Returns ------- norm : float The norm over the *flat* data of the array. """ if isinstance(a, Array): return a.norm(ord, convert_to_float) elif isinstance(a, np.ndarray): if convert_to_float: new_type = np.find_common_type([np.float_, a.dtype], []) # int -> float a = np.asarray(a, new_type) # doesn't copy, if the dtype did not change. return np.linalg.norm(a.reshape((-1, )), ord) else: raise ValueError("unknown type of a")
[docs]def eigh(a, UPLO='L', sort=None): r"""Calculate eigenvalues and eigenvectors for a hermitian matrix. ``W, V = eigh(a)`` yields :math:`a = V diag(w) V^{\dagger}`. **Assumes** that a is hermitian, ``a.conj().transpose() == a``. Parameters ---------- a : :class:`Array` The hermitian square matrix to be diagonalized. UPLO : {'L', 'U'} Whether to take the lower ('L', default) or upper ('U') triangular part of `a`. sort : {'m>', 'm<', '>', '<', ``None``} How the eigenvalues should are sorted *within* each charge block. Defaults to ``None``, which is same as '<'. See :func:`argsort` for details. Returns ------- W : 1D ndarray The eigenvalues, sorted within the same charge blocks according to `sort`. V : :class:`Array` Unitary matrix; ``V[:, i]`` is normalized eigenvector with eigenvalue ``W[i]``. The first label is inherited from `A`, the second label is ``'eig'``. Notes ----- Requires the legs to be contractible. If `a` is not blocked by charge, a blocked copy is made via a permutation ``P``, :math:` a' = P a P = V' W' (V')^{\dagger}`. The eigenvectors `V` are then obtained by the reverse permutation, :math:`V = P^{-1} V'` such that `A = V W V^{\dagger}`. """ w, v = _eig_worker(True, a, sort, UPLO) # hermitian v.iset_leg_labels([a._labels[0], 'eig']) return w, v
[docs]def eig(a, sort=None): r"""Calculate eigenvalues and eigenvectors for a non-hermitian matrix. ``W, V = eig(a)`` yields :math:`a V = V diag(w)`. Parameters ---------- a : :class:`Array` The hermitian square matrix to be diagonalized. sort : {'m>', 'm<', '>', '<', ``None``} How the eigenvalues should are sorted *within* each charge block. Defaults to ``None``, which is same as '<'. See :func:`argsort` for details. Returns ------- W : 1D ndarray The eigenvalues, sorted within the same charge blocks according to `sort`. V : :class:`Array` Unitary matrix; ``V[:, i]`` is normalized eigenvector with eigenvalue ``W[i]``. The first label is inherited from `A`, the second label is ``'eig'``. Notes ----- Requires the legs to be contractible. If `a` is not blocked by charge, a blocked copy is made via a permutation ``P``, :math:` a' = P a P = V' W' (V')^{\dagger}`. The eigenvectors `V` are then obtained by the reverse permutation, :math:`V = P^{-1} V'` such that `A = V W V^{\dagger}`. """ w, v = _eig_worker(False, a, sort) # non-hermitian v.iset_leg_labels([a._labels[0], 'eig']) return w, v
[docs]def eigvalsh(a, UPLO='L', sort=None): r"""Calculate eigenvalues for a hermitian matrix. **Assumes** that a is hermitian, ``a.conj().transpose() == a``. Parameters ---------- a : :class:`Array` The hermitian square matrix to be diagonalized. UPLO : {'L', 'U'} Whether to take the lower ('L', default) or upper ('U') triangular part of `a`. sort : {'m>', 'm<', '>', '<', ``None``} How the eigenvalues should are sorted *within* each charge block. Defaults to ``None``, which is same as '<'. See :func:`argsort` for details. Returns ------- W : 1D ndarray The eigenvalues, sorted within the same charge blocks according to `sort`. Notes ----- The eigenvalues are sorted within blocks of the completely blocked legs. """ return _eigvals_worker(True, a, sort, UPLO)
[docs]def eigvals(a, sort=None): r"""Calculate eigenvalues for a hermitian matrix. Parameters ---------- a : :class:`Array` The hermitian square matrix to be diagonalized. sort : {'m>', 'm<', '>', '<', ``None``} How the eigenvalues should are sorted *within* each charge block. Defaults to ``None``, which is same as '<'. See :func:`argsort` for details. Returns ------- W : 1D ndarray The eigenvalues, sorted within the same charge blocks according to `sort`. Notes ----- The eigenvalues are sorted within blocks of the completely blocked legs. """ return _eigvals_worker(False, a, sort)
[docs]def speigs(a, charge_sector, k, *args, **kwargs): """Sparse eigenvalue decomposition ``w, v`` of square `a` in a given charge sector. Finds `k` right eigenvectors (chosen by ``kwargs['which']``) in a given charge sector, ``tensordot(A, V[i], axes=1) = W[i] * V[i]``. Parameters ---------- a : :class:`Array` A square array with contractible legs and vanishing total charge. charge_sector : charges `ndim` charges to select the block. k : int How many eigenvalues/vectors should be calculated. If the block of `charge_sector` is smaller than `k`, `k` may be reduced accordingly. *args, **kwargs : Additional arguments given to `scipy.sparse.linalg.eigs`. Returns ------- W : ndarray `k` (or less) eigenvalues V : list of :class:`Array` `k` (or less) right eigenvectors of `A` with total charge `charge_sector`. Note that when interpreted as a matrix, this is the transpose of what ``np.eigs`` normally gives. """ charge_sector = a.chinfo.make_valid(charge_sector).reshape((a.chinfo.qnumber, )) if a.rank != 2 or a.shape[0] != a.shape[1]: raise ValueError("expect a square matrix!") a.legs[0].test_contractible(a.legs[1]) if np.any(a.qtotal != a.chinfo.make_valid()): raise ValueError("Non-trivial qtotal -> Nilpotent. Not diagonizable!?") ret_eigv = kwargs.get('return_eigenvectors', args[7] if len(args) > 7 else True) piped_axes, a = a.as_completely_blocked() # ensure complete blocking # find the block correspoding to `charge_sector` in `a` block_exists = False for qinds, block in zip(a._qdata, a._data): qi = qinds[0] if np.any(a.chinfo.make_valid(a.legs[0].get_charge(qi)) != charge_sector): continue block_exists = True # found the correct `block` res = _sp_speigs(block, k, *args, **kwargs) if ret_eigv: W, V_flat = res else: W = res break if not block_exists: # block corresponding to charge_sector is zero for qi in range(a.legs[0].block_number): if np.all(a.chinfo.make_valid(a.legs[0].get_charge(qi)) == charge_sector): sl = a.legs[0].slices block_size = sl[qi + 1] - sl[qi] break else: raise ValueError("desired charge sector not present in the leg of `a`") k = min(block_size, k) W = np.zeros(k, a.dtype) V_flat = np.zeros((block_size, k), a.dtype) V_flat[:k, :k] = np.eye(k, a.dtype) # chose standard basis as eigenvectors # convert V_flat to npc Arrays and return if ret_eigv: V = [] for j in range(V_flat.shape[1]): U = zeros([a.legs[0]], dtype=a.dtype, qtotal=charge_sector) U._data = [V_flat[:, j]] U._qdata = np.array([[qi]], dtype=np.intp) if len(piped_axes) > 0: U = U.split_legs(0) V.append(U) return W, V else: return W
[docs]def expm(a): """Use scipy.linalg.expm to calculate the matrix exponential of a square matrix. Parameters ---------- a : :class:`Array` A square matrix to be exponentiated. Returns ------- exp_a : :class:`Array` The matrix exponential ``expm(a)``, calculated using scipy.linalg.expm. Same legs/labels as `a`. """ if a.rank != 2 or a.shape[0] != a.shape[1]: raise ValueError("expect a square matrix!") a.legs[0].test_contractible(a.legs[1]) if np.any(a.qtotal != a.chinfo.make_valid()): raise NotImplementedError("A*A has different qtotal than A; nilpotent matrix") piped_axes, a = a.as_completely_blocked() # ensure complete blocking res_dtype = np.find_common_type([a.dtype], [np.float64]) res = diag(1., a.legs[0], dtype=res_dtype) res._labels = a._labels[:] for qindices, block in zip(a._qdata, a._data): # non-zero blocks on the diagonal exp_block = np.asarray(scipy.linalg.expm(block), dtype=res_dtype, order='C') # main work qi = qindices[0] # `res` has all diagonal blocks, # so res._qdata = [[0, 0], [1, 1], [2, 2]...] res._data[qi] = exp_block # replace idendity block if len(piped_axes) > 0: res = res.split_legs(piped_axes) # revert the permutation in the axes return res
[docs]def qr(a, mode='reduced', inner_labels=[None, None], cutoff=None): r"""Q-R decomposition of a matrix. Decomposition such that ``A == npc.tensordot(q, r, axes=1)`` up to numerical rounding errors. Parameters ---------- a : :class:`Array` A square matrix to be exponentiated, shape ``(M,N)``. mode : 'reduced', 'complete' 'reduced': return `q` and `r` with shapes (M,K) and (K,N), where K=min(M,N) 'complete': return `q` with shape (M,M). inner_labels: [{str|None}, {str|None}] The first label is used for ``Q.legs[1]``, the second for ``R.legs[0]``. cutoff : ``None`` or float If not ``None``, discard linearly dependent vectors to given precision, which might reduce `K` of the 'reduced' mode even further. Returns ------- q : :class:`Array` If `mode` is 'complete', a unitary matrix. For `mode` 'reduced' such thatOtherwise such that :math:`q^{*}_{j,i} q_{j,k} = \delta_{i,k}` r : :class:`Array` Upper triangular matrix if both legs of A are sorted by charges; Otherwise a simple transposition (performed when sorting by charges) brings it to upper triangular form. """ if a.rank != 2: raise ValueError("expect a matrix!") a_labels = a._labels label_Q, label_R = inner_labels piped_axes, a = a.as_completely_blocked() # ensure complete blocking & sort q_data = [] r_data = [] i0 = 0 a_leg0 = a.legs[0] inner_leg_mask = np.zeros(a_leg0.ind_len, dtype=np.bool_) for qindices, block in zip(a._qdata, a._data): # non-zero blocks on the diagonal if cutoff is None: q_block, r_block = np.linalg.qr(block, mode) else: q_block, r_block = qr_li(block, cutoff) q_data.append(q_block) r_data.append(r_block) if mode != 'complete': q1, q2 = qindices i0 = a_leg0.slices[q1] inner_leg_mask[i0:i0 + q_block.shape[1]] = True if mode != 'complete': # map qindices with warnings.catch_warnings(): warnings.simplefilter("ignore") map_qind, _, inner_leg = a_leg0.project(inner_leg_mask) else: inner_leg = a_leg0 q = Array([a_leg0, inner_leg.conj()], a.dtype) q._data = q_data q._qdata = a._qdata.copy() q._qdata_sorted = False r = Array([inner_leg, a.legs[1]], a.dtype, a.qtotal) r._data = r_data r._qdata = a._qdata.copy() r._qdata_sorted = False if mode != 'complete': q._qdata[:, 1] = map_qind[q._qdata[:, 0]] r._qdata[:, 0] = q._qdata[:, 1] # copy map_qind[q._qdata[:, 0]] from q if len(piped_axes) > 0: # revert the permutation in the axes if 0 in piped_axes: if mode != 'complete': q = q.split_legs(0) else: q = q.split_legs(0, 1) r = r.split_legs(0) if 1 in piped_axes: r = r.split_legs(-1) q.iset_leg_labels([a_labels[0], label_Q]) r.iset_leg_labels([label_R, a_labels[1]]) return q, r
[docs]def to_iterable_arrays(array_list): """Similar as :func:`~tenpy.tools.misc.to_iterable`, but also enclose npc Arrays in a list.""" if isinstance(array_list, Array): array_list = [array_list] array_list = to_iterable(array_list) return array_list
# ################################## # internal helper functions # ################################## def _find_calc_dtype(a_dtype, b_dtype): """return (calc_dtype, res_dtype) suitable for BLAS calculations.""" res_dtype = np.find_common_type([a_dtype, b_dtype], []) _, calc_dtype, _ = BLAS.find_best_blas_type(dtype=res_dtype) return calc_dtype, res_dtype @use_cython def _combine_legs_worker(self, res, combine_legs, non_combined_legs, new_axes, non_new_axes, pipes): """The main work of :meth:`Array.combine_legs`: create a copy and reshape the data blocks. Assumes standard form of parameters. Parameters ---------- self : Array The array from where legs are being combined. res : Array The array to be returned, already filled with correct legs (pipes); needs `_data` and `_qdata` to be filled. Labels are set outside. combine_legs : list(1D np.array) Axes of self which are collected into pipes. non_combined_legs : 1D array ``[i for i in range(self.rank) if i not in flatten(combine_legs)]`` new_axes : 1D array The axes of the pipes in the new array. Ascending. non_new_axes 1D array ``[i for i in range(res.rank) if i not in new_axes]`` pipes : list of :class:`LegPipe` All the correct output pipes, already generated. """ # non_combined_legs: axes of self which are not in combine_legs # map `self._qdata[:, combine_leg]` to `pipe.q_map` indices for each new pipe q_map_inds = [p._map_incoming_qind(self._qdata[:, cl]) for p, cl in zip(pipes, combine_legs)] self._imake_contiguous() # get new qdata qdata = np.empty((self.stored_blocks, res.rank), dtype=np.intp) qdata[:, non_new_axes] = self._qdata[:, non_combined_legs] for j in range(len(pipes)): ax = new_axes[j] qdata[:, ax] = pipes[j].q_map[q_map_inds[j], 2] # now we have probably many duplicate rows in qdata, # since for the pipes many `q_map_ind` map to the same `qindex` # find unique entries by sorting qdata sort = np.lexsort(qdata.T) qdata = qdata[sort] old_data = [self._data[s] for s in sort] q_map_inds = [qm[sort] for qm in q_map_inds] block_start = np.zeros((self.stored_blocks, res.rank), np.intp) block_shape = np.empty((self.stored_blocks, res.rank), np.intp) block_sizes = [leg._get_block_sizes() for leg in res.legs] for ax in non_new_axes: block_shape[:, ax] = block_sizes[ax][qdata[:, ax]] for j in range(len(pipes)): ax = new_axes[j] sizes = pipes[j].q_map[q_map_inds[j], :2] block_start[:, ax] = sizes[:, 0] block_shape[:, ax] = sizes[:, 1] - sizes[:, 0] # TODO size directly in pipe!? # divide qdata into parts, which give a single new block diffs = charges._find_row_differences(qdata) # including the first and last row res_stored_blocks = len(diffs) - 1 qdata = qdata[diffs[:res_stored_blocks], :] # (keeps the dimensions) res_blockshapes = np.empty((res_stored_blocks, res.rank), np.intp) for ax in range(res.rank): res_blockshapes[:, ax] = block_sizes[ax][qdata[:, ax]] # now the hard part: map data data = [] # iterate over ranges of equal qindices in qdata for res_blockshape, beg, end in zip(res_blockshapes, diffs[:-1], diffs[1:]): new_block = np.zeros(res_blockshape, dtype=res.dtype) data.append(new_block) # copy blocks for old_row in range(beg, end): shape = block_shape[old_row] old_block = old_data[old_row].reshape(shape) charges._sliced_copy(new_block, block_start[old_row], old_block, None, shape) res._data = data res._qdata = qdata res._qdata_sorted = True # done @use_cython def _split_legs_worker(self, split_axes, cutoff): """The main work of split_legs: create a copy and reshape the data blocks. Called by :meth:`split_legs`. Assumes that the corresponding legs are LegPipes. """ # calculate mappings of axes new_split_axes_first = [] nonsplit_axes = [] new_nonsplit_axes = [] pipes = [] res_legs = self.legs[:] new_axis = 0 for axis in range(self.rank): if axis in split_axes: pipe = self.legs[axis] pipes.append(pipe) res_legs[new_axis:new_axis + 1] = pipe.legs new_split_axes_first.append(new_axis) new_axis += pipe.nlegs else: nonsplit_axes.append(axis) new_nonsplit_axes.append(new_axis) new_axis += 1 split_axes = np.array(split_axes, dtype=np.intp) N_split = split_axes.shape[0] new_split_axes_first = np.array(new_split_axes_first, np.intp) nonsplit_axes = np.array(nonsplit_axes, np.intp) new_nonsplit_axes = np.array(new_nonsplit_axes, np.intp) res = self.copy(deep=False) res.legs = res_legs res._set_shape() if self.stored_blocks == 0: return res # get new qdata q_map_slices_beg = np.zeros((self.stored_blocks, N_split), np.intp) q_map_slices_shape = np.zeros((self.stored_blocks, N_split), np.intp) for j in range(N_split): pipe = pipes[j] q_map_slices = pipe.q_map_slices qinds = self._qdata[:, split_axes[j]] q_map_slices_beg[:, j] = q_map_slices[qinds] q_map_slices_shape[:, j] = q_map_slices[ qinds + 1] # - q_map_slices[qinds] # one line below # TODO: in pipe q_map_slices_shape -= q_map_slices_beg new_data_blocks_per_old_block = np.prod(q_map_slices_shape, axis=1) old_block_inds = charges._map_blocks(new_data_blocks_per_old_block) res_stored_blocks = old_block_inds.shape[0] q_map_rows = [] for beg, shape in zip(q_map_slices_beg, q_map_slices_shape): q_map_rows.append(np.indices(shape, np.intp).reshape(N_split, -1).T + beg[np.newaxis, :]) q_map_rows = np.concatenate(q_map_rows, axis=0) # shape (res_stored_blocks, N_split) new_qdata = np.empty((res_stored_blocks, res.rank), dtype=np.intp) new_qdata[:, new_nonsplit_axes] = self._qdata[np.ix_( old_block_inds, nonsplit_axes)] # TODO faster to implement by hand? old_block_beg = np.zeros((res_stored_blocks, self.rank), dtype=np.intp) old_block_shapes = np.empty((res_stored_blocks, self.rank), dtype=np.intp) for j in range(N_split): pipe = pipes[j] a = new_split_axes_first[j] a2 = a + pipe.nlegs q_map = pipe.q_map[q_map_rows[:, j], :] new_qdata[:, a:a2] = q_map[:, 3:] old_block_beg[:, split_axes[j]] = q_map[:, 0] old_block_shapes[:, split_axes[j]] = q_map[:, 1] - q_map[:, 0] new_block_shapes = np.empty((res_stored_blocks, res.rank), dtype=np.intp) block_sizes = [leg._get_block_sizes() for leg in res.legs] for ax in range(res.rank): new_block_shapes[:, ax] = block_sizes[ax][new_qdata[:, ax]] old_block_shapes[:, nonsplit_axes] = new_block_shapes[:, new_nonsplit_axes] dtype = self.dtype new_data = [] old_data = self._data # the actual loop to split the blocks for i in range(res_stored_blocks): old_block = old_data[old_block_inds[i]] new_block = np.empty(old_block_shapes[i], dtype) charges._sliced_copy(new_block, None, old_block, old_block_beg[i], old_block_shapes[i]) new_data.append(new_block.reshape(new_block_shapes[i])) res._qdata = new_qdata res._qdata_sorted = False res._data = new_data return res def _nontrivial_grid_entries(grid): """Return a list [(idx, entry)] of non-``None`` entries in an array_like grid.""" grid = np.asarray(grid, dtype=np.object) entries = [(idx, entry) for idx, entry in np.ndenumerate(grid) if entry is not None] if len(entries) == 0: raise ValueError("No non-trivial entries in grid") return grid.shape, entries def _grid_concat_recursion(grid, axes, copy): """Simple recursion on ndim for grid_concat.""" # Copy only required on last go if grid.ndim > 1: grid = [_grid_concat_recursion(row, axes=axes[1:], copy=False) for row in grid] return concatenate(grid, axes[0], copy=copy) # (in cython, but with different arguments) def _iter_common_sorted(a, b): """Yield indices ``i, j`` for which ``a[i] == b[j]``. *Assumes* that ``a[i_start:i_stop]`` and ``b[j_start:j_stop]`` are strictly ascending. Given that, it is equivalent to (but faster than) ``[(i, j) for j, i in itertools.product(range(len(a)), range(len(b)) if a[i] == b[j]]`` """ l_a = len(a) l_b = len(b) i, j = 0, 0 res = [] while i < l_a and j < l_b: if a[i] < b[j]: i += 1 elif b[j] < a[i]: j += 1 else: res.append((i, j)) i += 1 j += 1 return res @use_cython def _inner_worker(a, b, do_conj): """Full contraction of `a` and `b` with axes in matching order.""" calc_dtype, res_dtype = _find_calc_dtype(a.dtype, b.dtype) res = res_dtype.type(0) check_qtotal = b.qtotal - a.qtotal if do_conj else b.qtotal + a.qtotal if np.any(a.chinfo.make_valid(check_qtotal) != 0): return res # can't have blocks to be contracted. if a.stored_blocks == 0 or b.stored_blocks == 0: return res # also trivial a = a.astype(calc_dtype, False) b = b.astype(calc_dtype, False) func_name = 'dotc' if do_conj else 'dotu' blas_dot = BLAS.get_blas_funcs(func_name, dtype=calc_dtype) # need to find common blocks in a and b, i.e. equal leg charges. # for faster comparison, generate 1D arrays with a combined index # F-style strides to preserve sorting! stride = charges._make_stride([l.block_number for l in a.legs], False) a_qdata = np.sum(a._qdata * stride, axis=1) a_data = a._data if not a._qdata_sorted: perm = np.argsort(a_qdata) a_qdata = a_qdata[perm] a_data = [a_data[i] for i in perm] b_qdata = np.sum(b._qdata * stride, axis=1) b_data = b._data if not b._qdata_sorted: perm = np.argsort(b_qdata) b_qdata = b_qdata[perm] b_data = [b_data[i] for i in perm] for i, j in _iter_common_sorted(a_qdata, b_qdata): res += blas_dot(a_data[i], b_data[j]) # same as res += np.inner(a_data[i].reshape((-1, )), b_data[j].reshape((-1, ))) # (or with complex conj if 'do_conj') return res def _drop_duplicate_labels(a_labels, b_labels): """Combine lists `a_labels` and `b_labels` into a new list, dropping any duplicates.""" a_labels = list(a_labels) b_labels = list(b_labels) for i, lbl in enumerate(a_labels): if lbl in b_labels: # collision: drop labels j = b_labels.index(lbl) a_labels[i] = None b_labels[j] = None a_labels.extend(b_labels) return a_labels @use_cython def _tensordot_transpose_axes(a, b, axes): """Step 1: Transpose a,b if necessary.""" if a.chinfo != b.chinfo: raise ValueError("Different ChargeInfo") try: axes_a, axes_b = axes axes_int = False except TypeError: axes = int(axes) axes_int = True if not axes_int: a = a.copy(deep=False) # shallow copy allows to call itranspose b = b.copy(deep=False) # which would otherwise break views. # step 1.) of the implementation notes: bring into standard form by transposing axes_a = a.get_leg_indices(to_iterable(axes_a)) axes_b = b.get_leg_indices(to_iterable(axes_b)) if len(axes_a) != len(axes_b): raise ValueError("different lens of axes for a, b: " + repr(axes)) not_axes_a = [i for i in range(a.rank) if i not in axes_a] not_axes_b = [i for i in range(b.rank) if i not in axes_b] a.itranspose(not_axes_a + axes_a) b.itranspose(axes_b + not_axes_b) axes = len(axes_a) # now `axes` is integer # check for contraction compatibility if not optimize(OptimizationFlag.skip_arg_checks): for lega, legb in zip(a.legs[-axes:], b.legs[:axes]): lega.test_contractible(legb) elif a.shape[-axes:] != b.shape[:axes]: # check at least the shape raise ValueError("Shape mismatch for tensordot") return a, b, axes def _tensordot_pre_reshape(data, cut, dtype, same_shape_before_cut=True): """Reshape blocks to (fortran) matrix/vector (depending on `cut`)""" if cut == 0 or cut == data[0][0].ndim: # special case: reshape to 1D vectors return [[np.reshape(T, (-1, )).astype(dtype, order='F', copy=False) for T in blocks] for blocks in data] res = [] for blocks in data: if same_shape_before_cut: p = 1 for s in blocks[0].shape[:cut]: p *= s shape = (p, -1) else: p = 1 for s in blocks[0].shape[cut:]: p *= s shape = (-1, p) res.append([np.reshape(T, shape).astype(dtype, order='F', copy=False) for T in blocks]) return res def _tensordot_pre_worker(a, b, cut_a, cut_b): """Pre-calculations before the actual matrix procut. Called by :func:`_tensordot_worker`. See doc-string of :func:`tensordot` for details on the implementation. Parameters ---------- a, b : :class:`Array` the arrays to be contracted with tensordot. Should have non-empty ``a._data`` cut_a, cut_b : int contract `a.legs[cut_a:]` with `b.legs[:cut_b]` Returns ------- a_pre_result, b_pre_result : tuple In the following order, it contains for `a`, and `b` respectively: a_data : list of reshaped tensors a_qdata_contr : 2D array with qindices of `a` which we need to sum over a_qdata_keep : 2D array of the qindices of `a` which will appear in the final result a_slices : partition to map the indices of a_*_keep to a_data f_dot_sum : function a wrapper around a suitable BLAS function for perfoming the matrix product of single blocks sum over the results. For ``a, a2, ...`` from ``a_data`` (and similar for ``b_data``) the code ``s = f_dot_sum(a, b, None); s = f_dot_sum(a2, b2, s); ....`` should be equivalent to (yet faster than) ``s = np.dot(a, b); s += np.dot(a2, b2); ... ``. res_dtype : np.dtype The data type which should be chosed for the result. (The `dtype` of the ``s`` above might differ from `res_dtype`!). """ # convert qindices over which we sum to a 1D array for faster lookup/iteration # F-style strides to preserve sorting stride = charges._make_stride([l.block_number for l in a.legs[cut_a:]], False) a_qdata_contr = np.sum(a._qdata[:, cut_a:] * stride, axis=1) # lex-sort a_qdata, dominated by the axes kept, then the axes summed over. a_sort = np.lexsort(np.append(a_qdata_contr[:, np.newaxis], a._qdata[:, :cut_a], axis=1).T) a_qdata_keep = a._qdata[a_sort, :cut_a] a_qdata_contr = a_qdata_contr[a_sort] a_data = a._data a_data = [a_data[i] for i in a_sort] # combine all b_qdata[axes_b] into one column (with the same stride as before) b_qdata_contr = np.sum(b._qdata[:, :cut_b] * stride, axis=1) # lex-sort b_qdata, dominated by the axes summed over, then the axes kept. b_data = b._data if not b._qdata_sorted: b_sort = np.lexsort(np.append(b_qdata_contr[:, np.newaxis], b._qdata[:, cut_b:], axis=1).T) b_qdata_keep = b._qdata[b_sort, cut_b:] b_qdata_contr = b_qdata_contr[b_sort] b_data = [b_data[i] for i in b_sort] else: b_qdata_keep = b._qdata[:, cut_b:] # find blocks where qdata_a[not_axes_a] and qdata_b[not_axes_b] change a_slices = charges._find_row_differences(a_qdata_keep) b_slices = charges._find_row_differences(b_qdata_keep) # the slices divide a_data and b_data into rows and columns of the final result a_data = [a_data[i:i2] for i, i2 in zip(a_slices[:-1], a_slices[1:])] b_data = [b_data[j:j2] for j, j2 in zip(b_slices[:-1], b_slices[1:])] a_qdata_contr = [a_qdata_contr[i:i2] for i, i2 in zip(a_slices[:-1], a_slices[1:])] b_qdata_contr = [b_qdata_contr[i:i2] for i, i2 in zip(b_slices[:-1], b_slices[1:])] a_qdata_keep = a_qdata_keep[a_slices[:-1]] b_qdata_keep = b_qdata_keep[b_slices[:-1]] a_shape_keep = [blocks[0].shape[:cut_a] for blocks in a_data] b_shape_keep = [blocks[0].shape[cut_b:] for blocks in b_data] # determine calculation type and result type calc_dtype, res_dtype = _find_calc_dtype(a.dtype, b.dtype) # reshape a_data and b_data to matrix/vector in fortran order a_data = _tensordot_pre_reshape(a_data, cut_a, calc_dtype, same_shape_before_cut=True) b_data = _tensordot_pre_reshape(b_data, cut_b, calc_dtype, same_shape_before_cut=False) # determine blas function f_name = 'gemv' if (cut_a == 0 or cut_b == b.rank) else 'gemm' blas_dot = BLAS.get_blas_funcs(f_name, dtype=calc_dtype) kw_overwrite = 'overwrite_c' if f_name == 'gemm' else 'overwrite_y' kw_overwrite = {kw_overwrite: True} if cut_a > 0: def fast_dot_sum(a, b, a_qdata, b_qdata): """BLAS wrapper to perform contraction in a fast way. Equivalent to:: np.sum([np.dot(a[k1], b[k2]) for k1, k2 in _iter_common_sorted(a_qdata, b_qdata)], axis=0) Returns ``None`` if no ``(k1, k2)`` pair existed. """ ks = _iter_common_sorted(a_qdata, b_qdata) if len(ks) == 0: return None k1, k2 = ks[0] sum_ = blas_dot(1., a[k1], b[k2]) for k1, k2 in ks[1:]: sum_ = blas_dot(1., a[k1], b[k2], 1., sum_, **kw_overwrite) return sum_ else: # special case: `a` contains 1D vectors, so we need blas_dot(b, a, trans=True) kw_no_overwrite = {'trans': True} kw_overwrite.update(kw_no_overwrite) def fast_dot_sum(a, b, a_qdata, b_qdata): # same as above fast_dot_sum, but for special case that a contains vectors ks = _iter_common_sorted(a_qdata, b_qdata) if len(ks) == 0: return None k1, k2 = ks[0] sum_ = blas_dot(1., b[k2], a[k1], **kw_no_overwrite) for k1, k2 in ks[1:]: sum_ = blas_dot(1., b[k2], a[k1], 1., sum_, **kw_overwrite) return sum_ # collect and return the results a_pre_result = a_data, a_qdata_contr, a_qdata_keep, a_shape_keep b_pre_result = b_data, b_qdata_contr, b_qdata_keep, b_shape_keep return a_pre_result, b_pre_result, fast_dot_sum, res_dtype @use_cython def _tensordot_worker(a, b, axes): """Main work of tensordot, called by :func:`tensordot`. Assumes standard form of parameters: axes is integer, sum over the last `axes` legs of `a` and first `axes` legs of `b`. Notes ----- Looking at the source of numpy's tensordot (which is just 62 lines of python code), you will find that it has the following strategy: 1. Transpose `a` and `b` such that the axes to sum over are in the end of `a` and front of `b`. 2. Combine the legs `axes`-legs and other legs with a `np.reshape`, such that `a` and `b` are matrices. 3. Perform a matrix product with `np.dot`. 4. Split the remaining axes with another `np.reshape` to obtain the correct shape. The main work is done by `np.dot`, which calls LAPACK to perform the simple matrix product. [This matrix multiplication of a ``NxK`` times ``KxM`` matrix is actually faster than the O(N*K*M) needed by a naive implementation looping over the indices.] We follow the same overall strategy, viewing the :class:`Array` as a tensor with data block entries. Step 1) is performed directly in :func:`tensordot`. The steps 2) and 4) could be implemented with :meth:`Array.combine_legs` and :meth:`Array.split_legs`. However, that would actually be an overkill: we're not interested in the full charge data of the combined legs (which would be generated in the LegPipes). Instead, we just need to track the qindices of the `a._qdata` and `b._qdata` carefully. Our step 2) is implemented in :func:`_tensordot_pre_worker`: We split `a._qdata` in `a_qdata_keep` and `a_qdata_sum`, and similar for `b`. Then, view `a` is a matrix :math:`A_{i,k1}` and `b` as :math:`B_{k2,j}`, where `i` can be any row of `a_qdata_keep`, `j` can be any row of `b_qdata_keep`. The `k1` and `k2` are rows of `a_qdata_sum` and `b_qdata_sum`, which stem from the same legs (up to a :meth:`LegCharge.conj()`). In our storage scheme, `a._data[s]` then contains the block :math:`A_{i,k1}` for ``j = a_qdata_keep[s]`` and ``k1 = a_qdata_sum[s]``. To identify the different indices `i` and `j`, it is easiest to lexsort in the `s`. Note that we give priority to the `#_qdata_keep` over the `#_qdata_sum`, such that equal rows of `i` are contiguous in `#_qdata_keep`. Then, they are identified with :func:`charges._find_row_differences`. Now, the goal is to calculate the sums :math:`C_{i,j} = sum_k A_{i,k} B_{k,j}`, analogous to step 3) above. This is implemented in :func:`_tensordot_worker`. It is done 'naively' by explicit loops over ``i``, ``j`` and ``k``. However, this is not as bad as it sounds: First, we loop only over existent ``i`` and ``j`` (in the sense that there is at least some non-zero block with these ``i`` and ``j``). Second, if the ``i`` and ``j`` are not compatible with the new total charge, we know that ``C_{i,j}`` will be zero. Third, given ``i`` and ``j``, the sum over ``k`` runs only over ``k1`` with nonzero :math:`A_{i,k1}`, and ``k2` with nonzero :math:`B_{k2,j}`. How many multiplications :math:`A_{i,k} B_{k,j}` we actually have to perform depends on the sparseness. In the ideal case, if ``k`` (i.e. a LegPipe of the legs summed over) is completely blocked by charge, the 'sum' over ``k`` will contain at most one term! """ chinfo = a.chinfo if a.stored_blocks == 0 or b.stored_blocks == 0: # special case: `a` or `b` is 0 return zeros(a.legs[:-axes] + b.legs[axes:], np.find_common_type([a.dtype, b.dtype], []), a.qtotal + b.qtotal) cut_a = a.rank - axes cut_b = axes a_pre_result, b_pre_result, fast_dot_sum, res_dtype = _tensordot_pre_worker(a, b, cut_a, cut_b) a_data, a_qdata_contr, a_qdata_keep, a_shape_keep = a_pre_result b_data, b_qdata_contr, b_qdata_keep, b_shape_keep = b_pre_result # Step 3) loop over column/row of the result # first find output colum/row indices of the result, which are compatible with the charges qtotal = chinfo.make_valid(a.qtotal + b.qtotal) a_charges_keep = charges._partial_qtotal(a.chinfo, a.legs[:cut_a], a_qdata_keep, +1, None) # fast way to matb_charges_keep, ch the find the compatible indices a_lookup_charges = list_to_dict_list(a_charges_keep) # lookup table ``charge -> [row_a]`` # b_charges_match: for each row in a, which charge in b is compatible? b_charges_match = charges._partial_qtotal(a.chinfo, b.legs[cut_b:], b_qdata_keep, -1, qtotal) # (rows_a changes faster than cols_b, such that the resulting array is qdata lex-sorted) # determine output qdata res_data = [] res_qdata_a = [] res_qdata_b = [] for col_b, charge_match in enumerate(b_charges_match): rows_a = a_lookup_charges.get(tuple(charge_match), []) # empty list if no match for row_a in rows_a: block_contr = fast_dot_sum(a_data[row_a], b_data[col_b], a_qdata_contr[row_a], b_qdata_contr[col_b]) if block_contr is not None: # no common blocks # Step 4) reshape back to tensors block_contr = block_contr.reshape(a_shape_keep[row_a] + b_shape_keep[col_b]) res_data.append(block_contr.astype(res_dtype, copy=False)) res_qdata_a.append(a_qdata_keep[row_a]) res_qdata_b.append(b_qdata_keep[col_b]) res = Array(a.legs[:cut_a] + b.legs[cut_b:], res_dtype, qtotal) if len(res_data) == 0: return res # (at least one entry is non-empty, so res_qdata[keep] is also not empty) res._qdata = np.concatenate((res_qdata_a, res_qdata_b), axis=1) res._qdata_sorted = True res._data = res_data return res def _svd_worker(a, full_matrices, compute_uv, overwrite_a, cutoff, qtotal_LR, inner_qconj): """Main work of svd. Assumes that `a` is 2D and completely blocked. """ chinfo = a.chinfo qtotal_L, qtotal_R = qtotal_LR at = 0 # will be gradually increased, counting the number of singular values S = [] if compute_uv: U_data = [] VH_data = [] new_leg_slices = [] new_leg_slices_full = [] at_full = 0 blocks_kept = [] # main loop for i in range(len(a._data)): block = a._data[i] if compute_uv: U_b, S_b, VH_b = svd_flat(block, full_matrices, True, overwrite_a, check_finite=True) if anynan(U_b) or anynan(VH_b) or anynan(S_b): warnings.warn("Svd (gesdd) gave NaNs. Try again with gesvd") # give it another try with the other (more stable) svd driver U_b, S_b, VH_b = svd_flat(block, full_matrices, True, overwrite_a, check_finite=True, lapack_driver='gesvd') if anynan(U_b) or anynan(VH_b) or anynan(S_b): raise ValueError("NaN in U_b {0:d} and/or VH_b: {1:d}".format( np.sum(np.isnan(U_b)), np.sum(np.isnan(VH_b)))) else: S_b = svd_flat(block, False, False, overwrite_a, check_finite=True) if anynan(S_b): raise ValueError("NaN in S: " + str(np.sum(np.isnan(S_b)))) if cutoff is not None: keep = (S_b > cutoff) # bool array S_b = S_b[keep] if compute_uv: U_b = U_b[:, keep] VH_b = VH_b[keep, :] num = len(S_b) if num > 0: # have new singular values S.append(S_b) if compute_uv: blocks_kept.append(i) new_leg_slices.append(at) new_leg_slices_full.append(at_full) at_full += max(block.shape) at += num U_data.append(U_b.astype(a.dtype, copy=False)) VH_data.append(VH_b.astype(a.dtype, copy=False)) if len(S) == 0: raise RuntimeError("SVD found no singluar values") # (at least none > cutoff) S = np.concatenate(S) if not compute_uv: return (None, S, None) # else: compute_uv is True blocks_kept = np.array(blocks_kept, np.intp) nblocks = blocks_kept.shape[0] qi_L, qi_R = a._qdata[blocks_kept, :].T qi_C = np.arange(nblocks, dtype=np.intp) U_qdata = np.stack([qi_L, qi_C], axis=1) VH_qdata = np.stack([qi_C, qi_R], axis=1) new_leg_slices.append(at) new_leg_slices = np.array(new_leg_slices, np.intp) new_leg_charges = (qtotal_R - a.legs[1].get_charge(qi_R)) * inner_qconj new_leg_charges = chinfo.make_valid(new_leg_charges) new_leg_R = LegCharge.from_qind(chinfo, new_leg_slices, new_leg_charges, inner_qconj) new_leg_L = new_leg_R.conj() if full_matrices: new_leg_slices_full.append(at_full) new_leg_slices_full = np.array(new_leg_slices_full, np.intp) new_leg_full = LegCharge.from_qind(chinfo, new_leg_slices_full, new_leg_charges, inner_qconj) if a.shape[0] >= a.shape[1]: # new_leg_R is fine new_leg_L = new_leg_full.conj() else: # new_leg_L is fine new_leg_R = new_leg_full U = Array([a.legs[0], new_leg_L], a.dtype, qtotal_L) VH = Array([new_leg_R, a.legs[1]], a.dtype, qtotal_R) U._data = U_data U._qdata = np.array(U_qdata, dtype=np.intp) U._qdata_sorted = a._qdata_sorted VH._data = VH_data VH._qdata = np.array(VH_qdata, dtype=np.intp) VH._qdata_sorted = a._qdata_sorted return U, S, VH def _eig_worker(hermitian, a, sort, UPLO='L'): """Worker for ``eig``, ``eigh``""" if a.rank != 2 or a.shape[0] != a.shape[1]: raise ValueError("expect a square matrix!") a.legs[0].test_contractible(a.legs[1]) if np.any(a.qtotal != a.chinfo.make_valid()): raise ValueError("Non-trivial qtotal -> Nilpotent. Not diagonizable!?") piped_axes, a = a.as_completely_blocked() # ensure complete blocking dtype = np.float if hermitian else np.complex resw = np.zeros(a.shape[0], dtype=dtype) resv = diag(1., a.legs[0], dtype=np.promote_types(dtype, a.dtype)) # w, v now default to 0 and the Identity for qindices, block in zip(a._qdata, a._data): # non-zero blocks on the diagonal if hermitian: rw, rv = np.linalg.eigh(block, UPLO) else: rw, rv = np.linalg.eig(block) if sort is not None: # apply sorting options perm = argsort(rw, sort) rw = np.take(rw, perm) rv = np.take(rv, perm, axis=1) qi = qindices[0] # both `a` and `resv` are sorted and share the same qindices resv._data[qi] = rv # replace idendity block resw[a.legs[0].get_slice(qi)] = rw # replace eigenvalues if len(piped_axes) > 0: resv = resv.split_legs(0) # the 'outer' facing leg is permuted back. return resw, resv def _eigvals_worker(hermitian, a, sort, UPLO='L'): """Worker for ``eigvals``, ``eigvalsh``""" if a.rank != 2 or a.shape[0] != a.shape[1]: raise ValueError("expect a square matrix!") a.legs[0].test_contractible(a.legs[1]) if np.any(a.qtotal != a.chinfo.make_valid()): raise ValueError("Non-trivial qtotal -> Nilpotent. Not diagonizable!?") piped_axes, a = a.as_completely_blocked() # ensure complete blocking dtype = np.float if hermitian else np.complex resw = np.zeros(a.shape[0], dtype=dtype) # w now default to 0 for qindices, block in zip(a._qdata, a._data): # non-zero blocks on the diagonal if hermitian: rw = np.linalg.eigvalsh(block, UPLO) else: rw = np.linalg.eigvals(block) if sort is not None: # apply sorting options perm = argsort(rw, sort) rw = np.take(rw, perm) qi = qindices[0] # both `a` and `resv` are sorted and share the same qindices resw[a.legs[0].get_slice(qi)] = rw # replace eigenvalues return resw def __pyx_unpickle_Array(type_, checksum, state): """Allow to unpickle Arrays created with Cython-compiled TenPy version 0.3.0.""" res = Array.__new__(Array) if state is not None: # doesn't happen on my computer... res.__setstate__(state) return res