wiki:Tutorial

Version 13 (modified by Frédéric, 11 years ago) ( diff )

--

Tutorial

This is a draft version.

Vocabulary

Before we dive into this tutorial, lets define a few words. I tried to use KNX vocabulary, and hope I did understand them correclty. Feel free to contact me if you find mistakes.

KNX bases are needed.

  • Group Address: multicast destination address of a bus datagram
  • Individual Address: physical source address of a bus datagram
  • Flags:
  • Stack: code implementing KNX OSI layers
  • Datapoint: internal variable of a Functional Block
  • Group Object: multicast communication proxy of a Datapoint
  • Functional Block: black box containing Datapoints and implementing features
  • Device: set of Functional Block-s running as a process
  • ETS: tool used to weave Device Functional Blocks Groups Objects through Group Address-es

Functional Block

This is the central feature of pKNyX. This is where we will implement needed logic.

A Function Block is implemented as a python class, and have Datapoint-s and Group Object-s attributes. The Datapoint-s reflect the state of the Functional Block; the Group Object-s expose the Datapoint-s to the KNX bus. When implementating a custom Functional Block, we will set/get its Datapoint-s. On the other hand, we won't alter its Group Object-s, which are only used internally for bus communication; we only control how they will be used through their Flags.

Usually, a Functional Block should remains simple; if complex features are needed, it is better to split them in several Function Block-s, weaved through Group Address-es. This is better for readability, reusability and allow better granularity during deployement.

Functional Block-s can be deployed as one or several Device-s. In pKNyX, each Device runs as a process. All Device-s can run on the same machine, or spread over a set of machines. Like in real KNX world.

Create device structure

As any good framework, pKNyX helps in starting writing code by auto-generating some usefull files, in a specific hierarchy, to implement a Device in a correct way.

The tool to do that is pknyx-admin.py. This a global tool to manage devices (create/check/run). Let's see how to use it to create a fresh device:

$ pknyx-admin.py createdevice timer
Generating 'timer' structure from template...
'timer' dir created
'timer/admin.py' file generated
'timer/timer' dir created
'timer/timer/__init__.py' file generated
'timer/timer/config.py' file generated
'timer/timer/device.py' file generated
'timer/timer/fb' dir created
'timer/timer/fb/__init__.py' file generated
'timer/timer/fb/timerFB.py' file generated
'timer/timer/plugins' dir created
'timer/timer/plugins/__init__.py' file generated
'timer' structure done

This will create a little arborescence under a dir named timer:

$ tree timer
timer/
├── admin.py
└── timer
    ├── config.py
    ├── device.py
    ├── fb
    │   ├── __init__.py
    │   └── timerFB.py
    ├── __init__.py
    └── plugins
        └── __init__.py

3 directories, 7 files

The top-level dir timer can be renamed as you want. All further references are from this dir.

This dir contains a script called admin.py; this script is in fact the pknyx-admin.py one, with a pre-defined env var pointing on our timer structure, in order to manage it. It is possible to use the global pknyx-admin.py script, but it would need to define $PKNX_DEVICE_PATH var and make it point to the second timer dir, so that the python interpreter can correctly import our files. So, for now, let's use the admin.py script.

The timer/config.py file contains a few pre-defined constants:

# -*- coding: utf-8 -*-

from pknyx.common import config

DEVICE_NAME = "timer"
DEVICE_IND_ADDR = "1.1.1"
DEVICE_VERSION = "0.1"

config.LOGGER_LEVEL = "info"

This is where we will add new configs values for our Device, if needed.

The next file, timer/device.py, contains a Device example:

# -*- coding: utf-8 -*-

from pknyx.api import Device

from fb.timerFB import TimerFB


class Timer(Device):
    FB_01 = dict(cls=TimerFB, name="timer_fb", desc="timer fb")

    LNK_01 = dict(fb="timer_fb", dp="dp_01", gad="1/1/1")

    DESC = "Timer"


DEVICE = Timer

Last generated file is timer/fb/timerFB.py; it contains a Functional Block example:

# -*- coding: utf-8 -*-

from pknyx.api import FunctionalBlock
from pknyx.api import logger, schedule, notify


class TimerFB(FunctionalBlock):
    DP_01 = dict(name="dp_01", access="output", dptId="1.001", default="Off")

    GO_01 = dict(dp="dp_01", flags="CWT", priority="low")

    DESC = "Timer FB"

Timer implementation

Ok, lets's start to implement our simple timer example. This timer monitors the state of a light, and switches it off automatically after a delay.

First, modify the timerFB.py file according to this:

# -*- coding: utf-8 -*-

from pknyx.api import FunctionalBlock
from pknyx.api import logger, schedule, notify

class TimerFB(FunctionalBlock):
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer FB"

    def _init(self):
        self._timer = 0

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            logger.info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                logger.info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

    @schedule.every(seconds=1)
    def updateTimer(self):
        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        if self._timer:
            delay = self.dp["delay"].value
            logger.info("%s: delay changed; restart timer" % self._name)
            self._timer = delay

Then, modify device.py like this:

# -*- coding: utf-8 -*-

from pknyx.api import Device

from fb.timerFB import TimerFB


class Timer(Device):
    FB_01 = dict(cls=TimerFB, name="timer_fb", desc="timer fb")

    LNK_01 = dict(fb="timerfb", dp="cmd", gad="1/1/1")
    LNK_02 = dict(fb="timerfb", dp="state", gad="1/2/1")
    LNK_03 = dict(fb="timerfb", dp="delay", gad="1/3/1")

    DESC = "Timer"


DEVICE = Timer

That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1, 1/2/1 and 1/3/1. All you have to do, is to weave the Group Object-s of your real devices to these Group Address-es, using the real ETS application. Sure, you maye have to change the Group Address-es to match your installation.

Ok, now, lets check our device, using the admin.py script:

$ ./admin.py checkdevice
Logger level is 'info'
Config path is './timer'
Device name is 'timer'
Device Individual Address is '1.1.1'
No error found
$

Looks good. So, we can now really run the device:

$ ./admin.py rundevice
Logger level is 'info'
Config path is './timer'
Device name is 'timer'
Device Individual Address is '1.1.1'
Detaching is 'False'
Scheduler started
Stack running

Here, the call is blocking, on the Stack mainloop, waiting for incoming telegrams from the bus. Here is what you can see if you switch on the light:

timerfb: start timer for 10s

timerfb: timer expired; switch off

And the light should have been switched off after 10s.

To exist from the device, just use Ctrl-C.

Important note: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtual devices to communicate, you must ensure that they don't use the same source address, like in a real installation. Keep this in mind ; if you don't see telegrams you are expecting, the problem may be there.

So, lets deep inside this example to explain how things work, starting with the timer/timer/gb/timerFB.py module.

First, we import some python objects:

from pknyx.api import FunctionalBlock
from pknyx.api import logger, schedule, notify

FunctionalBlock is a class; logger, schedule, notify are instances.

The logger object is a global logger service, which allows you to output all informations you want, at different levels. pKNyX makes a big usage of that logger. By default, the template set the logger level to info, but you can set it to other levels. See the documentation.

schedule is a helper implementating a powerfull scheduler, based on APScheduler (see below). notify works the same way schedule does, but provides bus notifications rather than time notifications (see below).

Now, it is time to write the central part of our device: a custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:

class TimerFB(FunctionalBlock):
    DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off")
    DP_02 = dict(name="state", access="input", dptId="1.001", default="Off")
    DP_03 = dict(name="delay", access="input", dptId="7.005", default=10)

    GO_01 = dict(dp="cmd", flags="CWT", priority="low")
    GO_02 = dict(dp="state", flags="CWUI", priority="low")
    GO_03 = dict(dp="delay", flags="CWU", priority="low")

    DESC = "Timer FB"

    def _init(self):
        self._timer = 0

The DP_ class attributes are the Datapoint-s of our Functional Block.

The GO_ class attributes are the Group Object-s mapping the Datapoint-s to the bus through multicast service (Group Address).

They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as it starts with DP_ for Datapoints, and GO_ for Group Objects.

There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.

The _init() method is called at the end of the Functional Block instanciation (creation); it can be used to init some additionnal global vars, or make initial tasks. Here, we just create the _timer var, and set it to 0.

Ok, it's time to dig in the active part of our functional block:

    @notify.datapoint(dp="state", condition="change")
    def stateChanged(self, event):
        if event['newValue'] == "On":
            delay = self.dp["delay"].value
            Logger().info("%s: start timer for %ds" % (self._name, delay))
            self._timer = delay
        elif event['newValue'] == "Off":
            if self._timer:
                Logger().info("%s: switched off detected; cancel timer" % self._name)
                self._timer = 0

The role of the stateChanged() method is to start/stop the timer depending of the state.

This method as a decorator, @notify.datapoint(). pKNyX does not use decorators as they usually are made for; here, the decorator registers stateChanged() in notify, which will call this method as soon as the state Datapoint changes because of a bus activity.

Note how we retreive Datapoint value: our Functional Block has a dp dictionnary, which contains all our Datapoint-s; the keys are the names we used to describe them in the DP_ dictionnary. To get/set the value of the Datapoint, just use the .value property.

Let's now have a look at the timer treatement:

    @schedule.every(seconds=1)
    def updateTimer(self):
        if self._timer:
            self._timer -= 1
            if not self._timer:
                logger.info("%s: timer expired; switch off" % self._name)
                self.dp["cmd"].value = "Off"

Note how this method is periodically called, using the schedule.every() decorator, as for notify. This decorator will automatically register the method, and schedule will call it every second.

There, we just decrement the timer (if not null), and check if the delay expired. If it is the case, we change the cmd Datapoint value by assigning a new value to the .value property. Doing this, pKNyX will automatically notify the bus of this change, according to the flags of the corresponding GO_ object! If things do not work as expected, check that you didn't omit the .value property; if it is the case, you just overwrited the Datapoint itself with the value you want to assign to it!

The last method manages the delay Datapoint:

    @notify.datapoint(dp="delay", condition="change")
    def delayChanged(self, event):
        if self._timer:
            delay = self.dp["delay"].value
            Logger().info("%s: delay changed; restart timer" % self._name)
            self._timer = delay

I think you got the point ;o)

Now we need to implement our Device, in timer/timer/device.py:

# -*- coding: utf-8 -*-

from pknyx.api import Device

from fb.timerFB import TimerFB

A few imports; most important one is, of course, our custom Functional Block.

Then, we implement our Device:

class Timer(Device):
    FB_01 = dict(cls=TimerFB, name="timer_fb", desc="timer fb")

    LNK_01 = dict(fb="timer_fb", dp="cmd", gad="1/1/1")
    LNK_02 = dict(fb="timer_fb", dp="state", gad="1/2/1")
    LNK_03 = dict(fb="timer_fb", dp="delay", gad="1/3/1")

    DESC = "Timer"

As you see, it inherits the Device class. Like for the Function Block, things are defined through class attributes; pKNyX, like python, tries to limit the number of paradigms.

We first need to tell which Function Block(-s) our device will use. This is done by creating a dict, which name must start with FB_. Here, we only have one Functional Block.

Then, we weave our Datapoint-s (our Group Object-s, in fact) to our group addresses. Again, a simple dict is used, with names satring with LNK_.

The last thing we need to write is to tell pKNyX what is our Device implementation class:

DEVICE = Timer

Complex example

Have a look at pknyx/examples/3_weather/ to see how to use several Functional Block-s within the same device.

Conclusion

That's it for now with this tutorial. This is the first working dev. release; their are many additional things to do, and final implementation may change, according to feedback/suggestions I will get. But the core is there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions. Unlink linknx, you have a little more to do that simply write a configuration file. But you will find that it is not that complicated (pKNyX handles all nasty stuff for you), and may even simplify things for complex tasks (linknx rules are not that simple in this case).

Note: See TracWiki for help on using the wiki.