04-10-2024 09:30 AM
Hi all!
I am writing a code to control a signal using the NI-DAQmx Python API, to communicate with NI's USB-6314. I want to be able to write a signal (a sine wave for example), but, if needed, be able to stop the writing and start a new function (basically update a function being output on the fly).
To figure this out, I have tried implementing threading with the following code:
def test_stop(task):
print("stopping in 2 seconds")
time.sleep(2)
print("attempting to stop")
task.stop()
print("task has stopped")
device = "SimDev"
print("Signal parameters:\
\nAmplitude: {}\
\nFrequency: {}\
\nDuration: {}\
".format(util.ampl,
util.freq,
util.duration))
dt, signal = util.fct1(util.ampl, util.freq, util.duration, util.sampling_rate)
#WRITE in thread - STOP in main
start_time = time.time()
print("\n=================================\nSTOP in thread - WRITE in main")
with nidaqmx.Task() as task:
task.ao_channels.add_ao_voltage_chan(device+"/ao0")
task.timing.cfg_samp_clk_timing(util.sampling_rate)
task.start()
thread1 = threading.Thread(target=task.write, args=(signal,))
thread1.start()
print("thread start")
test_stop(task)
stop_time = time.time()
print("\nOverall time: {:.2f}s\
\nSignal duration: {}s\
".format(stop_time-start_time,util.duration))
With this code, I would expect the overall time to be 2 seconds (due to the time.sleep(2)), but I get 10 seconds (=util.duration), showing that the signal is not stopped mid-task.
Any help would be greatly appreciated!
04-10-2024 11:03 AM
What if you place the start_time before task.start()?
04-15-2024 01:43 AM
Hi! Thanks for the reply! Unfortunately it comes to the same. I would "simply" like to find a way to update a continuous signal on the fly. There must be something made for this, as it's not an unthinkable feature, but I just can't seem to find anything...
04-15-2024 07:27 AM
I am running it on a simulated device so it only takes 1 second, but it should only takes less than 3 seconds on an actual hardware.
import nidaqmx
import time
import numpy as np
cycles = 2 # how many sine cycles
resolution = 2000 # how many datapoints to generate
length = np.pi * 2 * cycles
my_wave = np.sin(np.arange(0, length, length / resolution))
with nidaqmx.Task() as task:
task.ao_channels.add_ao_voltage_chan('Dev1/ao0')
task.timing.cfg_samp_clk_timing(1000)
start_time = time.time()
print('1 Channel N Samples Write: ')
print(task.write(my_wave, auto_start=True))
task.wait_until_done()
task.stop()
stop_time = time.time()
print("\nOverall time: {:.2f}s\
".format(stop_time-start_time))
# task.ao_channels.add_ao_voltage_chan('Dev1/ao1:3')
# print('N Channel N Samples Write: ')
# print(task.write([[1.1, 2.2, 3.3], [1.1, 2.2, 4.4],
# [2.2, 3.3, 4.4], [2.2, 3.3, 4.4]],
# auto_start=True))
# task.wait_until_done()
# task.stop()