openpdu/openpdu

196 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python
import os
import sys
import argh
from argh import arg
import re
import ConfigParser
import json as JSON
VERSION = 0.1
defaults = {}
boards = []
def version():
'show program version and quit'
return 'OpenPDU version %s - Copyright (C) 2018 by Paolo Asperti.' % VERSION
@arg('-j', '--json', help="output in json")
def dumpcfg(json=False):
'dump configuration'
if json:
b = [board.toJSON() for board in boards]
return JSON.dumps({'boards':b})
else:
b = ''
for board in boards:
b += board.toSTR()
return '\nBoards:\n' + b
@arg("board", help="board number")
@arg("out", help="out number")
@arg("onoff", help="1=on [everything else]=off")
@arg('-j', '--json', help="output in json")
def setpower(board, out, onoff, json=False):
'enable or disable power to a socket'
b = [b for b in boards if b.boardnum==int(board)]
if len(b) != 1:
print 'wrong board number: %s' % str(board)
sys.exit(1)
theBoard = b[0]
if int(out)>int(theBoard.channels)-1:
print 'wrong out number: %s, board has %s channels (0..%s)' % (str(out),str(theBoard.channels),str(int(theBoard.channels)-1))
sys.exit(1)
status = (onoff == '1')
return theBoard.setpower(int(out),status)
@arg("board", help="board number")
@arg("out", help="out number")
@arg('-j', '--json', help="output in json")
def getpower(board, out, json=False):
'get socket power status'
b = [b for b in boards if b.boardnum==int(board)]
if len(b) != 1:
print 'wrong board number: %s' % str(board)
sys.exit(1)
theBoard = b[0]
if int(out)>int(theBoard.channels)-1:
print 'wrong out number: %s, board has %s channels (0..%s)' % (str(out),str(theBoard.channels),str(int(theBoard.channels)-1))
sys.exit(1)
return theBoard.getpower(int(out))
class BoardI2COut(object):
def __init__(self, boardnum, channels=None, address=None):
self.boardnum = boardnum
if channels is None:
self.channels = 0
else:
self.channels = channels
if address is None:
self.address = 0x20
else:
self.address = address
def dumpcfg(json=False):
if json:
return {'boardnum':self.boardnum,'type':'i2c-out','channels':self.channels,'address':self.address}
else:
return 'Board %i\nType: i2c-out\nChannels: %i\nAddress: %i' % (self.boardnum,self.channels,self.address)
def toJSON(self):
return {'boardnum':self.boardnum,'type':'i2c-out','channels':self.channels,'address':self.address}
def toSTR(self):
return ' Board %s\n Type: i2c-out\n Channels: %s\n Address: %s\n\n' % (self.boardnum,self.channels,self.address)
def setpower(self, channel, power):
return
def getpower(self, channel):
return False
class BoardGpioOut(object):
gpios = []
def __init__(self, boardnum, channels=None, gpios=None):
self.boardnum = boardnum
if channels is None:
self.channels = 0
else:
self.channels = int(channels)
if not isinstance(gpios, list):
print 'No gpios specified for gpio board %s' % self.boardnum
if len(gpios) != self.channels:
print 'Wrong number of gpios specified for gpio board %s' % self.boardnum
self.gpios = [int(gpio) for gpio in gpios]
def dumpcfg(json=False):
if json:
return {'boardnum':self.boardnum,'type':'gpio-out','channels':self.channels}
else:
return 'Board %i\nType: gpio-out\nChannels: %i ' % (self.boardnum,self.channels)
def toJSON(self):
return {'boardnum':self.boardnum,'type':'gpio-out','channels':self.channels}
def toSTR(self):
return ' Board %s\n Type: gpio-out\n Channels: %s\n\n' % (self.boardnum,self.channels)
def setpower(self, channel, power):
io = self.gpios[channel]
fn = '/sys/class/gpio/gpio%s/value' % io
f = open(fn,'w')
out = '0' if power else '1'
return f.write(out)
def getpower(self, channel):
io = self.gpios[channel]
fn = '/sys/class/gpio/gpio%s/value' % io
f = open(fn,'r')
return int(f.read()) == 0
configParser = ConfigParser.SafeConfigParser(defaults=defaults)
configFile = '/etc/openpdu/openpdu.conf'
if os.path.isfile(configFile) and os.access(configFile, os.R_OK):
configParser.read(configFile)
for s in configParser.sections():
if re.match('^board.*',s):
bType = configParser.get(s, 'type')
num = int(re.sub(r'^board','',s))
if bType=='gpio-out':
channels = int(configParser.get(s, 'channels'))
gpios = []
for g in range(0,channels):
gpios.append(int(configParser.get(s, 'out%s' % g)))
b = BoardGpioOut(boardnum=num, channels=channels, gpios=gpios)
boards.append(b)
elif bType=='i2c-out':
channels = int(configParser.get(s, 'channels'))
address = configParser.get(s, 'address')
b = BoardI2COut(boardnum=num, channels=channels, address=address)
boards.append(b)
parser = argh.ArghParser()
parser.add_commands([setpower, getpower, dumpcfg, version])
# dispatching:
if __name__ == '__main__':
parser.dispatch()