wiki:Tutorial

Version 2 (modified by Frédéric, 11 years ago) ( diff )

--

Draft

Tutorial

Architecture

Vocabulary

TODO

Functional Blocks

This is the central feature of pKNyX, allowing user to create virtual devices which mimics real KNX devices. The Device itself is implemented as a process.

Here is a very simple example which implements a timer. This timer can monitor the state of a light, and switch it off automatically after a delay. The code is taken from the examples/3_timer directory of pKNyX sources.

from pknyx.api import Logger
from pknyx.api import FunctionalBlock, Stack, ETS
from pknyx.api import Scheduler, Notifier

NAME = "timer"
IND_ADDR = "1.2.3"

# ETS group address map
GAD_MAP = {"1": dict(name="lights", desc="Lights"),
           "1/1": dict(name="cmds", desc="Commands"),
           "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"),
           "1/2": dict(name="states", desc="States"),
           "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"),
           "1/3": dict(name="delay", desc="Delays"),
           "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"),
          }


logger = Logger("%s-%s" % (NAME, IND_ADDR))

stack = Stack(individualAddress=IND_ADDR)
ets = ETS(stack=stack, gadMap=GAD_MAP))

schedule = Scheduler()
notify = Notifier()


class TimerFB(FunctionalBlock):
    """ Timer functional block
    """

    # Datapoints definition
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer"

    def _init(self):
        """ Additionnal init of the timer
        """
        self._timer = 0

    @schedule.every(seconds=1)
    def updateTimer(self):
        """ Method called every second.
        """
        #logger.trace("TimerFB.updateTimer()")

        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        """ Method called when the 'state' datapoint changes
        """
        logger.debug("TimerFB.stateChanged(): event=%s" % repr(event))

        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            Logger().info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                Logger().info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        """ Method called when the 'delay' datapoint changes
        """
        logger.debug("TimerFB.delayChanged(): event=%s" % repr(event))

        # If the timer is running, we reset it to the new delay
        if self._timer:
            delay = self.dp["delay"].value
            Logger().info("%s: delay changed; restart timer" % self._name)
            self._timer = delay


def main():

    # Register functional block
    ets.register(TimerFB, name="timer", desc="")

    # Weave datapoints
    ets.weave(fb="timer", dp="cmd", gad="1/1/1")
    ets.weave(fb="timer", dp="state", gad="1/2/1")
    ets.weave(fb="timer", dp="delay", gad="1/3/1")

    print
    ets.printGroat("gad")
    print
    ets.printGroat("go")
    print
    schedule.printJobs()
    print

    # Run the stack main loop (blocking call)
    stack.mainLoop()


if __name__ == "__main__":
    try:
        main()
    except:
        logger.exception("3_main")

That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1, 1/3/1 and 1/3/1. All you have to do, is to weave (link, bind...) the Group Objects of your real devices to these Groups Addresses, using the real ETS application. Sure, you can change the GAD to match your installation.

Ok, now, lets start this device (invoke the python interpreter as for any other python script):

Lets have a closer look to this example. First, we import some python objects:

from pknyx.api import Logger
from pknyx.api import FunctionalBlock, Stack, ETS
from pknyx.api import Scheduler, Notifier

These objects are all classes.

We then instanciante some high level objects and helpers:

logger = Logger("%s-%s" % (NAME, IND_ADDR))

stack = Stack(individualAddress=IND_ADDR)
ets = ETS(stack=stack, gadMap=GAD_MAP))

schedule = Scheduler()
notify = Notifier()

The Logger object is a global logger service, which allow you to output all informations you want, at different levels. pKNyX make a big usage of that logger.

The Stack object is used to communicate over the bus (real bus, of course, but also virtual bus). We can give it an Individual Address, to mimic real devices behaviour. This address will be used as Source Address when communicating over the bus. The address 0/0/0 is used by default.

Important: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtal devices to communicate, you must ensure that they don't use the same source address.

The ETS object is a tool which works more or less like the real ETS application (see below).

The Scheduler is a helper implementating a powerfull scheduler (see below). The Notifier works the same way the Scheduler does, but does not provide the same features (see below).

The main part is to create our custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:

class TimerFB(FunctionalBlock):
    """ Timer functional block
    """

    # Datapoints definition
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer"

    def _init(self):
        """ Additionnal init of the timer
        """
        self._timer = 0

The DP_xx class attributes are the Datapoints of our Functional Block. The GO_xx class attributes are the Group Objects mapping the Datapoints to the bus through multicast service (Group Address). They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as they start with DP_ for Datapoints, and GO_ for Group Objects.

There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.

The _init() method is called when the functional block is instanciated; it can be used to init some needed global vars, or make initial tasks. Here, we just create the _timer var, and set it to 0.

Ok, it's time to dig in the active part of our functional block. Let's start with the stateChanged() method:

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        """ Method called when the 'state' datapoint changes
        """
        logger.debug("TimerFB.stateChanged(): event=%s" % repr(event))

        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            Logger().info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                Logger().info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

This method as a decorator. If you don't know what is a decorator, have a look at this [excellent documentation.

Note how this method is periodically called, using the schedule.every() method as python decorator. This decorator will automatically register our method and call it every 5 minutes.

In this method, we get the temperature and humidity values (not explained here), and give these values to the respective Datapoints.

Then, we register our new Funtional Block (this will automatically instanciate it - and do other things):

ets.register(WeatherTemperatureBlock, name="weather_temperature", desc="A simple weather block example")

and use the ETS object to weave (bind, link...) our Datapoints (their matching Group Objects, in fact) to Group Addresses:

ets.weave(fb="weatherTempBlock", dp="temperature", gad="1/1/1")
ets.weave(fb="weatherTempBlock", dp="humidity", gad="1/1/2")

And finally, we launch the framework main loop:

stack.mainLoop()

(this call is blocking).

We now have a process (Device) running, listening to the bus. The Datapoints, through their respective Group Objects, will react to requests on the Group Addresses they are weaved to ("1/1/1" and "1/1/2"). According to the flags, they will transmit their internal value on Read requests or if their value internally change (updated). Yes, you don't have to manage this: pKNyX does it for you! This is the all point of a framework, isn't it?

That's it for now. This is only a draft version; final implementation may change, according to feedback/suggestions I will get. But the core is all there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions.

Note: See TracWiki for help on using the wiki.