Code reference

Core

class metropc.ViewStage(value)

View execution stage.

A view may be executed at different stages in a pipeline, each with different semantics and limitations.

FRONTEND

Within the frontend prior to dispatch to pipeline (NYI).

POOL

In the first pipeline stage possibly parallelized on an event boundary. All expensive computations required for each event should be done here, if possible.

REDUCE

In the second pipeline stage processing all events synchronously to possible combine data from several events.

ASYNC

Additional stage running asynchronously alongside the reduce stage for very expensive and long running computations across several events, which are only executed when idling (NYI).

STEP

Virtual stage running within the reduce stage at the end of an operator step.

ACTION

Virtual stagae running within the reduce stage when requested by the frontend.

class metropc.ViewOutput(value)

View output type.

The output types defines what kind of output is expected from a view. There are kinds of output types, generic and hinted. A hinted output type derives from a generic output type, but includes further information about how to handle or visualize the output.

Comparions between generic and hinted output types evaluate to True if the hinted type derives from the generic type. Comparions between hinted types evaluate to True only if they are actually identical, i.e. yields False if they are different hinted types, but derive from the same generic type.

Generic types should be single words with derived hinted types using the same word and further describe it after an underscore. The value of generic types must be smaller than 10, and any corresponding hinted type must use ten times this value as an offset, adding less than 10 itself. For the value x of the generic type, it must follow the equation: 10x + y with x, y < 10.

COMPUTE

Arbitrary output with no obvious or intended visualization, whose results are not sent to any client.

SCALAR

Each result is a scalar value, which is typically visualized as a waveform.

SCALAR_FAST

Same as SCALAR, but with the additional hint that the value is changing fast.

SCALAR_SLOW

Same as SCALAR, but with the additional hint that the value is changing slowly.

VECTOR

Each result is a one dimensional vector value, which is typically visualized as a line plot.

VECTOR_LINE

Same as VECTOR, but with the additional hint that the visualization should be a line plot.

VECTOR_DISTRIBUTION

Same as VECTOR, but with the additional hint that the visualization should be a bar plot.

MATRIX

Each value is a two dimensional matrix value, which is typically visualized as a color- or heatmap.

IMAGE

Identical to MATRIX.

POINTS

Each value is one or a list of data point(s) consisting of one or more scalars each, which are typically displayed as a binned result.

POINTS_BINNED

Same as POINTS, but with the additional hint that the visualization should bin all data points into a single result.

POINTS_SCATTER

Same as POINTS, but with the additional hint that the visualization should be a scatter plot.

class metropc.ContextStage(value)

Context stages.

Each pipeline stage (and all its instances) possesses a corresponding context object. This type only covers the actual stages with distinct instances unlike ViewStage, which also includes virtual stages with execute views, but are running in the context of a different stage (e.g. STEP and ACTION in REDUCE)

FRONTEND
POOL

Primary pipeline stage for event processing, possibly parallelized across a large number of instances.

REDUCE

Secondary processing stage for reduction across events with exactly one instance.

ASYNC

Tertiary processing stage for long-running reduction events only performed when idle (NYI).

class metropc.core.Context(source, path='<ctx>', stage=<ContextStage.FRONTEND: 0>, version=0, parameters=None, features=[], event_alias='event', event_id_alias=None, event_counter_alias=None)

Online analysis context.

The context defines the programmable stages of the online analysis pipeline in the form of views. These views allow a Python callable to be executed with the data obtained for any number of devices or other views and return result.

The context is typically loaded from a file and may contain any valid Python code, e.g. imports to help in implement its functionality. Several methods from the context object are available in the global scope of this file when evaluated in addition to the object itself via the ‘ctx’ symbol.

stage
Type

metropc.ContextStage

Stage at which this context object is created.

increase_view_counts(view_name, delta)

Increase view counts.

The view passed to this method does not need to actually exist, but may be used just for counting purposes. The count value is restricted to an int64. It may only be called from within a view execution process.

Parameters
  • view_name (str) – Name of the view to update.

  • delta (int) – Relative change to counts.

Returns

None

require_feature(*features)

Require support for a specific feature.

Feature strings indicate the presence of particular optional features supported by either metropc or its frontend in the current configuration.

Parameters

*features – All features required to run this context code.

Raises

ContextError – If any required feature is not supported.

require_version(version_str)

Require a minimal metropc version.

Parsing and comparing the version strings is performed via pkg_resources.parse_version() and follows its semantics.

Parameters

version_str (str) – Version string to compare against.

Returns

None

Raises

ContextError – If the version requirement is not satisfied.

set_view_docs(name, label, docstring=None)

Set view documentation.

The view passed to this method does not need to actually exist, but may also be part of an actual view’s path.

This call only takes an effect if called in the reduce stage.

Parameters
  • name (str) – Name of the view to update.

  • label (str) – New label.

  • docstring (str, optional) – New docstring, ignored for actual views.

Returns

None

Builtin view implementations

class metropc.builtin.HistogramView

Delegates to metropc.builtin.VectorHistogramView or metropc.builtin.MatrixHistogramView depending on output type.

class metropc.builtin.VectorHistogramView(*args, **kwargs)

View implementation to bin 1D data into histograms.

Accepts kernel data as a generator, single scalars or any Iterable of values.

__init__(*args, bin_step=None, bin_min=None, bin_max=None, bin_count=None, bin_limit=50000, bin_margin=1, dtype=<class 'numpy.float64'>, dim_label='bin', count_clipped=False, output_every=10, clear_every=None, **kwargs)

Initialize a vector histogram view.

All bin_* arguments are identical to HistogramView.BinnedAxis.

Parameters
  • dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.

  • dim_label (str, optional) – Name of the xarray dimension, ‘bin’ by default.

  • count_clipped (bool, optional) – Whether to count values clipped beyond the binning range (off by default). This may occur with fixed bin counts or when the limit is reached.

  • output_every (int, optional) – Output data every nth event with any input, 10 by default.

  • clear_every (int, optional) – Clear data every nth event with any input, disabled by default.

class metropc.builtin.MatrixHistogramView(*args, **kwargs)

View implementation to bin 2D data into histograms.

Accepts kernel data as a generator of individual points (two scalars per iteration), tuple of axis arrays (two 1D arrays, one for each axis), other Iterable of points (two scalars per item) or (N,2) shaped ndarray.

This view uses a bandwidth limit to not send excessive amounts of data to the client. It is computed by the product of x_limit, y_limit, the dtype’s item size and output_every with an assumed input rate of 10 Hz. The default limit is 10 MiB/s, which equates to a square output matrix of 1144x1144 bins for float64.

__init__(*args, x_step=None, x_min=None, x_max=None, x_count=None, x_limit=1000, x_label='x', y_step=None, y_min=None, y_max=None, y_count=None, y_limit=1000, y_label='y', bandwidth_limit=10485760, dtype=<class 'numpy.float64'>, count_clipped=False, output_every=10, clear_every=None, **kwargs)

Initialize a matrix histogram view.

All arguments prefixed with x_*, y_* (except *_label) are identical to their respective bin_* equivalents of HistogramView.BinnedAxis.

Parameters
  • x_label (str, optional) – Name of the xarray dimension, ‘x’ by default.

  • y_label (str, optional) – Name of the xarray dimensions, ‘y’ by default.

  • bandwidth_limit (int, optional) – Maximum output data rate for the view assuming 10 Hz input, 10 MiB/s by default.

  • dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.

  • count_clipped (bool, optional) – Whether to count values clipped beyond the binning range (off by default). This may occur with fixed bin counts or when the limit is reached.

  • output_every (int, optional) – Output data every nth event with any input, 10 by default.

  • clear_every (int, optional) – Clear data every nth event with any input, disabled by default.

class metropc.builtin.SortedVectorView(*args, **kwargs)

View implementation to sort vectors to binned 1D data.

Instead of the number of counts per bin, a vector value is assigned to each bin. This vector is composed of corresponding vector values assigned to each individual (scalar) data point on the bin axis, which are combined by one of several available reduce methods. It is therefore similar to a combination of a vector histogram on one axis and some vector reducing view such as an average on the other.

The supported reduce methods are:

  • sum

    Each row is the sum of all vector values assigned to its bin.

  • average

    Each row is averaged over all vector values assigned to its bin.

  • normalized

    Each row’s area is normalized to unity.

  • replace

    Each row contains the LAST result assigned to its bin.

An individual input is expected to be an Iterable as its vector value and an accompanying scalar for binning. Accepts kernel data as a generator of individual inputs, 2-tuple of a single Iterable as vector value and a scalar or 2-tuple of Iterables for vector and scalar values each. The most efficient format for several vectors is an (N,M)-shaped ndarray for N vectors of length M. If multiple values are yielded, they must be zip’d.

__init__(*args, bin_step=None, bin_min=None, bin_max=None, bin_count=None, bin_limit=50, vector_label='data', sort_label='bin', dtype=<class 'numpy.float64'>, include_clipped=False, output_every=10, clear_every=None, reduce_method='average', **kwargs)

Initialize a sorted vector view.

All bin_* arguments are identical to HistogramView.BinnedAxis.

Parameters
  • vector_label (str, optional) – Name of the xarray dimension for the vector axis, ‘data’ by default. Overridden if vector value is passed via xarray.

  • sort_label (str, optional) – Name of the xarray dimension for the scalar sort axis, ‘bin’ by default.

  • dtype (data-type, optional) – Data type for the resulting histogram buffer, float64 by default.

  • include_clipped (bool, optional) – Whether to include values on the scalar sort axis clipped beyond the binning range (off by default).

  • output_every (int, optional) – Output data every nth event with any input, 10 by default.

  • clear_every (int, optional) – Clear data every nth event with any input, disabled by default.

  • reduce_label (str, optional) – How to combine vector values with either ‘sum’ (default), ‘average’, ‘normalized’ or ‘replace’.

class metropc.builtin.HistogramView.BinnedAxis(bin_step, bin_min, bin_max, bin_count, bin_limit)

Manages coordinate axis for data binning.

Creates and manages a coordinate axis for binned data according to a predefined set of specifications. It may be set to grow automatically up to a certain limit or fixed on one or two boundaries. After creation, the align() method allows to adapt it to any new set of additional data points added. This class only covers a single dimension, i.e. 2D data may use two instances for the width and height, respectively.

Bins must have a uniform width with no gaps. The value of a bin is defined as its center with its edges at +/- bin_step/2.

__init__(bin_step, bin_min, bin_max, bin_count, bin_limit)

Initialize a binned axis.

All arguments except for bin_limit may be None to indicate they are unspecified. While conceptually like a default value, all arguments are positional and must be passed explicitly. This results in several different combinations to realize different operating modes:

  • No argument given

    Use bin_step=1.0 and automatically expand the coordinates up to bin_limit.

  • Only bin_step

    Use the supplied bin_step and automatically expand the coordinates up to bin_limit.

  • bin_min or bin_max

    Use the supplied bin_step or 1.0 by default and automatically expand the coordinates up to bin_limit in any non-specified direction.

  • Exactly two of bin_count, bin_min, bin_max.

    Uses the supplied bin_step or 1.0 by default and construct fixed coordinates, ignore bin_limit.

  • bin_count and bin_min and bin_max

    Compute bin_step and construct fixed coordinates, ignore bin_limit.

The buffer is auto-expanding in one or both direction if bin_count is None. However, bin_count may become implicitly not None if both bin_min and bin_max are specified.

Parameters
  • bin_step (float or None) – Size of a single bin, 1.0 by default and not unambiguous from other definitions.

  • bin_min (float or None) – Minimum value to include in binning, expanding by default.

  • bin_max (float or None) – Maximum value to include in binning, expanding by default. Note that the coordinates will end at bin_max - bin_size.

  • bin_count (int or None) – Number of bins, expanding by default up to bin_limit.

  • bin_limit (int) – Maximum number of bins when auto-expanding, but ignored for some configurations.

class metropc.builtin.LocalAverageView(*args, **kwargs)

View implementation to get local average of view results.

A local average collects a set number of results from view invocations, suppressing their output, and returns their average when the buffer is full. It is then cleared and collections starts again. The buffer size therefore determines the output rate of this view implementation.

Accepts any scalar or ndarray value as input, either directly or multiple values as a generator. An array axis may also be treated as multiple values if reduce_axis is passed to its constructor.

There can only be one result per event at most, so if more inputs are generated than required for the next average, only the first complete average is returned and any further inputs discarded.

__init__(*args, N=10, **kwargs)

Initialize a local average view.

Parameters
  • N (int, optional) – Number of view results to average, 10 by default.

  • reduce_axis (int, optional) – Array axis index to reduce when averaging, i.e. add multiple values to the average, disabled by default (the full input is averaged).

class metropc.builtin.GlobalAverageView(*args, **kwargs)

View implementation to get global average of view results.

A global average continuously collects the result of view invocations, suppressing their output, and periodically returns their average at that time. The intermediate result can only be cleared explicitly.

Accepts any scalar or ndarray value as input, either directly or multiple values as a generator. An array axis may also be treated as multiple values if reduce_axis is passed to its constructor.

__init__(*args, output_every=10, throttling=None, **kwargs)

Initialize a global average view.

Parameters
  • output_every (int, optional) – Output data every nth event with any input, 10 by default.

  • throttling (int, optional) – Deprecated name of output_every for compatibility with previous context code, None and ignored by default.

  • reduce_axis (int, optional) – Array axis index to reduce when averaging, i.e. add multiple values to the average, disabled by default (the full input is averaged).

class metropc.builtin.MovingAverageView(*args, **kwargs)

View implementation to get moving average of view results.

A moving average continuously collects the result of view invocations, suppressing their output, and periodically returns the average over a set of recent results. It is similar to a local average, but decouples averaging and output, so the averaging window can be larger than the output rate may cover. If both windows are identical (i.e. N == output_every), it is identical to the local average.

Accepts any scalar or ndarray value as input.

__init__(*args, N=100, output_every=10, throttling=None, smooth_start=True, **kwargs)

Initialize a moving average view.

Parameters
  • N (int, optional) – Number of view results to average, 100 by default

  • output_every (int, optional) – Output data every nth event with any input, 10 by default.

  • throttling (int, optional) – Deprecated name of output_every for compatibility with previous context code, None and ignored by default.

  • smooth_start (bool, optional) – Whether the result is scaled to regular signal intensity before the first full buffer, enabled by default. May be disabled for very large buffers to avoid an allocation, copy and rescaling on each output before the buffer is filled.

class metropc.builtin.ExtremumView(*args, **kwargs)

View implementation only returning extreme results.

Each input is expected to two values, one actual result and one comparative value, which is passed to the comparison operator. If the operator evaluates to True when comparing the current and previous extremum, the result is returned.

Accepts any number of 2-tuples either directly or as a generator.

__init__(*args, op, **kwargs)

Initialize an extreme value view.

Parameters

op (Callable) – Comparison operator taking two values and returning True if the current value is more extreme than the previous extreme (e.g. greater-than always yields the largest values).

class metropc.builtin.StepAverageView(*args, **kwargs)

View implementation get step average of view results.

A step average contains all results from view invocations that occured during an operator step.

Accepts any scalar or ndarray value as input.

__init__(*args, **kwargs)

Initialize a step average view.

class metropc.builtin.StepStackedView(*args, **kwargs)

View implementation to stack results from each step.

This view is always executed on the step boundary, taking the current result from its arguments and stacking the view result alongside a new axis, prepended to any existing axes.

Accepts any ndarray value as input.

__init__(*args, dim_label='step', **kwargs)

Initialize a step stacked view.

Parameters

dim_label (str, optional) – Name of the xarray dimension the data per step is stacked on, ‘step’ by default.

Frontend API

class metropc.frontend.Context(source, path='<ctx>', stage=<ContextStage.FRONTEND: 0>, version=0, parameters=None, features=[], event_alias='event', event_id_alias=None, event_counter_alias=None)

Online analysis context.

The context defines the programmable stages of the online analysis pipeline in the form of views. These views allow a Python callable to be executed with the data obtained for any number of devices or other views and return result.

The context is typically loaded from a file and may contain any valid Python code, e.g. imports to help in implement its functionality. Several methods from the context object are available in the global scope of this file when evaluated in addition to the object itself via the ‘ctx’ symbol.

__init__(source, path='<ctx>', stage=<ContextStage.FRONTEND: 0>, version=0, parameters=None, features=[], event_alias='event', event_id_alias=None, event_counter_alias=None)

Construct a new context.

Parameters
  • source (str) – Context source code to execute.

  • path (str, optional) – Path to a file containing ctx code, which allows for better feedback in case of exceptions.

  • stage (str, optional) – Stage at which this context object is created, may be ‘frontend’, ‘pool’ or ‘reduce’.

  • version (int, optional) – Version number of this context to secure hotplugging of the contexts into a running pipeline.

  • parameters (dict, optional) – Previous parameters mapping to use instead of default values defined in the context source.

  • features (iterable of str, optional) – Additional features supported by the frontend or pipeline stage as string identifiers.

  • event_alias (str, optional) – Alias for the term ‘event’ as it is used throughout the pipeline.

  • event_id_alias (str, optional) – Alias for the event ID used throughout the pipeline, {event_alias}_id by default.

  • event_counter_alias (str, optional) – Alias for the builtin executed events counter path, {event_alias}s by default.

Raises
  • ContextSyntaxError – If the context code contains a syntax error.

  • ViewDefError – If a view definition in the context code is malformed or contains invalid parameters.

  • ContextError – Any other error conditions due to context code.

add_parameters(*args, **kwargs) → None

Add runtime-configurable parameters.

Available in context code as “parameters.”

Parameters
  • a single dictioary or keyword arguments for each (Either) –

  • parameter.

add_path_symbols(*args, **kwargs) → None

Add path symbols to use with symbolic paths.

Available in context code as “symbols”.

Parameters
  • a single dictioary or keyword arguments for each (Either) –

  • parameter.

to_dict()

Serialize this context’s configuration.

This method is typically used for serialize the context when sending an update over ZMQ to re-initialize it. This will be part of a Frontend API later on.

Returns

(dict) Mapping of all initialization parameters to

Context.__init__.

class metropc.frontend.StageGroup(stage_runners)

Group of stage runners.

A StageGroup is a dict-like structure for StageRunner instances. It allows key access by identity and forwards calls to all its members. In addition, it aids in spawning all stages for a pipeline in a typical configurations with minimal configuration effort.

classmethod create_pipeline(zmq_control, zmq_reduce, zmq_output, sr_cls=<class 'metropc.stage.ProcessRunner'>, n_pool=1, profiling=False, **sr_kwargs)

Create pipeline with default configuration.

A reduce stage with identity ‘reduce’ and n_pool pool stages with enumerated identities starting at ‘pool0’ are created using the provided StageRunner implementation and ZMQ configuration. Note that only the instances are created, the resulting StageGroup still needs to be started.

Parameters
  • zmq_control (str) – ZMQ control address.

  • zmq_reduce (str) – ZMQ reduce address.

  • zmq_output (str) – ZMQ output address.

  • sr_cls (type, optional) – StageRunner implementation to use, ProcessRunner by default.

  • n_pool (int) – Number of pool stage instances to spawn, one by default.

  • profiling (bool, optional) – Whether profiling is enabled for the pipeline stages, disabled by default.

Returns

(StageGroup) Instance wrapping the created stage runners.

metropc.frontend.encode_protocol(identity, opcode, data)

Encode metropc message from frontend.

The frontend uses a ROUTER socket to communicate with the control network which enables addressing a particular REQUEST socket in an out-of-order fashion. The identity may be any bytes string with a length smaller than 255 and should not begin with a binary zero .

Parameters
  • identity (bytes) – Arbitrary but unique identifier for addressed stage instance.

  • opcode (bytes) – Opcode/path of this message.

  • data (Mapping) – Data payload mapping strings to values.

Returns

(list of ByteString) Message frames.

metropc.frontend.decode_protocol(frames)

Decode metropc message to frontend.

Analog to encode_protocol, this function decodes and returns the contained identity of the REQUEST socket sending the message.

Parameters

frames (Iterable of ByteString) – Received message frames.

Returns

Requesting stage instance identifier. (bytes): Opcode/path of this messages. (Mapping): Contained data payload.

Return type

(bytes)

class metropc.frontend.EventBuffer(input_names: Set[str], max_event_latency: int)

Event-matching buffer.

This data structure buffers input data as it is received and matches it by event. This abstract implementation assumes a property called buf compatible with a dictionary, but does not implement any actual buffering and/or matching behaviour. Several inheriting classes can be found below employing different matching strategies.

__init__(input_names: Set[str], max_event_latency: int) → None

Initialize a new event buffer.

Parameters
  • input_names (set) – Names for all active inputs.

  • max_event_latency (int) – Maximum number of events (in terms of their ID) an input may lag behind the current event.

__len__() → int

Get the number of actual events in the buffer.

add_input(input_event_id: int, input_name: str, input_data: Any, cur_event_id: int) → bool

Add input data to the buffer.

Parameters
  • input_event_id (int) – Event ID the data belongs to.

  • input_name (str) – Input source name.

  • input_data (Any) – Input data.

  • cur_event_id (int) – Current (“now”) event ID.

Returns

(bool) Whether this data was added to the buffer.

static by_strategy(strategy, *args, **kwargs)
clear() → None

Clear all events (and their data) from the buffer.

ready_events(cur_event_id: int) → Iterator[Tuple[int, Dict]]

Get all ready events.

The event matching strategy decides when an event is considered ready, but any given event may never be returned twice - either in the same call or across calls. Typically, an event is ready when its data has been matched as good as possible given the matching strategies constraints.

Parameters

cur_event_id (int) – Current (“now”) event ID.

Returns

(Iterable of tuples) the event ID and matched data in a

dictionary of input names.

class metropc.stage.StageRunner(context_cls, identity, zmq_control, zmq_reduce, zmq_output, profiling=False)

Stage runtime.

Pipeline stages (except for the frontend) run within a StageRunner instance that provide their environment, e.g. another thread, an asyncio coroutine or its own process. Several StageRunner may exist for the same pipeline stage except for reduce.

An implementation of StageRunner is only required to override the run() method and call the original StageRunner.run() whenever ready. When said method returns, this runner can shutdown safely.

As it is common for implementations of this class to use multiple inheritance, it is recommended to run the parent methods explicitly and not via super().

__init__(context_cls, identity, zmq_control, zmq_reduce, zmq_output, profiling=False)

Initialize stage runner instance.

Parameters
  • context_cls (type) – Context type to create context objects for this stage from.

  • identity (str) – Arbitrary but unique identifier for this stage instance. In addition to the rules defined by ZMQ, it may only contain ASCII characters.

  • zmq_control (str) – ZMQ address for control connection.

  • zmq_reduce (str) – ZMQ address for reduce connection.

  • zmq_output (str) – ZMQ address for output connection.

  • profiling (bool, optional) – Whether profiling is enabled. It is up the inheriting type to define the semantics of profiling and it may not be supported at all.

join(timeout=None)

Join this stage runner.

join_or_kill(timeout=None)

Join this stage runner or kill after timeout.

In addition to join(), this method attempts to kill the stage runner for good if the join timeout expires. Not every implementation may be able to implement this behavior.

start()

Start this stage runner.

class metropc.stage.ThreadRunner(context_cls, identity, profiling=False, **kwargs)

Bases: threading.Thread, metropc.stage.StageRunner

Thread-based stage runner.

This StageRunner implementation runs the stage in a new thread.

class metropc.stage.ProcessRunner(context_cls, identity, profiling=False, **kwargs)

Bases: multiprocessing.context.Process, metropc.stage.StageRunner

Process-based stage runner.

This StageRunner implementation runs the stage in a new process created via the multiprocessing module.

class metropc.stage.PoolContext(*args, **kwargs)

Context for pool stage.

The pool stage is the first pipeline stage outside the frontend and the default for a view. It should perform the majority of work per event and may consist of an arbitrary amount of individual workers in it with the control network distributing events between the members. As such, there is no global state between events as an event’s views may be executed in any worker. After an event has been processed, the results are forwarded to the reduce stage over the reduce connection while the frontend is notified.

In addition to the generic opcodes handled by StageContext, PoolContext only adds the ‘event’ opcode.

class metropc.stage.ReduceContext(*args, **kwargs)

Context for reduce stage.

The reduce stage sits at the end of the pipeline to reduce the results from the pool stage, possibly across events, and send them over the output connection to any clients. In addition, it generates the pipeline index listing data flowing through the pipeline. As such, there can be only one instance of the this stage in a pipeline and any view requiring a shared state must be run in it. If possible, any heavy computation should be done in the pool stage before and send downstream to only perform the final integration step here.

ReduceContext implements the opcodes ‘index’, ‘action’, ‘step_begin’ and ‘step_end’ in addition to generic ones by StageContext.

Client API

metropc.client.decode_protocol()

Identical to metropc.protocol.decode_protocol(), but automatically decodes b'index' opcodes into metropc.client.IndexEntry objects.

metropc.protocol.decode_protocol(frames)

Decode metropc message.

This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.

Parameters

frames – (Iterable or Iterator of ByteString): Received message frames.

Returns

Opcode/path of this message. (Mapping): Contained data payload.

Return type

(bytes)

class metropc.client.IndexEntry(counts, rate)

Generic index entry.

Each item in the pipeline index is encoded with this type in the protocol and then exposed on the client-side. Each field is stored in slots to avoid allocating a class dictionary.

class metropc.client.IndexViewEntry(counts, rate, output, stage)

Index entry for views.

Extends IndexEntry by slots for output type and stage.

ZeroMQ Protocol

metropc protocol.

All communications between the frontend, pipeline stages and clients uses ZeroMQ. This allows the distribution of these components to scale transparently from threads (via inproc transport) over processes (ipc) to nodes (tcp).

A metropc message consists of an opcode or path followed by the data payload in the form of a dict-like mapping of strings to arbitrary values. If possible, an optimized encoding/decoding fastpath for the data values is provided. This fastpath currently supports only the first level of this dictionary (i.e. not for nested dictionaries) and values of these types, either directly or in tuples of less than 256 elements: bytes, bool, int up to 64 bit, float, ndarray, DataArray, None. If the fastpath cannot be applied, the value is pickled.

A message always begins with two fixed message frames:

  • Frame 0: UTF8-encoded opcode/path

  • Frame 1: Type IDs of data values

For messages between the frontend and pipeline stages, the first message is an opcode denoting an operation to perform. Between the output and clients, it’s a data path the content of the message is about. The type IDs describes the type of values for fastpath serialization in the data mapping in the same order as they are encoded in the message. Each data key entry begins with the number of values for this key to represent tuples with up 255 elements, followed by single characters for each encoded type. For example, the type IDs for a data value like (42, b’foo’) are encoded as b’ib’ with b’i’ for integer and ‘b’ for a bytes string. The type IDs for all keys are directly concatenated to form a single long string.

The header is followed by the message content with one frame per UTF-8 encoded key and a type-dependant number x of frames for its value:

  • Frame N: UTF8-encoded key

  • Frame N+1 … N+x: Encoded value

This module defines the basic encoding/decoding scheme and should not be used directly by any code outside of metropc, but rather the imports via the corresponding API modules frontend and client. These add additional functionality required for these componets, e.g. identity control in the frontend.

metropc.protocol.decode_protocol(frames)

Decode metropc message.

This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.

Parameters

frames – (Iterable or Iterator of ByteString): Received message frames.

Returns

Opcode/path of this message. (Mapping): Contained data payload.

Return type

(bytes)

metropc.protocol.encode_protocol(opcode, data={}, parts=None)

Encode metropc message.

This function should not be used directly outside of metropc, e.g. in a frontend or client implementation, but instead the symbols defined in the corresponding API module.

Parameters
  • opcode (bytes) – Opcode/path of this message.

  • data (Mapping) – Data payload mapping strings to values.

  • parts (list, optional) – List to fill encoded frames into, a new list is created if None (default).

Returns

Return type

(list of ByteString) Message frames