Tubes | PyF, flow-based python programming Tubes

Tubes

class pyf.services.model.Tube(**kwargs)

A tube is an item containing a pyf.componentized configuration used to process a flow.

The flow can either come from a pyf.componentized extractor or from a source and descriptor to read the source.

Set the needs_source to flag a tube needing a source (file like or flow). Set the needs_descriptor to flag a tube needing a descripted input.

active

Boolean column, flags the tube as active

apply_includes(config_tree, variant=None)

Applies the includes on a config_tree. Includes can be layered. If no variant is specified, it will use the passed variant as a default (typically the current run variant name).

Parameters:
  • variant (None, string) : the variant name to check for (optionnal).

Examples

Example of simple non layered include:

<include name="foo" />

Example of layered include (will apply layers on the included tube):

<include name="foo" layered="True" />

Example of layered include with a variant name (will apply the variant “bar” on the included tube):

<include name="foo" layered="True" variant="bar" />

Note

If there is a producer in the included tube, it will be removed and all its children will be included where the include takes place.

config_tree

Return the raw ET config tree without layers or includes applied

description

Description of the tube (250 chars. max)

display_name

Display name of the tube (50 chars. max)

flow(source=None, variant=None, process_name=None, progression_callback=None, message_callback=None, descriptor=None, params=None, options=None)

Launches the tube.

Parameters:
  • source (None, generator, file) : the source of the flow, only available on descriptorsource-like producers. Shoul be a file-like object if the descriptor is set, or a generator otherwise.
  • variant (None, string) : the variant name to check for (optionnal).
  • process_name (None, string) : the process name to launch (optionnal)
  • progression_callback (None, function) : the callback to run on progression (optionnal)
  • message_callback (None, function) : the callback to run on messages (optionnal)
  • descriptor (None, pyjon.descriptor) : the Pyjon Descriptor to use with the file on source
  • params (None, dict) : the params to pass to the producer
  • options (None, dict) : the params to override the global config with
Returns:
List of finalization info for the flow...
get_config_tree()

Return the raw ET config tree without layers or includes applied

get_layered_config(variant=None)

Returns the layered ET config tree. Also appliees the includes before and after applying layers

Parameters:

variant (None, string) : the variant name to check for (optionnal).
get_manager(variant=None)

Returns a Componentised PyF manager for the layered config.

Parameters
variant (None, string) : the variant name to check for (optionnal).
get_ordered_layers(variant=None, only_active=True)

Get the layer objects ordered by their priority. If the variant isn’t specified, show all the layers

get_parameters()

Returns the parameters for the Tube. Checks the <params> argument of the process and returns a dictionnary of the key values associated.

Example:

<params>
    <param>
        <key>target_date</key>
        <type>datetime</type>
        <name>Target Date</name>
        <description>End date for the extraction</description>
        <default>10/10/1998</default>
    </param>
    <param>
        <key>target_type</key>
        <type>select</type>
        <name>Target Type</name>
        <default>oo</default>
        <options>
            <option label="toto">oo</option>
            <option label="titi">ii</option>
        </options>
    </param>
</params>

Will translate into:

{'target_date': {'key': 'target_date',
                 'type': 'datetime',
                 'name': 'Target Date',
                 'description': 'End date for the extraction',
                 'default': '10/10/1998',
                 'order': 0},
 'target_type': {'key': 'target_type',
                 'type': 'select',
                 'name': 'Target Type',
                 'options': [{'option_label': 'toto',
                              'option': 'oo'},
                             {'option_label': 'titi',
                              'option': 'ii'}],
                 'order': 1}
}
get_post_process_config(variant=None)

Returns the post process config if there is one or None. Basically a post process is defined in a <postprocess> tag, just after the <process name=”...”> tag.

Parameters:
  • variant (None, string) : the variant name to check for (optionnal).
has_postprocess

Boolean: The tube has a postprocess or not

id

Numerical unique identifier of the tube (primary key)

last_run_date

Public-facing descriptor, placed in the mapped class dictionary.

launch_post_process(source, variant=None, progression_callback=None, message_callback=None)

Launches the tube post process.

Parameters:
  • source (None, generator, file) : the source of the flow, should be a generator on EventTrack objects.
  • variant (None, string) : the variant name to check for (optionnal).
  • progression_callback (None, function) : the callback to run on progression (optionnal)
  • message_callback (None, function) : the callback to run on messages (optionnal)
Returns:
List of finalization info for the flow...
name

Unique identifier of the tube, used for simple retrieval

needs_source

Boolean column, flags the tube as needing a source (either to be included in another tube, or to be launched with a dispatch and a descriptor)

payload

Public-facing descriptor, placed in the mapped class dictionary.

process_name

Returns the first process name of the tube

process_names

Returns the process names for the tube.

Warning

multiple processes aren’t supported yet.