Version 2 (modified by 11 years ago) ( diff ) | ,
---|
Table of Contents
Draft
Tutorial
Architecture
Vocabulary
TODO
Functional Blocks
This is the central feature of pKNyX, allowing user to create virtual devices which mimics real KNX devices. The Device itself is implemented as a process.
Here is a very simple example which implements a timer. This timer can monitor the state of a light, and switch it off automatically after a delay. The code is taken from the examples/3_timer
directory of pKNyX sources.
from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier NAME = "timer" IND_ADDR = "1.2.3" # ETS group address map GAD_MAP = {"1": dict(name="lights", desc="Lights"), "1/1": dict(name="cmds", desc="Commands"), "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"), "1/2": dict(name="states", desc="States"), "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"), "1/3": dict(name="delay", desc="Delays"), "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"), } logger = Logger("%s-%s" % (NAME, IND_ADDR)) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier() class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0 @schedule.every(seconds=1) def updateTimer(self): """ Method called every second. """ #logger.trace("TimerFB.updateTimer()") if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off" @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 @notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): """ Method called when the 'delay' datapoint changes """ logger.debug("TimerFB.delayChanged(): event=%s" % repr(event)) # If the timer is running, we reset it to the new delay if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay def main(): # Register functional block ets.register(TimerFB, name="timer", desc="") # Weave datapoints ets.weave(fb="timer", dp="cmd", gad="1/1/1") ets.weave(fb="timer", dp="state", gad="1/2/1") ets.weave(fb="timer", dp="delay", gad="1/3/1") print ets.printGroat("gad") print ets.printGroat("go") print schedule.printJobs() print # Run the stack main loop (blocking call) stack.mainLoop() if __name__ == "__main__": try: main() except: logger.exception("3_main")
That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses 1/1/1
, 1/3/1
and 1/3/1
. All you have to do, is to weave (link, bind...) the Group Objects of your real devices to these Groups Addresses, using the real ETS application. Sure, you can change the GAD to match your installation.
Ok, now, lets start this device (invoke the python interpreter as for any other python script):
Lets have a closer look to this example. First, we import some python objects:
from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier
These objects are all classes.
We then instanciante some high level objects and helpers:
logger = Logger("%s-%s" % (NAME, IND_ADDR)) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier()
The Logger
object is a global logger service, which allow you to output all informations you want, at different levels. pKNyX make a big usage of that logger.
The Stack
object is used to communicate over the bus (real bus, of course, but also virtual bus). We can give it an Individual Address, to mimic real devices behaviour. This address will be used as Source Address when communicating over the bus. The address 0/0/0
is used by default.
Important: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtal devices to communicate, you must ensure that they don't use the same source address.
The ETS
object is a tool which works more or less like the real ETS application (see below).
The Scheduler
is a helper implementating a powerfull scheduler (see below).
The Notifier
works the same way the Scheduler
does, but does not provide the same features (see below).
The main part is to create our custom Functional Block; this is done by subclassing the FunctionBlock base class, and adding a few attributes/methods:
class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0
The DP_xx
class attributes are the Datapoints of our Functional Block. The GO_xx
class attributes are the Group Objects mapping the Datapoints to the bus through multicast service (Group Address). They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as they start with DP_
for Datapoints, and GO_
for Group Objects.
There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial.
The _init()
method is called when the functional block is instanciated; it can be used to init some needed global vars, or make initial tasks. Here, we just create the _timer
var, and set it to 0.
Ok, it's time to dig in the active part of our functional block. Let's start with the stateChanged()
method:
@notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0
This method as a decorator. If you don't know what is a decorator, have a look at this [excellent documentation.
Note how this method is periodically called, using the schedule.every()
method as python decorator. This decorator will automatically register our method and call it every 5 minutes.
In this method, we get the temperature and humidity values (not explained here), and give these values to the respective Datapoints.
Then, we register our new Funtional Block (this will automatically instanciate it - and do other things):
ets.register(WeatherTemperatureBlock, name="weather_temperature", desc="A simple weather block example")
and use the ETS
object to weave (bind, link...) our Datapoints (their matching Group Objects, in fact) to Group Addresses:
ets.weave(fb="weatherTempBlock", dp="temperature", gad="1/1/1") ets.weave(fb="weatherTempBlock", dp="humidity", gad="1/1/2")
And finally, we launch the framework main loop:
stack.mainLoop()
(this call is blocking).
We now have a process (Device) running, listening to the bus. The Datapoints, through their respective Group Objects, will react to requests on the Group Addresses they are weaved to ("1/1/1" and "1/1/2"). According to the flags, they will transmit their internal value on Read requests or if their value internally change (updated). Yes, you don't have to manage this: pKNyX does it for you! This is the all point of a framework, isn't it?
That's it for now. This is only a draft version; final implementation may change, according to feedback/suggestions I will get. But the core is all there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions.