#!/usr/bin/env python import os import sys import argh from argh import arg from subprocess import call import re import glob import ConfigParser import json as JSON VERSION = 0.1 defaults = {} _boards = [] _outlets = [] def version(): 'show program version and quit' return 'OpenPDU version %s - Copyright (C) 2018 by Paolo Asperti.' % VERSION @arg('-j', '--json', help="output in json") def boards(json=False): 'dump boards configuration' if json: b = [board.toJSON() for board in _boards] return JSON.dumps({'boards':b}) else: b = '' for board in _boards: b += board.toSTR() return '\nBoards:\n' + b def initboards(): 'initialize boards' for board in _boards: board.init() return @arg('-j', '--json', help="output in json") def outlets(json=False): 'dump outlets configuration' if json: o = [outlet.toJSON() for outlet in _outlets] return JSON.dumps({'outlets':o}) else: o = '' if len(_outlets)>0: for outlet in _outlets: o += outlet.toSTR() return '\nOutlets:\n' + o @arg("outlet", help="outlet number") @arg("onoff", help="1=on [everything else]=off") @arg('-j', '--json', help="output in json") def setpower(outlet, onoff, json=False): 'enable or disable power to an outlet' outlet=int(outlet) o = [o for o in _outlets if o.outletnum==outlet] if len(o) != 1: msg = 'wrong outlet number: %s' % str(outlet) print JSON.dumps({'message':msg}) if json else msg sys.exit(1) theOutlet = o[0] status = (onoff == '1') out = theOutlet.setpower(status) if out is None: msg = "Cannot set power status for outlet %s" % outlet return JSON.dumps({'message':msg,'outlet':outlet}) if json else msg else: pwrstr = 'on' if out==1 else 'off' msg = "Outlet #%s powered %s" % (outlet, pwrstr) return JSON.dumps({'powerstatus':out==1,'outlet':outlet}) if json else msg @arg("outlet", help="outlet number") @arg('-j', '--json', help="output in json") def getpower(outlet, json=False): 'get outlet power status' outlet=int(outlet) o = [o for o in _outlets if o.outletnum==outlet] if len(o) != 1: msg = 'wrong outlet number: %s' % str(outlet) print JSON.dumps({'message':msg}) if json else msg sys.exit(1) theOutlet = o[0] out = theOutlet.getpower() pwrstr = 'on' if out else 'off' msg = "Outlet #%s powered %s" % (outlet, pwrstr) return JSON.dumps({'powerstatus':out,'outlet':outlet}) if json else msg # MCP23008 class BoardI2COut(object): def __init__(self, boardnum, channels=None, address=None, bus=None): self.boardnum = boardnum if channels is None: self.channels = 0 else: self.channels = channels if address is None: self.address = 0x20 else: self.address = address if bus is None: self.bus = 1 else: self.bus = bus if not glob.glob('/dev/i2c*'): raise OSError('Cannot access I2C. Please ensure I2C is enabled') def toJSON(self): return {'boardnum':self.boardnum,'type':'i2c-out','channels':self.channels,'address':self.address} def toSTR(self): return ' Board %s\n Type: i2c-out\n Channels: %s\n Address: %s\n\n' % (self.boardnum,self.channels,self.address) def setpower(self, channel, power): d = self.getpower(channel) mask = 1 << channel d &= ~mask if power: d |= mask #i2cset -y 1 0x20 0x09 0xFF return os.popen("/usr/sbin/i2cset -y %s %s 0x09 0x%s" % (self.bus, self.address, format(d, '02x'))).read() def getpower(self, channel): return os.popen("/usr/sbin/i2cget -y %s %s 0x09" % (self.bus, self.address)).read() def init(self): call(["/usr/sbin/i2cset", "-y", self.bus, self.address, "0x00", "0x00"]) class BoardGpioOut(object): gpios = [] def __init__(self, boardnum, channels=None, gpios=None): self.boardnum = boardnum if channels is None: self.channels = 0 else: self.channels = int(channels) if not isinstance(gpios, list): print 'No gpios specified for gpio board %s' % self.boardnum if len(gpios) != self.channels: print 'Wrong number of gpios specified for gpio board %s' % self.boardnum self.gpios = [int(gpio) for gpio in gpios] def toJSON(self): return {'boardnum':self.boardnum,'type':'gpio-out','channels':self.channels} def toSTR(self): return ' Board %s\n Type: gpio-out\n Channels: %s\n\n' % (self.boardnum,self.channels) def setpower(self, channel, power): io = self.gpios[channel] fn = '/sys/class/gpio/gpio%s/value' % io f = open(fn,'w') out = '0' if power else '1' return f.write(out) def getpower(self, channel): io = self.gpios[channel] fn = '/sys/class/gpio/gpio%s/value' % io f = open(fn,'r') return int(f.read()) == 0 def init(self): for gpio in self.gpios: open('/sys/class/gpio/export','w').write(str(gpio)) fn = '/sys/class/gpio/gpio%s/direction' % gpio open(fn,'w').write('out') return class Outlet(object): def __init__(self, outletnum, boardnum, channel): self.outletnum = int(outletnum) b = [b for b in _boards if b.boardnum==int(boardnum)] self.board = b[0] self.channel = int(channel) self.description = 'Outlet # %s' % self.outletnum def setpower(self, power): self.board.setpower(self.channel,power) if self.board.getpower(self.channel) == power: return power else: return None def getpower(self): return self.board.getpower(self.channel) def toSTR(self): status = self.board.getpower(self.channel) return ' Outlet %s\n Description: %s\n Board #: %s\n Channel: %s\n Power Status:%s\n\n' % (self.outletnum,self.description,self.board.boardnum,self.channel,status) def toJSON(self): status = self.board.getpower(self.channel) return {'outlet':self.outletnum,'description':self.description,'board':self.board.boardnum,'channel':self.channel,'powerstatus':status} configParser = ConfigParser.SafeConfigParser(defaults=defaults) configFile = '/etc/openpdu/openpdu.conf' if os.path.isfile(configFile) and os.access(configFile, os.R_OK): configParser.read(configFile) for s in configParser.sections(): if re.match('^board.*',s): bType = configParser.get(s, 'type') num = int(re.sub(r'^board','',s)) if bType=='gpio-out': channels = int(configParser.get(s, 'channels')) gpios = [] for g in range(0,channels): gpios.append(int(configParser.get(s, 'channel%s' % g))) b = BoardGpioOut(boardnum=num, channels=channels, gpios=gpios) _boards.append(b) elif bType=='i2c-out': channels = int(configParser.get(s, 'channels')) address = configParser.get(s, 'address') bus = configParser.get(s, 'bus') b = BoardI2COut(boardnum=num, channels=channels, address=address, bus=bus) _boards.append(b) if re.match('^outlet.*',s): num = int(re.sub(r'^outlet','',s)) description = configParser.get(s, 'description') boardnum = configParser.get(s, 'board') channel = configParser.get(s, 'channel') o = Outlet(outletnum=num, boardnum=boardnum, channel=channel) o.description = description _outlets.append(o) parser = argh.ArghParser() parser.add_commands([setpower, getpower, outlets, boards, initboards, version]) # dispatching: if __name__ == '__main__': parser.dispatch()