Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: basic support for polyco doppler and rms #1713

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 70 additions & 52 deletions src/pint/polycos.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
----------
http://tempo.sourceforge.net/ref_man_sections/tz-polyco.txt
"""
from __future__ import annotations
import astropy.table as table
import astropy.units as u
import numpy as np
from astropy.io import registry
from astropy.time import Time
from collections import OrderedDict
import pathlib
from typing import Optional, Callable

from loguru import logger as log

Expand Down Expand Up @@ -93,17 +96,28 @@ class PolycoEntry:
Middle point of the time span in mjd
mjdspan : int
Time span in minutes
rphase : float
Reference phase
rph_int : int
Integer part of reference phase
rph_frac : float
Fractional part of reference phase
f0 : float
Reference spin frequency
ncoeff : int
Number of coefficients
coeff : numpy.ndarray
coeffs : numpy.ndarray
Polynomial coefficents
"""

def __init__(self, tmid, mjdspan, rph_int, rph_frac, f0, ncoeff, coeffs):
def __init__(
self,
tmid: float,
mjdspan: int,
rph_int: int,
rph_frac: float,
f0: float,
ncoeff: int,
coeffs: np.ndarray,
):
self.tmid = data2longdouble(tmid) * u.day
self.mjdspan = data2longdouble(mjdspan / MIN_PER_DAY) * u.day
self.tstart = self.tmid - (self.mjdspan / 2)
Expand Down Expand Up @@ -140,7 +154,7 @@ def __str__(self):
+ repr(self.coeffs)
)

def evalabsphase(self, t):
def evalabsphase(self, t: float | np.ndarray) -> Phase:
"""Return the phase at time t, computed with this polyco entry

Parameters
Expand All @@ -167,7 +181,7 @@ def evalabsphase(self, t):
phase += self.rphase + Phase(dt * 60.0 * self.f0)
return phase

def evalphase(self, t):
def evalphase(self, t: float | np.ndarray) -> Phase:
"""Return the phase at time t, computed with this polyco entry

Parameters
Expand All @@ -181,7 +195,7 @@ def evalphase(self, t):
"""
return self.evalabsphase(t).frac

def evalfreq(self, t):
def evalfreq(self, t: float | np.ndarray) -> np.ndarray:
"""Return the freq at time t, computed with this polyco entry

Parameters
Expand All @@ -200,7 +214,7 @@ def evalfreq(self, t):
s += data2longdouble(i) * self.coeffs[i] * dt ** (i - 1)
return self.f0 + s / 60.0

def evalfreqderiv(self, t):
def evalfreqderiv(self, t: float | np.ndarray) -> np.ndarray:
"""Return the frequency derivative at time t.

Parameters
Expand All @@ -227,7 +241,7 @@ def evalfreqderiv(self, t):


# Read polycos file data to table
def tempo_polyco_table_reader(filename):
def tempo_polyco_table_reader(filename: str) -> table.Table:
"""Read tempo style polyco file to an astropy table.

Tempo style: The polynomial ephemerides are written to file 'polyco.dat'.
Expand Down Expand Up @@ -355,7 +369,9 @@ def tempo_polyco_table_reader(filename):
return table.Table(entries, meta={"name": "Polyco Data Table"})


def tempo_polyco_table_writer(polycoTable, filename="polyco.dat"):
def tempo_polyco_table_writer(
polycoTable: table.Table, filename: str | pathlib.Path = "polyco.dat"
):
"""Write tempo style polyco file from an astropy table.

Tempo style polyco file:
Expand Down Expand Up @@ -486,7 +502,7 @@ class Polycos:
polycoFormats = []

@classmethod
def _register(cls, formatlist=_polycoFormats):
def _register(cls, formatlist: list = _polycoFormats):
"""
Register the table built-in reading and writing format

Expand All @@ -510,7 +526,9 @@ def _register(cls, formatlist=_polycoFormats):
fmt["format"], "rw", fmt["read_method"], fmt["write_method"]
)

def __init__(self, filename=None, format="tempo"):
def __init__(
self, filename: Optional[str | pathlib.Path] = None, format: str = "tempo"
):
"""Initialize polycos from a file

Parameters
Expand Down Expand Up @@ -541,7 +559,7 @@ def __init__(self, filename=None, format="tempo"):
raise ValueError("Zero polycos found for table")

@classmethod
def read(cls, filename, format="tempo"):
def read(cls, filename: str | pathlib.Path, format: str = "tempo") -> Polycos:
"""Read polyco file to a table.

Parameters
Expand All @@ -560,7 +578,11 @@ def read(cls, filename, format="tempo"):

@classmethod
def add_polyco_file_format(
cls, formatName, methodMood, readMethod=None, writeMethod=None
cls,
formatName: str,
methodMood: str,
readMethod: Optional[Callable] = None,
writeMethod: Optional[Callable] = None,
):
"""
Add a polyco file format and its reading/writing method to the class.
Expand Down Expand Up @@ -631,7 +653,9 @@ def add_polyco_file_format(

cls.polycoFormats.append(pFormat)

def read_polyco_file(self, filename, format="tempo"):
def read_polyco_file(
self, filename: str | pathlib.Path, format: str = "tempo"
) -> Polcos:
"""Read polyco file to a table.

Included for backward compatibility. It is better to use :meth:`pint.polycos.Polycos.read`.
Expand Down Expand Up @@ -676,18 +700,18 @@ def __getitem__(self, item):
@classmethod
def generate_polycos(
cls,
model,
mjdStart,
mjdEnd,
obs,
segLength,
ncoeff,
obsFreq,
maxha=12.0,
method="TEMPO",
numNodes=20,
progress=True,
):
model: pint.models.TimingModel,
mjdStart: float | np.longdouble,
mjdEnd: float | np.longdouble,
obs: str,
segLength: int,
ncoeff: int,
obsFreq: float,
maxha: float = 12.0,
method: str = "TEMPO",
numNodes: int = 20,
progress: bool = True,
) -> Polycos:
"""
Generate the polyco data.

Expand All @@ -712,7 +736,7 @@ def generate_polycos(
method : str, optional
Method to generate polycos. Only the ``TEMPO`` method is supported for now.
numNodes : int, optional
Number of nodes for fitting. It cannot be less then the number of
Number of nodes (sample points within each segment) for fitting. It cannot be less then the number of
coefficents. Default: 20
progress : bool, optional
Whether or not to show the progress bar during calculation
Expand Down Expand Up @@ -751,35 +775,21 @@ def generate_polycos(
obsFreq = float(obsFreq)

# Using tempo1 method to create polycos
# If you want to disable the progress bar, add disable=True to the tqdm() call.
for tmid in tqdm(tmids, disable=not progress):
tStart = tmid - mjdSpan / 2
tStop = tmid + mjdSpan / 2
nodes = np.linspace(tStart, tStop, numNodes)

# midpoint TOA and reference phase for each segment
toaMid = toa.get_TOAs_array(
(np.modf(tmid)[1], np.modf(tmid)[0]),
obs=obs,
freqs=obsFreq,
ephem=ephem,
)
# toaMid = toa.get_TOAs_list(
# [toa.TOA()],
# )

refPhase = model.phase(toaMid, abs_phase=True)

# Create node toas(Time sample using TOA class)
# toaList = [
# toa.TOA(
# (np.modf(toaNode)[1], np.modf(toaNode)[0]),
# obs=obs,
# freq=obsFreq,
# )
# for toaNode in nodes
# ]

# toas = toa.get_TOAs_list(toaList, ephem=ephem)
toas = toa.get_TOAs_array(
(np.modf(nodes)[0], np.modf(nodes)[1]),
obs=obs,
Expand All @@ -793,13 +803,18 @@ def generate_polycos(
rdcPhase = rdcPhase.int - (dt * model.F0.value * 60.0) + rdcPhase.frac
dtd = dt.astype(float) # Truncate to double
rdcPhased = rdcPhase.astype(float)
coeffs = np.polyfit(dtd, rdcPhased, ncoeff - 1)[::-1]
rawcoeffs = np.polyfit(dtd, rdcPhased, ncoeff - 1)
coeffs = rawcoeffs[::-1]

date, hms = Time(tmid, format="mjd", scale="utc").iso.split()
yy, mm, dd = date.split("-")
date = f"{dd}-{MONTHS[int(mm) - 1]}-{yy[-2:]}"
hms = float(hms.replace(":", ""))

ssbfreq = model.barycentric_radio_freq(toaMid)
doppler = float(
1e4 * ((ssbfreq - toaMid["freq"]) / toaMid["freq"]).decompose()
)
entry = PolycoEntry(
tmid,
segLength,
Expand All @@ -809,15 +824,18 @@ def generate_polycos(
ncoeff,
coeffs,
)
ph_polyco = np.polyval(rawcoeffs, dtd)
rms = np.sqrt(((ph_polyco - rdcPhased) ** 2).sum() / (len(dtd) - 1))

entry_dict = OrderedDict()
entry_dict["psr"] = model.PSR.value
entry_dict["date"] = date
entry_dict["utc"] = hms
entry_dict["tmid"] = tmid
entry_dict["dm"] = model.DM.value
entry_dict["doppler"] = 0.0
entry_dict["logrms"] = 0.0
entry_dict["doppler"] = doppler
# truncate so write/read work
entry_dict["logrms"] = max(np.log10(rms), -9.999)
entry_dict["mjd_span"] = segLength
entry_dict["t_start"] = entry.tstart
entry_dict["t_stop"] = entry.tstop
Expand All @@ -840,7 +858,7 @@ def generate_polycos(
raise ValueError("Zero polycos found for table")
return out

def write_polyco_file(self, filename="polyco.dat", format="tempo"):
def write_polyco_file(self, filename: str = "polyco.dat", format: str = "tempo"):
"""Write Polyco table to a file.

Parameters
Expand All @@ -862,7 +880,7 @@ def write_polyco_file(self, filename="polyco.dat", format="tempo"):

self.polycoTable.write(filename, format=format)

def find_entry(self, t):
def find_entry(self, t: float | np.ndarray) -> np.ndarray:
"""Find the right entry for the input time.

Parameters
Expand Down Expand Up @@ -896,7 +914,7 @@ def find_entry(self, t):
raise ValueError("Some input times not covered by Polyco entries.")
return start_idx

def eval_phase(self, t):
def eval_phase(self, t: float | np.ndarray) -> np.ndarray:
"""Polyco evaluation of fractional phase

Parameters
Expand All @@ -917,7 +935,7 @@ def eval_phase(self, t):
t = np.array([t])
return self.eval_abs_phase(t).frac

def eval_abs_phase(self, t):
def eval_abs_phase(self, t: float | np.ndarray) -> pint.phase.Phase:
"""
Polyco evaluate absolute phase for a time array.

Expand Down Expand Up @@ -962,7 +980,7 @@ def eval_abs_phase(self, t):
phaseFrac = np.hstack(phaseFrac).value
return Phase(phaseInt, phaseFrac)

def eval_spin_freq(self, t):
def eval_spin_freq(self, t: float | np.ndarray) -> np.ndarray:
"""
Polyco evaluate spin frequency for a time array.

Expand Down Expand Up @@ -997,7 +1015,7 @@ def eval_spin_freq(self, t):

return spinFreq

def eval_spin_freq_derivative(self, t):
def eval_spin_freq_derivative(self, t: float | np.ndarray) -> np.ndarray:
"""
Polyco evaluate spin frequency derivative for a time array.

Expand Down
Loading