Task Development Guide
To create a task, create a Python file, and within it, a class with the task’s name,
inheriting functionality from the generic TaskBase class. This process is straightforward.
Let’s look at an example:
Let’s explore the file habituation.py inside your code folder. The structure of the
task is the following:
from village.classes.task_base import Event, Output, TaskBase
from village.manager import manager
class Habituation(TaskBase):
"""
This class defines the task.
Required methods to implement:
- __init__: Initialize the task
- start: Called when the task starts.
- create_trial: Called once per trial to create the state machine.
- after_trial: Called once after each trial to register the values in the .csv file.
- close: Called when the task is finished.
"""
def __init__(self):
"""
Initialize the training protocol. The text in the self.info variable
will be shown when the task is selected in the GUI to be run manually.
"""
super().__init__()
self.info = """
Habituation Task
-------------------
This task is a simple visual task where the mouse has
to poke in illuminated ports.
The center port illuminates when a trial starts.
After the center port is poked,
both side ports are illuminated and give reward.
"""
The task is named Habituation. It is initialized with init, and we acquire all the properties of the generic Task class using super().init(). The naming conventions follow Python standards: CamelCase for class names and lower_case for filenames and function names. Certain methods must be implemented in your class. These methods are:
The start() Method
def start(self):
"""
This function is called when the task starts.
It is used to calculate values needed for the task.
The following variables are accessible by default:
- self.bpod: (Bpod object)
- self.name: (str) the name of the task
(it is the name of the class, in this case Habituation)
- self.subject: (str) the name of the subject performing the task
- self.current_trial: (int) the current trial number starting from 1
- self.system_name: (str) the name of the system as defined in the
tab settings of the GUI
- self.settings: (Settings object) the settings defined in training_protocol.py
- self.trial_data: (dict) information about the current trial
- self.force_stop: (bool) if made true the task will stop
Al the variables created in training_protocol.py are accessible.
- self.settings.reward_amount_ml: reward volume
- self.settings.stage: current training stage
- self.settings.light_intensity_high: high light intensity
- self.settings.light_intensity_low: low light intensity
- self.settings.trial_types: possible trial types
- self.settings.punishment_time: punishment duration
- self.settings.iti_time: inter-trial interval
"""
# First we calculate the time that the valves (or pumps) need to open to deliver
# the reward amount
# Make sure to calibrate the valves before using this function, otherwise
# it will return an Exception
self.left_valve_opening_time = manager.water_calibration.get_valve_time(
port=1, volume=self.settings.reward_amount_ml
)
self.right_valve_opening_time = manager.water_calibration.get_valve_time(
port=3, volume=self.settings.reward_amount_ml
)
The create_trial() Method
def create_trial(self):
"""
This function is called once per trial, first it modifies variables and then
sends the state machine to the bpod that will run the trial.
"""
# 'ready_to_initiate': state that turns on the central port light and
# waits for a poke in the central port (Port2)
self.bpod.add_state(
state_name="ready_to_initiate",
state_timer=0,
state_change_conditions={Event.Port2In: "stimulus_state"},
output_actions=[(Output.PWM2, self.settings.light_intensity_high)],
)
# 'stimulus_state': state that turns on the side ports and
# waits for a poke in one of the side ports (Port1 or Port3)
self.bpod.add_state(
state_name="stimulus_state",
state_timer=0,
state_change_conditions={
Event.Port1In: "reward_state_left",
Event.Port3In: "reward_state_right",
},
output_actions=[
(Output.PWM1, self.settings.light_intensity_high),
(Output.PWM3, self.settings.light_intensity_high),
],
)
# 'reward_state_left' and 'reward_state_right': states that deliver the reward
self.bpod.add_state(
state_name="reward_state_left",
state_timer=self.left_valve_opening_time,
state_change_conditions={Event.Tup: "exit"},
output_actions=[Output.Valve1],
)
self.bpod.add_state(
state_name="reward_state_right",
state_timer=self.right_valve_opening_time,
state_change_conditions={Event.Tup: "exit"},
output_actions=[Output.Valve3],
)
The after_trial() Method
def after_trial(self):
"""
Here you can register all the values you need to save for each trial.
It is essential to always include a variable named water, which stores the
amount of water consumed during each trial.
The system will calculate the total water consumption in each session
by summing this variable.
If the total water consumption falls below a certain threshold,
an alarm will be triggered.
This threshold can be adjusted in the Settings tab of the GUI.
"""
self.register_value("water", self.settings.reward_amount_ml)
The close() Method
def close(self):
"""
Here you can perform any actions you want to take once the task is completed,
such as sending a message via email or Slack, creating a plot, and more.
"""
pass
The FollowTheLight Task
Now you can explore the more complex FollowTheLight Task, where we use other variables that were created in the settings training_protocol.
from village.classes.task import Event, Output, Task
from village.manager import manager
import random
class FollowTheLight(Task):
def __init__(self):
super().__init__()
self.info = """
Follow The Light Task
-------------------
This task is a simple visual task where the mouse has
to poke the center port to start a trial.
After the center port is poked,
one of the two side ports will be illuminated.
If the mouse licks the correct side port, it receives a reward.
If the mouse licks the wrong side port, it receives a punishment.
It contains 2 training stages:
- Training stage 1: Only one side port is illuminated and gives reward.
No punishment is given, and the mouse can choose again.
- Training stage 2: Both ports are illuminated with different intensity.
Brighter port gives reward, the other one gives punishment.
The progression through the stages is defined in the training_settings.py file.
"""
def start(self):
"""
Al the variables created in training_protocol.py are accessible.
- self.settings.reward_amount_ml: reward volume
- self.settings.stage: current training stage
- self.settings.light_intensity_high: high light intensity
- self.settings.light_intensity_low: low light intensity
- self.settings.trial_types: possible trial types
- self.settings.punishment_time: punishment duration
- self.settings.iti_time: inter-trial interval
"""
# First we calculate the time that the valves (or pumps) need to open to deliver
# the reward amount
# Make sure to calibrate the valves before using this function, otherwise
# it will return an Exception
self.left_valve_opening_time = manager.water_calibration.get_valve_time(
port=1, volume=self.settings.reward_amount_ml
)
self.right_valve_opening_time = manager.water_calibration.get_valve_time(
port=3, volume=self.settings.reward_amount_ml
)
# determine if punishment will be used depending on stage
if self.settings.stage == 1:
# no punishment is used, let the mouse choose again
self.punish_condition = "stimulus_state"
else:
# punishment is used
self.punish_condition = "punish_state"
def create_trial(self):
# Pick a trial type at random
self.this_trial_type = random.choice(self.settings.trial_types)
# Set the variables for the stimulus states and the possible choices
# based on the trial type
self.stimulus_state_output = []
if "left" in self.this_trial_type:
self.stimulus_state_output.append(
(Output.PWM1, self.settings.light_intensity_high)
)
if "hard" in self.this_trial_type:
self.stimulus_state_output.append(
(Output.PWM3, self.settings.light_intensity_low)
)
self.left_poke_action = "reward_state"
self.valve_to_open = Output.Valve1
self.valve_opening_time = self.left_valve_opening_time
self.right_poke_action = self.punish_condition
elif "right" in self.this_trial_type:
self.stimulus_state_output.append(
(Output.PWM3, self.settings.light_intensity_high)
)
if "hard" in self.this_trial_type:
self.stimulus_state_output.append(
(Output.PWM1, self.settings.light_intensity_low)
)
self.left_poke_action = self.punish_condition
self.right_poke_action = "reward_state"
self.valve_to_open = Output.Valve3
self.valve_opening_time = self.right_valve_opening_time
# 'ready_to_initiate' state that waits for the poke in the middle port
self.bpod.add_state(
state_name="ready_to_initiate",
state_timer=0,
state_change_conditions={Event.Port2In: "stimulus_state"},
output_actions=[(Output.PWM2, self.settings.light_intensity_high)],
)
# 'stimulus_state' lights the side ports
self.bpod.add_state(
state_name="stimulus_state",
state_timer=self.settings.timer_for_response,
state_change_conditions={
Event.Port1In: self.left_poke_action,
Event.Port3In: self.right_poke_action,
Event.Tup: "exit",
},
output_actions=self.stimulus_state_output,
)
# 'reward_state' delivers the reward
self.bpod.add_state(
state_name="reward_state",
state_timer=self.valve_opening_time,
state_change_conditions={Event.Tup: "iti_state"},
output_actions=[self.valve_to_open],
)
# 'punish_state' waits during the punishment time
self.bpod.add_state(
state_name="punish_state",
state_timer=self.settings.punishment_time,
state_change_conditions={Event.Tup: "iti_state"},
output_actions=[],
)
# 'iti_state' waits for certain time before starting the next trial
# (inter-trial interval)
self.bpod.add_state(
state_name="iti_state",
state_timer=self.settings.iti_time,
state_change_conditions={Event.Tup: "exit"},
output_actions=[],
)
def after_trial(self):
# First, we calculates the performance of a trial, comparing the trial type
# to the first port that the mouse poked.
# We can access the trial information in self.trial_data
# get the side port that the mouse poked first
first_poke = self.find_first_occurrence(
self.trial_data["ordered_list_of_events"],
["Port1In", "Port3In"],
)
# check if the mouse poked the correct port
if first_poke == "Port1In" and "left" in self.this_trial_type:
correct = True
elif first_poke == "Port3In" and "right" in self.this_trial_type:
correct = True
else:
correct = False
# register the amount of water given to the mouse in this trial
# (this is always mandatory)
self.register_value("water", self.settings.reward_amount_ml)
# we will also record the trial type
self.register_value("trial_type", self.this_trial_type)
# we will also record if the trial was correct or not
self.register_value("correct", correct)
def close(self):
pass
def find_first_occurrence(self, event_list, targets):
"""
Helper function to find the first occurrence of any target event in the list.
Args:
event_list: List of events
targets: List of target events to look for
Returns:
The first target event found, or "NaN" if none are found
"""
for event in event_list:
if event in targets:
return event
return "NaN"