Network tutorial | PyF, flow-based python programming Network tutorial

Network tutorial

In this second tutorial we will present the same simple exemple as in the first tutorial. The real difference will be that we will now use the networking code of Pyf instead of just using the low level components.

Setup

First of all we will define a source of object by creating a dummy class that we call User:

class User(object):
    def __init__(self, name, email, level):
        self.name = name
        self.email = email
        self.level = level

Once we have this dummy user class we create our source of dummy users. In this tutorial we will use small function that will produce easily an enormous source of data so that you can verify by yourself that Pyf enables to manipulate huge data volumes easily.

def get_source():
    for index in range(0, 6):
        yield User(
                "John%04d" % index,
                "john%04d@some-where.com" % index,
                ['high', 'low', 'high', 'high', 'low'][index%5])

In this tutorial we just define our components without explanation, if you need some explanation about the following code please go to the first tutorial.

from pyf.dataflow import component
def high_only(item):
    if item.level == "high":
        return True
def low_or_med(item):
   if item.level == "low" or item.level == "med":
       return True
@component('IN', 'OUT')
def set_numeric_level(values, out, value=None):
    for item in values:
        item.numeric_level = value
        # yield explicitly on out port...
        # a simple "yield item" would work in this case
        yield (out, item)

Now we also create a component out of our small user printer that was defined in the last tutorial.

from pyf.dataflow import BYPASS_VALS
@component('IN', 'OUT')
def userprinter(values, out):
    for item in values:
        if item not in BYPASS_VALS:
            print "User: '%s', email: '%s', level: '%02d'" % (
                    item.name,
                    item.email,
                    item.numeric_level)
            yield True

No surprise here, we just added a @component decorator on a function that prints our “users”. Here is what we will create in the next step.

../../../../../_images/network_usage_scenario.png

Networking

And now for the big change! We will link-up all our components using the networking layer of Pyf.

from pyf.manager.network import Network
from pyf.dataflow.components import generator_to_component
from pyf.dataflow.components import filter_values
network = Network(nodes=dict(
    source1=dict(component=generator_to_component(get_source()), after=['filter1', 'filter2']),
    filter1=dict(component=filter_values, args=dict(comp=high_only)),
    filter2=dict(component=filter_values, args=dict(comp=low_or_med)),
    adapter1=dict(component=set_numeric_level, args=dict(value=10), before=['filter1']),
    adapter2=dict(component=set_numeric_level, args=dict(value=0), before=['filter2']),
    printer=dict(component=userprinter, before=['adapter1', 'adapter2']),
    ))
network.initialize()

With the Network code everything is made easier. For those of you coming from the first tutorial, you may note that this time we bind all our components using a single (even if long) command.

In fact the Network class takes a dictionnary mapping of all your network graph objects. And your network objects are themselves just dictionnaries mapping components with their arguments and their relation to the other objects in the graph. The thing to note is that a relation can be declared as a before or an after keyword. The before and after keywords are just lists of components that you have named and put in the same network.

The next big difference compared with the first tutorial is the fact that we do not explicitly define and link the splitters and mergers. The network just splits and merges automatically at “compilation time” by introspecting the network and discovering how many components are linked together.

Once the network is defined we just “compile” it by calling the .initialize() method without arguments.

Running

Running Basics

To make the network run we need to consume either the status handler:

# status resolves as True or False in Boolean
# and reflects if there are errors in the network
for status in network.get_status_handler():
    if not status:
        raise ValueError('Error status received')
User: 'John0000', email: 'john0000@some-where.com', level: '10'
User: 'John0001', email: 'john0001@some-where.com', level: '00'
User: 'John0002', email: 'john0002@some-where.com', level: '10'
User: 'John0004', email: 'john0004@some-where.com', level: '00'
User: 'John0003', email: 'john0003@some-where.com', level: '10'
User: 'John0005', email: 'john0005@some-where.com', level: '10'

Status handler expects your components to yield Boolean statuses, so if your output components yield real values you should not use the status handler method and iterate by yourself on the end nodes’ iterators of your graph.

Items were shuffled during the process. this is because the default joiner is a linear_merger (as seen in Filtering), which alternates items from each of its inputs. To keep the original order, one can have the filters insert Ellipsis elements in place of the filtered out items:

network2 = Network(nodes=dict(
    source1=dict(component=generator_to_component(get_source()), after=['filter1', 'filter2']),
    filter1=dict(component=filter_values, args=dict(comp=high_only, add_ellipsis=True)),
    filter2=dict(component=filter_values, args=dict(comp=low_or_med, add_ellipsis=True)),
    adapter1=dict(component=set_numeric_level, args=dict(value=10), before=['filter1']),
    adapter2=dict(component=set_numeric_level, args=dict(value=0), before=['filter2']),
    printer=dict(component=userprinter, before=['adapter1', 'adapter2']),
    ))
network2.initialize()
for status2 in network2.get_status_handler():
    if not status2:
        raise ValueError('Error status received')
User: 'John0000', email: 'john0000@some-where.com', level: '10'
User: 'John0001', email: 'john0001@some-where.com', level: '00'
User: 'John0002', email: 'john0002@some-where.com', level: '10'
User: 'John0003', email: 'john0003@some-where.com', level: '10'
User: 'John0004', email: 'john0004@some-where.com', level: '00'
User: 'John0005', email: 'john0005@some-where.com', level: '10'

Running Internals

If you need more control or cannot use the status handler method, you also have access to a list of output generators on network.consumers. You can iterate on those generators if you want the real output values and not use the status handler.

# python 2.6 example
#from itertools import izip_longest
# pyf chooses the righ izip_longest for you depending on your python version
from pyf.dataflow.components import izip_longest
network.nodes['source1']['component'] = generator_to_component(get_source())
network.initialize()
consumers = network.consumers
for result_tuple in izip_longest(*consumers):
    #* just a fix for broken vim syntax parser
    if not result_tuple[0] in BYPASS_VALS:
        print result_tuple
User: 'John0000', email: 'john0000@some-where.com', level: '10'
(True,)
User: 'John0001', email: 'john0001@some-where.com', level: '00'
(True,)
User: 'John0002', email: 'john0002@some-where.com', level: '10'
(True,)
User: 'John0004', email: 'john0004@some-where.com', level: '00'
(True,)
User: 'John0003', email: 'john0003@some-where.com', level: '10'
(True,)
User: 'John0005', email: 'john0005@some-where.com', level: '10'
(True,)

Since the network has been consumed in our previous example we just reset the source inside the network datastructure and then initialise it again.