Source code for psrsigsim.io.txtfile

# -*- coding: utf-8 -*-
# encoding=utf8

from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
import numpy as np
from ..utils import make_quant
from .file import BaseFile
from astropy import log

[docs]class TxtFile(BaseFile): """A class for saving PsrSigSim signals as text files. Multiple different text file data types may be supported, but currently only the PSRCHIVE pdv function output is supported. Parameters ---------- path: name and path of new text file that will be saved """ _path = None _signal = None _file = None def __init__(self, path=None): """ """ self._path = path self._tbin = None self._nbin = None self._nchan = None self._npol = None self._nrows = None self._tsubint = None self._chan_bw = None self._obsbw = None self._obsfreq = None
[docs] def save_psrchive_pdv(self, signal, pulsar): """ Function to save simulated data in the same format as the PSRCHIVE pdf function. To avoid large file sizes, every hundred pulses the data will be saved as a text file. Currently one a single polarization (total intensity) is supported. Inputs are: Parameters ---------- signal [class] : signal class object, currently only filterbank is supported pulsar [class] : pulsar class object filename [string] : desired name of source/output file. Output files will be saved as 'filename'_#.txt, where # is the chronological number of the files being saved. """ # Get the signal parameters self._get_signal_params(signal, pulsar) # Check if there's a save path if self.path == None: self._path = "PsrSigSim_Simulated_Pulsar.ar" # We first need to make the header rms = np.sqrt((1.0/len(signal.data))*np.sum((signal.data)**2)) header = "# File: %s Src: %s Nsub: %s Nch: %s Npol: %s Nbin: %s RMS: %s \n" % \ (self.path, pulsar.name, str(self.nrows), str(self.nchan), \ str(self.npol), str(self.nbin), str(rms)) # Now we make a list of the lines to save lines = [header] if self.npol != 1: log.warning("Only saving total intensity, multiple polarizations not yet implemented") # We need to dump the data every so often to avoid files that are too big dump_val = 0 # Now we need to loop through each subint, frequency channel, and polarization for ii in range(self.nrows): mjd_mid = 56000.0+(ii+1)*(self.tsubint.to('day').value)/2.0 for ff in range(self.nchan): freq = signal.dat_freq[ff].value pulse_header = "# MJD(mid): %s Tsub: %s Freq: %s BW: %s \n" \ % (mjd_mid, self.tsubint.value, freq, self.obsbw.value/self.nchan) lines.append(pulse_header) for bb in range(self.nbin): lines.append("%s %s %s %s \n" % (ii, ff, bb, signal.data[ff,bb])) dump_val += 1 # Now save the lines in a text file if dump_val >= 100: file_num = str(int(np.floor(dump_val/100))) with open(self.path+"_%s.txt" % (file_num), 'w') as pdv_file: pdv_file.writelines(lines) pdv_file.close() lines = [header] # and then we want to dump the last few lines file_num = str(int(np.floor(dump_val/100))) with open(self.path+"_%s.txt" % (file_num), 'w') as pdv_file: pdv_file.writelines(lines) pdv_file.close()
def _get_signal_params(self, signal, pulsar): """ Retrieve the various parameters needed to save the file from a PSS signal class. """ # get parameters from signal class self.nchan = signal.Nchan self.tbin = 1.0/signal.samprate self.nbin = int((signal.samprate * pulsar.period).decompose()) self.npol = signal.Npols self.nrows = signal.nsub self.obsfreq = signal.fcent self.obsbw = signal.bw self.chan_bw = signal.bw / signal.Nchan self.tsubint = signal.sublen # length of subint in seconds self.nsubint = self.nrows #### Define various PSRFITS parameters @property def tbin(self): return self._tbin @tbin.setter def tbin(self, value): self._tbin = make_quant(value,'s') @property def npol(self): return self._npol @npol.setter def npol(self, value): self._npol = value @property def nchan(self): return self._nchan @nchan.setter def nchan(self, value): self._nchan = value @property def nbin(self): return self._nbin @nbin.setter def nbin(self, value): self._nbin = value @property def nrows(self): return self._nrows @nrows.setter def nrows(self, value): self._nrows = value @property def obsfreq(self): return self._obsfreq @obsfreq.setter def obsfreq(self, value): self._obsfreq = make_quant(value,'MHz') @property def obsbw(self): return self._obsbw @obsbw.setter def obsbw(self, value): self._obsbw = make_quant(value,'MHz') @property def chan_bw(self): return self._chan_bw @chan_bw.setter def chan_bw(self, value): self._chan_bw = make_quant(value,'MHz') @property def tsubint(self): return self._tsubint @tsubint.setter def tsubint(self, value): self._tsubint = make_quant(value, 'second')