# -*- coding: utf-8 -*-
# csvlib.py
# Implementation of routines for CSV data I/O
# Copyright 2012 Giuseppe Venturini
#
# This file is part of the ahkab simulator.
#
# Ahkab is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 2 of the License.
#
# Ahkab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License v2
# along with ahkab. If not, see <http://www.gnu.org/licenses/>.
"""The ``csvlib`` module contains common routines for handling
Comma Separated Values (CSV) or Tab Separated Values (TSV) files.
Functions:
1. CSV write/load:
* :func:`write_csv`
* :func:`load_csv`
2. MISC utilities
* :func:`get_headers`
* :func:`write_headers`
* :func:`get_headers_index`
The separator can be selected setting:
::
csvlib.SEPARATOR = '\\t' # default value
"""
# Additionally the following internal functions are
# available:
#3. Internal routines
# _get_fp(filename, mode='rb')
# _close_fp(fp, filename)
from __future__ import (unicode_literals, absolute_import,
division, print_function)
import io
import sys
import copy
import os
import numpy as np
from . import options
SEPARATOR = u"\t"
[docs]def write_csv(filename, data, headers, append=False):
"""Writes data in CVS format to filename.
The headers have to be ordered according to the data order.
**Parameters:**
filename : string
the path to the file to be written.
Use 'stdout' to write to stdout
data : ndarray
The data to be written. Notice that variables are swept across *rows*,
time samples are swept along *columns*.
Or equivalently: ``data[variable_index, sample_number]``
headers : list of strings
the signal names, ordered so that ``headers[i]`` corresponds to
``data[i, :]``.
append : bool, optional
If False, the file (if it exists) will be rewritten, otherwise
it will be appended to.
"""
mode = 'ab' if append else 'wb'
fp = _get_fp(filename, mode)
if not data.shape[0] == len(headers):
print("(W): write_csv(): data and headers don't match. Continuing anyway.")
print("DATA: " + str(data.shape) + " headers length: " + str(len(headers)))
headers = SEPARATOR.join(headers) if not append else ""
np.savetxt(fp, data.T, delimiter=SEPARATOR, header=headers, comments='#')
_close_fp(fp, filename)
def _get_fp(filename, mode="r"):
if filename == 'stdout' or filename == '-' or filename == sys.stdout:
if mode == 'w' or mode == 'a' or mode == 'wb' or mode == 'ab':
fp = sys.stdout if not hasattr(sys.stdout, 'buffer') else sys.stdout.buffer
else:
print("(EE) Mode %s is not supported for stdout." % (mode,))
fp = None
else:
fp = io.open(filename, mode)#, encoding=options.encoding)
return fp
def _close_fp(fp, filename):
try:
fp.flush()
except IOError:
pass
if filename == 'stdout':
pass
else:
fp.close()
[docs]def load_csv(filename, load_headers=None, nsamples=None, skip=0, verbose=3):
"""Reads data in CVS format from filename.
Supports:
* selective signal loading,
* loading up to a certain number of samples,
* skipping to a certain line, to allow incremental reading of big files.
**Parameters:**
filename : string
the path to the file to be read.
load_headers : list of strings, optional
Each one being a signal to be loaded. An empty list (or None) is
interpreted as "read all signals".
nsamples : int, optional
The number of samples to be read for each signal. If ``None``,
read all available samples.
skip : int, optional
The index of the first sample to be read. Default: 0
**Returns:**
data : ndarray
The data, ordered according to the order of ``load_headers``
(or the order on file if ``load_headers`` was empty),
headers : list of strings
the names of the signals read from file,
pos : int
position of the last sample read +1, referred to the
sample #0 in the file.
EOF : bool
A flag set to true is all the data in the file were read.
"""
if filename == 'stdout':
print("Can't load data from stdout.")
return None, None, None, None
headers = get_headers(filename)
his = get_headers_index(headers, load_headers, verbose=verbose)
if load_headers and len(his) != len(load_headers):
raise ValueError("Specified header not found")
fp = _get_fp(filename)
data = np.loadtxt(fp, delimiter=SEPARATOR, usecols=his, unpack=True, skiprows=skip, ndmin=2)
_close_fp(fp, filename)
# prepare return values
EOF = (nsamples is None) or (nsamples == data.shape[1])
if nsamples is not None:
data = data[:, :min(nsamples, data.shape[1])]
pos = skip + data.shape[1]
headers = list(map(headers.__getitem__, his))
return data, headers, pos, EOF