initial commit

This commit is contained in:
Paolo Asperti 2018-04-16 22:08:38 +02:00
commit 12ba988bd3
29 changed files with 2156 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
packages/
*.apk
.on-save.json

26
APKBUILD Normal file
View File

@ -0,0 +1,26 @@
# Contributor: Paolo Asperti <paolo@asperti.com>
# Maintainer: Paolo Asperti <paolo@asperti.com>
pkgname=openpdu-libs
pkgver=0.1.0
pkgrel=1
pkgdesc="OpenPDU project - misc python libraries"
url="https://github.com/openpdu/openpdu-libs"
arch="noarch"
license="GPL2"
depends="python"
makedepends=""
subpackages=""
source=""
options="!check"
build() {
:
}
package() {
mkdir -p "$pkgdir"
# create directory tree
install -Dd usr "$pkgdir"/usr
cp -r usr "$pkgdir"/
}

View File

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
"""
OLED display driver for SSD1306, SSD1325, SSD1331 and SH1106 devices.
"""
__version__ = '1.5.0'

Binary file not shown.

View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
class common(object):
DISPLAYOFF = 0xAE
DISPLAYON = 0xAF
DISPLAYALLON = 0xA5
DISPLAYALLON_RESUME = 0xA4
NORMALDISPLAY = 0xA6
INVERTDISPLAY = 0xA7
SETREMAP = 0xA0
SETMULTIPLEX = 0xA8
SETCONTRAST = 0x81
class ssd1306(common):
CHARGEPUMP = 0x8D
COLUMNADDR = 0x21
COMSCANDEC = 0xC8
COMSCANINC = 0xC0
EXTERNALVCC = 0x1
MEMORYMODE = 0x20
PAGEADDR = 0x22
SETCOMPINS = 0xDA
SETDISPLAYCLOCKDIV = 0xD5
SETDISPLAYOFFSET = 0xD3
SETHIGHCOLUMN = 0x10
SETLOWCOLUMN = 0x00
SETPRECHARGE = 0xD9
SETSEGMENTREMAP = 0xA1
SETSTARTLINE = 0x40
SETVCOMDETECT = 0xDB
SWITCHCAPVCC = 0x2
sh1106 = ssd1306
class ssd1331(common):
ACTIVESCROLLING = 0x2F
CLOCKDIVIDER = 0xB3
CONTINUOUSSCROLLINGSETUP = 0x27
DEACTIVESCROLLING = 0x2E
DISPLAYONDIM = 0xAC
LOCKMODE = 0xFD
MASTERCURRENTCONTROL = 0x87
NORMALDISPLAY = 0xA4
PHASE12PERIOD = 0xB1
POWERSAVEMODE = 0xB0
SETCOLUMNADDR = 0x15
SETCONTRASTA = 0x81
SETCONTRASTB = 0x82
SETCONTRASTC = 0x83
SETDISPLAYOFFSET = 0xA2
SETDISPLAYSTARTLINE = 0xA1
SETMASTERCONFIGURE = 0xAD
SETPRECHARGESPEEDA = 0x8A
SETPRECHARGESPEEDB = 0x8B
SETPRECHARGESPEEDC = 0x8C
SETPRECHARGEVOLTAGE = 0xBB
SETROWADDR = 0x75
SETVVOLTAGE = 0xBE
class ssd1325(common):
SETCOLUMNADDR = 0x15
SETROWADDR = 0x75
SETCURRENT = 0x84
SETSTARTLINE = 0xA1
SETOFFSET = 0xA2
NORMALDISPLAY = 0xA4
DISPLAYALLOFF = 0xA6
MASTERCONFIG = 0xAD
SETPRECHARGECOMPENABLE = 0xB0
SETPHASELEN = 0xB1
SETROWPERIOD = 0xB2
SETCLOCK = 0xB3
SETPRECHARGECOMP = 0xB4
SETGRAYTABLE = 0xB8
SETPRECHARGEVOLTAGE = 0xBC
SETVCOMLEVEL = 0xBE
SETVSL = 0xBF

Binary file not shown.

View File

@ -0,0 +1,410 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
# Example usage:
#
# from oled.serial import i2c, spi
# from oled.device import ssd1306, sh1106
# from oled.render import canvas
# from PIL import ImageDraw
#
# serial = i2c(port=1, address=0x3C)
# device = ssd1306(serial)
#
# with canvas(device) as draw:
# draw.rectangle(device.bounding_box, outline="white", fill="black")
# draw.text(30, 40, "Hello World", fill="white")
#
# As soon as the with-block scope level is complete, the graphics primitives
# will be flushed to the device.
#
# Creating a new canvas is effectively 'carte blanche': If you want to retain
# an existing canvas, then make a reference like:
#
# c = canvas(device)
# for X in ...:
# with c as draw:
# draw.rectangle(...)
#
# As before, as soon as the with block completes, the canvas buffer is flushed
# to the device
import atexit
from oled.serial import i2c
import oled.mixin as mixin
import oled.error
import oled.const
class device(mixin.capabilities):
"""
Base class for OLED driver classes
.. warning::
Direct use of the :func:`command` and :func:`data` methods are
discouraged: Screen updates should be effected through the
:func:`display` method, or preferably with the
:class:`oled.render.canvas` context manager.
"""
def __init__(self, const=None, serial_interface=None):
self._const = const or oled.const.common
self._serial_interface = serial_interface or i2c()
atexit.register(self.cleanup)
def command(self, *cmd):
"""
Sends a command or sequence of commands through to the delegated
serial interface.
"""
self._serial_interface.command(*cmd)
def data(self, data):
"""
Sends a data byte or sequence of data bytes through to the delegated
serial interface.
"""
self._serial_interface.data(data)
def show(self):
"""
Sets the display mode ON, waking the device out of a prior
low-power sleep mode.
"""
self.command(self._const.DISPLAYON)
def hide(self):
"""
Switches the display mode OFF, putting the device in low-power
sleep mode.
"""
self.command(self._const.DISPLAYOFF)
def contrast(self, level):
"""
Switches the display contrast to the desired level, in the range
0-255. Note that setting the level to a low (or zero) value will
not necessarily dim the display to nearly off. In other words,
this method is **NOT** suitable for fade-in/out animation.
:param level: Desired contrast level in the range of 0-255.
:type level: int
"""
assert(level >= 0)
assert(level <= 255)
self.command(self._const.SETCONTRAST, level)
def cleanup(self):
self.hide()
self.clear()
self._serial_interface.cleanup()
class sh1106(device):
"""
Encapsulates the serial interface to the monochrome SH1106 OLED display
hardware. On creation, an initialization sequence is pumped to the display
to properly configure it. Further control commands can then be called to
affect the brightness and other settings.
"""
def __init__(self, serial_interface=None, width=128, height=64, rotate=0):
super(sh1106, self).__init__(oled.const.sh1106, serial_interface)
self.capabilities(width, height, rotate)
self._pages = self._h // 8
# FIXME: Delay doing anything here with alternate screen sizes
# until we are able to get a device to test with.
if width != 128 or height != 64:
raise oled.error.DeviceDisplayModeError(
"Unsupported display mode: {0} x {1}".format(width, height))
self.command(
self._const.DISPLAYOFF,
self._const.MEMORYMODE,
self._const.SETHIGHCOLUMN, 0xB0, 0xC8,
self._const.SETLOWCOLUMN, 0x10, 0x40,
self._const.SETSEGMENTREMAP,
self._const.NORMALDISPLAY,
self._const.SETMULTIPLEX, 0x3F,
self._const.DISPLAYALLON_RESUME,
self._const.SETDISPLAYOFFSET, 0x00,
self._const.SETDISPLAYCLOCKDIV, 0xF0,
self._const.SETPRECHARGE, 0x22,
self._const.SETCOMPINS, 0x12,
self._const.SETVCOMDETECT, 0x20,
self._const.CHARGEPUMP, 0x14)
self.contrast(0x7F)
self.clear()
self.show()
def display(self, image):
"""
Takes a 1-bit :py:mod:`PIL.Image` and dumps it to the SH1106
OLED display.
"""
assert(image.mode == self.mode)
assert(image.size == self.size)
image = self.preprocess(image)
set_page_address = 0xB0
image_data = image.getdata()
pixels_per_page = self.width * 8
buf = bytearray(self.width)
for y in range(0, int(self._pages * pixels_per_page), pixels_per_page):
self.command(set_page_address, 0x02, 0x10)
set_page_address += 1
offsets = [y + self.width * i for i in range(8)]
for x in range(self.width):
buf[x] = \
(image_data[x + offsets[0]] and 0x01) | \
(image_data[x + offsets[1]] and 0x02) | \
(image_data[x + offsets[2]] and 0x04) | \
(image_data[x + offsets[3]] and 0x08) | \
(image_data[x + offsets[4]] and 0x10) | \
(image_data[x + offsets[5]] and 0x20) | \
(image_data[x + offsets[6]] and 0x40) | \
(image_data[x + offsets[7]] and 0x80)
self.data(list(buf))
class ssd1306(device):
"""
Encapsulates the serial interface to the monochrome SSD1306 OLED display
hardware. On creation, an initialization sequence is pumped to the display
to properly configure it. Further control commands can then be called to
affect the brightness and other settings.
"""
def __init__(self, serial_interface=None, width=128, height=64, rotate=0):
super(ssd1306, self).__init__(oled.const.ssd1306, serial_interface)
self.capabilities(width, height, rotate)
self._pages = self._h // 8
self._buffer = [0] * self._w * self._pages
self._offsets = [n * self._w for n in range(8)]
# Supported modes
settings = {
(128, 64): dict(multiplex=0x3F, displayclockdiv=0x80, compins=0x12),
(128, 32): dict(multiplex=0x1F, displayclockdiv=0x80, compins=0x02),
(96, 16): dict(multiplex=0x0F, displayclockdiv=0x60, compins=0x02)
}.get((width, height))
if settings is None:
raise oled.error.DeviceDisplayModeError(
"Unsupported display mode: {0} x {1}".format(width, height))
self.command(
self._const.DISPLAYOFF,
self._const.SETDISPLAYCLOCKDIV, settings['displayclockdiv'],
self._const.SETMULTIPLEX, settings['multiplex'],
self._const.SETDISPLAYOFFSET, 0x00,
self._const.SETSTARTLINE,
self._const.CHARGEPUMP, 0x14,
self._const.MEMORYMODE, 0x00,
self._const.SETREMAP,
self._const.COMSCANDEC,
self._const.SETCOMPINS, settings['compins'],
self._const.SETPRECHARGE, 0xF1,
self._const.SETVCOMDETECT, 0x40,
self._const.DISPLAYALLON_RESUME,
self._const.NORMALDISPLAY)
self.contrast(0xCF)
self.clear()
self.show()
def display(self, image):
"""
Takes a 1-bit :py:mod:`PIL.Image` and dumps it to the SSD1306
OLED display.
"""
assert(image.mode == self.mode)
assert(image.size == self.size)
image = self.preprocess(image)
self.command(
# Column start/end address
self._const.COLUMNADDR, 0x00, self._w - 1,
# Page start/end address
self._const.PAGEADDR, 0x00, self._pages - 1)
w = self._w
pix = list(image.getdata())
step = w * 8
buf = self._buffer
os0, os1, os2, os3, os4, os5, os6, os7 = self._offsets
j = 0
for y in range(0, self._pages * step, step):
i = y + w - 1
while i >= y:
buf[j] = \
(0x01 if pix[i] > 0 else 0) | \
(0x02 if pix[i + os1] > 0 else 0) | \
(0x04 if pix[i + os2] > 0 else 0) | \
(0x08 if pix[i + os3] > 0 else 0) | \
(0x10 if pix[i + os4] > 0 else 0) | \
(0x20 if pix[i + os5] > 0 else 0) | \
(0x40 if pix[i + os6] > 0 else 0) | \
(0x80 if pix[i + os7] > 0 else 0)
i -= 1
j += 1
self.data(buf)
class ssd1331(device):
"""
Encapsulates the serial interface to the 16-bit color (5-6-5 RGB) SSD1331
OLED display hardware. On creation, an initialization sequence is pumped to
the display to properly configure it. Further control commands can then be
called to affect the brightness and other settings.
"""
def __init__(self, serial_interface=None, width=96, height=64, rotate=0):
super(ssd1331, self).__init__(oled.const.ssd1331, serial_interface)
self.capabilities(width, height, rotate, mode="RGB")
self._buffer = [0] * self._w * self._h * 2
if width != 96 or height != 64:
raise oled.error.DeviceDisplayModeError(
"Unsupported display mode: {0} x {1}".format(width, height))
self.command(
self._const.DISPLAYOFF,
self._const.SETREMAP, 0x72,
self._const.SETDISPLAYSTARTLINE, 0x00,
self._const.SETDISPLAYOFFSET, 0x00,
self._const.NORMALDISPLAY,
self._const.SETMULTIPLEX, 0x3F,
self._const.SETMASTERCONFIGURE, 0x8E,
self._const.POWERSAVEMODE, 0x0B,
self._const.PHASE12PERIOD, 0x74,
self._const.CLOCKDIVIDER, 0xD0,
self._const.SETPRECHARGESPEEDA, 0x80,
self._const.SETPRECHARGESPEEDB, 0x80,
self._const.SETPRECHARGESPEEDC, 0x80,
self._const.SETPRECHARGEVOLTAGE, 0x3E,
self._const.SETVVOLTAGE, 0x3E,
self._const.MASTERCURRENTCONTROL, 0x0F)
self.contrast(0xFF)
self.clear()
self.show()
def display(self, image):
"""
Takes a 24-bit RGB :py:mod:`PIL.Image` and dumps it to the SSD1331 OLED
display.
"""
assert(image.mode == self.mode)
assert(image.size == self.size)
image = self.preprocess(image)
self.command(
self._const.SETCOLUMNADDR, 0x00, self._w - 1,
self._const.SETROWADDR, 0x00, self._h - 1)
i = 0
buf = self._buffer
for r, g, b in image.getdata():
# 65K format 1
buf[i] = r & 0xF8 | g >> 5
buf[i + 1] = g << 5 & 0xE0 | b >> 3
i += 2
self.data(buf)
def contrast(self, level):
"""
Switches the display contrast to the desired level, in the range
0-255. Note that setting the level to a low (or zero) value will
not necessarily dim the display to nearly off. In other words,
this method is **NOT** suitable for fade-in/out animation.
:param level: Desired contrast level in the range of 0-255.
:type level: int
"""
assert(level >= 0)
assert(level <= 255)
self.command(self._const.SETCONTRASTA, level,
self._const.SETCONTRASTB, level,
self._const.SETCONTRASTC, level)
class ssd1325(device):
"""
Encapsulates the serial interface to the 4-bit greyscale SSD1325 OLED
display hardware. On creation, an initialization sequence is pumped to the
display to properly configure it. Further control commands can then be
called to affect the brightness and other settings.
"""
def __init__(self, serial_interface=None, width=128, height=64, rotate=0):
super(ssd1325, self).__init__(oled.const.ssd1325, serial_interface)
self.capabilities(width, height, rotate, mode="RGB")
self._buffer = [0] * (self._w * self._h // 2)
if width != 128 or height != 64:
raise oled.error.DeviceDisplayModeError(
"Unsupported display mode: {0} x {1}".format(width, height))
self.command(
self._const.DISPLAYOFF,
self._const.SETCLOCK, 0xF1,
self._const.SETMULTIPLEX, 0x3F,
self._const.SETOFFSET, 0x4C,
self._const.SETSTARTLINE, 0x00,
self._const.MASTERCONFIG, 0x02,
self._const.SETREMAP, 0x50,
self._const.SETCURRENT + 2,
self._const.SETGRAYTABLE, 0x01, 0x11, 0x22, 0x32, 0x43, 0x54, 0x65, 0x76)
self.contrast(0xFF)
self.command(
self._const.SETROWPERIOD, 0x51,
self._const.SETPHASELEN, 0x55,
self._const.SETPRECHARGECOMP, 0x02,
self._const.SETPRECHARGECOMPENABLE, 0x28,
self._const.SETVCOMLEVEL, 0x1C,
self._const.SETVSL, 0x0F,
self._const.NORMALDISPLAY)
self.clear()
self.show()
def display(self, image):
"""
Takes a 24-bit RGB :py:mod:`PIL.Image` and dumps it to the SSD1325 OLED
display, converting the image pixels to 4-bit greyscale using a
simplified Luma calculation, based on *Y'=0.299R'+0.587G'+0.114B'*.
"""
assert(image.mode == self.mode)
assert(image.size == self.size)
image = self.preprocess(image)
self.command(
self._const.SETCOLUMNADDR, 0x00, self._w - 1,
self._const.SETROWADDR, 0x00, self._h - 1)
i = 0
buf = self._buffer
for r, g, b in image.getdata():
# RGB->Greyscale luma calculation into 4-bits
grey = (r * 306 + g * 601 + b * 117) >> 14
if i % 2 == 0:
buf[i // 2] = grey
else:
buf[i // 2] |= (grey << 4)
i += 1
self.data(buf)

Binary file not shown.

View File

@ -0,0 +1,237 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
import os
import sys
import atexit
import logging
from PIL import Image
from oled.device import device
from oled.serial import noop
logger = logging.getLogger(__name__)
class emulator(device):
"""
Base class for emulated OLED driver classes
"""
def __init__(self, width, height, rotate, mode, transform, scale):
super(emulator, self).__init__(serial_interface=noop())
try:
import pygame
except:
raise RuntimeError("Emulator requires pygame to be installed")
self._pygame = pygame
self.capabilities(width, height, rotate, mode)
self.scale = 1 if transform == "none" else scale
self._transform = getattr(transformer(pygame, width, height, scale),
"none" if scale == 1 else transform)
def cleanup(self):
pass
def to_surface(self, image):
"""
Converts a :py:mod:`PIL.Image` into a :class:`pygame.Surface`,
transforming it according to the ``transform`` and ``scale``
constructor arguments.
"""
im = image.convert("RGB")
mode = im.mode
size = im.size
data = im.tobytes()
del im
surface = self._pygame.image.fromstring(data, size, mode)
return self._transform(surface)
class capture(emulator):
"""
Pseudo-device that acts like an OLED display, except that it writes
the image to a numbered PNG file when the :func:`display` method
is called.
While the capability of an OLED device is monochrome, there is no
limitation here, and hence supports 24-bit color depth.
"""
def __init__(self, width=128, height=64, rotate=0, mode="RGB",
transform="scale2x", scale=2, file_template="oled_{0:06}.png",
**kwargs):
super(capture, self).__init__(width, height, rotate, mode, transform, scale)
self._count = 0
self._file_template = file_template
def display(self, image):
"""
Takes a :py:mod:`PIL.Image` and dumps it to a numbered PNG file.
"""
assert(image.size == self.size)
self._count += 1
filename = self._file_template.format(self._count)
image = self.preprocess(image)
surface = self.to_surface(image)
logger.debug("Writing: {0}".format(filename))
self._pygame.image.save(surface, filename)
class gifanim(emulator):
"""
Pseudo-device that acts like an OLED display, except that it collects
the images when the :func:`display` method is called, and on exit,
assembles them into an animated GIF image.
While the capability of an OLED device is monochrome, there is no
limitation here, and hence supports 24-bit color depth, albeit with
an indexed color palette.
"""
def __init__(self, width=128, height=64, rotate=0, mode="RGB",
transform="scale2x", scale=2, filename="oled_anim.gif",
duration=0.01, loop=0, max_frames=None, **kwargs):
super(gifanim, self).__init__(width, height, rotate, mode, transform, scale)
self._images = []
self._count = 0
self._max_frames = max_frames
self._filename = filename
self._loop = loop
self._duration = duration
atexit.register(self.write_animation)
def display(self, image):
"""
Takes an image, scales it according to the nominated transform, and
stores it for later building into an animated GIF.
"""
assert(image.size == self.size)
image = self.preprocess(image)
surface = self.to_surface(image)
rawbytes = self._pygame.image.tostring(surface, "RGB", False)
im = Image.frombytes("RGB", (self._w * self.scale, self._h * self.scale), rawbytes)
self._images.append(im)
self._count += 1
logger.debug("Recording frame: {0}".format(self._count))
if self._max_frames and self._count >= self._max_frames:
sys.exit(0)
def write_animation(self):
logger.debug("Please wait... building animated GIF")
with open(self._filename, "w+b") as fp:
self._images[0].save(fp, save_all=True, loop=self._loop,
duration=int(self._duration * 1000),
append_images=self._images[1:],
format="GIF")
logger.debug("Wrote {0} frames to file: {1} ({2} bytes)".format(
len(self._images), self._filename, os.stat(self._filename).st_size))
class pygame(emulator):
"""
Pseudo-device that acts like an OLED display, except that it renders
to an displayed window. The frame rate is limited to 60FPS (much faster
than a Raspberry Pi can acheive, but this can be overridden as necessary).
While the capability of an OLED device is monochrome, there is no
limitation here, and hence supports 24-bit color depth.
:mod:`pygame` is used to render the emulated display window, and it's
event loop is checked to see if the ESC key was pressed or the window
was dismissed: if so :func:`sys.exit()` is called.
"""
def __init__(self, width=128, height=64, rotate=0, mode="RGB", transform="scale2x",
scale=2, frame_rate=60, **kwargs):
super(pygame, self).__init__(width, height, rotate, mode, transform, scale)
self._pygame.init()
self._pygame.font.init()
self._pygame.display.set_caption("OLED Emulator")
self._clock = self._pygame.time.Clock()
self._fps = frame_rate
self._screen = self._pygame.display.set_mode((width * self.scale, height * self.scale))
self._screen.fill((0, 0, 0))
self._pygame.display.flip()
def _abort(self):
keystate = self._pygame.key.get_pressed()
return keystate[self._pygame.K_ESCAPE] or self._pygame.event.peek(self._pygame.QUIT)
def display(self, image):
"""
Takes a :py:mod:`PIL.Image` and renders it to a pygame display surface.
"""
assert(image.size == self.size)
image = self.preprocess(image)
self._clock.tick(self._fps)
self._pygame.event.pump()
if self._abort():
self._pygame.quit()
sys.exit()
surface = self.to_surface(image)
self._screen.blit(surface, (0, 0))
self._pygame.display.flip()
class dummy(emulator):
"""
Pseudo-device that acts like an OLED display, except that it does nothing
other than retain a copy of the displayed image. It is mostly useful for
testing. While the capability of an OLED device is monochrome, there is no
limitation here, and hence supports 24-bit color depth.
"""
def __init__(self, width=128, height=64, rotate=0, mode="RGB", transform="scale2x",
scale=2, **kwargs):
super(dummy, self).__init__(width, height, rotate, mode, transform, scale)
self.image = None
def display(self, image):
"""
Takes a :py:mod:`PIL.Image` and makes a copy of it for later
use/inspection.
"""
assert(image.size == self.size)
self.image = self.preprocess(image).copy()
class transformer(object):
"""
Helper class used to dispatch transformation operations.
"""
def __init__(self, pygame, width, height, scale):
self._pygame = pygame
self._output_size = (width * scale, height * scale)
self.scale = scale
def none(self, surface):
"""
No-op transform - used when ``scale`` = 1
"""
return surface
def scale2x(self, surface):
"""
Scales using the AdvanceMAME Scale2X algorithm which does a
'jaggie-less' scale of bitmap graphics.
"""
assert(self.scale == 2)
return self._pygame.transform.scale2x(surface)
def smoothscale(self, surface):
"""
Smooth scaling using MMX or SSE extensions if available
"""
return self._pygame.transform.smoothscale(surface, self._output_size)
def identity(self, surface):
"""
Fast scale operation that does not sample the results
"""
return self._pygame.transform.scale(surface, self._output_size)

Binary file not shown.

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
"""
Exceptions for this library.
"""
class Error(Exception):
"""
Base class for exceptions in this library.
"""
pass
class DeviceNotFoundError(Error):
"""
Exception raised when a device cannot be found.
"""
class DevicePermissionError(Error):
"""
Exception raised when permission to access the device is denied.
"""
class DeviceAddressError(Error):
"""
Exception raised when an invalid device address is detected.
"""
class DeviceDisplayModeError(Error):
"""
Exception raised when an invalid device display mode is detected.
"""

Binary file not shown.

View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
from PIL import Image
class capabilities(object):
def capabilities(self, width, height, rotate, mode="1"):
assert mode in ("1", "RGB", "RGBA")
assert rotate in (0, 1, 2, 3)
self._w = width
self._h = height
self.width = width if rotate % 2 == 0 else height
self.height = height if rotate % 2 == 0 else width
self.size = (self.width, self.height)
self.bounding_box = (0, 0, self.width - 1, self.height - 1)
self.rotate = rotate
self.mode = mode
def clear(self):
"""
Initializes the device memory with an empty (blank) image.
"""
self.display(Image.new(self.mode, self.size))
def preprocess(self, image):
if self.rotate == 0:
return image
angle = self.rotate * -90
return image.rotate(angle, expand=True).crop((0, 0, self._w, self._h))
def display(self, image):
raise NotImplementedError()

Binary file not shown.

View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
from PIL import Image, ImageDraw
class canvas(object):
"""
A canvas returns a properly-sized :py:mod:`PIL.ImageDraw` object onto
which the caller can draw upon. As soon as the with-block completes, the
resultant image is flushed onto the device.
By default, any color (other than black) will be treated as white and
displayed on the device. However, this behaviour can be changed by adding
``dither=True`` and the image will be converted from RGB space into a 1-bit
monochrome image where dithering is employed to differentiate colors at the
expense of resolution.
"""
def __init__(self, device, dither=False):
self.draw = None
self.image = Image.new("RGB" if dither else device.mode, device.size)
self.device = device
self.dither = dither
def __enter__(self):
self.draw = ImageDraw.Draw(self.image)
return self.draw
def __exit__(self, type, value, traceback):
if type is None:
if self.dither:
self.image = self.image.convert(self.device.mode)
# do the drawing onto the device
self.device.display(self.image)
del self.draw # Tidy up the resources
return False # Never suppress exceptions

Binary file not shown.

View File

@ -0,0 +1,173 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
import errno
import oled.error
class i2c(object):
"""
Wrap an `I2C <https://en.wikipedia.org/wiki/I%C2%B2C>`_ interface to
provide data and command methods.
:param bus: I2C bus instance.
:type bus:
:param port: I2C port number.
:type port: int
:param address: I2C address.
:type address:
:raises oled.error.DeviceAddressError: I2C device address is invalid.
:raises oled.error.DeviceNotFoundError: I2C device could not be found.
:raises oled.error.DevicePermissionError: Permission to access I2C device
denied.
.. note::
1. Only one of ``bus`` OR ``port`` arguments should be supplied;
if both are, then ``bus`` takes precedence.
2. If ``bus`` is provided, there is an implicit expectation
that it has already been opened.
"""
def __init__(self, bus=None, port=1, address=0x3C):
import smbus2
self._cmd_mode = 0x00
self._data_mode = 0x40
try:
self._addr = int(str(address), 0)
except ValueError:
raise oled.error.DeviceAddressError(
'I2C device address invalid: {}'.format(address))
try:
self._bus = bus or smbus2.SMBus(port)
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
# FileNotFoundError
raise oled.error.DeviceNotFoundError(
'I2C device not found: {}'.format(e.filename))
elif e.errno == errno.EPERM or e.errno == errno.EACCES:
# PermissionError
raise oled.error.DevicePermissionError(
'I2C device permission denied: {}'.format(e.filename))
else:
raise
def command(self, *cmd):
"""
Sends a command or sequence of commands through to the I2C address
- maximum allowed is 32 bytes in one go.
"""
assert(len(cmd) <= 32)
self._bus.write_i2c_block_data(self._addr, self._cmd_mode, list(cmd))
def data(self, data):
"""
Sends a data byte or sequence of data bytes through to the I2C
address - maximum allowed in one transaction is 32 bytes, so if
data is larger than this, it is sent in chunks.
"""
i = 0
n = len(data)
write = self._bus.write_i2c_block_data
while i < n:
write(self._addr, self._data_mode, list(data[i:i + 32]))
i += 32
def cleanup(self):
"""
Clean up I2C resources
"""
self._bus.close()
class spi(object):
"""
Wraps an `SPI <https://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus>`_
interface to provide data and command methods.
* The DC pin (Data/Command select) defaults to GPIO 24 (BCM).
* The RST pin (Reset) defaults to GPIO 25 (BCM).
:raises oled.error.DeviceNotFoundError: SPI device could not be found.
"""
def __init__(self, spi=None, gpio=None, port=0, device=0,
bus_speed_hz=8000000, bcm_DC=24, bcm_RST=25):
self._gpio = gpio or self.__rpi_gpio__()
self._spi = spi or self.__spidev__()
try:
self._spi.open(port, device)
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
# FileNotFoundError
raise oled.error.DeviceNotFoundError('SPI device not found')
else:
raise
self._spi.max_speed_hz = bus_speed_hz
self._bcm_DC = bcm_DC
self._bcm_RST = bcm_RST
self._cmd_mode = self._gpio.LOW # Command mode = Hold low
self._data_mode = self._gpio.HIGH # Data mode = Pull high
self._gpio.setmode(self._gpio.BCM)
self._gpio.setup(self._bcm_DC, self._gpio.OUT)
self._gpio.setup(self._bcm_RST, self._gpio.OUT)
self._gpio.output(self._bcm_RST, self._gpio.HIGH) # Keep RESET pulled high
def __rpi_gpio__(self):
# RPi.GPIO _really_ doesn't like being run on anything other than
# a Raspberry Pi... this is imported here so we can swap out the
# implementation for a mock
import RPi.GPIO
return RPi.GPIO
def __spidev__(self):
# spidev cant compile on macOS, so use a similar technique to
# initialize (mainly so the tests run unhindered)
import spidev
return spidev.SpiDev()
def command(self, *cmd):
"""
Sends a command or sequence of commands through to the SPI device.
"""
self._gpio.output(self._bcm_DC, self._cmd_mode)
self._spi.xfer2(list(cmd))
def data(self, data):
"""
Sends a data byte or sequence of data bytes through to the SPI device.
If the data is more than 4Kb in size, it is sent in chunks.
"""
self._gpio.output(self._bcm_DC, self._data_mode)
i = 0
n = len(data)
write = self._spi.xfer2
while i < n:
write(data[i:i + 4096])
i += 4096
def cleanup(self):
"""
Clean up SPI & GPIO resources
"""
self._spi.close()
self._gpio.cleanup()
class noop(object):
"""
Does nothing, used for pseudo-devices / emulators, which dont have a serial
interface.
"""
def command(self, *cmd):
pass
def data(self, data):
pass
def cleanup(self):
pass

Binary file not shown.

View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
# Adapted from http://code.activestate.com/recipes/577187-python-thread-pool/
# Attribution: Created by Emilio Monti on Sun, 11 Apr 2010 (MIT License).
from threading import Thread
class worker(Thread):
"""
Thread executing tasks from a given tasks queue
"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
print(e)
self.tasks.task_done()
class threadpool:
"""
Pool of threads consuming tasks from a queue
"""
def __init__(self, num_threads):
try:
from Queue import Queue
except ImportError:
from queue import Queue
self.tasks = Queue(num_threads)
for _ in range(num_threads):
worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""
Add a task to the queue
"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""
Wait for completion of all the tasks in the queue
"""
self.tasks.join()

View File

@ -0,0 +1,371 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Richard Hull and contributors
# See LICENSE.rst for details.
import time
from PIL import Image, ImageDraw, ImageFont
import oled.mixin as mixin
from oled.threadpool import threadpool
pool = threadpool(4)
def calc_bounds(xy, entity):
"""
For an entity with width and height attributes, determine
the bounding box if were positioned at (x, y).
"""
left, top = xy
right, bottom = left + entity.width, top + entity.height
return [left, top, right, bottom]
def range_overlap(a_min, a_max, b_min, b_max):
"""
Neither range is completely greater than the other
"""
return (a_min < b_max) and (b_min < a_max)
class viewport(mixin.capabilities):
def __init__(self, device, width, height):
self.capabilities(width, height, rotate=0, mode=device.mode)
self._device = device
self._backing_image = Image.new(self.mode, self.size)
self._position = (0, 0)
self._hotspots = []
def display(self, image):
assert(image.mode == self.mode)
assert(image.size == self.size)
self._backing_image.paste(image)
self.refresh()
def set_position(self, xy):
self._position = xy
self.refresh()
def add_hotspot(self, hotspot, xy):
"""
Add the hotspot at (x, y). The hotspot must fit inside the bounds
of the virtual device. If it does not then an AssertError is raised.
"""
(x, y) = xy
assert(0 <= x <= self.width - hotspot.width)
assert(0 <= y <= self.height - hotspot.height)
# TODO: should it check to see whether hotspots overlap each other?
# Is sensible to _allow_ them to overlap?
self._hotspots.append((hotspot, xy))
def remove_hotspot(self, hotspot, xy):
"""
Remove the hotspot at (x, y): Any previously rendered image where the
hotspot was placed is erased from the backing image, and will be
"undrawn" the next time the virtual device is refreshed. If the
specified hotspot is not found (x, y), a ValueError is raised.
"""
self._hotspots.remove((hotspot, xy))
eraser = Image.new(self.mode, hotspot.size)
self._backing_image.paste(eraser, xy)
def is_overlapping_viewport(self, hotspot, xy):
"""
Checks to see if the hotspot at position (x, y)
is (at least partially) visible according to the
position of the viewport
"""
l1, t1, r1, b1 = calc_bounds(xy, hotspot)
l2, t2, r2, b2 = calc_bounds(self._position, self._device)
return range_overlap(l1, r1, l2, r2) and range_overlap(t1, b1, t2, b2)
def refresh(self):
should_wait = False
for hotspot, xy in self._hotspots:
if hotspot.should_redraw() and self.is_overlapping_viewport(hotspot, xy):
pool.add_task(hotspot.paste_into, self._backing_image, xy)
should_wait = True
if should_wait:
pool.wait_completion()
im = self._backing_image.crop(box=self._crop_box())
self._device.display(im)
del im
def _crop_box(self):
(left, top) = self._position
right = left + self._device.width
bottom = top + self._device.height
assert(0 <= left <= right <= self.width)
assert(0 <= top <= bottom <= self.height)
return (left, top, right, bottom)
class hotspot(mixin.capabilities):
"""
A hotspot (`a place of more than usual interest, activity, or popularity`)
is a live display which may be added to a virtual viewport - if the hotspot
and the viewport are overlapping, then the :func:`update` method will be
automatically invoked when the viewport is being refreshed or its position
moved (such that an overlap occurs).
You would either:
* create a ``hotspot`` instance, suppling a render function (taking an
:py:mod:`PIL.ImageDraw` object, ``width`` & ``height`` dimensions. The
render function should draw within a bounding box of (0, 0, width,
height), and render a full frame.
* sub-class ``hotspot`` and override the :func:``should_redraw`` and
:func:`update` methods. This might be more useful for slow-changing
values where it is not necessary to update every refresh cycle, or
your implementation is stateful.
"""
def __init__(self, width, height, draw_fn=None):
self.capabilities(width, height, rotate=0) # TODO: set mode?
self._fn = draw_fn
def paste_into(self, image, xy):
im = Image.new(image.mode, self.size)
draw = ImageDraw.Draw(im)
self.update(draw)
image.paste(im, xy)
del draw
del im
def should_redraw(self):
"""
Override this method to return true or false on some condition
(possibly on last updated member variable) so that for slow changing
hotspots they are not updated too frequently.
"""
return True
def update(self, draw):
if self._fn:
self._fn(draw, self.width, self.height)
class snapshot(hotspot):
"""
A snapshot is a `type of` hotspot, but only updates once in a given
interval, usually much less frequently than the viewport requests refresh
updates.
"""
def __init__(self, width, height, draw_fn=None, interval=1.0):
super(snapshot, self).__init__(width, height, draw_fn)
self.interval = interval
self.last_updated = 0.0
def should_redraw(self):
"""
Only requests a redraw after ``interval`` seconds have elapsed
"""
return time.time() - self.last_updated > self.interval
def paste_into(self, image, xy):
super(snapshot, self).paste_into(image, xy)
self.last_updated = time.time()
class terminal(object):
"""
Provides a terminal-like interface to a device (or a device-like object
that has :class:`mixin.capabilities` characteristics).
"""
def __init__(self, device, font=None, color="white", bgcolor="black", tabstop=4, line_height=None, animate=True):
self._device = device
self.font = font or ImageFont.load_default()
self.color = color
self.bgcolor = bgcolor
self.animate = animate
self.tabstop = tabstop
self._cw, self._ch = (0, 0)
for i in range(32, 128):
w, h = self.font.getsize(chr(i))
self._cw = max(w, self._cw)
self._ch = max(h, self._ch)
self._ch = line_height or self._ch
self.width = device.width // self._cw
self.height = device.height // self._ch
self.size = (self.width, self.height)
self._backing_image = Image.new(self._device.mode, self._device.size, self.bgcolor)
self._canvas = ImageDraw.Draw(self._backing_image)
self.clear()
def clear(self):
"""
Clears the display and resets the cursor position to (0, 0).
"""
self._cx, self._cy = (0, 0)
self._canvas.rectangle(self._device.bounding_box, fill=self.bgcolor)
self.flush()
def println(self, text=""):
"""
Prints the supplied text to the device, scrolling where necessary.
The text is always followed by a newline.
"""
self.puts(text)
self.newline()
def puts(self, text):
"""
Prints the supplied text, handling special character codes for carriage
return (\\r), newline (\\n), backspace (\\b) and tab (\\t).
If the ``animate`` flag was set to True (default), then each character
is flushed to the device, giving the effect of 1970's teletype device.
"""
for line in str(text).split("\n"):
for char in line:
if char == '\r':
self.carriage_return()
elif char == '\n':
self.newline()
elif char == '\b':
self.backspace()
elif char == '\t':
self.tab()
else:
self.putch(char, flush=self.animate)
def putch(self, ch, flush=True):
"""
Prints the specific character, which must be a valid printable ASCII
value in the range 32..127 only.
"""
assert(32 <= ord(ch) <= 127)
w = self.font.getsize(ch)[0]
if self._cx + w >= self._device.width:
self.newline()
self.erase()
self._canvas.text((self._cx, self._cy), text=ch, font=self.font, fill=self.color)
self._cx += w
if flush:
self.flush()
def carriage_return(self):
"""
Returns the cursor position to the left-hand side without advancing
downwards.
"""
self._cx = 0
def tab(self):
"""
Advances the cursor position to the next (soft) tabstop.
"""
soft_tabs = self.tabstop - ((self._cx // self._cw) % self.tabstop)
for _ in range(soft_tabs):
self.putch(" ", flush=False)
def newline(self):
"""
Advances the cursor position ot the left hand side, and to the next
line. If the cursor is on the lowest line, the displayed contents are
scrolled, causing the top line to be lost.
"""
self.carriage_return()
if self._cy + (2 * self._ch) >= self._device.height:
# Simulate a vertical scroll
copy = self._backing_image.crop((0, self._ch, self._device.width, self._device.height))
self._backing_image.paste(copy, (0, 0))
self._canvas.rectangle((0, copy.height, self._device.width, self._device.height), fill=self.bgcolor)
else:
self._cy += self._ch
self.flush()
if self.animate:
time.sleep(0.2)
def backspace(self):
"""
Moves the cursor one place to the left, erasing the character at the
current position. Cannot move beyound column zero, nor onto the
previous line
"""
if self._cx + self._cw >= 0:
self.erase()
self._cx -= self._cw
self.flush()
def erase(self):
"""
Erase the contents of the cursor's current postion without moving the
cursor's position.
"""
self._canvas.rectangle((self._cx, self._cy, self._cx + self._cw, self._cy + self._ch), fill=self.bgcolor)
def flush(self):
"""
Cause the current backing store to be rendered on the nominated device.
"""
self._device.display(self._backing_image)
class history(mixin.capabilities):
"""
Wraps a device (or emulator) to provide a facility to be able to make a
savepoint (a point at which the screen display can be "rolled-back" to).
This is mostly useful for displaying transient error/dialog messages
which could be subsequently dismissed, reverting back to the previous
display.
"""
def __init__(self, device):
self.capabilities(device.width, device.height, rotate=0, mode=device.mode)
self._savepoints = []
self._device = device
self._last_image = None
def display(self, image):
self._last_image = image.copy()
self._device.display(image)
def savepoint(self):
"""
Copies the last displayed image.
"""
if self._last_image:
self._savepoints.append(self._last_image)
self._last_image = None
def restore(self, drop=0):
"""
Restores the last savepoint. If ``drop`` is supplied and greater than
zero, then that many savepoints are dropped, and the next savepoint is
restored.
"""
assert(drop >= 0)
while drop > 0:
self._savepoints.pop()
drop -= 1
img = self._savepoints.pop()
self.display(img)
def __len__(self):
"""
Indication of the number of savepoints retained.
"""
return len(self._savepoints)

View File

@ -0,0 +1,202 @@
Metadata-Version: 1.1
Name: smbus2
Version: 0.2.0
Summary: smbus2 is a drop-in replacement for smbus-cffi/smbus-python in pure Python
Home-page: https://github.com/kplindegaard/smbus2
Author: Karl-Petter Lindegaard
Author-email: kp.lindegaard@gmail.com
License: MIT
Description: smbus2
======
A drop-in replacement for smbus-cffi/smbus-python in pure Python
|travis|
.. |travis| image:: https://travis-ci.org/kplindegaard/smbus2.svg?branch=master
:target: https://travis-ci.org/kplindegaard/smbus2
Introduction
============
smbus2 is (yet another) pure Python implementation of the `python-smbus <http://www.lm-sensors.org/browser/i2c-tools/trunk/py-smbus/>`_ package.
It was designed from the ground up with two goals in mind:
1. It should be a drop-in replacement of smbus. The syntax shall be the same.
2. Use the inherent i2c structs and unions to a greater extent than other pure Python implementations like `pysmbus <https://github.com/bjornt/pysmbus>`_ does. By doing so, it will be more feature complete and easier to extend.
Currently supported features are:
* Get i2c capabilities (I2C_FUNCS)
* read_byte
* write_byte
* read_byte_data
* write_byte_data
* read_word_data
* write_word_data
* read_i2c_block_data
* write_i2c_block_data
* i2c_rdwr - *combined write/read transactions with repeated start*
It is developed on Python 2.7 but works without any modifications in Python 3.X too.
SMBus code examples
===================
smbus2 installs next to smbus as the package, so it's not really a 100% replacement. You must change the module name.
Example 1a: Read a byte
-----------------------
.. code:: python
from smbus2 import SMBus
# Open i2c bus 1 and read one byte from address 80, offset 0
bus = SMBus(1)
b = bus.read_byte_data(80, 0)
print(b)
bus.close()
Example 1b: Read a byte using 'with'
------------------------------------
This is the very same example but safer to use since the smbus will be closed automatically when exiting the with block.
.. code:: python
from smbus2 import SMBusWrapper
with SMBusWrapper(1) as bus:
b = bus.read_byte_data(80, 0)
print(b)
Example 2: Read a block of data
-------------------------------
You can read up to 32 bytes at once.
.. code:: python
from smbus2 import SMBusWrapper
with SMBusWrapper(1) as bus:
# Read a block of 16 bytes from address 80, offset 0
block = bus.read_i2c_block_data(80, 0, 16)
# Returned value is a list of 16 bytes
print(block)
Example 3: Write a byte
-----------------------
.. code:: python
from smbus2 import SMBusWrapper
with SMBusWrapper(1) as bus:
# Write a byte to address 80, offset 0
data = 45
bus.write_byte_data(80, 0, data)
Example 4: Write a block of data
--------------------------------
It is possible to write 32 bytes at the time, but I have found that error-prone. Write less and add a delay in between if you run into trouble.
.. code:: python
from smbus2 import SMBusWrapper
with SMBusWrapper(1) as bus:
# Write a block of 8 bytes to address 80 from offset 0
data = [1, 2, 3, 4, 5, 6, 7, 8]
bus.write_i2c_block_data(80, 0, data)
I2C
===
Starting with v0.2, the smbus2 library also has support for combined read and write transactions. *i2c_rdwr* is not really a SMBus feature but comes in handy when the master needs to:
1. read or write bulks of data larger than SMBus' 32 bytes limit.
1. write some data and then read from the slave with a repeated start and no stop bit between.
Each operation is represented by a *i2c_msg* message object.
Example 5: Single i2c_rdwr
--------------------------
.. code:: python
from smbus2 import SMBus, ic_msg
with SMBusWrapper(1) as bus:
# Read 64 bytes from address 80
msg = i2c_msg.read(80, 64)
bus.i2c_rdwr(msg)
# Write some bytes to address 80
msg = i2c_msg.write(80, [65, 66, 67, 68])
bus.i2c_rdwr(msg)
Example 6: Dual i2c_rdwr
------------------------
To perform dual operations just add more i2c_msg instances to the bus call:
.. code:: python
from smbus2 import SMBus, ic_msg
# Single transaction writing two bytes then read two at address 80
write = i2c_msg.write(80, [40, 50])
read = i2c_msg.read(80, 2)
with SMBusWrapper(1) as bus:
bus.i2c_rdwr(write, read)
Example 7: Access i2c_msg data
------------------------------
All data is contained in the i2c_msg instances. Here are some data access alternatives.
.. code:: python
# 1: Convert message content to list
msg = i2c.write(60, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
data = list(msg) # data = [1, 2, 3, ...]
print(len(data)) # => 10
# 2: i2c_msg is iterable
for value in msg:
print(value)
# 3: Through i2c_msg properties
for k in range(msg.len):
print(msg.buf[k])
Installation instructions
=========================
smbus2 is pure Python code and requires no compilation. Installation is easy:
.. code:: bash
python setup.py install
Or just use pip
.. code:: bash
pip install smbus2
Keywords: smbus,smbus2,python,i2c,raspberrypi,linux
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Topic :: Utilities
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3

View File

@ -0,0 +1,9 @@
README.rst
setup.cfg
setup.py
smbus2/__init__.py
smbus2/smbus2.py
smbus2.egg-info/PKG-INFO
smbus2.egg-info/SOURCES.txt
smbus2.egg-info/dependency_links.txt
smbus2.egg-info/top_level.txt

View File

@ -0,0 +1,8 @@
../smbus2/smbus2.py
../smbus2/__init__.py
../smbus2/smbus2.pyc
../smbus2/__init__.pyc
top_level.txt
SOURCES.txt
PKG-INFO
dependency_links.txt

View File

@ -0,0 +1 @@
smbus2

View File

@ -0,0 +1,3 @@
from .smbus2 import SMBus, SMBusWrapper, i2c_msg
__version__ = "0.2.0"

Binary file not shown.

View File

@ -0,0 +1,451 @@
"""smbus2 - A drop-in replacement for smbus-cffi/smbus-python"""
# The MIT License (MIT)
# Copyright (c) 2017 Karl-Petter Lindegaard
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import sys
from fcntl import ioctl
from ctypes import c_uint32, c_uint8, c_uint16, c_char, POINTER, Structure, Array, Union, create_string_buffer
# Commands from uapi/linux/i2c-dev.h
I2C_SLAVE = 0x0703 # Use this slave address
I2C_SLAVE_FORCE = 0x0706 # Use this slave address, even if it is already in use by a driver!
I2C_FUNCS = 0x0705 # Get the adapter functionality mask
I2C_RDWR = 0x0707 # Combined R/W transfer (one STOP only)
I2C_SMBUS = 0x0720 # SMBus transfer. Takes pointer to i2c_smbus_ioctl_data
# SMBus transfer read or write markers from uapi/linux/i2c.h
I2C_SMBUS_WRITE = 0
I2C_SMBUS_READ = 1
# Size identifiers uapi/linux/i2c.h
I2C_SMBUS_BYTE = 1
I2C_SMBUS_BYTE_DATA = 2
I2C_SMBUS_WORD_DATA = 3
I2C_SMBUS_BLOCK_DATA = 5 # Can't get this one to work on my Raspberry Pi
I2C_SMBUS_I2C_BLOCK_DATA = 8
I2C_SMBUS_BLOCK_MAX = 32
# To determine what functionality is present (uapi/linux/i2c.h)
I2C_FUNC_I2C = 0x00000001
I2C_FUNC_10BIT_ADDR = 0x00000002
I2C_FUNC_PROTOCOL_MANGLING = 0x00000004 # I2C_M_IGNORE_NAK etc.
I2C_FUNC_SMBUS_PEC = 0x00000008
I2C_FUNC_NOSTART = 0x00000010 # I2C_M_NOSTART
I2C_FUNC_SLAVE = 0x00000020
I2C_FUNC_SMBUS_BLOCK_PROC_CALL = 0x00008000 # SMBus 2.0
I2C_FUNC_SMBUS_QUICK = 0x00010000
I2C_FUNC_SMBUS_READ_BYTE = 0x00020000
I2C_FUNC_SMBUS_WRITE_BYTE = 0x00040000
I2C_FUNC_SMBUS_READ_BYTE_DATA = 0x00080000
I2C_FUNC_SMBUS_WRITE_BYTE_DATA = 0x00100000
I2C_FUNC_SMBUS_READ_WORD_DATA = 0x00200000
I2C_FUNC_SMBUS_WRITE_WORD_DATA = 0x00400000
I2C_FUNC_SMBUS_PROC_CALL = 0x00800000
I2C_FUNC_SMBUS_READ_BLOCK_DATA = 0x01000000
I2C_FUNC_SMBUS_WRITE_BLOCK_DATA = 0x02000000
I2C_FUNC_SMBUS_READ_I2C_BLOCK = 0x04000000 # I2C-like block xfer
I2C_FUNC_SMBUS_WRITE_I2C_BLOCK = 0x08000000 # w/ 1-byte reg. addr.
# i2c_msg flags from uapi/linux/i2c.h
I2C_M_RD = 0x0001
# Pointer definitions
LP_c_uint8 = POINTER(c_uint8)
LP_c_uint16 = POINTER(c_uint16)
LP_c_uint32 = POINTER(c_uint32)
#############################################################
# Type definitions as in i2c.h
class i2c_smbus_data(Array):
"""
Adaptation of the i2c_smbus_data union in i2c.h
Data for SMBus messages.
"""
_length_ = I2C_SMBUS_BLOCK_MAX+2
_type_ = c_uint8
class union_i2c_smbus_data(Union):
_fields_ = [
("byte", c_uint8),
("word", c_uint16),
("block", i2c_smbus_data)
]
union_pointer_type = POINTER(union_i2c_smbus_data)
class i2c_smbus_ioctl_data(Structure):
"""
As defined in i2c-dev.h
"""
_fields_ = [
('read_write', c_uint8),
('command', c_uint8),
('size', c_uint32),
('data', union_pointer_type)]
__slots__ = [name for name, type in _fields_]
@staticmethod
def create(read_write=I2C_SMBUS_READ, command=0, size=I2C_SMBUS_BYTE_DATA):
u = union_i2c_smbus_data()
return i2c_smbus_ioctl_data(
read_write=read_write, command=command, size=size,
data=union_pointer_type(u))
#############################################################
# Type definitions for i2c_rdwr combined transactions
class i2c_msg(Structure):
"""
As defined in i2c.h
"""
_fields_ = [
('addr', c_uint16),
('flags', c_uint16),
('len', c_uint16),
('buf', POINTER(c_char))]
__slots__ = [name for name, type in _fields_]
def __iter__(self):
return i2c_msg_iter(self)
@staticmethod
def read(address, length):
"""
Prepares an i2c read transaction
:param address: Slave address
:param length: Number of bytes to read
:return: New i2c_msg instance for read operation
:rtype: i2c_msg
"""
arr = create_string_buffer(length)
return i2c_msg(
addr=address, flags=I2C_M_RD, len=length,
buf=arr)
@staticmethod
def write(address, buf):
"""
Prepares an i2c write transaction
:param address: Slave address
:param buf: Bytes to write. Either list of values or string
:return: New i2c_msg instance for write operation
:rtype: i2c_msg
"""
if sys.version_info.major >= 3:
if type(buf) is str:
buf = bytes(buf, 'UTF-8')
else:
buf = bytes(buf)
else:
if type(buf) is not str:
buf = ''.join([chr(x) for x in buf])
arr = create_string_buffer(buf, len(buf))
return i2c_msg(
addr=address, flags=0, len=len(arr),
buf=arr)
class i2c_rdwr_ioctl_data(Structure):
"""
As defined in i2c-dev.h
"""
_fields_ = [
('msgs', POINTER(i2c_msg)),
('nmsgs', c_uint32)
]
__slots__ = [name for name, type in _fields_]
@staticmethod
def create(*i2c_msg_instances):
"""
Factory method for creating a i2c_rdwr_ioctl_data struct that can
be called with ioctl(fd, I2C_RDWR, data)
:param i2c_msg_instances: Up to 42 i2c_msg instances
:return:
:rtype: i2c_rdwr_ioctl_data
"""
n_msg = len(i2c_msg_instances)
msg_array = (i2c_msg * n_msg)(*i2c_msg_instances)
return i2c_rdwr_ioctl_data(
msgs=msg_array,
nmsgs=n_msg
)
class i2c_msg_iter:
"""
i2c_msg iterator. For convenience.
"""
def __init__(self, msg):
self.msg = msg
self.idx = 0
def __iter__(self):
return self
def __next__(self):
if self.idx < self.msg.len:
val = ord(self.msg.buf[self.idx])
self.idx += 1
return val
else:
raise StopIteration()
def next(self):
return self.__next__()
#############################################################
class SMBus(object):
def __init__(self, bus=None, force=False):
# type: (int, bool) -> None
"""
Initialize and (optionally) open an i2c bus connection.
:param bus: i2c bus number (e.g. 0 or 1). If not given, a subsequent call to open() is required.
:param force: force using the slave address even when driver is already using it
:type force: Boolean
"""
self.fd = None
self.funcs = 0
if bus is not None:
self.open(bus)
self.address = None
self.force = force
def open(self, bus):
# type: (int) -> None
"""
Open a given i2c bus.
:param bus: i2c bus number (e.g. 0 or 1)
"""
self.fd = os.open("/dev/i2c-{}".format(bus), os.O_RDWR)
self.funcs = self._get_funcs()
def close(self):
"""
Close the i2c connection.
"""
if self.fd:
os.close(self.fd)
self.fd = None
def _set_address(self, address):
# type: (int) -> None
"""
Set i2c slave address to use for subsequent calls.
:param address:
"""
if self.address != address:
self.address = address
if self.force:
ioctl(self.fd, I2C_SLAVE_FORCE, address)
else:
ioctl(self.fd, I2C_SLAVE, address)
def _get_funcs(self):
"""
Returns a 32-bit value stating supported I2C functions.
:rtype: int
"""
f = c_uint32()
ioctl(self.fd, I2C_FUNCS, f)
return f.value
def read_byte(self, i2c_addr):
# type: (int) -> int
"""
Read a single byte from a device
:rtype: int
:param i2c_addr: i2c address
:return: Read byte value
"""
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=0, size=I2C_SMBUS_BYTE
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.byte
def write_byte(self, i2c_addr, value):
# type: (int, int) -> None
"""
Write a single byte to a device
:param i2c_addr: i2c address
:param value: value to write
"""
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=value, size=I2C_SMBUS_BYTE
)
ioctl(self.fd, I2C_SMBUS, msg)
def read_byte_data(self, i2c_addr, register):
# type: (int, int) -> int
"""
Read a single byte from a designated register.
:rtype: int
:param i2c_addr: i2c address
:param register: Register to read
:return: Read byte value
"""
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_BYTE_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.byte
def write_byte_data(self, i2c_addr, register, value):
# type: (int, int, int) -> None
"""
Write a byte to a given register
:param i2c_addr: i2c address
:param register: Register to write to
:param value: Byte value to transmit
"""
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_BYTE_DATA
)
msg.data.contents.byte = value
ioctl(self.fd, I2C_SMBUS, msg)
def read_word_data(self, i2c_addr, register):
# type: (int, int) -> int
"""
Read a single word (2 bytes) from a given register
:rtype: int
:param i2c_addr: i2c address
:param register: Register to read
:return: 2-byte word
"""
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_WORD_DATA
)
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.word
def write_word_data(self, i2c_addr, register, value):
# type: (int, int, int) -> None
"""
Write a byte to a given register
:param i2c_addr: i2c address
:param register: Register to write to
:param value: Word value to transmit
"""
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_WORD_DATA
)
msg.data.contents.word = value
ioctl(self.fd, I2C_SMBUS, msg)
def read_i2c_block_data(self, i2c_addr, register, length):
# type: (int, int, int) -> list
"""
Read a block of byte data from a given register
:rtype: list
:param i2c_addr: i2c address
:param register: Start register
:param length: Desired block length
:return: List of bytes
"""
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Desired block length over %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.byte = length
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.block[1:length+1]
def write_i2c_block_data(self, i2c_addr, register, data):
# type: (int, int, list) -> None
"""
Write a block of byte data to a given register
:param i2c_addr: i2c address
:param register: Start register
:param data: List of bytes
"""
length = len(data)
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.block[0] = length
msg.data.contents.block[1:length + 1] = data
ioctl(self.fd, I2C_SMBUS, msg)
def i2c_rdwr(self, *i2c_msgs):
# type: (i2c_msg) -> None
"""
Combine a series of i2c read and write operations in a single
transaction (with repeted start bits but no stop bits in between).
This method takes i2c_msg instances as input, which must be created
first with i2c_msg.create_read() or i2c_msg.create_write().
:type i2c_msgs: i2c_msg
:param i2c_msgs: One or more i2c_msg class instances.
:return: None
"""
ioctl_data = i2c_rdwr_ioctl_data.create(*i2c_msgs)
ioctl(self.fd, I2C_RDWR, ioctl_data)
class SMBusWrapper:
"""
Wrapper class around the SMBus. Enables the user to wrap access to
the SMBus class in a "with" statement. Will automatically close the SMBus handle upon
exit of the with block.
"""
def __init__(self, bus_number=0, auto_cleanup=True, force=False):
"""
:param auto_cleanup: Close bus when leaving scope.
:type auto_cleanup: Boolean
:param force: Force using the slave address even when driver is already using it.
:type force: Boolean
"""
self.bus_number = bus_number
self.auto_cleanup = auto_cleanup
self.force = force
def __enter__(self):
self.bus = SMBus(bus=self.bus_number, force=self.force)
return self.bus
def __exit__(self, exc_type, exc_val, exc_tb):
if self.auto_cleanup:
self.bus.close()

Binary file not shown.