Getting started

Installation

To install this package use:

pip install --extra-index-url=https://artefact.skao.int/repository/pypi-internal/simple ska-sdp-realtime-calibration.

The RCalProcessor derives from SDP Receive Processors class BaseProcessor and requires a number of SKA SDP packages for real-time data access and calibration routines:

See the SDP Receive Processors documentation for detailed information about the class and how it interacts with other processing components.

Running the RCal processor

The SDP Receive Processors package provides the program used to load and run the processors. This program is called plasma-processor. It loads the user-supplied processor class and sets up the necessary infrastructure to connect it to the Plasma store. See the processors documentation for more information. With access to the Plasma store taken care of, a Kafka producer can be enabled for transmission of gain arrays and provided to a new RCalProcessor:

producer = AIOKafkaProducer(bootstrap_servers=KAFKA_HOST)
producer.start()

rcal_processor = RCalProcessor(
    max_calibration_intervals=NUM_TIMESTAMPS,
    rcal_producer=RCalProducer(
        producer,
        bandpass_topic,
    ),
)

This processor can be used within a Processor runner:

rcal_runner = Runner(
    PLASMA_SOCKET,
    rcal_processor,
    polling_rate=0.001,
    use_sdp_metadata=False,
)

and run alongside the Plasma store and receiver. Another processing component, such as the CBF beamformer, can launch a Kafka consumer to receive the gain arrays.

Gain arrays

The RCalProcessor fills a SDP datamodels GainTable xarray with antenna-based gain solutions of jones_type “B”. These are re-channelised to have spectral sampling that is appropriate for CBF beamforming, as determined by SDP func-python calibration.beamformer_utils. To help determine what sampling is appropriate, an additional RCalProcessor constructor argument, array, can be set to “LOW” or “MID”.

The CBF beamformer is expecting bandpass calibration Jones matrices, sent as an array of complex floats with dimensions (Nantenna, Nfrequency, Npolarisation). These are extracted from the re-channelised GainTable and sent to Kafka as a BytesIO memory buffer:

nant = len(gaintable.antenna)
nfreq = len(gaintable.frequency)
npol = len(gaintable.receptor1) * len(gaintable.receptor2)
bio = BytesIO()
numpy.save(bio, gaintable.gain.data.reshape(nant, nfreq, npol))
data = bio.getbuffer()
producer.send_and_wait(bandpass_topic, data)