Python API

Processors

class realtime.calibration.processors.rcal_processor.RCalProcessor(beam_id: str, calibrator_to_use: ~realtime.calibration.calibrators.base_calibrator.BaseCalibrator = <realtime.calibration.calibrators.rcal_calibrator.RCalCalibrator object>, output_h5: str | None = None, kafka_server: str | None = None, kafka_topic: str | None = None, qa_kafka_topic: str | None = None, qa_metrics_kafka_topic: str | None = None, ignore_config_db: bool = False, everybeam_ms: str | None = None, sky_model: str | None = None, fov: float | None = 5, flux_limit: float | None = 0.5, uv_min: float | None = 150, autoflag: bool = True, rfi_masks: ~numpy.ndarray[~typing.Any, ~numpy.dtype[~numpy._typing._array_like._ScalarType_co]] | None = None, preproc_ave_time: int | None = None, preproc_ave_frequency: int | None = None, flip_uvw: bool = False, scale_gains: bool = False, scale_factor: float | None = None, low_gain_threshold: float | None = 0.1, threshold_headroom: bool = False, gain_headroom: float | None = 0.1, station_data_key_path: str = 'stations')

A processor that can present Visibility data to the RCal library. Child of BaseProcessor.

Gain scaling

Before sending calibration solutions to the data queues for application in CBF, they need to be scaled to have real and imaginary values in the range [-1, 1). This is enabled using option scale_gains. If possible, a single or slowly varying scaling factor will be used to do this, to simplify its correction down stream in the formed beams. Most of the variation in time is expected to be included in the beam models, and so the following approach is used:

  • Before multiplication with the beam models, calculate the median bandpass gain amplitude. This may need to be more sophisticated if there is significant variation of the amplitudes with frequency.

  • Set a low gain amplitude threshold as a fraction of the median amplitude level (using option low_gain_threshold).

  • Before finalising the low gain amplitude threshold, option threshold_headroom can be used to reduce it to a less variable number. Until there is a better option, the highest factor of 2 below the median amplitude is used.

  • After setting the low gain amplitude threshold, option gain_headroom is used to reduce it further to account for station beam variations. Gains may drop by an order of magnitude at low elevations. Gains will also drop for tied-array beams that are far from station beam centre.

  • The inverse of this final threshold value is used to scale the Jones matrices. After matrix inversion, any matrix with an element outside the range (-1, 1) is flagged.

Attributes

beam_id: str

The visibility beam to process. Visibility datasets for other beams will be ignored.

calibrator_to_use: BaseCalibrator

Calibration support class to use. Default = RCalCalibrator()

kafka_server: Optional[str]

Bootstrap server for RCal Kafka producer. Default = None (i.e. no Kafka output)

kafka_topic: Optional[str]

Kafka topic for RCal Kafka producer. Default = None (i.e. no Kafka output)

sky_model: Optional[str]

Define the sky model. Options are:

  • sky_model=<my_file.csv>: A CSV file containing a sky component list that is readable by the ska_sdp_datamodels.global_sky_model.LocalSkyModel class, as generated by the GSM service.

  • sky_model=None: A spectrally-flat point source will be placed at the phase centre.

Default = None

fov: Optional[float]

Field of view (in degrees) used in the sky model cone search. Default = 5.0,

flux_limit: Optional[float]

Minimum flux density (in Jy) used in the sky model cone search. Default = 0.5,

uv_min: Optional[float]

Minimum baseline length (in wavelengths) to use in calibration. Default = 150.0,

autoflag: bool

Whether or not to run RFI peak finding. Default is True. See pre_process() for details.

rfi_masks: Optional[ndarray]

Flagging limits for known bad frequencies. Default is None. See pre_process() for details.

preproc_ave_time: Optional[int]

Pre-average visibility dataset in time by this number of samples. See pre_process() for defaults.

preproc_ave_frequency: Optional[int]

Pre-average visibility dataset in frequency by this number of channels. See pre_process() for defaults.

output_h5: Optional[str]

Optional output h5parm file for the bandpass Jones matrices. If this option is set, additional files will be written for the combined matrices (J_bandpass @ J_beam) with “_combined” added before the file-type suffix, and for the final inverted and scaled matrices, with “_final” added before the file-type suffix. In all cases, the scan and beam IDs will also be added before the file-type suffix. Default = None

flip_uvw: bool

Change the sign of the uvw coordinates. This is a temporary switch to deal with an inconsistency in receiver options. Default = False

scale_gains: bool

Whether or not to scale output Jones matrices to the range [-1, 1). A single scale factor is used for all solution matrices. Default = False

scale_factor: Optional[float]

User-defined scale factor. This will override all other scaling settings. It should be used with care, as inappropriate scaling can result in over flagging or insufficient bit depth in CBF beamforming. Default = None

low_gain_threshold: Optional[float]

The fraction of the median bandpass gain amplitude below which to flag matrices. This is also used as a starting point for the scale factor, since its inverse will be the approximate maximum value in the inverse matrices. See also threshold_headroom. Default = 0.1.

threshold_headroom: bool

If using low_gain_threshold, decrease the median gain amplitude to the next lowest power of 2 before setting the threshold. This will move the threshold level lower, but should also limit variation from one RCAL cycle to the next.

gain_headroom: Optional[float]

How much gain variation to build into the scaling, expressed as a fraction of maximum gain. Default = 0.1.

station_data_key_path: str

For SKA Low, path in the Telescope Model structure under which documents describing individual stations can be found. Telescope Model keys to each station are assumed to be of the form <station_data_key_path>/<station-geolabel>.yaml (e.g. stations/s8-1.yaml).

param BaseProcessor:

processor base class

__init__(beam_id: str, calibrator_to_use: ~realtime.calibration.calibrators.base_calibrator.BaseCalibrator = <realtime.calibration.calibrators.rcal_calibrator.RCalCalibrator object>, output_h5: str | None = None, kafka_server: str | None = None, kafka_topic: str | None = None, qa_kafka_topic: str | None = None, qa_metrics_kafka_topic: str | None = None, ignore_config_db: bool = False, everybeam_ms: str | None = None, sky_model: str | None = None, fov: float | None = 5, flux_limit: float | None = 0.5, uv_min: float | None = 150, autoflag: bool = True, rfi_masks: ~numpy.ndarray[~typing.Any, ~numpy.dtype[~numpy._typing._array_like._ScalarType_co]] | None = None, preproc_ave_time: int | None = None, preproc_ave_frequency: int | None = None, flip_uvw: bool = False, scale_gains: bool = False, scale_factor: float | None = None, low_gain_threshold: float | None = 0.1, threshold_headroom: bool = False, gain_headroom: float | None = 0.1, station_data_key_path: str = 'stations')

Create a new BaseProcessor.

async close()

Flush and close any remaining resources.

DataProducts are gracefully finalized transitioned to COMPLETED status where possible.

If cancel() was invoked, pending work is cancelled and pending DataProducts transition to CANCELLED.

static create(argv: Iterable[str]) RCalProcessor

Create an instance of this class from the given command line parameters. This allows user-provided classes to have their own command line parsing logic, and receive arbitrary user-provided parameters.

Parameters:

argv – A list of command line parameters.

Returns:

A new instance of this class.

async end_scan(scan_id: int) Visibility | Iterable[Visibility] | None

Called when a scan has ended. The default implementation ignores this event, but subclasses might want to react to this.

Parameters:

scan_id – the ID of the scan that has ended.

Returns:

Visibility to pass to the next processor, or None to skip subsequent processors.

async process(_dataset: Visibility) bool

Process the given visibility dataset.

Parameters:

dataset – the visibility to process

Returns:

Visibility or iterable collection of Visibility objects to pass to the next processor, or None to skip subsequent processors.

async start_scan(scan_id: int) None

Called when a new scan has started. The default implementation ignores this event, but subclasses might want to react to this.

Parameters:

scan_id – the ID of the scan that has started.

Processor helper functions

realtime.calibration.processors.utilities.create_h5parm(gaintable: GainTable, filename: str, squeeze: bool = False) File

Set up a H5Parm HDF5 file to receive one or more gaintable datasets.

The time dimension of tables in the new file is left empty. Function update_h5parm can be used to resize the tables and add gains as new time samples are processed.

The file is flushed before returning.

Parameters:
  • gaintable – GainTable

  • filename – Name of H5Parm file

  • squeeze – If True, remove axes of length one from dataset

Output:

the new h5py file object

realtime.calibration.processors.utilities.pre_process(vis: Dataset, timestep: int | None = None, freqstep: int | None = None, uv_min: float | None = 100, autoflag: bool | None = True, rfi_masks: ndarray[Any, dtype[float]] | None = None) Dataset

Pre-process a visibility dataset.

Pre-processing options:

  • Average in time

  • Average in frequency

  • Flag baselines below a minimum uv length cutoff

  • Flag based on outlier detection

  • Flag frequencies ranges

Parameters:
  • vis – Visibility data to be processed

  • timestep – Pre-average visibility dataset in time by this number of samples. For default values, see set_pre_process_averaging().

  • freqstep – Pre-average visibility dataset in frequency by this number of channels. For default values, see set_pre_process_averaging().

  • uv_min – Minimum baseline length (in wavelengths) to use in calibration. Default = 100

  • autoflag – Whether or not to run an adaptive flagger. Currently the ska-sdp-func-python preprocessing rfi_flagger is used with nominal settings.

  • rfi_masks – Flagging limits for known bad frequencies. Should be a 2D array of [start,end] frequencies in Hz, for example: rfi_masks = np.array([[137.4e6, 137.6e6], [230e6, 280e6]])

Returns:

Pre-processed the Visibility dataset

realtime.calibration.processors.utilities.set_beamformer_frequencies(gain_table: GainTable, array=None)

Generate a list of CBF beamformer frequencies

The main version of this function is in ska_sdp_func_python.calibration, however it is likely to undergo multiple changes while RCAL is developed. Use a local copy during this development and update func-python at the end.

SKA-Low beamformer:
  • Need Jones matrices for 384, 304 or 152 channels, per antenna, per beam

  • Station channels/beams are centred on integer multiples of 781.25 kHz

    781.25 kHz = 400 MHz / 512 channels

  • Station channels run from 50 MHz (64*df) to 350 MHz (448*df)

  • CBF beamformer calibration is done at the station channel resolution

  • Will assume that no padding is needed if input band < beamformer band

MID beamformer
  • Need Jones matrices for 4096 channels, per antenna, per beam

  • Timing beam bandwidth : 200 MHz (channel width : 48.8281250 kHz?)

  • Search beam bandwidth : 300 MHz (channel width : 73.2421875 kHz?)

  • Set first beamformer channel centre frequency to first input channel

Parameters:
  • gain_table – GainTable

  • array – optional argument to explicitly set the array. Should be “LOW” or “MID”. By default the gaintable configuration name will be used to set the array automatically.

Returns:

np array of shape [nfreq,]

realtime.calibration.processors.utilities.update_h5parm(gaintable: GainTable, file: File)

Update a H5Parm file with the contents of a gaintable.

Extend a H5Parm file along the time dimension by the shape of gaintable and insert the gains. The file is flushed before returning.

Parameters:
  • gaintable – GainTable

  • filename – Name of H5Parm file

  • squeeze – If True, remove axes of length one from dataset

realtime.calibration.processors.utilities.update_metadata(vis: Dataset, flip_uvw: bool | None = False) Dataset

Update visibility metadata.

Various bits of metadata need updating. So of these updates are temporary and will be eventually be retired, while others are more permanent. Updates include:

  • Option to reverse the sign of the uvw coordinates

  • Change internal uvw sample order (and sign) if baselines have upper-triangle order

  • Change time samples from end-of-integration to centre-of-integration

Parameters:
  • vis – Visibility data to be processed

  • flip_uvw – Temporary switch to deal with an inconsistency in receiver options. If true, the sign of the uvw data is inverted. Default is False.

Returns:

Pre-processed the Visibility dataset

Calibrators

class realtime.calibration.calibrators.base_calibrator.BaseCalibrator(config=NotImplemented)

A base class to inherit from when implementing a calibrator

__init__(config=NotImplemented)

_summary_

__weakref__

list of weak references to the object (if defined)

apply_flags_to_solutions() None

Set Jones matrices with zero weight to zero.

CBF expects that flagged calibration solutions are set to zero. Make sure that is the case.

calculate_chisq(vis: Visibility)

Calculate the chisq for the visibility model and Jones matrices.

Note that the visibility weights may not be set to the inverse variance of the visibility noise, in which case the expected value will not be 1, but rather a function of the noise variance.

Parameters:

vis – Visibility data to be calibrated

Returns:

Chisq estimate

abstract calibrate(vis: Visibility, beams=None)

Update calibration solutions in gain_table.

delete_skymodel()

Return the sky model to the unset state.

flag_low_gain_solutions(low_gain_threshold: float) None

Flag low-gain stations.

Flag any Jones matrix with either either X or Y gain less than low_gain_threshold multiplied by the median gain.

This might be better done in RCalCalibrator.calibrate, where the flagging can help to improve calibration.

Parameters:

low_gain_threshold – Proportion of median gain below which to flag.

get_calibration() GainTable

Return the calibration gain_table.

If it has been resampled, the resampled version is returned.

Returns:

GainTable

initialise_gaintable(vis, keep_gains=True)

Initialise the gaintable.

Initialise the gaintable before calibration. This is done separately so that it can be skipped if cumulative solving is required.

Parameters:
  • vis – Visibility data to be calibrated

  • keep_gains – If true and the gain table is already initialised with matching metadata, the existing gains will be retained. The metadata that must match are the antenna and frequency axes. Default is True.

resample(array=None)

Resample the calibration solutions.

The gain_table is re-channelised to have spectral sampling that is appropriate for CBF beamforming. The results are written into a new GainTable dataset.

Parameters:

array – Array type for re-channelisation (“MID” or “LOW”). Default = None

reset_skymodel()

Whether or not to reset (or initialise) the sky model.

class realtime.calibration.calibrators.rcal_calibrator.RCalCalibrator(config=NotImplemented)

Realtime calibration support class.

Child of BaseCalibrator.

calibrate(vis, beams: GenericBeams | None = None)

Update calibration solutions in gain_table.

Generate bandpass calibration solutions using ska_sdp_func_python function solve_gaintable.

Parameters:
  • vis – Visibility data to calibrate

  • beams – Optional GenericBeams object. If supplied, the central beam response will be divided out during initialisation

update_table(gain_table: GainTable)

Update the main bandpass table with a new one.

Any channels that are entirely flagged will be flagged in the main table and the gain values will be unchanged.

Any channels that contain non-finite gains (NaNs or Infs) will be flagged in the main table and the gain values will be unchanged.

Parameters:

gain_table – new GainTable

Sky and visibility models

Functions for generating beam models

class realtime.calibration.sky_model.beams.GenericBeams(vis: Dataset, array: str | None = None, direction: SkyCoord | None = None, ms_path: str | None = None)

A generic class for beam handling.

Generic interface to Everybeam beam models.

Currently for Mid, all beam values are set to 2x2 identity matrices.

Parameters

vis: xr.Dataset

Dataset containing required metadata.

array: str

array type (e.g. “low”). If unset or None, will check Vis model for an obvious match.

direction: SkyCoord

Beam direction. By default the vis phase centre will be used.

ms_path: str

Location of measurement set for everybeam (e.g. OSKAR_MOCK.ms).

__init__(vis: Dataset, array: str | None = None, direction: SkyCoord | None = None, ms_path: str | None = None)
__weakref__

list of weak references to the object (if defined)

array_response(direction: SkyCoord, frequency: ndarray[Any, dtype[float]], time: Time | None = None) ndarray[Any, dtype[complex]]

Return the response of each antenna or station in a given direction

Parameters:
  • direction – Direction of desired response

  • frequency – 1D array of frequencies

  • time – obstime

Returns:

array of beam matrices [nant, nfreq, 2, 2]

init_models(vis: Dataset, array: str | None = None, ms_path: str | None = None)

Initialise the beam models.

Parameters:
  • vis – Visibility data to be calibrated

  • array – array type. If unset or None, will check Vis model for an obvious match.

update_beam(frequency: ndarray[Any, dtype[float]], time: Time)

Update the ITRF coordinates of the beam and normalisation factors.

Parameters:
  • frequency – 1D array of frequencies

  • time – obstime

update_beam_direction(direction: SkyCoord)

Return the response of each antenna or station in a given direction.

Parameters:

direction – Pointing direction of the beams

Functions for generating the local sky model.

realtime.calibration.sky_model.lsm.convert_model_to_skycomponents(model: list[Component], freq: ndarray[Any, dtype[float]]) list[SkyComponent]

Convert the LocalSkyModel to a list of SkyComponents.

All sources are unpolarised and specified in the linear polarisation frame using XX = YY = Stokes I/2.

Function deconvolve_gaussian() is used to deconvolve the MWA synthesised beam from catalogue shape parameters of each component. Components with non-zero widths after this process are stored with shape = “GAUSSIAN”. Otherwise shape = “POINT”.

Parameters:
  • model – Component list

  • freq – Frequency list in Hz

  • freq0 – Reference Frequency for flux scaling in Hz. Default is 200e6. Note: freq0 should really be part of the sky model

Returns:

SkyComponent list

realtime.calibration.sky_model.lsm.deconvolve_gaussian(comp: Component) tuple[float]

Deconvolve MWA synthesised beam from Gaussian shape parameters.

This follows the approach of the analysisutilities function deconvolveGaussian in the askap-analysis repository, written by Matthew Whiting. This is based on the approach described in Wild (1970), AuJPh 23, 113.

Parameters:

compComponent data for a source

Returns:

Tuple of deconvolved parameters (same units as data in comp)

realtime.calibration.sky_model.lsm.generate_lsm_from_csv(csvfile: str, phasecentre: SkyCoord, fov: float = 5.0, flux_limit: float = 0.0) list[Component]

Generate a local sky model using a LocalSkyModel csv file.

Form a list of Component objects.

All components are Gaussians, although many will typically default to point sources. See data class Component for more information.

Parameters:
  • csvfile – CSV filename.

  • phasecentre – astropy SkyCoord for the centre of the sky model.

  • fov – Field of view in degrees. Default is 5.

  • flux_limit – minimum flux density in Jy. Default is 0.

Returns:

Component list

realtime.calibration.sky_model.lsm.generate_skycomponents(vis: Visibility, sky_model: str | None = None, fov: float = 5.0, flux_limit: float = 0.0) List[SkyComponent]

Initialise the local sky model.

Component based local sky model for the current dataset. For a large number of components or for significant diffuse emission, the full SkyModel class could be used instead.

Parameters:
  • vis – Visibility dataset

  • sky_model

    Sky model source. Options are:

    • sky_model=<my_file.csv>: A CSV file containing a sky component list readable by the ska_sdp_datamodels.global_sky_model.LocalSkyModel class, as generated by the GSM service.

    • sky_model=None: A spectrally-flat point source will be placed at the phase centre.

    Default = None

  • fov – Field of view (in degrees) used in the sky model cone search. Default = 5.0

  • flux_limit – Minimum flux density (in Jy) used in the sky model cone search. Default = 0.0

Returns:

SkyComponent List

Functions for generating/predicting model visibilities

realtime.calibration.sky_model.predict.predict_model_vis(vis: Visibility, skycomponents: List[SkyComponent], beams: GenericBeams | None = None, decorrelate: bool = True) Visibility

Predict model visibilities.

Parameters:
  • vis – Visibility dataset to be modelled

  • skycomponents – SkyComponent List containing the local sky model

  • beams – Optional GenericBeams object. Defaults to no beams.

  • decorrelate – Whether to taper by bandwidth and time-average smearing factors

Returns:

Visibility model