from __future__ import (absolute_import, division,
print_function, unicode_literals)
import numpy as np
from scipy import stats
from .profiles import GaussProfile
from .profiles import UserProfile
from .profiles import DataProfile
from ..utils.utils import make_quant, shift_t
[docs]class Pulsar(object):
"""class for pulsars
The minimal data to instantiate a pulsar is the period, Smean, and
pulse profile. The Profile is supplied via a :class:`PulseProfile`-like
object.
Parameters
----------
period : float
Pulse period (sec)
Smean : float
Mean pulse flux density (Jy)
profile : :class:`PulseProfile`
Pulse profile or 2-D pulse portrait
name : str
Name of pulsar
"""
#TODO Other data could be supplied via a `.par` file.
def __init__(self, period, Smean, profiles=None, name=None):
self._period = make_quant(period, 's')
self._Smean = make_quant(Smean, 'Jy')
self._name = name
# Assign profile class; default to GaussProfile if nothing is specified
if profiles is None:
self._Profiles = GaussProfile()
else:
self._Profiles = profiles
def __repr__(self):
namestr = "" if self.name is None else self.name+", "
return "Pulsar("+namestr+"{})".format(self.period.to('ms'))
@property
def Profiles(self):
return self._Profiles
@property
def name(self):
return self._name
@property
def period(self):
return self._period
@property
def Smean(self):
return self._Smean
[docs] def make_pulses(self, signal, tobs):
"""generate pulses from Profiles, :class:`PulsePortrait` object
Required Args:
signal (:class:`Signal`-like): signal object to store pulses
tobs (float): observation time (sec)
"""
signal._tobs = make_quant(tobs, 's')
# init base profile at correct sample rate
Nph = int((signal.samprate * self.period).decompose())
# If there isn't enough frequencies in the profiles
# And if it is a :class:`Profile` instance, reshape.
self.Profiles.init_profiles(Nph, signal.Nchan)
# if (signal.Nchan != self.Profiles.profiles.shape[0]
# and hasattr(self.Profiles, 'set_Nchan')):
# self.Profiles.set_Nchan(signal.Nchan)
# select pulse generation method
if signal.sigtype in ["RFSignal", "BasebandSignal"]:
self._make_amp_pulses(signal)
elif signal.sigtype is "FilterBankSignal":
self._make_pow_pulses(signal)
else:
msg = "no pulse method for signal: {}".format(signal.sigtype)
raise NotImplementedError(msg)
# compute Smax (needed for radiometer noise level)
#pr = self.Profiles()
pr = self.Profiles._max_profile
nbins = len(pr) # Think this assumes a single profile for now...
signal._Smax = self.Smean * nbins / np.sum(pr)
def _make_amp_pulses(self, signal):
"""generate amplitude pulses
This method should be used for radio frequency and basebanded
pulses.
Parameters
----------
signal : :class:`Signal`-like
Signal object to store pulses.
"""
# generate several pulses in time
distr = stats.norm()
Nsamp = int((signal.tobs * signal.samprate).decompose())
signal.init_data(Nsamp)
# TODO break into blocks
# TODO phase from .par file
# calc profile at phases
phs = (np.arange(Nsamp) /
(signal.samprate * self.period).decompose().value)
phs %= 1 # clip integer part
# convert intensity profile to amplitude!
full_prof = np.sqrt(self.Profiles.calc_profiles(phs))
signal._data = full_prof * distr.rvs(size=signal.data.shape)
def _make_pow_pulses(self, signal):
"""generate a power pulse
This method should be used for filter bank pulses
Parameters
----------
signal : :class:`Signal`-like
Signal object to store pulses.
"""
if signal.fold:
# Determine how many subints to make
if signal.sublen is None:
signal._sublen = signal.tobs
signal._nsub = 1
else:
# This should be an integer, if not, will round
signal._nsub = int(np.round(signal.tobs / signal.sublen))
# determine the number of data samples necessary
signal._nsamp = int((signal.nsub*(self.period*signal.samprate)).decompose())
# Need to make an initial empty data array
signal.init_data(signal.nsamp)
# Tile the profiles to number of desired subints
sngl_prof = np.tile(self.Profiles(), signal.nsub)
# changed to number of subints
signal._Nfold = (signal.sublen / self.period).decompose()
distr = stats.chi2(df=signal.Nfold)
signal._set_draw_norm(df=signal.Nfold)
#Why is there a second call to init_data?
signal.init_data(sngl_prof.shape[1])
signal._data = (sngl_prof * distr.rvs(size=signal.data.shape)
* signal._draw_norm)
else:
# fold is false and will make single pulses
signal._sublen = self.period
# This should be an integer, if not, will round; may not be exact
signal._nsub = int(np.round((signal.tobs / signal.sublen).decompose()))
# generate several pulses in time
distr = stats.chi2(df=1)
signal._set_draw_norm(df=1)
signal._nsamp = int((signal.tobs * signal.samprate).decompose())
signal.init_data(signal.nsamp)
# TODO break into blocks
# TODO phase from .par file
# calc profiles at phases
phs = (np.arange(signal.nsamp) /
(signal.samprate * self.period).decompose().value)
phs %= 1 # clip integer part
full_prof = self.Profiles.calc_profiles(phs,signal.Nchan)
signal._data = (full_prof * distr.rvs(size=signal.data.shape)
* signal._draw_norm)
[docs] def null(self, signal, null_frac, length=None, frequency=None):
"""
Function to simulate pulsar pulse nulling. Given some nulling fraction,
will replace simulated pulses with noise until nulling fraction is met.
This function should only be run after running any ism or other delays
have been added, e.g. disperison, FD, profile evolution, etc., but
should be run before adding the radiometer noise ('telescope.observe()`),
if nulling is desired.
Parameters
----------
signal [class] : signal class containing the simulated pulses
null_frac [float] : desired pulsar nulling fraction, given as a
decimal; range of 0.0 to 1.0.
length [float] : desired length of each null in seconds. If not given,
will randomly null pulses. Default is None.
frequency [float] : frequency of pulse nulling, e.g. how often the pulsar
nulls per hour. E.g. if frequency is 2, then the
pulsar will null twice per hour for some length
of time. If not given, will randomly null pulses.
Default is None.
"""
# Determine how many pulses need to be nulled out
null_pulses = int(np.round(signal.nsub * null_frac)) # needs to be int, may not be exact
# Get the number of bins per pulse
Nph = int((signal.samprate * self.period).decompose())
# determine the off pulse window
opw = self.Profiles._calcOffpulseWindow(Nphase = Nph)
# define the random noise distribution
if signal.fold:
distr = stats.chi2(df=signal.Nfold)
else:
distr = stats.chi2(df=1)
# have a test ditribution to determine null bins if Nfold is 1
if not signal.fold or signal.Nfold < 100:
check_distr = stats.chi2(df=100)
else:
check_distr = stats.chi2(df=signal.Nfold)
# Figure out how off-center the pulses are
shift_val = Nph//2 - np.where(signal.data[0,:Nph]==np.max(signal.data[0,:Nph]))[0]
# Now null randomly if no length or frequency is given
if length==None and frequency==None:
# randomly draw pulse numbers to null
rand_pulses = np.random.choice(signal.nsub, null_pulses, replace=False)
# replace each pulse number with random noise
if signal.delay == None:
for p in rand_pulses:
# convert to bins
null_bins = np.arange(Nph*p, Nph*(p+1)) + shift_val
# make sure we don't go beyond the data array length
null_bins = null_bins[null_bins<np.shape(signal.data)[1]]
# replace pulses with noise
if len(null_bins) != Nph:
noise = (distr.rvs(size=len(null_bins)) * signal._draw_norm)
else:
noise = (distr.rvs(size=Nph) * signal._draw_norm)
# if no extra delays have been added to the signal
signal._data[:,null_bins] = noise * np.mean(self.Profiles._max_profile[opw.astype(int)])
# if if signal has been delayed (e.g dispersed, etc.)
else:
# First initialize new array
null_array = np.zeros(np.shape(signal.data))
for p in rand_pulses:
#null_bins = np.arange(Nph*p, Nph*(p+1))
null_bins = np.arange(Nph*p, Nph*(p+1)) + shift_val
# make sure we don't go beyond the data array length
null_bins = null_bins[null_bins<np.shape(signal.data)[1]]
# replace the appropriate arrays
if len(null_bins) != Nph:
null_array[:,null_bins] = (check_distr.rvs(size=len(null_bins)) * signal._draw_norm)
else:
null_array[:,null_bins] = (check_distr.rvs(size=Nph) * signal._draw_norm)
# now shift the data
freq_array = signal._dat_freq
shift_dt = (1/signal._samprate).to('ms')
for ii, freq in enumerate(freq_array):
null_array[ii,:] = shift_t(null_array[ii,:],
signal.delay[ii].value,
dt=shift_dt.value)
# Now replace pulses with noise
off_pulse_mean = np.mean(self.Profiles._max_profile[opw.astype(int)])
noise_shape = np.shape(np.where(null_array>1))[1]
noise = (distr.rvs(size=noise_shape) * signal._draw_norm)
signal._data[np.where(null_array>1)] = noise*off_pulse_mean
else:
raise NotImplementedError("Length and Frequency not been implimented yet")