Getting started
Installation
- To install this package use:
pip install --extra-index-url=https://artefact.skao.int/repository/pypi-internal/simple ska-sdp-realtime-calibration
.
The RCalProcessor derives from SDP Receive Processors
class BaseProcessor
and requires a number of SKA SDP packages for
real-time data access and calibration routines:
See the SDP Receive Processors documentation for detailed information about the class and how it interacts with other processing components.
Running the RCal processor
The SDP Receive Processors package provides the
program used to load and run the processors. This program is called
plasma-processor
. It loads the user-supplied processor class and sets up
the necessary infrastructure to connect it to the Plasma store. See the
processors documentation for more information.
With access to the Plasma store taken care of, a Kafka producer can be
enabled for transmission of gain arrays and provided to a new RCalProcessor:
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_HOST)
producer.start()
rcal_processor = RCalProcessor(
max_calibration_intervals=NUM_TIMESTAMPS,
rcal_producer=RCalProducer(
producer,
bandpass_topic,
),
)
This processor can be used within a Processor runner:
rcal_runner = Runner(
PLASMA_SOCKET,
rcal_processor,
polling_rate=0.001,
use_sdp_metadata=False,
)
and run alongside the Plasma store and receiver. Another processing component, such as the CBF beamformer, can launch a Kafka consumer to receive the gain arrays.
Gain arrays
The RCalProcessor fills a SDP datamodels GainTable xarray with antenna-based gain solutions of jones_type “B”. These are re-channelised to have spectral sampling that is appropriate for CBF beamforming, as determined by SDP func-python calibration.beamformer_utils. To help determine what sampling is appropriate, an additional RCalProcessor constructor argument, array, can be set to “LOW” or “MID”.
The CBF beamformer is expecting bandpass calibration Jones matrices, sent as an array of complex floats with dimensions (Nantenna, Nfrequency, Npolarisation). These are extracted from the re-channelised GainTable and sent to Kafka as a BytesIO memory buffer:
nant = len(gaintable.antenna)
nfreq = len(gaintable.frequency)
npol = len(gaintable.receptor1) * len(gaintable.receptor2)
bio = BytesIO()
numpy.save(bio, gaintable.gain.data.reshape(nant, nfreq, npol))
data = bio.getbuffer()
producer.send_and_wait(bandpass_topic, data)