wiki:Tutorial

Version 9 (modified by Frédéric, 11 years ago) ( diff )

--

Tutorial

Vocabulary

Before we dive into this tutorial, lets define a few words. I tried to use KNX vocabulary, and hope I did understand them correclty. Feel free to contact me if you find mistakes.

KNX bases are needed.

  • Group Address: multicast destination address of a bus datagram
  • Individual Address: physical source address of a bus datagram
  • Flags:
  • Stack: code implementing KNX OSI layers
  • Datapoint: internal variable of a Functional Block
  • Group Object: multicast communication proxy of a Datapoint
  • Functional Block: black box containing Datapoints and implementing features
  • Device: set of Functional Block-s running as a process
  • ETS: tool used to weave Device Functional Blocks Groups Objects through Group Address-es

Functional Block

This is the central feature of pKNyX. This is where we will implement needed logic.

A Function Block is implemented as a python class, and have Datapoint-s and Group Object-s attributes. The Datapoint-s reflect the state of the Functional Block; the Group Object-s expose the Datapoint-s to the KNX bus. When implementating a custom Functional Block, we will set/get its Datapoint-s. On the other hand, we won't alter its Group Object-s, which are only used internally for bus communication; we only control how they will be used through their Flags.

Usually, a Functional Block should remains simple; if complex features are needed, it is better to split them in several Function Block-s, weaved through Group Address-es. This is better for readability, reusability and allow better granularity during deployement.

Functional Block-s can be deployed as one or several Device-s. In pKNyX, each Device runs as a process. All Device-s can run on the same machine, or spread over a set of machines. Like in real KNX world.

Create the device

As said, pKNyX main entry is a Device, which will run as a process. As any good framework, pKNyX helps in starting writing code by auto-generating some usefull files, in a specific hierarchy, to implement this Device in a correct way.

The tool to do that is pknyx-admin.py. This a global tool to manage devices (create/check/start). Let's see how to use it to create a fresh device:

$ pknyx-admin.py createdevice timer
create 'timer' from template...
done

This will create a little arborescence under a dir named timer:

$ tree timer
timer
├── admin.py
└── timer
    ├── config.py
    ├── device.py
    └── __init__.py

1 directory, 4 files

The top-level dir timer can be renamed as you want. It contains a script called admin.py; this script is in fact the pknyx-admin.py one, with a pre-defined env var pointing on our timer structure, in order to manage it. It is possible to use the global pknyx-admin.py script, but it would need to define $PKNX_DEVICE_PATH var and make it point to the second timer dir, so that the python interpreter can correctly import our files. So, for now, let's use the admin.py script.

The config.py file contains a few pre-defined constants:

# -*- coding: utf-8 -*-

from pknyx.common import config

DEVICE_NAME = "timer"
DEVICE_IND_ADDR = "1.1.1"
DEVICE_VERSION = "0.1"

config.LOGGER_LEVEL = "info"

This is where we will add new configs values for our Device, if needed.

The last file is device.py; this is the most important one, where we will implement our Device code:

"""# -*- coding: utf-8 -*-

from pknyx.api import Device, FunctionalBlock
from pknyx.api import logger, schedule, notify


class FB(FunctionalBlock):
    DP_01 = dict(name="dp_01", access="output", dptId="1.001", default="Off")

    GO_01 = dict(dp="dp_01", flags="CWT", priority="low")

    DESC = "FB"


class Timer(Device):
    FB_01 = dict(cls=FB, name="fb_01", desc="fb 01")

    LNK_01 = dict(fb="fb_01", dp="dp_01", gad="1/1/1")

    DESC = "Timer"


DEVICE = Timer

As you see, it already contains a dummy Functional Block, and a dummy Device, in order to show how things work.

The Timer example

Ok, lets's start to implement our simple timer example. This timer monitors the state of a light, and switches it off automatically after a delay.

Let's modify the device.py according to this:

from pknyx.api import Device, FunctionalBlock
from pknyx.api import logger, schedule, notify

class TimerFB(FunctionalBlock):
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer FB"

    def _init(self):
        self._timer = 0

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            logger.info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                logger.info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

    @schedule.every(seconds=1)
    def updateTimer(self):
        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        if self._timer:
            delay = self.dp["delay"].value
            logger.info("%s: delay changed; restart timer" % self._name)
            self._timer = delay


class Timer(Device):
    FB_01 = dict(cls=TimerFB, name="timerfb", desc="timer fb")

    LNK_01 = dict(fb="timerfb", dp="cmd", gad="1/1/1")
    LNK_02 = dict(fb="timerfb", dp="state", gad="1/2/1")
    LNK_03 = dict(fb="timerfb", dp="delay", gad="1/3/1")

    DESC = "Timer device"


DEVICE = Timer

That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1, 1/2/1 and 1/3/1. All you have to do, is to weave the Group Object-s of your real devices to these Group Address-es, using the real ETS application. Sure, you maye have to change the Group Address-es to match your installation.

Ok, now, lets check our device, using the admin.py script:

$ ./admin.py checkdevice
MainThread::AdminUtility._checkRunDevice(): logger level is 'info'
MainThread::AdminUtility._checkRunDevice(): config path is './timer'
MainThread::AdminUtility._checkRunDevice(): device name is 'timer'
MainThread::AdminUtility._checkRunDevice(): device individual address is '1.1.1'
no error found
$

Looks good. So, we can now really run the device:

$ ./admin.py rundevice
MainThread::AdminUtility._checkRunDevice(): logger level is 'info'
MainThread::AdminUtility._checkRunDevice(): config path is './timer'
MainThread::AdminUtility._checkRunDevice(): device name is 'timer'
MainThread::AdminUtility._checkRunDevice(): device individual address is '1.1.1'
MainThread::AdminUtility._runDevice(): detach is 'False'
MainThread::Scheduler started
MainThread::Stack running

Here, the call is blocking, on the Stack mainloop, waiting for incoming telegrams from the bus. Here is what you can see if you switch on the light:

LinkLayer::timerfb: start timer for 10s

Thread-15::timerfb: timer expired; switch off

And the light should have been switched off after 10s.

To exist from the device, just use Ctrl-C.

Important note: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtual devices to communicate, you must ensure that they don't use the same source address, like in a real installation. Keep this in mind ; if you don't see telegrams you are expecting, the problem may be there.

So, lets deep inside this example to explain how things work. First, we import some python objects:

from pknyx.api import Device, FunctionalBlock
from pknyx.api import logger, schedule, notify

Device and FunctionalBlock are classes; logger, schedule, notify are instances.

The logger object is a global logger service, which allows you to output all informations you want, at different levels. pKNyX makes a big usage of that logger. By default, the template set the logger level to info, but you can set it to other levels. See the documentation.

schedule is a helper implementating a powerfull scheduler, based on APScheduler (see below). notify works the same way schedule does, but provides bus notifications rather than time notifications (see below).

Now, it is time to write the central part of our device: a custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:

class TimerFB(FunctionalBlock):
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer FB"

    def _init(self):
        self._timer = 0

The DP_ class attributes are the Datapoint-s of our Functional Block.

The GO_ class attributes are the Group Object-s mapping the Datapoint-s to the bus through multicast service (Group Address).

They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as it starts with DP_ for Datapoints, and GO_ for Group Objects.

There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.

The _init() method is called at the end of the Functional Block instanciation (creation); it can be used to init some additionnal global vars, or make initial tasks. Here, we just create the _timer var, and set it to 0.

Ok, it's time to dig in the active part of our functional block:

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            Logger().info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                Logger().info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

The role of the stateChanged() method is to start/stop the timer depending of the state.

This method as a decorator, @notify.datapoint(). pKNyX does not use decorators as they usually are made for; here, the decorator registers stateChanged() in notify, which will call this method as soon as the state Datapoint changes because of a bus activity.

Note how we retreive Datapoint value: our Functional Block has a dp dictionnary, which contains all our Datapoint-s; the keys are the names we used to describe them in the DP_ dictionnary. To get/set the value of the Datapoint, just use the .value property.

Let's now have a look at the timer treatement:

    @schedule.every(seconds=1)
    def updateTimer(self):
        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

Note how this method is periodically called, using the schedule.every() decorator, as for notify. This decorator will automatically register the method, and schedule will call it every second.

There, we just decrement the timer (if not null), and check if the delay expired. If it is the case, we change the cmd Datapoint value by assigning a new value to the .value property. Doing this, pKNyX will automatically notify the bus of this change, according to the flags of the corresponding GO_ object! If things do not work as expected, check that you didn't omit the .value property; if it is the case, you just overwrited the Datapoint itself with the value you want to assign to it!

The last method manages the delay Datapoint:

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        if self._timer:
            delay = self.dp["delay"].value
            Logger().info("%s: delay changed; restart timer" % self._name)
            self._timer = delay

I think you got the point ;o)

Now we need to implement our Device:

class Timer(Device):
    FB_01 = dict(cls=TimerFB, name="timerfb", desc="timer fb")

    LNK_01 = dict(fb="timerfb", dp="cmd", gad="1/1/1")
    LNK_02 = dict(fb="timerfb", dp="state", gad="1/2/1")
    LNK_03 = dict(fb="timerfb", dp="delay", gad="1/3/1")

    DESC = "Timer device"

As you see, it must inherits the Device class. Like for the Function Block, things are defined through class attributes; pKNyX, like python, tries to limit the number of paradigms.

We first need to tell which Function Block(-s) our device will use. This is done by creating a dict, which name must start with FB_. Here, we only have one Functional Block.

Then, we weave our Datapoint-s (our Group Object-s, in fact) to our group addresses. Again, a simple dict is used, with names satring with LNK_.

The last thing we need to write is to tell pKNyX what is our Device implementation class:

DEVICE = Timer

That's it for now with this tutorial. This is the first working dev. release; their are many additional things to do, and final implementation may change, according to feedback/suggestions I will get. But the core is there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions. Unlink linknx, you have a little more to do that simply write a configuration file. But you will find that it is not that complicated (pKNyX handles all nasty stuff for you), and may even simplify things for complex tasks (linknx rules are not that simple in this case).

Note: See TracWiki for help on using the wiki.