Low-level tutorial | PyF, flow-based python programming Low-level tutorial

Low-level tutorial

In this first tutorial we will present a simple exemple, let’s imagine that we want to produce a security listing from one of our system. This listing will be (at the end) in a CSV file. The source may be some kind of SQL server, for the moment we will consider that the source is an iterator.

First of all we will define a source of object by creating a dummy class that we call User:

class User(object):
    def __init__(self, name, email, level):
        self.name = name
        self.email = email
        self.level = level

Once we have this dummy user class we create our source of dummy users:

source = [
    User("John Doe", "john.doe@some-where.com", "high"),
    User("Jane Doe", "jane.doe@some-where.com", "high"),
    User("Joe Dalton", "joe.dalton@some-where.com", "high"),
    User("Jack Dalton", "jack.dalton@some-where.com", "med"),
    User("Averell Dalton", "averell.dalton@some-where.com", "low"),
    ]

For the purpose of this exemple we will want to split the user population in two categories only. And then each category of user will be manipulated to prepare the items in the flow before merging again and giving the result back. In this exemple we do not yet write to a csv file and just print out the flow to stdout:

../../../../../_images/low_level_scenario.png

As you can see in this schema, we use a splitter and a filter on each branch of the resulting graph. This is because what we call a splitter is indeed only that, a splitter. The splitter node represented in this schema is a node that will ensure that each branch of the graph receives all the objects of the original source.

As some of you may already wonder, but what happens if the original object it modified differently in two different branches? The answer is simple: you will not modify the original object. If you need to modify an object you will in fact return a wrapped object using the tools provided by our library (more on that later).

Splitting

That said we will begin to show some code. The first thing we need to do is to setup a splitter to cut the flow:

from pyf.dataflow import runner
from pyf.dataflow.components import splitm
# splitm takes a size kwarg which defines the size of
# the output array
splitter = runner(splitm, dict(size=2))
# let's pretend that the source is fed to us in an
# iterator (memory consumption)
splitter.connect_in('source', iter(source))

As you can see in this exemple, setting up a splitter for your data flow is as simple as importing the splitm base component from the dataflow.components in pyf.

In the low levels of our API every component must be manually wrapped in a pyf.dataflow.runner. The runner takes care of wrapping a component by adding the necessary functionnalities that are desired on a node of a dataflow network. A runner provides the necessary functions to manage input and output ports.

The second important operation we perform is to connect our data source to the input port of our splitter (wrapped in a runner). Every runner object exposes a connect_in and a connect_out method that take two arguments. The first argument is always the name of the port as it was defined on the component and the second is the source or target (more on that later).

If the component is something you import from elsewhere (as opposed to written by yourself), you will have to read its documentation to know its port names. In the exemple the splitm component has a ‘source’ input port to which we connect our data source.

You may note that we decided to wrap the simple list of users inside a iter() call to show that we work with a real data flow and not only with in-memory lists.

Filtering

Now that we separated the flow in two we will setup some filters. In this exemple we will create our own filters using the primitives provided by pyf. First we define ultra simple functions that return True when we want to keep the item and False otherwise:

def high_only(item):
    if item.level == "high":
        return True
def low_or_med(item):
   if item.level == "low" or item.level == "med":
       return True

Then we just make use of these functions inside the main() function that we begun with this exemple:

from pyf.dataflow.components import filter_values
# we need two filters for the 2 output ports of the splitter
# comp is the callable that does the comparison to know what should
# be passed to the next step and what should be discarded
# the default 'filter_value' component can yield a special object
# called Ellipsis to make sure that if you filter out a value a placeholder
# is inserted in its place in the flow.
# This is important if you want to keep the original ordering of the flow
high_filter = runner(filter_values, dict(comp=high_only, add_ellipsis=True))
low_filter = runner(filter_values, dict(comp=low_or_med, add_ellipsis=True))
# we now connect the filters to the splitter out ports
high_filter.connect_in('values', splitter('out', 0))
low_filter.connect_in('values', splitter('out', 1))

In this code chunk we create two new runners that wrap another core component named filter_values. The filter_values component, which is provided by pyf.dataflow.components does nothing by itself, it needs to be provided with a comp keyword argument that points to a callable. This callable will be fed with each item of the flow one by one and will need to answer True or False for each of them.

When the filtering function returns True, the runner takes the item from the input port and forwards it to the output port as is. When the filtering function returns False the runner will return nothing. For flow regulation reasons you may want to make sure that even if your filter function returns False, the subsequent dataflow behind it sees some items anyway. This is what we do in our exemple because we want to keep the ordering of the source in the resulting flow at the end.

To keep the number of items equal in the output port as it is in the input port of a filter (wrapped by a runner) you just need to add the add_ellipsis=True keyword argument. In this case each time your filtering function returns False, the runner will return an Ellipsis object (also named a continuation object).

Other components of the network (even if we are at the low level this is a network we are building already) will receive those ellipsis objects and will know how to treat them. In the other components you will also have access to the same kind of arguments to handle ellipsis objects.

Manipulating

We now have two data flows that “know” which element they need to manage because our filters are doing the selection work. We will now enrich the items with a new attribute which is not present on the original one before outputing the result. This will show you how to manipulate the objects (take care that as we said previously we will manipulate the original object):

from pyf.dataflow import runner, component
@component('IN', 'OUT')
def set_numeric_level(values, out, value=None):
    for item in values:
        item.numeric_level = value
        # yield explicitly on out port...
        # a simple "yield item" would work in this case
        yield (out, item)

This is the first time we write our own component instead of just importing a core one from pyf. As you can see this is extremly simple. You just need to import the pyf.dataflow.component decorator and to decorate your function with it. The component decorator lets you define the input and output ports and their types. The diverse possibilities are “IN”, “OUT”, “INA”, “OUTA”:

  • IN: means a simple input generator for your underlying function
  • OUT: means a simple output generator for your underlying function
  • INA: means an input array of generators for your underlying function
  • OUTA: means an output array of generators for your underlying function

Note: if you need to use an output array written “OUTA” the size needs to be yielded at first on a specific virtual port to configure the output array. The yields that you perform on the output array is then yielded on the port(index) instead of just the port itself.

@component('IN', 'OUTA')
def splitm(source, out, size=3):
    """ Splits a data source in n sources (size kwarg)"""
    yield (out.size(), size)
    for row in source:
        for i in range(size):
            yield (out(i), row)

This example is the internal implementation of the pyf.dataflow.components.splitm component from zflow (the underlying flow programming library that we use). As you can see it just takes one generator as an input port and then outputs the same object on an array of 3 generators.

Note that we do not keep objects inside our component and just yield each object as soon as we have performed the desired action on it.

For our exemple we will just set a new attribute to every object reflecting the way we want to see it in our final reporting. The value we set will just be a fixed value per branch of our graph. To do this we will just use yet some other nodes in our graph that will make use of the component we just wrote above. These new nodes will be connected to the output of the two filters and will yield modified items with numeric representations of the level.

# now add the numeric level on the two branches
high_setter = runner(set_numeric_level, dict(value=10))
low_setter = runner(set_numeric_level, dict(value=0))
# connect the setters on the relevant branches
high_setter.connect_in('values', high_filter('out'))
low_setter.connect_in('values', low_filter('out'))

Merging

Once the two branches have been properly manipulated, it is time to merge back the two branches to finally output something real to the console and see if it works :)

from pyf.dataflow.components import linear_merging
from pyf.dataflow import BYPASS_VALS
# create a merger
merger = runner(linear_merging)
# connect the sources in the input of our merger
merger.connect_in('sources', high_setter('out'))
merger.connect_in('sources', low_setter('out'))
# now we can iterate on the out port of the merger
# to get the resulting items
# since the filters and also the merger are using Ellipsis objects
# as placeholders we need to ignore those special items.
for item in merger('out'):
    if item not in BYPASS_VALS:
        print "User: '%s', email: '%s', level: '%02d'" % (
                item.name,
                item.email,
                item.numeric_level)

The output should be like this:

User: 'John Doe', email: 'john.doe@some-where.com', level: '10'
User: 'Jane Doe', email: 'jane.doe@some-where.com', level: '10'
User: 'Joe Dalton', email: 'joe.dalton@some-where.com', level: '10'
User: 'Jack Dalton', email: 'jack.dalton@some-where.com', level: '00'
User: 'Averell Dalton', email: 'averell.dalton@some-where.com', level: '00'

Here we are using a linear merger, which is provided as a standard merger in pyf. You can see in pyf documentation to find other possible mergers. Keep in mind that you can write your own if you don’t find the desired component ready to use.

BYPASS_VALS is a list of values that pyf considers as values that can be ignored since they are used by internal components as placeholders values. Ellipsis objects are part of this list.

Full Program

Below you can admire the full code of the program we just created:

from pyf.dataflow import runner, component
from pyf.dataflow.components import splitm, filter_values
from pyf.dataflow import BYPASS_VALS
from pyf.dataflow.components import linear_merging
class User(object):
    def __init__(self, name, email, level):
        self.name = name
        self.email = email
        self.level = level
source = [
        User("John Doe", "john.doe@some-where.com", "high"),
        User("Jane Doe", "jane.doe@some-where.com", "high"),
        User("Joe Dalton", "joe.dalton@some-where.com", "high"),
        User("Jack Dalton", "jack.dalton@some-where.com", "med"),
        User("Averell Dalton", "averell.dalton@some-where.com", "low"),
        ]
def high_only(item):
    if item.level == "high":
        return True
def low_or_med(item):
   if item.level == "low" or item.level == "med":
       return True
@component('IN', 'OUT')
def set_numeric_level(values, out, value=None):
    for item in values:
        item.numeric_level = value
        # yield explicitly on out port...
        # a simple "yield item" would work in this case
        yield (out, item)
def main():
    # splitm takes a size kwarg which defines the size of
    # the output array
    splitter = runner(splitm, dict(size=2))
    # let's pretend that the source is fed to us in an
    # iterator (memory consumption)
    splitter.connect_in('source', iter(source))
    # we need two filters for the 2 output ports of the splitter
    # comp is the callable that does the comparison to know what should
    # be passed to the next step and what should be discarded
    # the default 'filter_value' component can yield a special object
    # called Ellipsis to make sure that if you filter out a value a place holder
    # is inserted in its place in the flow.
    # This is important if you want to keep the original ordering of the flow
    high_filter = runner(filter_values, dict(comp=high_only, add_ellipsis=True))
    low_filter = runner(filter_values, dict(comp=low_or_med, add_ellipsis=True))
    # we now connect the filters to the splitter out ports
    high_filter.connect_in('values', splitter('out', 0))
    low_filter.connect_in('values', splitter('out', 1))
    # now add the numeric level on the two branches
    high_setter = runner(set_numeric_level, dict(value=10))
    low_setter = runner(set_numeric_level, dict(value=0))
    # connect the setters on the relevant branches
    high_setter.connect_in('values', high_filter('out'))
    low_setter.connect_in('values', low_filter('out'))
    # create a merger
    merger = runner(linear_merging)
    # connect the sources in the input of our merger
    merger.connect_in('sources', high_setter('out'))
    merger.connect_in('sources', low_setter('out'))
    # now we can iterate on the out port of the merger
    # to get the resulting items
    # since the filters and also the merger are using Ellipsis objects
    # as place holders we need to ignore those special items.
    for item in merger('out'):
        if item not in BYPASS_VALS:
            print "User: '%s', email: '%s', level: '%02d'" % (
                    item.name,
                    item.email,
                    item.numeric_level)
main()

And its output should be like this (same as above):

User: 'John Doe', email: 'john.doe@some-where.com', level: '10'
User: 'Jane Doe', email: 'jane.doe@some-where.com', level: '10'
User: 'Joe Dalton', email: 'joe.dalton@some-where.com', level: '10'
User: 'Jack Dalton', email: 'jack.dalton@some-where.com', level: '00'
User: 'Averell Dalton', email: 'averell.dalton@some-where.com', level: '00'