import datetime
from pyoperant import hwio, utils, ComponentError
import logging
logger = logging.getLogger()
[docs]class BaseComponent(object):
"""Base class for physcal component"""
def __init__(self, name=None, *args, **kwargs):
self.name = name
pass
## Hopper ##
[docs]class HopperActiveError(ComponentError):
"""raised when the hopper is up when it shouldn't be"""
pass
[docs]class HopperInactiveError(ComponentError):
"""raised when the hopper is down when it shouldn't be"""
pass
[docs]class HopperAlreadyUpError(HopperActiveError):
"""raised when the hopper is already up before it goes up"""
pass
[docs]class HopperWontComeUpError(HopperInactiveError):
"""raised when the hopper won't come up"""
pass
[docs]class HopperWontDropError(HopperActiveError):
"""raised when the hopper won't drop"""
pass
[docs]class Hopper(BaseComponent):
""" Class which holds information about a hopper
Parameters
----------
solenoid : `hwio.BooleanOutput`
output channel to activate the solenoid & raise the hopper
IR : :class:`hwio.BooleanInput`
input channel for the IR beam to check if the hopper is up
max_lag : float, optional
time in seconds to wait before checking to make sure the hopper is up (default=0.3)
Attributes
----------
solenoid : hwio.BooleanOutput
output channel to activate the solenoid & raise the hopper
IR : hwio.BooleanInput
input channel for the IR beam to check if the hopper is up
max_lag : float
time in seconds to wait before checking to make sure the hopper is up
"""
def __init__(self,IR,solenoid,max_lag=0.3, inverted=False,*args,**kwargs):
super(Hopper, self).__init__(*args,**kwargs)
self.max_lag = max_lag
if isinstance(IR,hwio.BooleanInput):
self.IR = IR
else:
raise ValueError('%s is not an input channel' % IR)
if isinstance(solenoid,hwio.BooleanOutput):
self.solenoid = solenoid
else:
raise ValueError('%s is not an output channel' % solenoid)
if inverted:
self.inverted=True
else:
self.inverted=False
[docs] def check(self):
"""reads the status of solenoid & IR beam, then throws an error if they don't match
Returns
-------
bool
True if the hopper is up.
Raises
------
HopperActiveError
The Hopper is up and it shouldn't be. (The IR beam is tripped, but the solenoid is not active.)
HopperInactiveError
The Hopper is down and it shouldn't be. (The IR beam is not tripped, but the solenoid is active.)
"""
return True
IR_status = self.IR.read()
if self.inverted:
IR_status = not IR_status
solenoid_status = self.solenoid.read()
if IR_status != solenoid_status:
if IR_status:
raise HopperActiveError
elif solenoid_status:
raise HopperInactiveError
else:
raise ComponentError("the IR & solenoid don't match: IR:%s,solenoid:%s" % (IR_status,solenoid_status))
else:
return IR_status
[docs] def up(self):
"""Raises the hopper up.
Returns
-------
bool
True if the hopper comes up.
Raises
------
HopperWontComeUpError
The Hopper did not raise.
"""
self.solenoid.write(True)
time_up = self.IR.poll(timeout=self.max_lag)
if time_up is None: # poll timed out
#self.solenoid.write(False)
raise HopperWontComeUpError
else:
return time_up
[docs] def down(self):
"""Lowers the hopper.
Returns
-------
bool
True if the hopper drops.
Raises
------
HopperWontDropError
The Hopper did not drop.
"""
self.solenoid.write(False)
time_down = datetime.datetime.now()
utils.wait(self.max_lag)
try:
self.check()
except HopperActiveError as e:
raise HopperWontDropError(e)
return time_down
[docs] def feed(self,dur=2.0,error_check=True):
"""Performs a feed
Parameters
---------
dur : float, optional
duration of feed in seconds
Returns
-------
(datetime, float)
Timestamp of the feed and the feed duration
Raises
------
HopperAlreadyUpError
The Hopper was already up at the beginning of the feed.
HopperWontComeUpError
The Hopper did not raise for the feed.
HopperWontDropError
The Hopper did not drop fater the feed.
"""
logger.debug("Feeding..")
assert self.max_lag < dur, "max_lag (%ss) must be shorter than duration (%ss)" % (self.max_lag,dur)
try:
self.check()
except HopperActiveError as e:
self.solenoid.write(False)
raise HopperAlreadyUpError(e)
feed_time = self.up()
utils.wait(dur)
feed_over = self.down()
feed_duration = feed_over - feed_time
return (feed_time,feed_duration)
[docs] def reward(self,value=2.0):
"""wrapper for `feed`, passes *value* into *dur* """
return self.feed(dur=value)
## Peck Port ##
[docs]class PeckPort(BaseComponent):
""" Class which holds information about peck ports
Parameters
----------
LED : hwio.BooleanOutput
output channel to activate the LED in the peck port
IR : hwio.BooleanInput
input channel for the IR beam to check for a peck
Attributes
----------
LED : hwio.BooleanOutput
output channel to activate the LED in the peck port
IR : hwio.BooleanInput
input channel for the IR beam to check for a peck
"""
def __init__(self,IR,LED, inverted=False,*args,**kwargs):
super(PeckPort, self).__init__(*args,**kwargs)
if isinstance(IR,hwio.BooleanInput):
self.IR = IR
else:
raise ValueError('%s is not an input channel' % IR)
if isinstance(LED,hwio.BooleanOutput):
self.LED = LED
self.LEDtype = "boolean"
if isinstance(LED, hwio.PWMOutput):
self.LED = LED
self.LEDtype = "pwm"
else:
raise ValueError('%s is not an output channel' % LED)
if inverted:
self.inverted=True
else:
self.inverted=False
[docs] def status(self):
"""reads the status of the IR beam
Returns
-------
bool
True if beam is broken
"""
if self.inverted:
return not self.IR.read()
return self.IR.read()
[docs] def off(self):
""" Turns the LED off
Returns
-------
bool
True if successful
"""
if self.LEDtype == "boolean":
self.LED.write(False)
else:
self.LED.write(0.0);
return True
[docs] def on(self, val=100.0):
"""Turns the LED on
Returns
-------
bool
True if successful
"""
if self.LEDtype == "boolean":
self.LED.write(True)
else:
self.LED.write(val);
return True
[docs] def flash(self,dur=1.0,isi=0.1):
"""Flashes the LED on and off with *isi* seconds high and low for *dur* seconds, then revert LED to prior state.
Parameters
----------
dur : float, optional
Duration of the light flash in seconds.
isi : float,optional
Time interval between toggles. (0.5 * period)
Returns
-------
(datetime, float)
Timestamp of the flash and the flash duration
"""
LED_state = self.LED.read()
flash_time = datetime.datetime.now()
flash_duration = datetime.datetime.now() - flash_time
while flash_duration < datetime.timedelta(seconds=dur):
self.LED.toggle()
utils.wait(isi)
flash_duration = datetime.datetime.now() - flash_time
self.LED.write(LED_state)
return (flash_time,flash_duration)
[docs] def poll(self,timeout=None):
""" Polls the peck port until there is a peck
Returns
-------
datetime
Timestamp of the IR beam being broken.
"""
return self.IR.poll(timeout)
## House Light ##
[docs]class HouseLight(BaseComponent):
""" Class which holds information about the house light
Keywords
--------
light : hwio.BooleanOutput
output channel to turn the light on and off
Methods:
on() --
off() --
timeout(dur) -- turns off the house light for 'dur' seconds (default=10.0)
punish() -- calls timeout() for 'value' as 'dur'
"""
def __init__(self,light,*args,**kwargs):
super(HouseLight, self).__init__(*args,**kwargs)
if isinstance(light,hwio.BooleanOutput):
self.light = light
else:
raise ValueError('%s is not an output channel' % light)
[docs] def off(self):
"""Turns the house light off.
Returns
-------
bool
True if successful.
"""
self.light.write(False)
return True
[docs] def on(self):
"""Turns the house light on.
Returns
-------
bool
True if successful.
"""
self.light.write(True)
return True
[docs] def timeout(self,dur=10.0):
"""Turn off the light for *dur* seconds
Keywords
-------
dur : float, optional
The amount of time (in seconds) to turn off the light.
Returns
-------
(datetime, float)
Timestamp of the timeout and the timeout duration
"""
timeout_time = datetime.datetime.now()
self.light.write(False)
utils.wait(dur)
timeout_duration = datetime.datetime.now() - timeout_time
self.light.write(True)
return (timeout_time,timeout_duration)
[docs] def punish(self,value=10.0):
"""Calls `timeout(dur)` with *value* as *dur* """
return self.timeout(dur=value)
## Cue Light ##
[docs]class RGBLight(BaseComponent):
""" Class which holds information about an RGB cue light
Keywords
--------
red : hwio.BooleanOutput
output channel for the red LED
green : hwio.BooleanOutput
output channel for the green LED
blue : hwio.BooleanOutput
output channel for the blue LED
"""
def __init__(self,red,green,blue,*args,**kwargs):
super(RGBLight, self).__init__(*args,**kwargs)
if isinstance(red,hwio.BooleanOutput):
self._red = red
else:
raise ValueError('%s is not an output channel' % red)
if isinstance(green,hwio.BooleanOutput):
self._green = green
else:
raise ValueError('%s is not an output channel' % green)
if isinstance(blue,hwio.BooleanOutput):
self._blue = blue
else:
raise ValueError('%s is not an output channel' % blue)
[docs] def red(self):
"""Turns the cue light to red
Returns
-------
bool
`True` if successful.
"""
self._green.write(False)
self._blue.write(False)
return self._red.write(True)
[docs] def green(self):
"""Turns the cue light to green
Returns
-------
bool
`True` if successful.
"""
self._red.write(False)
self._blue.write(False)
return self._green.write(True)
[docs] def blue(self):
"""Turns the cue light to blue
Returns
-------
bool
`True` if successful.
"""
self._red.write(False)
self._green.write(False)
return self._blue.write(True)
[docs] def off(self):
"""Turns the cue light off
Returns
-------
bool
`True` if successful.
"""
self._red.write(False)
self._green.write(False)
self._blue.write(False)
return True
## House Light ##
[docs]class LEDStripHouseLight(BaseComponent):
""" Class which holds information about the RGBW LED Strip PWM house light
Keywords
--------
light : hwio.PWMOutputs
[R, G, B, W]
output channels to turn the light on and off
Methods:
on() --
off() --
set_color() -- set the color
change_color -- sets color and turns on light
timeout(dur) -- turns off the house light for 'dur' seconds (default=10.0)
punish() -- calls timeout() for 'value' as 'dur'
"""
def __init__(self,lights,color=[100.0,100.0,100.0,100.0],*args,**kwargs):
super(LEDStripHouseLight, self).__init__(*args,**kwargs)
self.lights = []
for light in lights:
if isinstance(light,hwio.PWMOutput):
self.lights.append(light)
else:
raise ValueError('%s is not an output channel' % light)
self.color = color
[docs] def off(self):
"""Turns the house light off.
Returns
-------
bool
True if successful.
"""
for light in self.lights:
light.write(0.0)
return True
[docs] def on(self):
"""Turns the house light on.
Returns
-------
bool
True if successful.
"""
for ind in range(4):
self.lights[ind].write(self.color[ind])
return True
[docs] def timeout(self,dur=10.0):
"""Turn off the light for *dur* seconds
Keywords
-------
dur : float, optional
The amount of time (in seconds) to turn off the light.
Returns
-------
(datetime, float)
Timestamp of the timeout and the timeout duration
"""
timeout_time = datetime.datetime.now()
self.off()
utils.wait(dur)
timeout_duration = datetime.datetime.now() - timeout_time
self.on()
return (timeout_time,timeout_duration)
[docs] def punish(self,value=10.0):
"""Calls `timeout(dur)` with *value* as *dur* """
return self.timeout(dur=value)
[docs] def set_color(self, color):
self.color = color
[docs] def change_color(self, color):
self.color = color
self.on()
# ## Perch ##
# class Perch(BaseComponent):
# """Class which holds information about a perch
# Has parts:
# - IR Beam (input)
# - speaker
# """
# def __init__(self,*args,**kwargs):
# super(Perch, self).__init__(*args,**kwargs)