# -*- coding: iso-8859-1 -*-
# results.py
# Results module
# Copyright 2011-2013 Giuseppe Venturini
# This file is part of the ahkab simulator.
#
# Ahkab is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 2 of the License.
#
# Ahkab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License v2
# along with ahkab. If not, see <http://www.gnu.org/licenses/>.
"""
This module provides classes for easy, dictionary-like access to simulation
results.
Simulation results are typically returned upon successful simulation of a
circuit and the user is not expected to use their constructor, but rather
to use the methods they provide to access their data set.
Overview of the data interface
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The solution classes define special methods according to their simulation
type but they all subclass :class:`solution`, which provides the shared
data interface.
The interface allows for accessing the values as::
>>> ac_sol.keys()
['f', 'Vn1', 'Vn2', 'I(V1)', 'I(L1)', 'I(L2)']
Where ``ac_sol`` is a generic example instance of :class:`ac_solution`.
Checking with the ``in`` construct::
>>> 'Vn1' in ac_sol
True
Access any variable in the solution object::
>>> ac_sol['f']
array([ 6098.38572827, 6102.08394991, 6105.78441425, 6109.48712265,
6113.19207648, 6116.89927708, 6120.60872583, 6124.32042408,
[... omissis ...]
6463.83880528, 6467.75864729, 6471.68086639])
Iterate over the results::
>>> for var in ac_sol:
... # do something with ac_sol[var]
... pass
Convenience methods are available to identify and access the independent,
swept variable, when it is available::
>>> ac_sol.get_xlabel()
'f'
>>> ac_sol.get_x()
array([ 6098.38572827, 6102.08394991, 6105.78441425, 6109.48712265,
6113.19207648, 6116.89927708, 6120.60872583, 6124.32042408,
[... omissis ...]
6463.83880528, 6467.75864729, 6471.68086639])
Index of the solution classes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
ac_solution
dc_solution
op_solution
pss_solution
pz_solution
symbolic_solution
tran_solution
Module reference
~~~~~~~~~~~~~~~~
"""
from __future__ import (unicode_literals, absolute_import,
division, print_function)
import sys
import time
import pickle
import re
import numpy as np
from . import circuit
from . import components
from . import printing
from . import options
from . import constants
from . import csvlib
from .py3compat import text_type
from .__version__ import __version__
csvlib.SEPARATOR = "\t"
class _mutable_data(object):
def __init__(self):
self._init_file_done = False
def _add_data(self, data):
"""Add the data matrix to the results set."""
csvlib.write_csv(self.filename, data, self.variables, append=self._init_file_done)
self._init_file_done = True
[docs]class solution(object):
"""Base class storing a set of generic simulation results.
This class is not meant to be accessed directly, rather it is
subclassed by the classes for the specific simulation solutions.
**Parameters:**
circ : circuit instance
the circuit instance of the simulated circuit.
outfile : string
the filename of the save file
"""
def __init__(self, circ, outfile):
self.timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
self.netlist_file = circ.filename
self.netlist_title = circ.title
self.vea = options.vea
self.ver = options.ver
self.iea = options.iea
self.ier = options.ier
self.gmin = options.gmin
self.cmin = options.cmin
self.temp = constants.T
self.filename = outfile
self._init_file_done = False
self.skip_nodes_list = [] # nodi da saltare, solo interni
self.variables = []
self.units = case_insensitive_dict()
self.iter_index = 0
# Please redefine this sol_type in the subclasses
self.sol_type = None
[docs] def asarray(self):
"""Return all data.
.. note::
This method loads to memory a possibly huge data matrix.
"""
data, _, _, _ = csvlib.load_csv(self.filename, load_headers=[],
verbose=0)
return data
# Access as a dictionary BY VARIABLE NAME:
def __len__(self):
"""Get the number of variables in the results set."""
return len(self.variables)
def __getitem__(self, name):
"""Get a specific variable, as from a dictionary."""
# data, headers, pos, EOF = csvlib.load_csv(...)
try:
data, _, _, _ = csvlib.load_csv(self.filename, load_headers=[name],
nsamples=None, skip=0, verbose=0)
except ValueError:
raise KeyError(name)
return data.reshape((-1,))
[docs] def get(self, name, default=None):
"""Get a solution by variable name."""
try:
# data, headers, pos, EOF = csvlib.load_csv(...)
data, _, _, _ = csvlib.load_csv(self.filename, load_headers=[name],
nsamples=None, skip=0, verbose=0)
except ValueError:
return default
return data.reshape((-1,))
[docs] def has_key(self, name):
"""Determine whether the result set contains a variable."""
return name.upper() in [v.upper() for v in self.variables]
def __contains__(self, name):
"""Determine whether the result set contains a variable."""
return name.upper() in [v.upper() for v in self.variables]
[docs] def keys(self):
"""Get all of the results set's variables names."""
return self.variables
[docs] def values(self):
"""Get all of the results set's variables values."""
# data, headers, pos, EOF = csvlib.load_csv(...)
data, _, _, _ = csvlib.load_csv(self.filename,
load_headers=self.variables,
nsamples=None, skip=0, verbose=0)
values = [data[i, :] for i in range(data.shape[0])]
return values
[docs] def items(self):
return list(zip(self.keys(), self.values()))
# iterator methods
def __iter__(self):
self.iter_index = 0
self.iter_data, self.iter_headers, _, _ = csvlib.load_csv(self.filename)
return self
[docs] def next(self):
return self.__next__()
def __next__(self):
if self.iter_index == len(self.iter_headers):
self.iter_index = 0
raise StopIteration
else:
next_i = self.iter_headers[self.iter_index]
if hasattr(self.iter_data, 'shape'):
next_d = self.iter_data[self.iter_index, :]
else:
next_d = self.iter_data[self.iter_index]
nxt = next_i, next_d
self.iter_index += 1
return nxt
[docs]class op_solution(solution, _mutable_data):
"""OP results
**Parameters:**
x : ndarray
the result set
error : ndarray
the residual error after solution,
circ : circuit instance
the circuit instance of the simulated circuit
outfile: str
the file to write the results to.
Use "stdout" to write to std output.
iterations, int, optional
The number of iterations needed for convergence, if known.
"""
def __init__(self, x, error, circ, outfile, iterations=0):
solution.__init__(self, circ, outfile)
self.sol_type = "OP"
self.iterations = iterations
# We have mixed current and voltage results
# per primi vengono tanti valori di tensioni quanti sono i nodi del circuito meno
# uno, quindi tante correnti quanti sono gli elementi definiti in tensione presenti
# (per questo, per misurare una corrente, si può fare uso di generatori di tensione
# da 0V)
nv_1 = circ.get_nodes_number() - 1 # numero di soluzioni di tensione (al netto del ref)
self.results = case_insensitive_dict()
self.errors = case_insensitive_dict()
self.x = x
for index in range(nv_1):
varname = ("V" + str(circ.nodes_dict[index + 1])).upper()
self.variables += [varname]
self.results.update({varname: x[index, 0]})
self.errors.update({varname: error[index, 0]})
self.units.update({varname: "V"})
if circ.is_int_node_internal_only(index+1):
self.skip_nodes_list.append(index)
index = nv_1 - 1
for elem in circ:
if circuit.is_elem_voltage_defined(elem):
index = index + 1
varname = ("I("+elem.part_id.upper()+")").upper()
self.variables += [varname]
self.results.update({varname: x[index, 0]})
self.errors.update({varname: error[index, 0]})
self.units.update({varname: "A"})
self._op_keys, self._op_info, self.tot_power = self._get_elements_op(circ, x)
def __str__(self):
str_repr = \
(("OP simulation results for '%s'" % (self.netlist_title,)) +
('(netlist %s)'%(self.netlist_file,) if self.netlist_file else '') +
('.\nRun on %s, data file %s.\n' % \
(self.timestamp, self.filename)))
return str_repr + self.get_table_array()
def __getitem__(self, name):
"""Get a specific variable, as from a dictionary."""
if not name.upper() in [v.upper() for v in self.variables]:
raise KeyError
his = csvlib.get_headers_index(self.variables, [name], verbose=0)
return self.x[his]
[docs] def get(self, name, default=None):
"""Get a solution by variable name."""
try:
data = self.__getitem__(name)
except KeyError:
return default
return data
[docs] def asarray(self):
"""Get all data as a ``numpy`` array"""
return self.x
[docs] def get_table_array(self):
headers = ("Variable", "Units", "Value", "Error", "%")
table = []
for v in self.variables:
if self.results[v] != 0:
relerror = self.errors[v]/self.results[v]*100.0
else:
relerror = 0.0
line = (v, self.units[v], self.results[v], self.errors[v], '%d' %
relerror)
table.append(line)
return printing.table(table, headers=headers)
def _get_elements_op(self, circ, x):
"""Returns"""
tot_power = 0
i_index = 0
nv_1 = circ.get_nodes_number() - 1
op_info = {}
op_keys = {}
for elem in circ:
ports_v_v = []
if hasattr(elem, "get_op_info"):
if elem.is_nonlinear:
# build the drive ports vector
oports = elem.get_output_ports()
for index in range(len(oports)):
dports = elem.get_drive_ports(index)
ports_v = []
for port in dports:
tempv = 0
if port[0]:
tempv = x[port[0]-1]
if port[1]:
tempv = tempv - x[port[1]-1]
ports_v.append(tempv)
ports_v_v.append(ports_v)
else:
port = (elem.n1, elem.n2)
tempv = 0
if port[0]:
tempv = x[port[0]-1]
if port[1]:
tempv = tempv - x[port[1]-1]
ports_v_v = ((tempv,),)
if circuit.is_elem_voltage_defined(elem):
i = circ.find_vde_index(elem.part_id)
nv_1 = circ.get_nodes_number() - 1
opk, opi = elem.get_op_info(ports_v_v, x[nv_1 + i])
else:
opk, opi = elem.get_op_info(ports_v_v)
if elem.part_id[0].upper() != 'M':
if elem.part_id[0].upper() in op_info:
op_info.update({elem.part_id[0].upper():
op_info[elem.part_id[0].upper()]+[opi]})
#assert set(opk) == set(op_keys[elem.part_id[0].upper()])
else:
op_info.update({elem.part_id[0].upper():[opi]})
op_keys.update({elem.part_id[0].upper():[opk]})
else:
op_info.update({elem.part_id.upper():opi})
op_keys.update({elem.part_id.upper():[[]]})
if isinstance(elem, components.sources.GISource):
v = 0
v = v + x[elem.n1-1] if elem.n1 != 0 else v
v = v - x[elem.n2-1] if elem.n2 != 0 else v
vs = 0
vs = vs + x[elem.n1-1] if elem.sn1 != 0 else vs
vs = vs - x[elem.n2-1] if elem.sn2 != 0 else vs
tot_power = tot_power - v*vs*elem.alpha
elif isinstance(elem, components.sources.ISource):
v = 0
v = v + x[elem.n1-1] if elem.n1 != 0 else v
v = v - x[elem.n2-1] if elem.n2 != 0 else v
tot_power = tot_power - v*elem.I()
elif isinstance(elem, components.sources.VSource) or \
isinstance(elem, components.sources.EVSource):
v = 0
v = v + x[elem.n1-1] if elem.n1 != 0 else v
v = v - x[elem.n2-1] if elem.n2 != 0 else v
tot_power = tot_power - v*x[nv_1 + i_index, 0]
i_index = i_index + 1
elif isinstance(elem, components.sources.FISource):
local_i_index = 0
found_source = False
for e in circ:
if circuit.is_elem_voltage_defined(e):
if isinstance(e, components.sources.VSource) and e.part_id.lower() == elem.source_id.lower():
found_source = True
break
else:
local_i_index += 1
if not found_source:
raise RuntimeError("Sensing voltage source %s for %s not found. BUG!" %
(elem.source_id, elem.part_id))
v = 0.
v = v + x[elem.n1 - 1] if elem.n1 != 0 else v
v = v - x[elem.n2 - 1] if elem.n2 != 0 else v
tot_power = tot_power - v * elem.alpha * x[nv_1 + local_i_index, 0]
elif isinstance(elem, components.sources.HVSource):
try:
local_i_index = circ.find_vde_index(elem.source_id)
except ValueError:
raise RuntimeError("Sensing voltage source %s for %s not found. BUG!" %
(elem.source_id, elem.part_id))
local_i_index2 = circ.find_vde_index(elem.part_id)
tot_power = tot_power - elem.alpha*x[nv_1 + local_i_index, 0]* \
x[nv_1 + local_i_index2, 0]
elif circuit.is_elem_voltage_defined(elem):
i_index = i_index + 1
#op_info.append("TOTAL POWER: %e W\n" % (tot_power,))
return op_keys, op_info, tot_power
[docs] def write_to_file(self, filename=None):
if filename is None and self.filename is None:
# maybe warn the user here?
return
if filename is None:
filename = self.filename
if filename != 'stdout':
fp = printing.open_utf8(filename+"info")
else:
fp = sys.stdout
fp.write(self.timestamp+"\n")
fp.write("ahkab v. "+__version__+" (c) 2006-2015 Giuseppe Venturini\n\n")
fp.write("Operating Point (OP) analysis\n\n")
fp.write("Netlist: %s\nTitle: %s\n" % (self.netlist_file, self.netlist_title))
fp.write("At %.2f K\n" % (self.temp,))
fp.write("Options:\n\tvea = %e\n\tver = %f\n\tiea = %e\n\tier = %f\n\tgmin = %e\n" \
% (self.vea, self.ver, self.iea, self.ier, self.gmin))
fp.write("\nConvergence reached in %d iterations.\n" % (self.iterations,))
fp.write("\n========\n")
fp.write("RESULTS:\n")
fp.write("========\n\n")
vtable = self.get_table_array()
fp.write(vtable+'\n')
fp.write("\n========================\n")
fp.write("ELEMENTS OP INFORMATION:\n")
fp.write("========================\n\n")
for k in sorted(self._op_info.keys()):
t = printing.table(self._op_info[k], headers=self._op_keys[k][0])
fp.write(t + '\n\n')
fp.write('Total power dissipation: %g W\n\n' % self.tot_power)
fp.flush()
if filename != 'stdout':
fp.close()
# save to .op file
self._add_data(self.x)
[docs] def print_short(self):
"""Print a short, essential representation of the OP results"""
table = []
line = []
for v in self.variables:
line.append("%s: %g %s" % \
(v, self.results[v], self.units[v]))
if len(line) == 5:
table.append(line)
line = []
if len(line) > 0: # add the last line
line += [""]*(5 - len(line))
table.append(line)
print(printing.table(table))
@staticmethod
[docs] def gmin_check(op2, op1):
"""Checks the differences between two sets of OP results.
It is assumed that one set of results is calculated with Gmin, the other without.
**Parameters:**
op1, op2: op_solution instances
the results vectors, interchangeable
**Returns:**
test_fail_variables : list
The list of the variables that did not pass the test. They are extracted from
the op_solution objects. If the check was passed, this is an empty list.
"""
check_failed_vars = []
for v in op1.variables:
abserr = abs(op2.results[v] - op1.results[v])
if op1.units[v] == 'V':
if abserr > options.ver*max(abs(op1.results[v]),
abs(op2.results[v])) + options.vea:
check_failed_vars.append(v)
elif op1.units[v] == 'A':
if abserr > options.ier*max(abs(op1.results[v]),
abs(op2.results[v])) + options.iea:
check_failed_vars.append(v)
else:
print("Unrecognized unit... Bug.")
return check_failed_vars
[docs] def values(self):
"""Get all of the results set's variables values."""
return np.squeeze(self.x).tolist()
[docs] def items(self):
vlist = []
for j in range(self.x.shape[0]):
vlist.append(self.x[j, 0])
return list(zip(self.variables, vlist))
# iterator methods
def __iter__(self):
self._iter_index = 0
return self
[docs] def next(self):
if self._iter_index == len(self.variables):
self._iter_index = 0
raise StopIteration
else:
nxt = self.variables[self._iter_index], \
self.x[self._iter_index]
self._iter_index += 1
return nxt
def __next__(self):
return self.next()
[docs]class ac_solution(solution, _mutable_data):
"""AC results
**Parameters:**
circ : circuit instance
the circuit instance of the simulated circuit
start : float
the AC sweep frequency start value, in Hz.
stop : float
the AC sweep frequency stop value, in Hz.
points : int
the AC sweep total points.
stype : str
the type of sweep, ``"LOG"``, ``"LIN"`` or arb. ``"POINTS"``.
op : op_solution
the linearization Operating Point used to compute the results.
outfile: str
the file to write the results to. Use ``"stdout"`` to write to the
standard output.
"""
def __init__(self, circ, start, stop, points, stype, op, outfile):
solution.__init__(self, circ, outfile)
self.sol_type = "AC"
self.linearization_op = op
self.stype = stype
self.ostart, self.ostop, self.opoints = start, stop, points
self.variables += ["f"]
self.units.update({"f": "Hz"})
self.csv_headers = [self.variables[0]]
nv_1 = circ.get_nodes_number() - 1 # numero di soluzioni di tensione (al netto del ref)
for index in range(nv_1):
varname = "V%s" % str(circ.nodes_dict[index + 1])
self.variables += [varname]
self.units.update({varname: "V"})
if circ.is_int_node_internal_only(index+1):
self.skip_nodes_list.append(index)
for elem in circ:
if circuit.is_elem_voltage_defined(elem):
varname = "I(%s)" % elem.part_id.upper()
self.variables += [varname]
self.units.update({varname: "A"})
for i in range(1, len(self.variables)):
self.csv_headers.append("|%s|" % self.variables[i])
self.csv_headers.append("arg(%s)" % self.variables[i])
def _add_data(self, data):
"""Remember to call this method with REAL data - already split in ABS and PHASE."""
csvlib.write_csv(self.filename, data, self.csv_headers, append=self._init_file_done)
self._init_file_done = True
def __str__(self):
return ("<AC simulation results for '%s' (netlist %s). %s sweep, " +
"from %g to %g rad/sec, %d points. Run on %s, data file " +
"%s>") % (self.netlist_title, self.netlist_file, self.stype,
self.ostart, self.ostop, self.opoints, self.timestamp,
self.filename)
[docs] def add_line(self, frequency, x):
frequency = np.array([[frequency]])
xsplit = np.zeros((x.shape[0]*2, 1))
for i in range(x.shape[0]):
xsplit[2*i, 0] = np.abs(x[i, 0])
xsplit[2*i+1, 0] = np.angle(x[i, 0], deg=options.ac_phase_in_deg)
data = np.concatenate((frequency, xsplit), axis=0)
self._add_data(data)
[docs] def get_x(self):
return self[self.variables[0]]
[docs] def get_xlabel(self):
return self.variables[0]
[docs] def asarray(self):
"""Return all data as a (possibly huge) python matrix."""
## data, headers, pos, EOF = csvlib.load_csv()
data, headers, _, _ = csvlib.load_csv(self.filename, load_headers=[],
nsamples=None, skip=0, verbose=0)
cplx_data = None
cplx_headers = []
re1 = '\\|(.*?)\\|'
rg = re.compile(re1, re.IGNORECASE|re.DOTALL)
for i in range(len(headers)):
if headers[i].upper() == self.variables[0].upper():
if cplx_data is None:
cplx_data = np.array(data[i, :].reshape((1, -1)), dtype=np.complex_)
else:
cplx_data = np.vstack((cplx_data, data[i, :].reshape((1, -1))))
else:
m = rg.search(headers[i])
if m: # we got a |VAR|
var = m.group(1)
cplx_headers.append(var)
match_phase = ('arg(%s)' % var).upper()
ip = [h.upper() for h in headers].index(match_phase)
if cplx_data is None:
cplx_data = np.array(data[i, :]*
np.exp(1j*data[ip,:]).reshape((1, -1)),
dtype=np.complex_)
else:
cplx_data = np.vstack((cplx_data,
(data[i, :]*
np.exp(1j*data[ip, :])).reshape((1, -1))))
return cplx_data
# Access as a dictionary BY VARIABLE NAME:
def __getitem__(self, name):
"""Get a specific variable, as from a dictionary."""
if name.upper() != 'F':
headers = ['|%s|' % name, 'arg(%s)' % name]
else:
headers = [name]
try:
# data, headers, pos, EOF = csvlib.load_csv()
data, headers, _, _ = csvlib.load_csv(self.filename,
load_headers=headers,
nsamples=None, skip=0,
verbose=0)
except ValueError:
# raise the correct exception
raise KeyError(name)
if len(headers) == 2:
data = data[0, :] * np.exp(1j*data[1, :])
else:
data = data.reshape((-1,))
return data
[docs] def get(self, name, default=None):
"""Get a solution by variable name."""
try:
data = self.__getitem__(name)
except KeyError:
return default
return data
[docs] def values(self):
"""Get all of the results set's variables values."""
data = self.asarray()
values = [np.real_if_close(data[0, :])]
for i in range(1, data.shape[0]):
values.append(data[i, :])
return values
[docs] def items(self):
values = self.values()
return zip(self.variables, values)
# iterator methods
def __iter__(self):
self.iter_index = 0
self.iter_data = self.values()
self.iter_headers = self.variables
return self
[docs]class dc_solution(solution, _mutable_data):
"""DC results
**Parameters:**
circ : circuit instance
the simulated circuit.
start : float
the DC sweep start value.
stop : float
the DC sweep stop value.
sweepvar : str
the swept variable ``part_id``.
stype : str
the type of sweep, ``"LOG"``, ``"LIN"`` or arb. ``"POINTS"``.
outfile : str
the filename of the file where the results will be written.
Use ``"stdout"`` to write to std output.
"""
def __init__(self, circ, start, stop, sweepvar, stype, outfile):
solution.__init__(self, circ, outfile)
self.sol_type = "DC"
self.start, self.stop = start, stop
self.stype = stype
nv_1 = circ.get_nodes_number() - 1 # numero di soluzioni di tensione (al netto del ref)
self.variables = [sweepvar]
self.units = case_insensitive_dict()
if self.variables[0][0] == 'V':
self.units.update({self.variables[0]:'V'})
if self.variables[0][0] == 'I':
self.units.update({self.variables[0]:'A'})
for index in range(nv_1):
varname = "V%s" % (str(circ.nodes_dict[index + 1]),)
self.variables += [varname]
self.units.update({varname:"V"})
if circ.is_int_node_internal_only(index+1):
self.skip_nodes_list.append(index)
for elem in circ:
if circuit.is_elem_voltage_defined(elem):
varname = "I(%s)" % (elem.part_id.upper(),)
self.variables += [varname]
self.units.update({varname:"A"})
def __str__(self):
return ("<DC simulation results for '%s' (netlist %s). %s sweep of" +
" %s from %g to %g %s. Run on %s, data file %s>") % \
(self.netlist_title, self.netlist_file, self.stype,
self.variables[0].upper(), self.start, self.stop,
self.units[self.variables[0]], self.timestamp, self.filename)
[docs] def add_op(self, sweepvalue, op):
"""A DC sweep is made of a set of OP points.
This method adds an OP solution and
its corresponding sweep value to the results set.
"""
sweepvalue = np.array([[sweepvalue]])
x = op.asarray()
data = np.concatenate((sweepvalue, x), axis=0)
self._add_data(data)
[docs] def get_x(self):
return self.get(self.variables[0])
[docs] def get_xlabel(self):
return self.variables[0]
[docs]class tran_solution(solution, _mutable_data):
"""Transient results
**Parameters:**
circ : circuit instance
the circuit instance of the simulated circuit.
tstart : float
the transient simulation start time.
tstop : float
the transient simulation stop time.
op : op_solution instance
the Operating Point (OP) used to start the transient analysis.
method : str
the differentiation method employed.
outfile : str
the filename of the save file.
Use "stdout" to write to the standard output.
"""
def __init__(self, circ, tstart, tstop, op, method, outfile):
solution.__init__(self, circ, outfile)
self.sol_type = "TRAN"
self.start_op = op
self.tstart, self.tstop = tstart, tstop
self.method = method
self._lock = False
nv_1 = circ.get_nodes_number() - 1 # numero di soluzioni di tensione (al netto del ref)
self.variables = ["T"]
self.units.update({"T":"s"})
for index in range(nv_1):
varname = ("V%s" % (str(circ.nodes_dict[index + 1]),)).upper()
self.variables += [varname]
self.units.update({varname:"V"})
if circ.is_int_node_internal_only(index+1):
self.skip_nodes_list.append(index)
for elem in circ:
if circuit.is_elem_voltage_defined(elem):
varname = ("I(%s)" % (elem.part_id.upper(),)).upper()
self.variables += [varname]
self.units.update({varname:"A"})
def __str__(self):
return ("<TRAN simulation results for '%s' (netlist %s), from %g s to" +
" %g s. Diff. method %s. Run on %s, data file %s>") % \
(self.netlist_title, self.netlist_file, self.tstart, self.tstop,
self.method, self.timestamp, self.filename)
[docs] def add_line(self, time, x):
"""This method adds a solution and its corresponding time value to the results set.
"""
if not self._lock:
time = np.array([[time]])
data = np.concatenate((time, x), axis=0)
self._add_data(data)
else:
raise RuntimeError("Attempting to add values to a complete " +
"result set.")
[docs] def lock(self):
self._lock = True
[docs] def get_x(self):
return self.get(self.variables[0])
[docs] def get_xlabel(self):
return self.variables[0]
[docs]class pss_solution(solution, _mutable_data):
"""PSS results
**Parameters:**
circ : circuit instance
the circuit instance of the simulated circuit.
method : str
the PSS algorithm employed.
period : float
the solution period.
outfile : str
the filename of the save file.
Use "stdout" to write to the std output.
.. note::
Instantiating ``pss_solution`` creates an *empty* data set. Call
:func:`set_results` to initialize its data.
"""
def __init__(self, circ, method, period, outfile):
solution.__init__(self, circ, outfile)
self.sol_type = "PSS"
self.period = period
self.method = method
# We have mixed current and voltage results
nv_1 = circ.get_nodes_number() - 1 # numero di soluzioni di tensione (al netto del ref)
self.variables = ["T"]
self.units.update({"T":"s"})
for index in range(nv_1):
varname = "V%s" % (str(circ.nodes_dict[index + 1]),)
self.variables += [varname]
self.units.update({varname:"V"})
if circ.is_int_node_internal_only(index+1):
self.skip_nodes_list.append(index)
for elem in circ:
if circuit.is_elem_voltage_defined(elem):
varname = "I(%s)" % (elem.part_id.upper(),)
self.variables += [varname]
self.units.update({varname:"A"})
def __str__(self):
return ("<PSS simulation results for '%s' (netlist %s), period %g s. " +
"Method: %s. Run on %s, data file %s>") % \
(self.netlist_title, self.netlist_file, self.period, self.method,
self.timestamp, self.filename)
[docs] def set_results(self, t, x):
"""Set the results in the data set
.. note::
* All the data are set at the same time for a PSS results set.
* Instantiating ``pss_solution`` creates an empty data set.
* This method should be called as soon as the data is available.
**Parameters:**
t : ndarray
The time. The array should be 2D with shape ``(1, N)``.
x : ndarray
The data corresponding to the variables.
The array should be 2D with shape ``(M, N)``, where ``M`` is the
number of variables in the data set.
"""
time = np.array(t)
data = np.concatenate((time, x), axis=0)
self._add_data(data)
[docs] def asarray(self):
allvalues, _, _, _ = csvlib.load_csv(self.filename, load_headers=[],
nsamples=None, skip=0, verbose=0)
return allvalues
[docs] def get_x(self):
return self.get(self.variables[0])
[docs] def get_xlabel(self):
return self.variables[0]
[docs]class symbolic_solution(object):
"""Symbolic results
**Parameters:**
results_dict : dict
the results dict returned by ``sympy.solve()``,
substitutions : dict
the substitutions (dictionary) employed before solving,
circ : circuit instance
the circuit instance of the simulated circuit.
outfile : str, optional
the filename of the save file.
Use ``"stdout"`` to write to the standard output.
tf : bool, optional
Transfer function flag: set this to ``True`` if this set of results
corrsponds to a transfer function. Defaults to ``False``.
"""
def __init__(self, results_dict, substitutions, circ, outfile=None, tf=False):
self.sol_type = "Symbolic"
self.timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
self.netlist_file = circ.filename
self.netlist_title = circ.title
self.substitutions = substitutions
self.tf = tf
# the keys are strings
# self.symbols = map(str, results_dict.keys())
self.results = case_insensitive_dict()
for symbol, result in results_dict.items():
self.results.update({str(symbol):result})
self._symbols = list(results_dict.keys()) # keep them, they're useful
for expr in list(results_dict.values()):
if tf:
expr = expr['gain']
for symb in expr.atoms():
if symb.is_Symbol and symb not in self._symbols:
self._symbols.append(symb)
self.filename = outfile if outfile != 'stdout' else None
if self.filename is not None:
self.save()
[docs] def as_symbol(self, variable):
"""Converts a string to the corresponding symbolic variable.
This symbol may then be used by the user as an atom to construct
new expressions, modify the results expressions or it can be passed
to Sympy's functions.
**Parameters:**
variable : string
The string that identifies the variable. Eg. ``'R1'`` for the variable
corresponding to the resistance of the resistor ``R1``. Note that the
case is disregarded and that the first letter defines the type of
the element (resistor, capacitor...).
**Returns:**
symbol : Sympy symbol
The corresponding symbol, if it exists in the result set.
**Raises:**
ValueError : exception
In case no such symbol is found.
"""
symbs = [x for x in self._symbols if x.name.lower() == variable.lower()]
if len(symbs) == 0:
raise ValueError("No symbol %s in the results set."%(variable,))
else:
return symbs[0]
[docs] def as_symbols(self, spacedstr):
"""Convenience function to call :func:`as_symbol` multiple times.
**Parameters:**
spacedstr : string,
A string containing several symbol identifiers separated by spaces.
Eg. ``'R1 C2 L3'``.
**Returns:**
(s1, s2, ...) : tuple of Sympy symbol instances
The symbols corresponding to the identifiers in the string supplied,
ordered as the identifiers in the string.
**Raises:**
ValueError : exception
In case any corresponding symbol is not found.
"""
return list(map(self.as_symbol, spacedstr.split()))
[docs] def save(self):
"""Write the results to disk.
It is necessary first to set the ``filename`` attribute, indicating
which file to write to.
**Raises:**
RuntimeError : exception
If the `filename` attribute is not set.
"""
if not self.filename:
raise RuntimeError("Writing the results to file requires setting the \
'filename' attribute")
with open(self.filename, 'wb') as fp:
pickle.dump(self, fp, protocol=2)
@staticmethod
[docs] def load(filename):
"""Static method to load a symbolic solution from disk.
**Parameters:**
filename : str
The filename corresponding to the file to load from.
**Returns:**
sol : symbolic solution instance
The solution instance loaded from disk.
.. warning::
This method employs ``pickle.load``, which is to be used exclusively
on trusted data. **Only load trusted simulation files!**
"""
with open(filename, 'rb') as fp:
asolution = pickle.load(fp)
return asolution
def __str__(self):
str_repr = "Symbolic %s results for '%s' (netlist %s).\nRun on %s.\n" % \
('simulation'*(not self.tf) + 'transfer function'*self.tf,
self.netlist_title, self.netlist_file, self.timestamp)
keys = list(self.results.keys())
keys.sort(key=str)
if not self.tf:
for key in keys:
str_repr += str(key) + "\t = " + str(self.results[key]) + "\n"
else:
for key in keys:
str_repr += str(key) + ":\n\t%s:\t%s\n" % \
('gain', self.results[key]['gain'])
if self.results[key]['gain'] == self.results[key]['gain0']:
continue
str_repr += "\t%s:\t%s\n" % \
('gain0', self.results[key]['gain0'])
for sing in ('poles', 'zeros'):
if not len(self.results[key][sing]):
continue
str_repr += "\t%s:\n" % sing
for p in self.results[key][sing]:
str_repr += "\t\t" + str(p) + "\n"
return str_repr
# Access as a dictionary:
def __len__(self):
"""Get the number of variables in the results set."""
return len(self.results)
def __getitem__(self, name):
"""Get a specific header, as from a dictionary."""
return self.results[str(name).upper()]
[docs] def get(self, name, default=None):
"""Get the solution corresponding to a variable."""
name = str(name).upper()
try:
return self.results[name]
except KeyError:
return default
[docs] def has_key(self, name):
"""Determine whether the result set contains a variable."""
return str(name).upper() in self.results
def __contains__(self, name):
"""Determine whether the result set contains a variable."""
return str(name).upper() in self.results
[docs] def keys(self):
"""Get all of the results set's variable's names."""
return list(self.results.keys())
[docs] def values(self):
"""Get all of the results set's variable's values."""
return list(self.results.values())
[docs] def items(self):
"""Get all solutions."""
return list(self.results.items())
# iterator methods
def __iter__(self):
self.iter_index = -1
return self
def __next__(self):
"""Iterator method."""
if self.iter_index == len(list(self.results.keys())) - 1:
self.iter_index = 0
raise StopIteration
else:
self.iter_index += 1
return list(self.results.keys())[self.iter_index], \
self.results[self._symbols[self.iter_index]]
[docs] def next(self):
return self.__next__()
[docs]class pz_solution(solution, _mutable_data):
"""PZ results
**Parameters:**
circ : circuit instance
the circuit instance of the simulated circuit.
poles : sequence
the circuit zeros
zeros : sequence
the circuit poles
outfile : str
the filename of the save file.
"""
def __init__(self, circ, poles, zeros, outfile):
solution.__init__(self, circ, outfile)
self.sol_type = "PZ"
self.poles = np.sort_complex(np.array(poles).reshape((-1,)))
self.zeros = np.sort_complex(np.array(zeros).reshape((-1,)))
data = np.vstack((self.poles.reshape((-1, 1)),
self.zeros.reshape((-1, 1))))
if np.prod(self.poles.shape):
for i in range(self.poles.shape[0]):
self.variables += ['p%d' % i]
if np.prod(self.zeros.shape):
for i in range(self.zeros.shape[0]):
self.variables += ['z%d' % i]
for v in self.variables:
self.units.update({v: "rad/s"})
self.csv_headers = []
for i in range(len(self.variables)):
self.csv_headers.append("Re(%s)" % self.variables[i])
self.csv_headers.append("Im(%s)" % self.variables[i])
# save in Re/Im form
sdata = data.reshape(-1).view(np.float_).reshape((-1, 1))
self._add_data(sdata)
# store local data too:
self.data = case_insensitive_dict()
for i in range(len(self.variables)):
self.data.update({self.variables[i]: data[i, 0]})
def _add_data(self, data):
"""Remember to call this method with REAL data - already split in RE and IM."""
csvlib.write_csv(self.filename, data, self.csv_headers, append=self._init_file_done)
self._init_file_done = True
def __str__(self):
return ("PZ simulation results for %s (netlist %s).\n" + \
"Poles: %s\nZeros: %s") % \
(self.netlist_title, self.netlist_file, list(self.poles),
list(self.zeros))
# Access as a dictionary BY VARIABLE NAME:
def __getitem__(self, name):
"""Get a specific variable, as from a dictionary."""
if name in self.data:
return self.data[name]
raise KeyError
[docs] def get(self, name, default=None):
try:
return self.data[name]
except KeyError:
return default
[docs] def has_key(self, name):
"""Determine whether the result set contains a variable."""
return name in self.data
def __contains__(self, name):
"""Determine whether the result set contains a variable."""
return name in self.data
[docs] def keys(self):
"""Get all of the results set's variable's names."""
return self.data.keys()
[docs] def values(self):
"""Get all of the results set's variable's values."""
return self.data.values()
[docs] def items(self):
return self.data.items()
# iterator methods
def __iter__(self):
# take into account that we increment first, then return
# the value
self.iter_index = -1
return self
def __next__(self):
# redefine this or the superclass method will be used on
# PY3, calling the superclass' next() and not *our* next
# method below.
return self.next()
[docs] def next(self):
if self.iter_index == len(self.variables)-1:
self.iter_index = 0
raise StopIteration
else:
self.iter_index += 1
return self.variables[self.iter_index], \
self.data[self.variables[self.iter_index]]
[docs]class case_insensitive_dict(object):
"""A dictionary that uses case-insensitive strings as keys.
"""
def __init__(self):
self._dict = {}
def __len__(self):
"""Get the number of elements in the set."""
return len(self._dict)
def __str__(self):
rpr = "{"
keys = list(self._dict.keys())
for i in range(len(keys)):
rpr += "%s: %s" % (keys[i], self._dict[keys[i]])
if i < len(keys) - 1:
rpr += ", "
else:
rpr += "}"
return rpr
def __getitem__(self, name):
"""Get a specific variable, as from a dictionary."""
keys = list(self._dict.keys())
try:
i = [k.upper() for k in keys].index(text_type(name).upper())
except ValueError:
raise KeyError(name)
return self._dict[keys[i]]
[docs] def get(self, name, default=None):
"""Given the case-insensitive string key ``name``, return its corresponding value.
If not found, return ``default``.
"""
try:
keys = list(self._dict.keys())
i = [k.upper() for k in keys].index(name.upper())
except ValueError:
return default
return self._dict[keys[i]]
[docs] def has_key(self, name):
"""Determine whether the result set contains the variable ``name``."""
return name.upper() in [k.upper() for k in list(self._dict.keys())]
def __contains__(self, name):
"""Determine whether the result set contains a variable."""
return name.upper() in [k.upper() for k in list(self._dict.keys())]
[docs] def keys(self):
"""Get all keys"""
return list(self._dict.keys())
[docs] def values(self):
"""Get all values"""
return list(self._dict.values())
[docs] def items(self):
"""Get all keys and values pairs"""
return list(self._dict.items())
[docs] def update(self, adict):
"""Update the dictionary contents with the mapping in the dictionary ``adict``.
"""
return self._dict.update(adict)
# iterator methods
def __iter__(self):
"""Iterator"""
return self._dict.__iter__()