import wave
import sys
import struct
import time
import subprocess
import threading
import traceback
import shlex
import os
import string
import random
import datetime as dt
import numpy as np
import scipy as sp
import scipy.special
from contextlib import closing
from argparse import ArgumentParser
from pyoperant import Error
try:
import simplejson as json
except ImportError:
import json
[docs]class NumpyAwareJSONEncoder(json.JSONEncoder):
""" this json encoder converts numpy arrays to lists so that json can write them.
example usage:
>>> import numpy as np
>>> dict_to_save = {'array': np.zeros((5,))}
>>> json.dumps(dict_to_save,
cls=NumpyAwareJSONEncoder
)
'{"array": [0.0, 0.0, 0.0, 0.0, 0.0]}'
"""
[docs] def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
# consider importing this from python-neo
[docs]class Event(object):
"""docstring for Event"""
def __init__(self, time=None, duration=None, label='', name=None, description=None, file_origin=None, *args, **kwargs):
super(Event, self).__init__()
self.time = time
self.duration = duration
self.label = label
self.name = name
self.description = description
self.file_origin = file_origin
self.annotations = {}
self.annotate(**kwargs)
[docs] def annotate(self,**kwargs):
self.annotations.update(kwargs)
[docs]class Stimulus(Event):
"""docstring for Stimulus"""
def __init__(self, *args, **kwargs):
super(Stimulus, self).__init__(*args, **kwargs)
if self.label=='':
self.label = 'stimulus'
[docs]class AuditoryStimulus(Stimulus):
"""docstring for AuditoryStimulus"""
def __init__(self, *args, **kwargs):
super(AuditoryStimulus, self).__init__(*args, **kwargs)
if self.label=='':
self.label = 'auditory_stimulus'
[docs]def run_state_machine(start_in='pre', error_state=None, error_callback=None, **state_functions):
"""runs a state machine defined by the keyword arguments
>>> def run_start():
>>> print "in 'run_start'"
>>> return 'next'
>>> def run_next():
>>> print "in 'run_next'"
>>> return None
>>> run_state_machine(start_in='start',
>>> start=run_start,
>>> next=run_next)
in 'run_start'
in 'run_next'
None
"""
# make sure the start state has a function to run
assert (start_in in state_functions.keys())
# make sure all of the arguments passed in are callable
for func in state_functions.values():
assert hasattr(func, '__call__')
state = start_in
while state is not None:
try:
state = state_functions[state]()
except Exception, e:
if error_callback:
error_callback(e)
raise
else:
raise
state = error_state
[docs]class Trial(Event):
"""docstring for Trial"""
def __init__(self,
index=None,
type_='normal',
class_=None,
*args, **kwargs):
super(Trial, self).__init__(*args, **kwargs)
self.label = 'trial'
self.session = None
self.index = index
self.type_ = type_
self.stimulus = None
self.class_ = class_
self.response = None
self.correct = None
self.rt = None
self.reward = False
self.punish = False
self.events = []
self.stim_event = None
[docs]class Command(object):
"""
Enables to run subprocess commands in a different thread with TIMEOUT option.
via https://gist.github.com/kirpit/1306188
Based on jcollado's solution:
http://stackoverflow.com/questions/1191374/subprocess-with-timeout/4825933#4825933
"""
command = None
process = None
status = None
output, error = '', ''
def __init__(self, command):
if isinstance(command, basestring):
command = shlex.split(command)
self.command = command
[docs] def run(self, timeout=None, **kwargs):
""" Run a command then return: (status, output, error). """
def target(**kwargs):
try:
self.process = subprocess.Popen(self.command, **kwargs)
self.output, self.error = self.process.communicate()
self.status = self.process.returncode
except:
self.error = traceback.format_exc()
self.status = -1
# default stdout and stderr
if 'stdout' not in kwargs:
kwargs['stdout'] = subprocess.PIPE
if 'stderr' not in kwargs:
kwargs['stderr'] = subprocess.PIPE
# thread
thread = threading.Thread(target=target, kwargs=kwargs)
thread.start()
thread.join(timeout)
if thread.is_alive():
self.process.terminate()
thread.join()
return self.status, self.output, self.error
[docs]def parse_commandline(arg_str=sys.argv[1:]):
""" parse command line arguments
note: optparse is depreciated w/ v2.7 in favor of argparse
"""
parser=ArgumentParser()
parser.add_argument('-B', '--box',
action='store', type=int, dest='box', required=False,
help='(int) box identifier')
parser.add_argument('-S', '--subject',
action='store', type=str, dest='subj', required=False,
help='subject ID and folder name')
parser.add_argument('-c','--config',
action='store', type=str, dest='config_file', default='config.json', required=True,
help='configuration file [default: %(default)s]')
args = parser.parse_args(arg_str)
return vars(args)
[docs]def check_cmdline_params(parameters, cmd_line):
# if someone is using red bands they should ammend the checks I perform here
allchars=string.maketrans('','')
nodigs=allchars.translate(allchars, string.digits)
if not ('box' not in cmd_line or cmd_line['box'] == int(parameters['panel_name'].encode('ascii','ignore').translate(allchars, nodigs))):
print "box number doesn't match config and command line"
return False
if not ('subj' not in cmd_line or int(cmd_line['subj'].encode('ascii','ignore').translate(allchars, nodigs)) == int(parameters['subject'].encode('ascii','ignore').translate(allchars, nodigs))):
print "subject number doesn't match config and command line"
return False
return True
[docs]def time_in_range(start, end, x):
"""Return true if x is in the range [start, end]"""
if start <= end:
return start <= x <= end
else:
return start <= x or x <= end
[docs]def is_day(latitude = '32.82', longitude = '-117.14'):
"""Is it daytime?
(lat,long) -- latitude and longitude of location to check (default is San Diego)
Returns True if it is daytime
"""
import ephem
obs = ephem.Observer()
obs.lat = latitude # San Diego, CA
obs.long = longitude
sun = ephem.Sun()
sun.compute()
next_sunrise = ephem.localtime(obs.next_rising(sun))
next_sunset = ephem.localtime(obs.next_setting(sun))
return next_sunset < next_sunrise
[docs]def check_time(schedule,fmt="%H:%M"):
""" determine whether trials should be done given the current time and the light schedule
returns Boolean if current time meets schedule
schedule='sun' will change lights according to local sunrise and sunset
schedule=[('07:00','17:00')] will have lights on between 7am and 5pm
schedule=[('06:00','12:00'),('18:00','24:00')] will have lights on between
"""
if schedule == 'sun':
if is_day():
return True
else:
for epoch in schedule:
assert len(epoch) is 2
now = dt.datetime.time(dt.datetime.now())
start = dt.datetime.time(dt.datetime.strptime(epoch[0],fmt))
end = dt.datetime.time(dt.datetime.strptime(epoch[1],fmt))
if time_in_range(start,end,now):
return True
return False
[docs]def wait(secs=1.0, final_countdown=0.0,waitfunc=None):
"""Smartly wait for a given time period.
secs -- total time to wait in seconds
final_countdown -- time at end of secs to wait and constantly poll the clock
waitfunc -- optional function to run in a loop during hogCPUperiod
If secs=1.0 and final_countdown=0.2 then for 0.8s python's time.sleep function will be used,
which is not especially precise, but allows the cpu to perform housekeeping. In
the final hogCPUsecs the more precise method of constantly polling the clock
is used for greater precision.
"""
#initial relaxed period, using sleep (better for system resources etc)
if secs > final_countdown:
time.sleep(secs-final_countdown)
secs = final_countdown # only this much is now left
#It's the Final Countdown!!
#hog the cpu, checking time
t0 = time.time()
while (time.time()-t0) < secs:
#let's see if any events were collected in meantime
try:
waitfunc()
except:
pass
[docs]def auditory_stim_from_wav(wav):
with closing(wave.open(wav,'rb')) as wf:
(nchannels, sampwidth, framerate, nframes, comptype, compname) = wf.getparams()
duration = float(nframes)/sampwidth
duration = duration * 2.0 / framerate
stim = AuditoryStimulus(time=0.0,
duration=duration,
name=wav,
label='wav',
description='',
file_origin=wav,
annotations={'nchannels': nchannels,
'sampwidth': sampwidth,
'framerate': framerate,
'nframes': nframes,
'comptype': comptype,
'compname': compname,
}
)
return stim
[docs]def concat_wav(input_file_list, output_filename='concat.wav'):
""" concat a set of wav files into a single wav file and return the output filename
takes in a tuple list of files and duration of pause after the file
input_file_list = [
('a.wav', 0.1),
('b.wav', 0.09),
('c.wav', 0.0),
]
returns a list of AuditoryStimulus objects
TODO: add checks for sampling rate, number of channels, etc.
"""
cursor = 0
epochs = [] # list of file epochs
audio_data = ''
with closing(wave.open(output_filename, 'wb')) as output:
for input_filename, isi in input_file_list:
# read in the wav file
with closing(wave.open(input_filename,'rb')) as wav_part:
try:
params = wav_part.getparams()
output.setparams(params)
fs = output.getframerate()
except: # TODO: what was I trying to except here? be more specific
pass
audio_frames = wav_part.readframes(wav_part.getnframes())
# append the audio data
audio_data += audio_frames
part_start = cursor
part_dur = len(audio_frames)/params[1]
epochs.append(AuditoryStimulus(time=float(part_start)/fs,
duration=float(part_dur)/fs,
name=input_filename,
file_origin=input_filename,
annotations=params,
label='motif'
))
cursor += part_dur # move cursor length of the duration
# add isi
if isi > 0.0:
isi_frames = ''.join([struct.pack('h', fr) for fr in [0]*int(fs*isi)])
audio_data += isi_frames
cursor += len(isi_frames)/params[1]
# concat all of the audio together and write to file
output.writeframes(audio_data)
description = 'concatenated on-the-fly'
concat_wav = AuditoryStimulus(time=0.0,
duration=epochs[-1].time+epochs[-1].duration,
name=output_filename,
label='wav',
description=description,
file_origin=output_filename,
annotations=output.getparams(),
)
return (concat_wav,epochs)
[docs]def get_num_open_fds():
'''
return the number of open file descriptors for current process
.. warning: will only work on UNIX-like os-es.
'''
pid = os.getpid()
procs = subprocess.check_output(
[ "lsof", '-w', '-Ff', "-p", str( pid ) ] )
nprocs = len(
filter(
lambda s: s and s[ 0 ] == 'f' and s[1: ].isdigit(),
procs.split( '\n' ) )
)
return nprocs
[docs]def rand_from_log_shape_dist(alpha=10):
"""
randomly samples from a distribution between 0 and 1 with pdf shaped like the log function
low probability of getting close to zero, increasing probability going towards 1
alpha determines how sharp the curve is, higher alpha, sharper curve.
"""
beta = (alpha + 1) * np.log(alpha + 1) - alpha
t = random.random()
ret = ((beta * t-1)/(sp.special.lambertw((beta*t-1)/np.e)) - 1) / alpha
return max(min(np.real(ret), 1), 0)