Introduction¶
The MetroProcessor expresses transformations via Python code injectable at runtime as a collection of functions, so called views. A view takes input from one or more data sources in Karabo, e.g. properties or fast pipeline data, and may return a result, which in turn serves as input to other views. The execution of views occurs per train and all data is matched on the train boundary for this purpose.
Typical applications are online analysis from slicing data to a region of interest over performing simple statistical analyses to extensive pipelines to process raw data in realtime. As the underlying analysis code can be changed quickly at runtime, it allows for fast iterations and explicit implementations to solve a given problem in the very same moment. In general, any operation that can be expressed with Python may be performed. This also includes calling native code or bindings to larger frameworks, e.g. OpenCL or CUDA. The pipeline can scale from running in the same process to balancing trains across cores or even nodes.
The device takes care of wiring the data flow to and from the pipeline code. Its output may be sent back to Karabo as pipeline data (WIP!) and is accessible via a bridge-like ZMQ protocol for consumption by external tools, e.g. for visualization or further analysis. Currently, the code is typically supplied via a file on the respective node the device is running on.
As an example, consider two spectrometers measuring the spectral distribution of a laser pulse before and after some optics. The laser operator would like to monitor both the respective spectra as well as their normalized difference. As each spectrometer has a unique wavelength calibration of its sensor, the difference requires interpolation. The corresponding analysis code could look like this:
import numpy as np
from scipy.interpolate import interp1d
@View.Vector
def src(x: 'SQS_ILH_LAS/SPEC/SRC.wavelengths',
y: 'SQS_ILH_LAS/SPEC/SRC:output.data.spectrum'):
"""Normalized spectral distribution at the source.
"""
return x, y/y.max())
@View.Vector
def exp(x: 'SQS_ILH_LAS/SPEC/EXP.wavelengths',
y: 'SQS_ILH_LAS/SPEC/EXP:output.data.spectrum'):
"""Normalized spectral distibution at the experiment.
"""
return x, y/y.max()
@View.Vector
def diff(src_data: 'src', exp_data: 'exp'):
"""Difference spectrum on interpolated X vector.
"""
# Unpack the view arguments.
x_src, y_src = src_data
x_exp, y_exp = exp_data
# Define a common X axis for both spectrometers.
x_both = np.linspace(max(x_src.min(), x_exp.min()),
min(x_src.max(), x_exp.max()),
min(len(x_src), len(x_exp)))
# Obtain interpolation functions from scipy.
f_src = interp1d(x_src, y_src)
f_exp = interp1d(x_exp, y_exp)
return x_both, f_src(x_both) - f_exp(x_both)
Here, three views are defined called src
, exp
and diff
.
The first two views src
and exp
are used to combine the wavelength calibration and the spectral measurement, which is also normalized to unity, for each of the two spectrometers. Each view receives two arguments, the wavelength
property and the key data.spectrum
within the output
pipeline hash. Argument annotations are used to declare the source for a view’s argument, which are all coming from Karabo in this case. Both views will be executed individually for each train when their respective devices send out fast data with a spectral measurement, while the wavelength property as slow data is always available.
The third view diff
is used to obtain the difference between both spectra. As its input, this view takes the result of the aforementioned views - hence, it will execute whenever both views are executed for a given train and return an actual result. As with Karabo sources before, another view may be declared in an argument annotation. Since the wavelength vectors of the two spectrometers may be different, it uses interpolation to obtain the spectra on a common wavelength axis and then performs the substraction.
As all views are annotated with a Vector
decorator, a client will know to display it as a line plot. It is customary to interpret a tuple of vectors as the respective X and Y data.