pyf.station | PyF, flow-based python programming pyf.station

pyf.station

PyF.Station is a protocol with client and server to transfer python generators accross tcp networks. Items in the generator must be pyf.transport.Packet instances.

Best practice is to provide information about the flow in the first packet, identified as an header (containing for example authentication data, method, target, and so on).

Errors are passed on both ends.

Examples of use

Server

Please note that the server requires tgscheduler (to spawn tasks, passing generators) and twisted.

An example:

from twisted.internet import reactor
from pyf.station import FlowServer
def sample_handler(flow, client=None):
    header = flow.next()
    print header
    for i, item in enumerate(flow):
        if not i%50:
            print i, item
    print "end of flow..."
factory = FlowServer(sample_handler)
reactor.listenTCP(8000,factory)
reactor.run()

Another example, if you are in an already threaded env (like a wsgi server):

from tgscheduler import scheduler
from twisted.internet import reactor
def sample_handler(flow, client=None):
    header = flow.next()
    print header
    for i, item in enumerate(flow):
        # every 50 items...
        if not i%50:
            print i, item
            # we send a message to the client
            client.message(Packet({'type': 'info',
                                   'message': 'hello ! (%s)' % i}))
    print "end of flow..."
factory = FlowServer(sample_handler)
reactor.listenTCP(8000,factory)
scheduler.add_single_task(reactor.run,
                          kw=dict(installSignalHandlers=0),
                          initialdelay=0)

Client

Example of client:

client = StationClient('127.0.0.1', 6789, True)
def message_handler(message_packet):
    # the handler for messages that come back from the server
    print message_packet
# we register our callback
client.add_listener('message_received', message_handler)
# we generate sample packets
flow = (Packet(dict(Field1=i+1,
                    Field2=('titi', 'tata')[i%2], num=i+1,
                    Field3=(i+1)*10))
        for i in range(10000))
values = client.call(
     flow,
     header=dict(authtkt='my false auth token :)',
                 action='my_action'))
# here values is either "True" (saying that message has passed well) or a packet, comming back from the server.
for i, value in enumerate(values):
    if not i % 5000:
        print i
    if isinstance(value, Packet):
        print value

Module documentation

pyf.station.client

class pyf.station.client.StationClient(host, port=6789, waits_for_success=False, separator='rnx00')

The station client. To send a flow to a client running on localhost, while receiving messages:

>>> client = StationClient('127.0.0.1', 6789, waits_for_success=True)
>>> values = client.call(my_flow)
>>> for value in values:
...     print value # will print everything, while the processing on server isn't finished

To just send values without waiting for results:

>>> client = StationClient('127.0.0.1', 6789, waits_for_success=False)
>>> values = client.call(my_flow)
>>> for value in values:
...     print value # will print every message until all the data has been sent, but not afterward.
add_listener(name, callback, *args, **kwargs)

Adds an event listener on the instance.

Parameters:
  • name (unicode or str) – event name to listen for
  • callback (callable) – the callable to fire when the event is emitted

Additionnal args and kwargs are passed to the callback when the event is fired

If you want to stop the callback chain, your callback should return False. All other return values are discarded.

emit_event(name, *args, **kwargs)

Emit a named event. This will fire all the callbacks registered for the named event.

Parameters:
  • name (unicode or str) – event name to listen for

Additionnal args and kwargs are passed to the callbacks (before the one that were passed to add_listener)

remove_listener(name, func)

Removes a callback from the callback list for the given event name.

Parameters:
  • name (unicode or str) – event name to listen for
  • func (method) – the function of the callback to unregister

pyf.station.main

class pyf.station.main.FlowServer(data_handler, max_items=1000, max_clients=100)

The flow server factory.

Example use:

>>> factory = FlowServer(my_handler) # handler is a function receiving a flow
                                     # and a "client" keyword argument
                                     # client being a FlowProtocol instance.
>>> reactor.listenTCP(8000,factory)
>>> reactor.run()