[[PageOutline(2-5, Table of Contents, floated)]] '''Draft''' = Tutorial = == Architecture == == Vocabulary == TODO == Functional Blocks == This is the central feature of '''pKNyX''', allowing user to create virtual devices which mimics real KNX devices. The Device itself is implemented as a process. Here is a very simple example which implements a timer. This timer can monitor the state of a light, and switch it off automatically after a delay. The code is taken from the {{{examples/3_timer}}} directory of ''pKNyX''' sources. {{{ #!python from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier NAME = "timer" IND_ADDR = "1.2.3" # ETS group address map GAD_MAP = {"1": dict(name="lights", desc="Lights"), "1/1": dict(name="cmds", desc="Commands"), "1/1/1": dict(name="light_test_cmd", desc="Light 'test' (cmd)"), "1/2": dict(name="states", desc="States"), "1/2/1": dict(name="light_test_state", desc="Ligh 'test' (state)"), "1/3": dict(name="delay", desc="Delays"), "1/3/1": dict(name="light_test_delay", desc="Light 'test' (delay)"), } logger = Logger("%s-%s" % (NAME, IND_ADDR)) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier() class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0 @schedule.every(seconds=1) def updateTimer(self): """ Method called every second. """ #logger.trace("TimerFB.updateTimer()") if self._timer: self._timer -= 1 if not self._timer: logger.info("%s: timer expired; switch off" % self._name) self.dp["cmd"].value = "Off" @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 @notify.datapoint(dp="delay", condition="change") def delayChanged(self, event): """ Method called when the 'delay' datapoint changes """ logger.debug("TimerFB.delayChanged(): event=%s" % repr(event)) # If the timer is running, we reset it to the new delay if self._timer: delay = self.dp["delay"].value Logger().info("%s: delay changed; restart timer" % self._name) self._timer = delay def main(): # Register functional block ets.register(TimerFB, name="timer", desc="") # Weave datapoints ets.weave(fb="timer", dp="cmd", gad="1/1/1") ets.weave(fb="timer", dp="state", gad="1/2/1") ets.weave(fb="timer", dp="delay", gad="1/3/1") print ets.printGroat("gad") print ets.printGroat("go") print schedule.printJobs() print # Run the stack main loop (blocking call) stack.mainLoop() if __name__ == "__main__": try: main() except: logger.exception("3_main") }}} That's it! As you can see, concepts used here are simple... This Functional Block can now be used from any other real device of your installation, through Groups Addresses {{{1/1/1}}}, {{{1/3/1}}} and {{{1/3/1}}}. All you have to do, is to weave (link, bind...) the Group Objects of your real devices to these Groups Addresses, using the real '''ETS''' application. Sure, you can change the GAD to match your installation. Ok, now, lets start this device (invoke the python interpreter as for any other python script): {{{ }}} Lets have a closer look to this example. First, we import some python objects: {{{ #!python from pknyx.api import Logger from pknyx.api import FunctionalBlock, Stack, ETS from pknyx.api import Scheduler, Notifier }}} These objects are all classes. We then instanciante some high level objects and helpers: {{{ #!python logger = Logger("%s-%s" % (NAME, IND_ADDR)) stack = Stack(individualAddress=IND_ADDR) ets = ETS(stack=stack, gadMap=GAD_MAP)) schedule = Scheduler() notify = Notifier() }}} The {{{Logger}}} object is a global logger service, which allow you to output all informations you want, at different levels. '''pKNyX''' make a big usage of that logger. The {{{Stack}}} object is used to communicate over the bus (real bus, of course, but also virtual bus). We can give it an Individual Address, to mimic real devices behaviour. This address will be used as Source Address when communicating over the bus. The address {{{0/0/0}}} is used by default. '''Important: to avoid internal loops, a device drops all telegrams sent by itself. So, if you want 2 virtal devices to communicate, you must ensure that they don't use the same source address.''' The {{{ETS}}} object is a tool which works more or less like the real ETS application (see below). The {{{Scheduler}}} is a helper implementating a powerfull scheduler (see below). The {{{Notifier}}} works the same way the {{{Scheduler}}} does, but does not provide the same features (see below). The main part is to create our custom Functional Block; this is done by subclassing the '''!FunctionBlock''' base class, and adding a few attributes/methods: {{{ #!python class TimerFB(FunctionalBlock): """ Timer functional block """ # Datapoints definition DP_01 = dict(name="cmd", access="output", dptId="1.001", default="Off") DP_02 = dict(name="state", access="input", dptId="1.001", default="Off") DP_03 = dict(name="delay", access="input", dptId="7.005", default=10) GO_01 = dict(dp="cmd", flags="CWT", priority="low") GO_02 = dict(dp="state", flags="CWUI", priority="low") GO_03 = dict(dp="delay", flags="CWU", priority="low") DESC = "Timer" def _init(self): """ Additionnal init of the timer """ self._timer = 0 }}} The {{{DP_xx}}} class attributes are the Datapoints of our Functional Block. The {{{GO_xx}}} class attributes are the Group Objects mapping the Datapoints to the bus through multicast service (Group Address). They are both defined as python dictionnary; they will be automatically instanciated for us by the framework. The named used here do not matter, as long as they start with {{{DP_}}} for Datapoints, and {{{GO_}}} for Group Objects. There are a few parameters in the dicts; some are obvious, some will need more explanations. But this is out of the scope of this tutorial. The {{{_init()}}} method is called when the functional block is instanciated; it can be used to init some needed global vars, or make initial tasks. Here, we just create the {{{_timer}}} var, and set it to 0. Ok, it's time to dig in the active part of our functional block. Let's start with the {{{stateChanged()}}} method: {{{ #!python @notify.datapoint(dp="state", condition="change") def stateChanged(self, event): """ Method called when the 'state' datapoint changes """ logger.debug("TimerFB.stateChanged(): event=%s" % repr(event)) if event['newValue'] == "On": delay = self.dp["delay"].value Logger().info("%s: start timer for %ds" % (self._name, delay)) self._timer = delay elif event['newValue'] == "Off": if self._timer: Logger().info("%s: switched off detected; cancel timer" % self._name) self._timer = 0 }}} This method as a '''decorator'''. If you don't know what is a decorator, have a look at this [excellent documentation. Note how this method is periodically called, using the {{{schedule.every()}}} method as python decorator. This decorator will automatically register our method and call it every 5 minutes. In this method, we get the temperature and humidity values (not explained here), and give these values to the respective Datapoints. Then, we register our new Funtional Block (this will automatically instanciate it - and do other things): {{{ #!python ets.register(WeatherTemperatureBlock, name="weather_temperature", desc="A simple weather block example") }}} and use the {{{ETS}}} object to weave (bind, link...) our Datapoints (their matching Group Objects, in fact) to Group Addresses: {{{ #!python ets.weave(fb="weatherTempBlock", dp="temperature", gad="1/1/1") ets.weave(fb="weatherTempBlock", dp="humidity", gad="1/1/2") }}} And finally, we launch the framework main loop: {{{ #!python stack.mainLoop() }}} (this call is blocking). We now have a process (Device) running, listening to the bus. The Datapoints, through their respective Group Objects, will react to requests on the Group Addresses they are weaved to ("1/1/1" and "1/1/2"). According to the flags, they will transmit their internal value on Read requests or if their value internally change (updated). Yes, you don't have to manage this: '''pKNyX''' does it for you! This is the all point of a framework, isn't it? That's it for now. This is only a draft version; final implementation may change, according to feedback/suggestions I will get. But the core is all there. Again, the goal of the framework is to provide very high level tools to build complete and powerfull applications and KNX extensions.