Built-In Data Joiners | PyF, flow-based python programming Built-In Data Joiners

Built-In Data Joiners

Joiner plugin “linear”

class pyf.componentized.builtins.joiners.LinearJoiner(config_node)

This joiner takes items from all incoming ports, doing a cycle. (Ex: one item from first port, one item from second, one item from first, etc.)

Optional attribute : “type”, default: “longest” Available types :

  • longest: takes values from all sources, filling with an Ellipsis when a source is finished, and stops when there are no active sources anymore.

    <joiner pluginname="linear" type="longest" />
    
  • simple: takes values from all sources, and stops when any of the sources is finished.

    <joiner pluginname="linear" type="simple" />
    
Configuration available :
  • advanced(label: Advanced): Compound key (each sub key is an individual tag)
    • separate_process(label: Separate Process): boolean
  • name(label: Name): Simple key/value (text-based)

    unique name

Joiner plugin “sequence”

class pyf.componentized.builtins.joiners.SequencialJoiner(config_node)

This joiner takes items from all the sources, but one after another : It consumes a source entirely, then goes to the next one.

Warning

If use this joiner on sources that come from a single producer, you may take all data into memory.

You may optionaly setup the order of the sources (useful if you want to chain two separate branches of a network) :

<joiner pluginname="sequence">
    <source name="my_first_upper_node" priority="0" />
    <source name="my_second_upper_node" priority="1" />
</joiner>

Note

Lower integer means higher priority.

Configuration available :
  • advanced(label: Advanced): Compound key (each sub key is an individual tag)
    • separate_process(label: Separate Process): boolean
  • name(label: Name): Simple key/value (text-based)

    unique name

Joiner plugin “zip”

class pyf.componentized.builtins.joiners.ZipJoiner(config_node)

This joiner yields dictionnary with source name as keys and items in values. For one value in upper sources you get one dictionnary.

Optional attribute : “type”, default: “longest” Available types :

  • longest: takes values from all sources, filling with None when a source is finished, and stops when there are no active sources anymore.

    <joiner pluginname="zip" type="longest" />
    
  • simple: takes values from all sources, and stops when any of the sources is finished.

    <joiner pluginname="zip" type="simple" />
    

Example :

source_1 yields ‘A’, ‘B’ and ‘C’ and source_2 yields ‘aa’ and ‘bb’

the joiner will yield:
  1. {‘source_1’: ‘A’, ‘source_2’: ‘aa’}
  2. {‘source_1’: ‘B’, ‘source_2’: ‘bb’}
  3. {‘source_1’: ‘A’, ‘source_2’: None} (only if type is “longest”, the default)
Configuration available :
  • advanced(label: Advanced): Compound key (each sub key is an individual tag)
    • separate_process(label: Separate Process): boolean
  • name(label: Name): Simple key/value (text-based)

    unique name

Joiner plugin “orderedkey”

class pyf.componentized.builtins.joiners.OrderedKeyJoiner(config_node)

A joiner that synchronises the input sources and yields groups similar to the ones in the “zip” joiner.

Note

To synchronise data sources, it uses pyf.dataflow.merging.merge_iterators. The sources have to be ordered by this key.For example, if you set AccountCode attribute as key on a source, you have to do an order_by AccountCode on your extractor for this source.

To synchronise items, you have to set comparison keys. Basically, the merger checks every items from input sources and compares the keys, if two items in source A and B have the same key, they are yielded together, if only one source has a value with this key, it is yielded alone (and None is set on values for other sources).

If you don’t set keys, the object themselves will be compared. If you set only one key, it will be used for all sources.

Example with same key for all sources :

<joiner pluginname="sequence">
    <key type="attribute">AccountCode</key>
</joiner>

Example with individual keys for each source :

<joiner pluginname="sequence">
    <!-- Default type is "attribute", other supported are "item" (value[item])
        or "code" (eval, with "item" as available variable). -->
    <key source="my_first_upper_node">AccountCode</key>
    <key source="my_second_upper_node" type="code">item.Customer.CustomerCode</key>
    <key source="my_other_source" type="item">1</key> <!-- imagine this is a tuple, it will be second item of it :) -->
</joiner>
Configuration available :
  • advanced(label: Advanced): Compound key (each sub key is an individual tag)
    • separate_process(label: Separate Process): boolean
  • name(label: Name): Simple key/value (text-based)

    unique name

Joiner plugin “orderedkeymerge”

class pyf.componentized.builtins.joiners.OrderedKeyMerger(config_node)

Deprecated joiner, similar as using “orderedkey” with output=”packet”

Configuration available :
  • advanced(label: Advanced): Compound key (each sub key is an individual tag)
    • separate_process(label: Separate Process): boolean
  • name(label: Name): Simple key/value (text-based)

    unique name