Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Pablo Ortega
HYGRIP
Commits
cfdfc58b
Commit
cfdfc58b
authored
May 27, 2020
by
pablosmig
Browse files
HYGRIP cleaning
parent
9521579c
Changes
4
Expand all
Hide whitespace changes
Inline
Side-by-side
presentation.ipynb
View file @
cfdfc58b
This diff is collapsed.
Click to expand it.
This diff is collapsed.
Click to expand it.
utils/ica.py
View file @
cfdfc58b
...
...
@@ -3,10 +3,15 @@ import warnings
from
.pipelines
import
*
def
compute_ica
(
x
,
n_components
=
None
,
tol
=
1e-3
,
max_iter
=
500
):
""" Computes sources and mixing matrices.
args:
x : array, shape (num_samps, num_chans)
** thanks to ** scikit-learn.org
""" Computes sources and mixing matrices
** thanks to ** scikit-learn.org
:param x: (array) shape (num_samps, num_chans)
:param n_components: (int) number of component to use in decomposition
:param tol: (float) tolerance in convergence
:param max_iter: (int) max number of iterations
:return: (tuple) sources, mixing, unmixing mean and bool for convergence
"""
assert
len
(
x
.
shape
)
==
2
,
f
"x is
{
len
(
x
.
shape
)
}
D but only 2D is accepted"
assert
x
.
shape
[
0
]
>
x
.
shape
[
1
],
f
"x shape is
{
x
.
shape
}
but time axis should be last"
...
...
@@ -31,10 +36,11 @@ def compute_ica(x, n_components=None, tol=1e-3, max_iter=500):
def
align
(
x
,
y
,
fs
,
threshold
=
1
):
""" Aligns to signals with same sfreq in time
args:
x : array, size (tsamps,)
y : array, size (tsamps',)
threshold : float, maxmimum delay possible
:param x: (array) shape (tsamps,)
:param y: (array) shape (tsamps',)
:param fs: (float) sampling frequency [Hz]
:param threshold: (float) maxmimum delay possible in [s]
:return:
"""
assert
len
(
x
.
shape
)
==
1
and
len
(
y
.
shape
)
==
1
,
\
f
"align only accepts 1D vectors but got
{
len
(
x
.
shape
)
}
and
{
len
(
y
.
shape
)
}
"
...
...
@@ -54,11 +60,15 @@ def align(x, y, fs, threshold=1):
def
reject
(
S_
,
ref
,
max_num
,
threshold
,
fs
,
_debug
=
False
):
""" Find estimated sources to reject based on `ref` signals. Signals should be high pass filtered.
Rejects the max_num of sources that are most correlated to any of the ref signals.
args:
S_ : array, shape (num_samps, num_comps)
ref: array, shape (num_samps')
""" Find estimated sources to reject based on `ref` signals. Signals should be high pass filtered
Rejects the max_num of sources that are most correlated to any of the ref signals
:param S_: (array) sources of shape (num_samps, num_comps)
:param ref: (array) confound reference of shape (num_samps')
:param max_num: (int) maximum number of source to remove
:param threshold: (float) correlation with reference threshold above which accept rejection
:param fs: (float) sampling frequency [Hz]
:param _debug: (bool) If `True` plots sources and reference
:return: (tuple) list of rejected sources and correlations of sources with references
"""
assert
len
(
S_
.
shape
)
==
2
,
f
"`S_` should be 2D array but found
{
len
(
S_
.
shape
)
}
dims"
assert
len
(
ref
.
shape
)
==
2
,
f
"`ref` should be 2D array but found
{
len
(
ref
.
shape
)
}
dims"
...
...
@@ -99,15 +109,13 @@ def reject(S_, ref, max_num, threshold, fs, _debug=False):
def
reconstruct
(
x
,
M_
,
U_
,
mean_
,
reject
):
""" Reconstruct original signal without rejected estimated sources
Data should not be standardised since U_/M_ already whiten/color the signals
args:
X : original signal, array (num_samps, num_chans)
U_ : unmixing matrix, array (num_comps, num_chans)
M_ : array, shape (num_chans, num_comps)
pinv(np.dot(unmixing_matrix, self.whitening_))
mean_ : array, (num_chans,)
reject : list of int
components to zero in the recovered sources
Data doesn't need to be standardised since U_/M_ already whiten/color the signals
:param x: (array) original signal of shape (num_samps, num_chans)
:param M_: (array) mixing matrix of shape (num_chans, num_comps)
:param U_: (array) unmixing matrix of shape (num_comps, num_chans) == pinv(np.dot(unmixing_matrix, self.whitening_))
:param mean_: (array) means for coloring, shape (num_chans,)
:param reject: (list) components to zero in the recovered sources
:return: (array) reconstructed signals
"""
assert
type
(
reject
)
==
list
,
\
f
"rejected_components is type
{
type
(
reject
)
}
but can only be list"
...
...
@@ -122,10 +130,11 @@ def reconstruct(x, M_, U_, mean_, reject):
class
IcaReconstructor
(
object
):
""" ICA gives better results when there are more samples. The `gathered` mode concatenates samples per channel.
On the other if there are many sources along a recording this might be less well captured. If we guess there are
different sources for different conditions or at different recording times selecting the time-periods (`tlim`) to
apply the ICA to might improve results.
""" Pipeline process
ICA gives better results when there are more samples. The `gathered` mode concatenates samples per channel.
On the other if there are many sources along a recording this might be less well captured. If we guess there are
different sources for different conditions or at different recording times selecting the time-periods (`tlim`) to
apply the ICA to might improve results.
"""
def
__init__
(
self
,
fs
,
n_components
=
None
,
max_components
=
2
,
threshold
=
0.3
,
tolerance
=
1e-2
,
max_iter
=
500
,
...
...
@@ -163,8 +172,6 @@ class IcaReconstructor(object):
f
" mode=
{
self
.
_mode
}
, standarize=
{
self
.
_stdze
}
)
\n
"
def
__call__
(
self
,
x
,
evts
,
fs
,
**
kwargs
):
"""
"""
assert
"ref"
in
kwargs
,
\
'IcaReconstructor requires a reference signal with key
\"
ref
\"
'
assert
"ref_fs"
in
kwargs
,
\
...
...
utils/pipelines.py
View file @
cfdfc58b
...
...
@@ -4,7 +4,35 @@ import multiprocessing
from
joblib
import
Parallel
,
delayed
def
get_task
(
force
,
evts
,
fs
,
tup
=
1.55
,
tdown
=
0.55
,
repeats
=
10
):
""" Builds task starting at times indicated in events
:param force: (array) force data
:param evts: (array) force events
:param fs: (float) force sampling frequency [Hz]
:param tup: (float) contraction time [s]
:param tdown: (float) relaxation time [s]
:param repeats: (int) number of repetitions per trial
:return: (array) task
"""
task
=
np
.
zeros
((
1
,
force
.
shape
[
-
1
]))
trial
=
np
.
concatenate
([.
5
*
np
.
ones
((
1
,
int
(
tup
*
fs
))),
np
.
zeros
((
1
,
int
(
tdown
*
fs
)))],
-
1
)
trial
=
[
trial
for
i
in
range
(
repeats
)]
trial
=
np
.
concatenate
(
trial
,
-
1
)
n
=
trial
.
shape
[
-
1
]
times
=
np
.
arange
(
force
.
shape
[
-
1
])
/
fs
for
i
in
range
(
evts
.
shape
[
0
]):
if
evts
[
i
,
1
]
!=
-
1
:
i0
=
np
.
argmin
(
np
.
abs
(
times
-
evts
[
i
,
0
]))
task
[:,
i0
:
i0
+
n
]
=
trial
return
task
def
primes
(
n
,
threshold
):
""" Decomponses an integer in prime numbers
:param n: (int) integer to decompose
:param threshold: (int) maximum integer for decomposition
:return: (list) prime factors
"""
primfac
=
[]
d
=
2
while
d
*
d
<=
n
:
...
...
@@ -23,6 +51,12 @@ def primes(n, threshold):
def
decimate
(
x
,
in_fs
,
out_fs
,
axis
=-
1
,
threshold
=
13
):
""" Decimates signal to out_fs avoiding downsampling factors greater than `thres`
:param x: (array) signal to decimate
:param in_fs: (float) input signal sampling frequency [Hz]
:param out_fs: (float) desired output signal sampling frequency [Hz]
:param axis: (int) axis along which decimate the signal. Last axis by default.
:param threshold: (int) maximum decimation factor
:return: (array) decimated signal
"""
if
in_fs
!=
out_fs
:
assert
in_fs
%
out_fs
==
0
,
\
...
...
@@ -38,6 +72,8 @@ def decimate(x, in_fs, out_fs, axis=-1, threshold=13):
class
Pipeline
(
object
):
""" Class storing the list of processes defining the pipeline and applying them when called
"""
def
__init__
(
self
,
processes
,
name
=
""
):
self
.
_name
=
name
if
len
(
processes
)
==
0
:
...
...
@@ -61,8 +97,11 @@ class Pipeline(object):
return
x
,
evts
,
fs
# Crop
class
EpochExtractor
(
object
):
""" Pipeline process
Extract the epochs indicated by events within `tbound` time boundaries
"""
def
__init__
(
self
,
tbound
):
self
.
_tbound
=
tbound
...
...
@@ -93,8 +132,11 @@ class EpochExtractor(object):
return
epochs
,
events
,
fs
# Gather by labels
class
LabelGatherer
(
object
):
""" Pipeline process
Gathers epochs by "left" and "right" hand labels in first axis of array
"""
def
__init__
(
self
,
labels
=
[
0
,
1
]):
self
.
_labels
=
labels
...
...
@@ -115,8 +157,11 @@ class LabelGatherer(object):
return
x
,
evts
,
fs
# Filter
class
Filter
(
object
):
""" Pipeline process
Instantiates filter and filters signal when called
"""
def
__init__
(
self
,
bands
,
fs
,
gpass
=
3
,
gstop
=
60
,
filttype
=
"butter"
):
self
.
_fs
=
fs
...
...
@@ -177,8 +222,11 @@ class Filter(object):
return
x
,
evts
,
fs
# Downsample to specified group fs
class
Downsampler
(
object
):
""" Pipeline process
Downsample to specified `out_fs` sampling frequency
"""
def
__init__
(
self
,
in_fs
,
out_fs
,
axis
=-
1
,
threshold
=
13
):
assert
in_fs
%
out_fs
==
0
,
\
f
"input sfreq (
{
in_fs
}
Hz) should be divisible by output sfreq (
{
out_fs
}
Hz)"
...
...
@@ -198,11 +246,12 @@ class Downsampler(object):
class
VCconverter
(
object
):
""" Converts force in V to Voluntary Contraction per unit.
Force in V needs to be filtered to remove offset.
Requires kwarg `mvc` with Maximum Voluntary Contraction in V.
""" Pipeline process
Converts force in V to Voluntary Contraction per unit (VC[pu])
Notes
- Force in V needs to be filtered to remove offset
- Requires kwarg `mvc` with Maximum Voluntary Contraction in V available in dataset
"""
def
__repr__
(
self
):
return
f
"VCconverter()
\n
"
...
...
@@ -213,14 +262,16 @@ class VCconverter(object):
return
x
,
evts
,
fs
# Compute HbO, HbR
class
HbConverter
(
object
):
""" Converts light intensity in Oxy and Deoxy Hemoglobin concentration.
Parameters may be introduced as a list in the same order and units as specified in `ds.attrs["nirs_..."]`
or as text in the same format.
Compared to other processes, this one takes 2 signal inputs and returns...
""" Pipeline process
Converts light intensity in Oxy and Deoxy Hemoglobin concentration ([HbO] and [HbR])
Notes:
- Parameters may be introduced as a list in the same order and units as specified in `ds.attrs["nirs_..."]`
or as text in the same format
- Compared to other processes, this one takes 2 signal inputs and returns single array with concatenated
HbO and HbR measures
"""
def
__init__
(
self
,
DPF
,
SD
,
exc
,
tbound
=
None
):
self
.
_DPF
=
self
.
__parse__
(
DPF
)
self
.
_SD
=
self
.
__parse__
(
SD
)
...
...
@@ -276,6 +327,9 @@ class HbConverter(object):
class
Joiner
(
object
):
""" Pipeline process
Joins `x` input signal with additonal keyworded argument `y` signal along declared axis
"""
def
__init__
(
self
,
axis
):
self
.
_axis
=
axis
...
...
@@ -288,10 +342,11 @@ class Joiner(object):
return
x
,
evts
,
fs
# Correct Baseline
class
BaselineRemover
(
object
):
""" Removes baseline by substracting signal average between specified time bound `tbound`.
Bound is specified with time starting at 0 at the beginning of the signal to correct.
""" Pipeline process
Removes baseline by substracting signal average between specified time bound `tbound`
Bound is specified with time starting at 0 at the beginning of the signal to correct
"""
def
__init__
(
self
,
tbound
):
...
...
@@ -308,6 +363,9 @@ class BaselineRemover(object):
class
MeanReferencer
(
object
):
""" Pipeline process
Removes accross channels mean of a set of measures
"""
def
__init__
(
self
,
axis
=-
2
):
self
.
_axis
=
axis
...
...
@@ -323,6 +381,9 @@ class MeanReferencer(object):
class
StartEndCropper
(
object
):
""" Pipeline process
Crops a signal from the synchronised beginning to the synchronised end of the recording
"""
def
__repr__
(
self
):
return
f
"StartEndCropper()
\n
"
...
...
@@ -336,8 +397,12 @@ class StartEndCropper(object):
evts
[:,
0
]
-=
t_on
return
x
,
evts
,
fs
# Subject parallel processing
class
EegPipe
(
object
):
""" Superclass making parallel processing of a pipeline across subjects
To use as inspiration for subjects parallel processing
"""
def
__init__
(
self
,
dataset
,
eeg_fs
=
250
):
self
.
_num_cores
=
multiprocessing
.
cpu_count
()
pipe
=
[]
...
...
utils/plots.py
View file @
cfdfc58b
...
...
@@ -10,29 +10,21 @@ font = {
}
matplotlib
.
rc
(
'font'
,
**
font
)
def
plot_signal
(
x
,
fs
,
events
=
None
,
off
=
0
,
ax
=
None
,
figsize
=
(
18
,
9
)):
if
ax
is
None
:
fig
,
ax
=
plt
.
subplots
(
1
,
1
,
figsize
=
(
18
,
9
))
else
:
fig
=
plt
.
gcf
()
times
=
np
.
arange
(
x
.
shape
[
-
1
])
/
fs
offset
=
0
for
i
in
range
(
x
.
shape
[
0
]):
offset
+=
off
*
x
[
i
,
:].
min
()
ax
.
plot
(
times
,
x
[
i
,
:]
+
offset
)
offset
+=
off
*
x
[
i
,
:].
max
()
if
events
is
not
None
:
ylim
=
ax
.
get_ylim
()
for
i
in
range
(
events
.
shape
[
0
]):
ax
.
plot
([
events
[
i
,
0
],
events
[
i
,
0
]],
ylim
,
[
'r'
,
'b'
,
'--k'
][
int
(
events
[
i
,
1
])])
ax
.
grid
(
True
)
return
fig
,
ax
def
plot_raw
(
ds
,
sbj
,
xlim
=
None
,
measures
=
None
,
labels
=
None
,
ylims
=
None
):
def
plot_raw
(
ds
,
sbj
,
xlim
=
None
,
measures
=
None
,
labels
=
None
,
figsize
=
(
6
,
12
)):
""" Plots figure 2 A
:param ds: (h5py instance) dataset
:param sbj: (str) id of subject to plot
:param xlim: (list) x coordinates limits
:param measures: (list) strings of desired measures to plot
:param labels: (list) strings of ylabels for units
:param figsize: (tuple) size of figure
:return: fig and axes handles
"""
if
measures
is
None
:
measures
=
[
"
eeg
"
,
"
oxy
"
,
"
dxy
"
,
"e
m
g"
,
"
frc
"
,
"eog"
,
"brt"
,
"ecg"
]
measures
=
[
"
task
"
,
"
frc
"
,
"
emg
"
,
"e
e
g"
,
"
oxy"
,
"dxy
"
,
"eog"
,
"brt"
,
"ecg"
]
labels
=
{
"task"
:
"VC [pu]"
,
"eeg"
:
r
"EEG [$\mu$V]"
,
"oxy"
:
"HbO [mol]"
,
"dxy"
:
"HbR [mol]"
,
...
...
@@ -44,40 +36,68 @@ def plot_raw(ds, sbj, xlim=None, measures=None, labels=None, ylims=None):
}
cropper_
=
StartEndCropper
()
fig
,
ax
=
plt
.
subplots
(
len
(
measures
),
1
,
figsize
=
(
6
,
12
)
,
sharex
=
True
)
fig
,
ax
=
plt
.
subplots
(
len
(
measures
),
1
,
figsize
=
figsize
,
sharex
=
True
)
for
i
,
k
in
enumerate
(
measures
):
x
,
evts
,
fs
=
cropper_
(
ds
[
sbj
+
"/"
+
k
],
ds
[
sbj
+
"/"
+
k
].
attrs
[
'events'
],
ds
.
attrs
[
k
+
"_sfreq"
])
if
k
==
"emg"
:
x
=
x
[
0
]
if
k
==
"task"
:
x
,
evts
,
fs
=
cropper_
(
ds
[
sbj
+
"/frc"
],
ds
[
sbj
+
"/frc"
].
attrs
[
'events'
],
ds
.
attrs
[
"frc_sfreq"
])
x
=
get_task
(
x
,
evts
,
fs
)
else
:
x
=
x
[
0
]
x
,
evts
,
fs
=
cropper_
(
ds
[
sbj
+
"/"
+
k
],
ds
[
sbj
+
"/"
+
k
].
attrs
[
'events'
],
ds
.
attrs
[
k
+
"_sfreq"
])
times
=
np
.
arange
(
x
.
shape
[
-
1
])
/
fs
if
xlim
is
not
None
:
# Remove offset
if
xlim
is
not
None
and
k
not
in
[
"task"
]:
inds
=
[
np
.
argmin
(
np
.
abs
(
times
-
t
))
for
t
in
[
xlim
[
0
],
xlim
[
0
]
+
1
]]
x
-=
x
[...,
inds
[
0
]:
inds
[
1
]].
mean
(
-
1
,
keepdims
=
True
)
ax
[
i
].
plot
(
times
,
x
,
'k'
,
alpha
=
0.8
)
# Signal time selection
if
xlim
is
not
None
:
inds
=
[
np
.
argmin
(
np
.
abs
(
times
-
t
))
for
t
in
xlim
]
else
:
inds
=
[
0
,
-
1
]
cmap
=
plt
.
get_cmap
(
"flag"
)
norm
=
matplotlib
.
colors
.
Normalize
(
vmin
=
0
,
vmax
=
24
)
offset
,
ylims
=
0
,
[]
for
ch
in
range
(
x
.
shape
[
0
]):
if
ch
>
0
:
offset
-=
x
[
ch
,
inds
[
0
]:
inds
[
1
]].
min
()
else
:
ylims
.
append
(
x
[
ch
,
inds
[
0
]:
inds
[
1
]].
min
())
if
k
in
[
"eeg"
,
"oxy"
,
"dxy"
,
"emg"
]:
color
=
cmap
(
norm
(
ch
))
else
:
color
=
"k"
ax
[
i
].
plot
(
times
,
x
[
ch
]
+
offset
,
color
=
color
,
alpha
=
0.75
)
offset
+=
x
[
ch
,
inds
[
0
]:
inds
[
1
]].
max
()
ylims
.
append
(
offset
)
mn
,
mx
=
x
[...,
inds
[
0
]:
inds
[
1
]].
min
(),
x
[...,
inds
[
0
]:
inds
[
1
]].
max
()
for
j
in
range
(
evts
.
shape
[
0
]):
ev
=
evts
[
j
,
0
]
lb
=
int
(
evts
[
j
,
1
])
ax
[
i
].
plot
([
ev
,
ev
],
[
mn
,
mx
]
,
[
'r'
,
'b'
,
'--k'
][
lb
])
ax
[
i
].
set_ylim
(
mn
,
mx
)
ax
[
i
].
plot
([
ev
,
ev
],
ylims
,
[
'r'
,
'b'
,
'--k'
][
lb
])
ax
[
i
].
set_ylim
(
ylims
[
0
]
*
1.1
,
ylims
[
1
]
*
1.1
)
if
labels
is
not
None
:
ax
[
i
].
set_ylabel
(
labels
[
k
])
ax
[
i
].
ticklabel_format
(
axis
=
'y'
,
style
=
'sci'
,
scilimits
=
(
0
,
0
),
useOffset
=
True
)
if
xlim
is
not
None
:
ax
[
0
].
set_xlim
(
xlim
)
ax
[
-
1
].
set_xlabel
(
"t[s]"
)
for
px
in
ax
:
px
.
yaxis
.
tick_right
()
px
.
grid
(
True
)
return
fig
,
ax
def
make_scalp_grid
(
grid
,
figsize
=
(
14
,
8
)):
""" Returns 2D axes handles for NIRS & EEG layout
:param grid: (array) grid of layout
:param figsize: (tuple) figsize
:return: figure and axes handle
"""
n
,
m
=
grid
.
shape
fig
,
ax
=
plt
.
subplots
(
n
,
m
,
sharex
=
True
,
sharey
=
True
,
figsize
=
figsize
)
for
i
in
range
(
n
):
...
...
@@ -88,10 +108,19 @@ def make_scalp_grid(grid, figsize=(14, 8)):
def
plot_hb
(
nirs
,
evts
,
fs
,
grid
,
st_err
=
True
,
figsize
=
(
10
,
6
)):
""" Plots HbO and HbR in 2D layour
:param nirs: (array) nirs data
:param evts: (array) nirs events
:param fs: (float) samping frequency [Hz]
:param grid: (array) grid layout
:param st_err: (bool) plot standard error if `True` else standard deviation
:param figsize: (tuple) figure size
:return: figure and axes handle
"""
assert
len
(
nirs
.
shape
)
==
4
,
"Nirs needs to be gathered by trials."
fig
,
ax
=
make_scalp_grid
(
grid
,
figsize
=
figsize
)
colors
=
[[
'#4477AA'
,
'#117733'
],
[
'#DDCC77'
,
'#CC6677'
]]
# Compute mean and standard deviation (or standard error) ac
c
ross trials
# Compute mean and standard deviation (or standard error) across trials
m
,
s
=
1e6
*
nirs
.
mean
(
1
),
1e6
*
nirs
.
std
(
1
)
if
st_err
:
s
=
s
/
np
.
sqrt
(
nirs
.
shape
[
1
])
...
...
@@ -112,11 +141,21 @@ def plot_hb(nirs, evts, fs, grid, st_err=True, figsize=(10, 6)):
xlim
,
ylim
=
ax
[
r
,
c
].
get_xlim
(),
ax
[
r
,
c
].
get_ylim
()
ax
[
-
2
,
0
].
set_ylabel
(
r
"$\Delta Hb~[\mu mol]$"
)
ax
[
-
1
,
1
].
set_xlabel
(
"t [s]"
)
# fig.legend()
return
fig
,
ax
def
plot_emg_power
(
emg
,
evts
,
fs
,
face
=
"ant"
,
tbound
=
None
,
st_err
=
True
,
plotsd
=
False
,
figsize
=
(
10
,
6
)):
""" Computes and plots the power profile of EMG
:param emg: (array) emg data
:param evts: (array) emg events
:param fs: (float) sampling frequency [Hz]
:param face: (str) selects the EMG channels to plot. Default is "ant"erior channels
:param tbound: (list) selects time to plot
:param st_err: (bool) plot standard error if `True` else standard deviation
:param plotsd: (bool) plot or not standard error or deviation
:param figsize: (tuple) figure size
:return: figure and axes handles
"""
fig
,
ax
=
plt
.
subplots
(
2
,
2
,
sharex
=
True
,
sharey
=
True
,
figsize
=
figsize
)
times
=
np
.
arange
(
emg
.
shape
[
-
1
])
/
fs
-
evts
[
0
,
0
,
0
]
# Compute mean and standard deviation (or standard error) accross trials
...
...
@@ -143,7 +182,15 @@ def plot_emg_power(emg, evts, fs, face="ant", tbound=None, st_err=True, plotsd=F
return
fig
,
ax
def
plot_emg_amp
(
emg
,
evts
,
fs
,
face
=
"ant"
,
tbound
=
None
,
figsize
=
(
10
,
6
)):
def
plot_emg_amp
(
emg
,
evts
,
fs
,
face
=
"ant"
,
figsize
=
(
10
,
6
)):
""" Plots EMG amplitudes
:param emg: (array) emg data
:param evts: (array) emg events
:param fs: (float) sampling frequency [Hz]
:param face: (str) selects the EMG channels to plot. Default is "ant"erior channels
:param figsize: (tuple) figure size
:return: figure and axes handles
"""
fig
,
ax
=
plt
.
subplots
(
2
,
2
,
sharex
=
True
,
sharey
=
True
,
figsize
=
figsize
)
times
=
np
.
arange
(
emg
.
shape
[
-
1
])
/
fs
-
evts
[
0
,
0
,
0
]
f
=
0
if
face
==
"ant"
else
1
...
...
@@ -158,32 +205,14 @@ def plot_emg_amp(emg, evts, fs, face="ant", tbound=None, figsize=(10, 6)):
return
fig
,
ax
def
plot_eeg_times
(
eeg
,
evts
,
fs
,
tbound
=
None
,
off
=
6
,
figsize
=
(
13
,
9
)):
assert
len
(
eeg
.
shape
)
==
2
,
"This function only accepts non-epoched data"
fig
,
ax
=
plt
.
subplots
(
1
,
1
,
figsize
=
figsize
)
if
tbound
is
None
:
ind
=
[
0
,
-
1
]
else
:
ind
=
[
int
(
t
*
fs
)
for
t
in
tbound
]
times
=
np
.
arange
(
eeg
.
shape
[
-
1
])
/
fs
offset
,
off
=
0
,
off
*
eeg
.
std
()
for
ch
in
range
(
eeg
.
shape
[
0
]):
ax
.
plot
(
times
[
ind
[
0
]:
ind
[
1
]],
eeg
[
ch
,
ind
[
0
]:
ind
[
1
]]
+
offset
,
'k'
,
alpha
=
0.8
)
ax
.
text
(
times
[
ind
[
1
]],
eeg
[
ch
,
ind
[
1
]]
+
offset
,
str
(
ch
))
offset
+=
off
ylim
=
ax
.
get_ylim
()
for
i
in
range
(
evts
.
shape
[
0
]):
if
(
evts
[
i
,
0
]
>=
times
[
ind
[
0
]])
and
(
evts
[
i
,
0
]
<=
times
[
ind
[
1
]]):
ax
.
plot
([
evts
[
i
,
0
],
evts
[
i
,
0
]],
ylim
,
[
'r'
,
'b'
,
'--k'
][
int
(
evts
[
i
,
1
])])
return
fig
,
ax
def
plot_brt
(
brt
,
evts
,
fs
,
figsize
=
(
16
,
4
)):
""" Plots breath
:param brt: (array) breath data
:param evts: (array) events
:param fs: (float) sampling frequency
:param figsize: (tuple) figure size
:return: figure and axes handles
"""
fig
,
ax
=
plt
.
subplots
(
1
,
2
,
sharey
=
True
,
sharex
=
True
,
figsize
=
figsize
)
times
=
np
.
arange
(
brt
.
shape
[
-
1
])
/
fs
-
evts
[
0
,
0
,
0
]
for
i
in
range
(
brt
.
shape
[
0
]):
...
...
@@ -196,33 +225,58 @@ def plot_brt(brt, evts, fs, figsize=(16, 4)):
def
plot_ecg
(
ecg
,
evts
,
fs
,
figsize
=
(
16
,
4
)):
""" Plots ECG
:param ecg: (array) ECG data
:param evts: (array) events
:param fs: (float) sampling frequency
:param figsize: (tuple) figure size
:return: figure and axes handles
"""
fig
,
ax
=
plot_brt
(
ecg
,
evts
,
fs
,
figsize
=
figsize
)
ax
[
0
].
set_ylabel
(
"ECG [mV]"
)
return
fig
,
ax
def
plot_eog
(
eog
,
evts
,
fs
,
figsize
=
(
16
,
4
)):
""" Plots EOG
:param eog: (array) EOG data
:param evts: (array) events
:param fs: (float) sampling frequency
:param figsize: (tuple) figure size
:return: figure and axes handles
"""
fig
,
ax
=
plot_brt
(
eog
,
evts
,
fs
,
figsize
=
figsize
)
ax
[
0
].
set_ylabel
(
"EOG [mV]"
)
return
fig
,
ax
def
plot_force
(
force
,
evts
,
fs
,
perc
=
99.5
,
alpha
=
0.1
,
figsize
=
(
10
,
5
),
width_ratios
=
[
5
,
1
]):
def
plot_force
(
force
,
evts
,
fs
,
perc
=
95
,
alpha
=
0.1
,
figsize
=
(
10
,
5
),
width_ratios
=
[
5
,
1
],
plot_task
=
False
):
""" Plots force
:param force: (array) force data
:param evts: (array) events
:param fs: (float) sampling frequency [Hz]
:param perc: (float) percentile to get representative levels of force
:param alpha: (float) transparency of force trials
:param figsize: (tuple) figure size in inches
:param width_ratios: (list) ratio of widths for time plot versus violin plots
:param plot_task: (bool) plot task on top of produced forces
:return: figure and axes handles
"""
fig
,
ax
=
plt
.
subplots
(
2
,
2
,
figsize
=
figsize
,
gridspec_kw
=
{
"width_ratios"
:
width_ratios
})
times
=
np
.
arange
(
force
.
shape
[
-
1
])
/
fs
-
evts
[
0
,
0
,
0
]
for
i
in
range
(
force
.
shape
[
0
]):
vp
=
[]
# Plot delimiters for forces
for
j
,
y
in
enumerate
([
0.5
,
0.75
,
0.25
]):
ax
[
i
,
0
].
plot
([
-
6
,
26
],
[
y
,
y
],
[
'--k'
,
'k'
,
'--k'
][
i
],
alpha
=
0.9
)
for
j
,
y
in
enumerate
([
0.5
,
0.25
]):
ax
[
i
,
0
].
plot
([
-
6
,
26
],
[
y
,
y
],
[
'--k'
,
'--k'
][
j
],
alpha
=
0.9
)
# Plot forces and gather percentiles
for
j
in
range
(
force
.
shape
[
1
]):
ax
[
i
,
0
].
plot
(
times
,
force
[
i
,
j
,
0
].
T
,
'gray'
,
alpha
=
alpha
);
vp
.
append
(
np
.
percentile
(
force
[
i
,
j
,
0
],
perc
))
ax
[
i
,
0
].
set_ylabel
(
f
"
{
[
'Left'
,
'Right'
][
i
]
}
hand VC [pu]"
)
# Horizontal delimiters for violins
for
j
,
y
in
enumerate
([
0.5
,
0.75
,
0.25
]):
ax
[
i
,
1
].
plot
([.
75
,
1.25
],
[
y
,
y
],
[
'--k'
,
'k'
,
'--k'
][
i
],
alpha
=
0.9
)
for
j
,
y
in
enumerate
([
0.5
,
0.25
]):
ax
[
i
,
1
].
plot
([.
75
,
1.25
],
[
y
,
y
],
[
'--k'
,
'--k'
][
j
],
alpha
=
0.9
)
# Plot violins
ax
[
i
,
1
].
violinplot
(
vp
)
# Adjust axes
...
...
@@ -232,16 +286,30 @@ def plot_force(force, evts, fs, perc=99.5, alpha=0.1, figsize=(10, 5), width_rat
ax
[
i
,
1
].
grid
(
True
)
ax
[
i
,
1
].
set_xticklabels
([])
ax
[
i
,
1
].
set_yticklabels
([])
if
plot_task
:
x
=
get_task
(
force
,
evts
[
0
,
0
:
1
],
fs
)
for
i
in
range
(
2
):
ax
[
i
,
0
].
plot
(
times
,
x
[
0
],
"k"
)
ax
[
1
,
0
].
set_xlabel
(
"t [s]"
)
return
fig
,
ax
def
plot_eeg
(
eeg
,
evts
,
fs
,
grid
,
hand
,
tlim
=
None
,
flim
=
None
,
mode
=
"specgram"
,
figsize
=
(
16
,
8
),
**
kwargs
):
"""
mode:
1. specgram : plots spectrogram of average signal accross trials
2. average : plots average signal accross trials
3. trials : overlay all trials
""" Plot EEG in 2D scalp layout
:param eeg: (array) EEG data
:param evts: (array) events
:param fs: (float) sampling frequency