diff --git a/Resources/Scripts/gha_unit_tests.patch b/Resources/Scripts/gha_unit_tests.patch index 6b3c7c073..dbf04431c 100644 --- a/Resources/Scripts/gha_unit_tests.patch +++ b/Resources/Scripts/gha_unit_tests.patch @@ -1,15 +1,16 @@ diff --git a/Tests/Processors/CMakeLists.txt b/Tests/Processors/CMakeLists.txt -index a89fa4da4..ab53e8d89 100644 +index 5c14d2a0e..7770875b8 100644 --- a/Tests/Processors/CMakeLists.txt +++ b/Tests/Processors/CMakeLists.txt -@@ -5,8 +5,8 @@ add_sources(${COMPONENT_NAME}_tests +@@ -5,9 +5,9 @@ add_sources(${COMPONENT_NAME}_tests DataBufferTests.cpp PluginManagerTests.cpp SourceNodeTests.cpp - RecordNodeTests.cpp ++ # RecordNodeTests.cpp + SIMDConverterTests.cpp - ProcessorGraphTests.cpp -+ #RecordNodeTests.cpp -+ #ProcessorGraphTests.cpp ++ # ProcessorGraphTests.cpp EventTests.cpp DataThreadTests.cpp GenericProcessorTests.cpp diff --git a/Source/Processors/AudioNode/AudioNode.h b/Source/Processors/AudioNode/AudioNode.h index 674c0ff54..13d5049ec 100755 --- a/Source/Processors/AudioNode/AudioNode.h +++ b/Source/Processors/AudioNode/AudioNode.h @@ -73,9 +73,6 @@ class AudioNode : public GenericProcessor /** Constructor */ AudioNode(); - /** Destructor */ - ~AudioNode() {} - /** Handle incoming data and decide which channels to monitor */ void process (AudioBuffer& buffer) override; diff --git a/Source/Processors/DataThreads/DataBuffer.cpp b/Source/Processors/DataThreads/DataBuffer.cpp index fc03172a5..43c8c668e 100755 --- a/Source/Processors/DataThreads/DataBuffer.cpp +++ b/Source/Processors/DataThreads/DataBuffer.cpp @@ -107,7 +107,7 @@ int DataBuffer::getNumSamples() const { return abstractFifo.getNumReady(); } int DataBuffer::readAllFromBuffer (AudioBuffer& data, int64* blockSampleNumber, - double* blockTimestamp, + double* blockTimestamps, uint64* eventCodes, int maxSize, int dstStartChannel, @@ -134,15 +134,15 @@ int DataBuffer::readAllFromBuffer (AudioBuffer& data, blockSize1); // numSamples } - memcpy (blockSampleNumber, sampleNumberBuffer + startIndex1, 8); - memcpy (blockTimestamp, timestampBuffer + startIndex1, 8); - memcpy (eventCodes, eventCodeBuffer + startIndex1, blockSize1 * 8); + memcpy (blockSampleNumber, sampleNumberBuffer + startIndex1, sizeof (int64)); + memcpy (blockTimestamps, timestampBuffer + startIndex1, (size_t) blockSize1 * sizeof (double)); + memcpy (eventCodes, eventCodeBuffer + startIndex1, (size_t) blockSize1 * sizeof (uint64)); } else { // std::cout << "NO SAMPLES" << std::endl; - memcpy (blockSampleNumber, &lastSampleNumber, 8); - memcpy (blockTimestamp, &lastTimestamp, 8); + memcpy (blockSampleNumber, &lastSampleNumber, sizeof (int64)); + memcpy (blockTimestamps, &lastTimestamp, sizeof (double)); } if (blockSize2 > 0) @@ -156,7 +156,8 @@ int DataBuffer::readAllFromBuffer (AudioBuffer& data, startIndex2, // sourceStartSample blockSize2); // numSamples } - memcpy (eventCodes + blockSize1, eventCodeBuffer + startIndex2, blockSize2 * 8); + memcpy (blockTimestamps + blockSize1, timestampBuffer + startIndex2, (size_t) blockSize2 * sizeof (double)); + memcpy (eventCodes + blockSize1, eventCodeBuffer + startIndex2, (size_t) blockSize2 * sizeof (uint64)); } // std::cout << "START SAMPLE FOR READ: " << *blockSampleNumber << std::endl; @@ -164,7 +165,7 @@ int DataBuffer::readAllFromBuffer (AudioBuffer& data, if (numItems > 0) { lastSampleNumber = *blockSampleNumber; - lastTimestamp = *blockTimestamp; + lastTimestamp = *blockTimestamps; // std::cout << "Updating last sample number: " << lastSampleNumber << std::endl; } diff --git a/Source/Processors/DataThreads/DataBuffer.h b/Source/Processors/DataThreads/DataBuffer.h index 7b0a8b727..51733726a 100755 --- a/Source/Processors/DataThreads/DataBuffer.h +++ b/Source/Processors/DataThreads/DataBuffer.h @@ -64,7 +64,11 @@ class PLUGIN_API DataBuffer /** Returns the number of samples currently available in the buffer.*/ int getNumSamples() const; - /** Copies as many samples as possible from the DataBuffer to an AudioBuffer.*/ + /** Copies as many samples as possible from the DataBuffer to an AudioBuffer. + + The first sample number is returned in `sampleNumbers[0]`, while `timestamps` + and `eventCodes` receive one value per copied sample. + */ int readAllFromBuffer (AudioBuffer& data, int64* sampleNumbers, double* timestamps, diff --git a/Source/Processors/Events/Event.cpp b/Source/Processors/Events/Event.cpp index 7bc4e4187..d7ea2e06b 100644 --- a/Source/Processors/Events/Event.cpp +++ b/Source/Processors/Events/Event.cpp @@ -218,6 +218,36 @@ size_t SystemEvent::fillTimestampAndSamplesData (HeapBlock& data, return eventSize; } +size_t SystemEvent::fillTimestampArrayData (HeapBlock& data, + const GenericProcessor* proc, + uint16 streamId, + int64 startSampleForBlock, + const double* timestamps, + uint32 nSamplesInBlock, + int64 processStartTime, + uint16 syncStreamId) +{ + const size_t timestampDataSize = (size_t) nSamplesInBlock * sizeof (double); + const size_t eventSize = EVENT_BASE_SIZE + 4 + 8 + timestampDataSize; + const double startTimestampForBlock = nSamplesInBlock > 0 && timestamps != nullptr ? timestamps[0] : -1.0; + + data.allocate (eventSize, true); + data[0] = SYSTEM_EVENT; + data[1] = TIMESTAMP_ARRAY; + *reinterpret_cast (data.getData() + 2) = proc->getNodeId(); + *reinterpret_cast (data.getData() + 4) = streamId; + *reinterpret_cast (data.getData() + 6) = syncStreamId; + *reinterpret_cast (data.getData() + 8) = startSampleForBlock; + *reinterpret_cast (data.getData() + 16) = startTimestampForBlock; + *reinterpret_cast (data.getData() + EVENT_BASE_SIZE) = nSamplesInBlock; + *reinterpret_cast (data.getData() + EVENT_BASE_SIZE + 4) = processStartTime; + + if (timestampDataSize > 0 && timestamps != nullptr) + memcpy (data.getData() + EVENT_BASE_SIZE + 12, timestamps, timestampDataSize); + + return eventSize; +} + size_t SystemEvent::fillTimestampSyncTextData ( HeapBlock& data, const GenericProcessor* proc, @@ -277,7 +307,12 @@ size_t SystemEvent::fillReferenceSampleEvent (HeapBlock& data, uint32 SystemEvent::getNumSamples (const EventPacket& packet) { - if (getBaseType (packet) != SYSTEM_EVENT && getSystemEventType (packet) != TIMESTAMP_AND_SAMPLES) + if (getBaseType (packet) != SYSTEM_EVENT) + return 0; + + Type type = getSystemEventType (packet); + + if (type != TIMESTAMP_AND_SAMPLES && type != TIMESTAMP_ARRAY) return 0; return *reinterpret_cast (packet.getRawData() + EVENT_BASE_SIZE); @@ -285,7 +320,12 @@ uint32 SystemEvent::getNumSamples (const EventPacket& packet) int64 SystemEvent::getHiResTicks (const EventPacket& packet) { - if (getBaseType (packet) != SYSTEM_EVENT && getSystemEventType (packet) != TIMESTAMP_AND_SAMPLES) + if (getBaseType (packet) != SYSTEM_EVENT) + return 0; + + Type type = getSystemEventType (packet); + + if (type != TIMESTAMP_AND_SAMPLES && type != TIMESTAMP_ARRAY) return 0; return *reinterpret_cast (packet.getRawData() + EVENT_BASE_SIZE + 4); diff --git a/Source/Processors/Events/Event.h b/Source/Processors/Events/Event.h index 7b8f0a858..0cb978ac2 100644 --- a/Source/Processors/Events/Event.h +++ b/Source/Processors/Events/Event.h @@ -220,7 +220,10 @@ class PLUGIN_API SystemEvent : public EventBase TIMESTAMP_SYNC_TEXT = 3, // Indicates reference sample information for each incoming data buffer - REFERENCE_SAMPLE = 4 + REFERENCE_SAMPLE = 4, + + // Per-sample timestamps for the current buffer + TIMESTAMP_ARRAY = 5 }; /* Create a TIMESTAMP_AND_SAMPLES event (used by processors that update timestamps) */ @@ -233,6 +236,16 @@ class PLUGIN_API SystemEvent : public EventBase int64 processStartTime, uint16 syncStreamId = 0); + /* Create a TIMESTAMP_ARRAY event (used by processors that provide per-sample timestamps) */ + static size_t fillTimestampArrayData (HeapBlock& data, + const GenericProcessor* proc, + uint16 streamId, + int64 startSampleForBlock, + const double* timestamps, + uint32 nSamplesInBlock, + int64 processStartTime, + uint16 syncStreamId = 0); + /* Create a TIMESTAMP_SYNC_TEXT event (used by Record Node) */ static size_t fillTimestampSyncTextData (HeapBlock& data, const GenericProcessor* proc, diff --git a/Source/Processors/GenericProcessor/GenericProcessor.cpp b/Source/Processors/GenericProcessor/GenericProcessor.cpp index 2a30667ab..66583fb13 100755 --- a/Source/Processors/GenericProcessor/GenericProcessor.cpp +++ b/Source/Processors/GenericProcessor/GenericProcessor.cpp @@ -46,6 +46,65 @@ #define MS_FROM_START Time::highResolutionTicksToSeconds (Time::getHighResolutionTicks() - start) * 1000 +namespace +{ +using TimestampArraysByStream = std::unordered_map>; + +class TimestampArrayRegistry +{ +public: + void clearProcessor (const GenericProcessor* processor) + { + arrays.erase (processor); + } + + void clearStream (const GenericProcessor* processor, uint16 streamId) + { + auto processorIt = arrays.find (processor); + + if (processorIt == arrays.end()) + return; + + processorIt->second.erase (streamId); + + if (processorIt->second.empty()) + arrays.erase (processorIt); + } + + void store (const GenericProcessor* processor, uint16 streamId, const double* timestamps, uint32 nSamples) + { + auto& streamTimestamps = arrays[processor][streamId]; + streamTimestamps.assign (timestamps, timestamps + nSamples); + } + + const std::vector* find (const GenericProcessor* processor, uint16 streamId) const + { + auto processorIt = arrays.find (processor); + + if (processorIt == arrays.end()) + return nullptr; + + auto streamIt = processorIt->second.find (streamId); + + if (streamIt == processorIt->second.end() || streamIt->second.empty()) + return nullptr; + + return &streamIt->second; + } + +private: + std::unordered_map arrays; +}; + +TimestampArrayRegistry& getTimestampArrayRegistry() +{ + static TimestampArrayRegistry registry; + return registry; +} + +constexpr int timestampArrayOffset = EVENT_BASE_SIZE + 12; +} // namespace + LatencyMeter::LatencyMeter (GenericProcessor* processor_) : processor (processor_), counter (0) @@ -141,6 +200,7 @@ GenericProcessor::GenericProcessor (const String& name, bool headlessMode_) GenericProcessor::~GenericProcessor() { + getTimestampArrayRegistry().clearProcessor (this); editor.reset(); // remove parameter editors before parameters dataStreamParameters.clear (true); @@ -663,6 +723,7 @@ void GenericProcessor::clearSettings() ttlEventChannel = nullptr; + getTimestampArrayRegistry().clearProcessor (this); startTimestampsForBlock.clear(); startSamplesForBlock.clear(); syncStreamIds.clear(); @@ -1256,12 +1317,46 @@ double GenericProcessor::getFirstTimestampForBlock (uint16 streamId) const return startTimestampsForBlock.at (streamId); } +const double* GenericProcessor::getTimestampsForBlock (uint16 streamId) const +{ + const std::vector* timestamps = getTimestampArrayRegistry().find (this, streamId); + + return timestamps != nullptr ? timestamps->data() : nullptr; +} + +bool GenericProcessor::getTimestampForSample (uint16 streamId, int64 sampleNumber, double& timestamp) const +{ + const std::vector* timestamps = getTimestampArrayRegistry().find (this, streamId); + + if (timestamps == nullptr) + return false; + + auto sampleIt = startSamplesForBlock.find (streamId); + auto countIt = numSamplesInBlock.find (streamId); + + if (sampleIt == startSamplesForBlock.end() || countIt == numSamplesInBlock.end()) + return false; + + if (sampleNumber < sampleIt->second) + return false; + + const uint32 sampleOffset = static_cast (sampleNumber - sampleIt->second); + + if (sampleOffset >= countIt->second || sampleOffset >= timestamps->size()) + return false; + + timestamp = (*timestamps)[sampleOffset]; + return true; +} + void GenericProcessor::setTimestampAndSamples (int64 sampleNumber, double timestamp, uint32 nSamples, uint16 streamId, uint16 syncStreamId) { + getTimestampArrayRegistry().clearStream (this, streamId); + HeapBlock data; size_t dataSize = SystemEvent::fillTimestampAndSamplesData (data, this, @@ -1278,7 +1373,39 @@ void GenericProcessor::setTimestampAndSamples (int64 sampleNumber, startTimestampsForBlock[streamId] = timestamp; startSamplesForBlock[streamId] = sampleNumber; syncStreamIds[streamId] = syncStreamId; + numSamplesInBlock[streamId] = nSamples; + processStartTimes[streamId] = m_initialProcessTime; +} + +void GenericProcessor::setTimestampArrayForBlock (int64 sampleNumber, + const double* timestamps, + uint32 nSamples, + uint16 streamId, + uint16 syncStreamId) +{ + getTimestampArrayRegistry().clearStream (this, streamId); + + if (timestamps == nullptr || nSamples == 0) + return; + + HeapBlock data; + size_t dataSize = SystemEvent::fillTimestampArrayData (data, + this, + streamId, + sampleNumber, + timestamps, + nSamples, + m_initialProcessTime, + syncStreamId); + + m_currentMidiBuffer->addEvent (data, int (dataSize), 0); + + startTimestampsForBlock[streamId] = timestamps[0]; + startSamplesForBlock[streamId] = sampleNumber; + syncStreamIds[streamId] = syncStreamId; + numSamplesInBlock[streamId] = nSamples; processStartTimes[streamId] = m_initialProcessTime; + getTimestampArrayRegistry().store (this, streamId, timestamps, nSamples); } int GenericProcessor::getGlobalChannelIndex (uint16 streamId, int localIndex) const @@ -1303,26 +1430,39 @@ int GenericProcessor::processEventBuffer() for (const auto meta : *m_currentMidiBuffer) { const uint8* dataptr = meta.data; + const Event::Type baseType = static_cast (*dataptr); - if (static_cast (*dataptr) == Event::Type::SYSTEM_EVENT - && static_cast (*(dataptr + 1) == SystemEvent::Type::TIMESTAMP_AND_SAMPLES)) + if (baseType == Event::Type::SYSTEM_EVENT) { - uint16 sourceProcessorId = *reinterpret_cast (dataptr + 2); - uint16 sourceStreamId = *reinterpret_cast (dataptr + 4); - uint16 syncStreamId = *reinterpret_cast (dataptr + 6); - - int64 startSample = *reinterpret_cast (dataptr + 8); - double startTimestamp = *reinterpret_cast (dataptr + 16); - uint32 nSamples = *reinterpret_cast (dataptr + 24); - int64 initialTicks = *reinterpret_cast (dataptr + 28); - - startSamplesForBlock[sourceStreamId] = startSample; - startTimestampsForBlock[sourceStreamId] = startTimestamp; - syncStreamIds[sourceStreamId] = syncStreamId; - numSamplesInBlock[sourceStreamId] = nSamples; - processStartTimes[sourceStreamId] = initialTicks; + const SystemEvent::Type systemEventType = static_cast (*(dataptr + 1)); + + if (systemEventType == SystemEvent::Type::TIMESTAMP_AND_SAMPLES + || systemEventType == SystemEvent::Type::TIMESTAMP_ARRAY) + { + uint16 sourceStreamId = *reinterpret_cast (dataptr + 4); + uint16 syncStreamId = *reinterpret_cast (dataptr + 6); + + int64 startSample = *reinterpret_cast (dataptr + 8); + double startTimestamp = *reinterpret_cast (dataptr + 16); + uint32 nSamples = *reinterpret_cast (dataptr + 24); + int64 initialTicks = *reinterpret_cast (dataptr + 28); + + getTimestampArrayRegistry().clearStream (this, sourceStreamId); + + startSamplesForBlock[sourceStreamId] = startSample; + startTimestampsForBlock[sourceStreamId] = startTimestamp; + syncStreamIds[sourceStreamId] = syncStreamId; + numSamplesInBlock[sourceStreamId] = nSamples; + processStartTimes[sourceStreamId] = initialTicks; + + if (systemEventType == SystemEvent::Type::TIMESTAMP_ARRAY && nSamples > 0) + { + const double* timestamps = reinterpret_cast (dataptr + timestampArrayOffset); + getTimestampArrayRegistry().store (this, sourceStreamId, timestamps, nSamples); + } + } } - else if (static_cast (*dataptr) == Event::Type::PROCESSOR_EVENT + else if (baseType == Event::Type::PROCESSOR_EVENT && static_cast (*(dataptr + 1) == EventChannel::Type::TTL)) { uint16 sourceStreamId = *reinterpret_cast (dataptr + 4); @@ -1334,7 +1474,7 @@ int GenericProcessor::processEventBuffer() getEditor()->setTTLState (sourceStreamId, eventBit, eventState); } } - else if (static_cast (*dataptr) == Event::Type::PROCESSOR_EVENT + else if (baseType == Event::Type::PROCESSOR_EVENT && static_cast (*(dataptr + 1) == EventChannel::Type::TEXT)) { TextEventPtr textEvent = TextEvent::deserialize (dataptr, getMessageChannel()); diff --git a/Source/Processors/GenericProcessor/GenericProcessor.h b/Source/Processors/GenericProcessor/GenericProcessor.h index 41ae2ae48..8bd6259ad 100755 --- a/Source/Processors/GenericProcessor/GenericProcessor.h +++ b/Source/Processors/GenericProcessor/GenericProcessor.h @@ -614,6 +614,12 @@ class PLUGIN_API GenericProcessor : public GenericProcessorBase, public PluginCl /** Used to get the current timestamp for a given stream.*/ double getFirstTimestampForBlock (uint16 streamId) const; + /** Returns per-sample timestamps for the current block, or nullptr if unavailable. */ + const double* getTimestampsForBlock (uint16 streamId) const; + + /** Resolves a sample number within the current block to a timestamp. */ + bool getTimestampForSample (uint16 streamId, int64 sampleNumber, double& timestamp) const; + /** Used to set the timestamp for a given buffer, for a given DataStream. */ void setTimestampAndSamples (int64 startSampleForBlock, double startTimestampForBlock, @@ -621,6 +627,13 @@ class PLUGIN_API GenericProcessor : public GenericProcessorBase, public PluginCl uint16 streamId, uint16 syncStreamId = 0); + /** Emits per-sample timestamps for the current buffer. */ + void setTimestampArrayForBlock (int64 startSampleForBlock, + const double* timestampsForBlock, + uint32 nSamples, + uint16 streamId, + uint16 syncStreamId = 0); + // -------------------------------------------- // CHANNEL INDEXING // -------------------------------------------- diff --git a/Source/Processors/RecordNode/DataQueue.cpp b/Source/Processors/RecordNode/DataQueue.cpp index 573b018de..dec2c6731 100644 --- a/Source/Processors/RecordNode/DataQueue.cpp +++ b/Source/Processors/RecordNode/DataQueue.cpp @@ -185,6 +185,30 @@ float DataQueue::writeSynchronizedTimestamps (double start, double step, int des return 1.0f - (float) m_FTSFifos[destChannel]->getFreeSpace() / (float) m_FTSFifos[destChannel]->getTotalSize(); } +float DataQueue::writeSynchronizedTimestamps (const double* timestamps, int destChannel, int nSamples) +{ + int index1, size1, index2, size2; + + m_FTSFifos[destChannel]->prepareToWrite (nSamples, index1, size1, index2, size2); + + if ((size1 + size2) < nSamples) + { + LOGE (__FUNCTION__, " Recording Data Queue Overflow: sz1: ", size1, " sz2: ", size2, " nSamples: ", nSamples); + } + + double* writePtr = m_FTSBuffer.getWritePointer (destChannel); + + if (size1 > 0) + memcpy (writePtr + index1, timestamps, (size_t) size1 * sizeof (double)); + + if (size2 > 0) + memcpy (writePtr + index2, timestamps + size1, (size_t) size2 * sizeof (double)); + + m_FTSFifos[destChannel]->finishedWrite (size1 + size2); + + return 1.0f - (float) m_FTSFifos[destChannel]->getFreeSpace() / (float) m_FTSFifos[destChannel]->getTotalSize(); +} + float DataQueue::writeChannel (const AudioBuffer& buffer, int srcChannel, int destChannel, diff --git a/Source/Processors/RecordNode/DataQueue.h b/Source/Processors/RecordNode/DataQueue.h index d02cd6392..4aa6cf3b7 100644 --- a/Source/Processors/RecordNode/DataQueue.h +++ b/Source/Processors/RecordNode/DataQueue.h @@ -88,6 +88,9 @@ class DataQueue /** Writes an array of timestamps for one stream */ float writeSynchronizedTimestamps (double start, double step, int destChannel, int nSamples); + /** Writes explicit per-sample timestamps for one stream */ + float writeSynchronizedTimestamps (const double* timestamps, int destChannel, int nSamples); + /** Returns the number of samples available to read (minimum across all channels) */ int getNumSamplesReady() const; diff --git a/Source/Processors/RecordNode/RecordNode.cpp b/Source/Processors/RecordNode/RecordNode.cpp index 5ea2af07c..4acbbe896 100755 --- a/Source/Processors/RecordNode/RecordNode.cpp +++ b/Source/Processors/RecordNode/RecordNode.cpp @@ -990,15 +990,7 @@ void RecordNode::handleTTLEvent (TTLEventPtr event) size_t size = event->getChannelInfo()->getDataSize() + event->getChannelInfo()->getTotalEventMetadataSize() + EVENT_BASE_SIZE; uint16 streamId = event->getStreamId(); - double ts = -1.0; - if (synchronizer.streamGeneratesTimestamps (streamKey)) - { - ts = getFirstTimestampForBlock (streamId) + (sampleNumber - getFirstSampleNumberForBlock (streamId)) / getDataStream (streamId)->getSampleRate(); - } - else - { - ts = synchronizer.convertSampleNumberToTimestamp (streamKey, sampleNumber); - } + double ts = resolveRecordedTimestamp (streamKey, streamId, sampleNumber); HeapBlock buffer (size); event->setTimestampInSeconds (ts); @@ -1020,7 +1012,7 @@ void RecordNode::handleEvent (const EventChannel* eventInfo, const EventPacket& String streamKey = getDataStream (eventInfo->getStreamId())->getKey(); - Event::setTimestampInSeconds (packet, synchronizer.convertSampleNumberToTimestamp (streamKey, sampleNumber)); + Event::setTimestampInSeconds (packet, resolveRecordedTimestamp (streamKey, eventInfo->getStreamId(), sampleNumber)); eventQueue->addEvent (packet, sampleNumber, eventIndex); } @@ -1037,15 +1029,7 @@ void RecordNode::handleSpike (SpikePtr spike) uint16 streamId = spike->getStreamId(); int64 sampleNumber = spike->getSampleNumber(); - double ts = -1.0; - if (synchronizer.streamGeneratesTimestamps (streamKey)) - { - ts = getFirstTimestampForBlock (streamId) + (sampleNumber - getFirstSampleNumberForBlock (streamId)) / getDataStream (streamId)->getSampleRate(); - } - else - { - ts = synchronizer.convertSampleNumberToTimestamp (streamKey, sampleNumber); - } + double ts = resolveRecordedTimestamp (streamKey, streamId, sampleNumber); spike->setTimestampInSeconds (ts); writeSpike (spike, spike->getChannelInfo()); @@ -1143,19 +1127,36 @@ void RecordNode::process (AudioBuffer& buffer) { first = synchronizer.convertSampleNumberToTimestamp (streamKey, sampleNumber); second = synchronizer.convertSampleNumberToTimestamp (streamKey, sampleNumber + 1); + + dataQueues[streamIndex]->writeSynchronizedTimestamps ( + first, + second - first, + 0, + numSamples); } else { - first = getFirstTimestampForBlock (streamId); - second = first + 1 / stream->getSampleRate(); + const double* blockTimestamps = getTimestampsForBlock (streamId); + + if (blockTimestamps != nullptr) + { + first = blockTimestamps[0]; + dataQueues[streamIndex]->writeSynchronizedTimestamps (blockTimestamps, 0, (int) numSamples); + } + else + { + first = getFirstTimestampForBlock (streamId); + second = first + 1 / stream->getSampleRate(); + + dataQueues[streamIndex]->writeSynchronizedTimestamps ( + first, + second - first, + 0, + numSamples); + } + synchronizer.setHardwareTimestamp (sampleNumber, first, streamKey); } - // Each per-stream queue has only 1 timestamp stream (index 0) - dataQueues[streamIndex]->writeSynchronizedTimestamps ( - first, - second - first, - 0, // timestamp stream index within this queue - numSamples); } if (numSamples > 0 && recordChanCount > 0 && streamSourceChannels[streamIndex] != nullptr) @@ -1210,6 +1211,22 @@ void RecordNode::process (AudioBuffer& buffer) } } +double RecordNode::resolveRecordedTimestamp (const String& streamKey, uint16 streamId, int64 sampleNumber) +{ + if (synchronizer.streamGeneratesTimestamps (streamKey)) + { + double timestamp = -1.0; + + if (getTimestampForSample (streamId, sampleNumber, timestamp)) + return timestamp; + + return getFirstTimestampForBlock (streamId) + + (sampleNumber - getFirstSampleNumberForBlock (streamId)) / getDataStream (streamId)->getSampleRate(); + } + + return synchronizer.convertSampleNumberToTimestamp (streamKey, sampleNumber); +} + // called in RecordNode::handleSpike void RecordNode::writeSpike (const Spike* spike, const SpikeChannel* spikeElectrode) { diff --git a/Source/Processors/RecordNode/RecordNode.h b/Source/Processors/RecordNode/RecordNode.h index 2af189dc6..f03ce75ae 100755 --- a/Source/Processors/RecordNode/RecordNode.h +++ b/Source/Processors/RecordNode/RecordNode.h @@ -256,6 +256,8 @@ class TESTABLE RecordNode : public GenericProcessor, static bool overrideTimestampWarningShown; private: + double resolveRecordedTimestamp (const String& streamKey, uint16 streamId, int64 sampleNumber); + /** Handles other types of events (text, sync texts, etc.) */ void handleEvent (const EventChannel* channel, const EventPacket& eventPacket); diff --git a/Source/Processors/SourceNode/SourceNode.cpp b/Source/Processors/SourceNode/SourceNode.cpp index 210c37edd..05e7341a3 100644 --- a/Source/Processors/SourceNode/SourceNode.cpp +++ b/Source/Processors/SourceNode/SourceNode.cpp @@ -80,6 +80,7 @@ DataThread* SourceNode::getThread() const void SourceNode::resizeBuffers() { inputBuffers.clear(); + timestampBuffers.clear(); eventCodeBuffers.clear(); eventStates.clear(); @@ -90,6 +91,7 @@ void SourceNode::resizeBuffers() for (int i = 0; i < dataStreams.size(); i++) { inputBuffers.add (dataThread->getBufferAddress (i)); + timestampBuffers.add (new MemoryBlock (10000 * sizeof (double))); eventCodeBuffers.add (new MemoryBlock (10000 * sizeof (uint64))); eventStates.add (0); } @@ -302,10 +304,12 @@ void SourceNode::process (AudioBuffer& buffer) for (int streamIdx = 0; streamIdx < inputBuffers.size(); streamIdx++) { int channelsToCopy = getNumOutputsForStream (streamIdx); + double* streamTimestamps = static_cast (timestampBuffers[streamIdx]->getData()); + uint16 streamId = dataStreams[streamIdx]->getStreamId(); int nSamples = inputBuffers[streamIdx]->readAllFromBuffer (buffer, &sampleNumber, - ×tamp, + streamTimestamps, static_cast (eventCodeBuffers[streamIdx]->getData()), buffer.getNumSamples(), copiedChannels, @@ -313,10 +317,15 @@ void SourceNode::process (AudioBuffer& buffer) copiedChannels += channelsToCopy; + timestamp = streamTimestamps[0]; + setTimestampAndSamples (sampleNumber, timestamp, nSamples, - dataStreams[streamIdx]->getStreamId()); + streamId); + + if (nSamples > 0 && streamTimestamps[0] >= 0.0f) + setTimestampArrayForBlock (sampleNumber, streamTimestamps, (uint32) nSamples, streamId); if (eventChannels[streamIdx]) { diff --git a/Source/Processors/SourceNode/SourceNode.h b/Source/Processors/SourceNode/SourceNode.h index 4fb04d68f..4944cc5f9 100755 --- a/Source/Processors/SourceNode/SourceNode.h +++ b/Source/Processors/SourceNode/SourceNode.h @@ -130,6 +130,7 @@ class PLUGIN_API SourceNode : public GenericProcessor, public Timer int64 sampleNumber = 0; double timestamp = -1.0; + OwnedArray timestampBuffers; OwnedArray eventCodeBuffers; Array eventStates; Array ttlChannels; diff --git a/Tests/Processors/DataBufferTests.cpp b/Tests/Processors/DataBufferTests.cpp index a0b5deb0a..69f47432d 100644 --- a/Tests/Processors/DataBufferTests.cpp +++ b/Tests/Processors/DataBufferTests.cpp @@ -41,4 +41,67 @@ TEST(DataBufferTest, CopyToAudioBuffer) for (int sample = 0; sample < audioBuffer.getNumSamples(); ++sample) EXPECT_EQ(audioBuffer.getSample(channel, sample), sample); } + + for (int sample = 0; sample < numItems; ++sample) + { + EXPECT_EQ(timestamps[sample], sample); + EXPECT_EQ(eventCodes[sample], (uint64) sample); + } +} + +TEST(DataBufferTest, CopyToAudioBufferAcrossWrap) +{ + constexpr int bufferSize = 11; + constexpr int firstWriteSize = 8; + constexpr int secondWriteSize = 6; + constexpr int firstReadSize = 5; + + DataBuffer dataBuffer (1, bufferSize); + + { + float data[firstWriteSize] = { 0, 1, 2, 3, 4, 5, 6, 7 }; + int64 sampleNumbers[firstWriteSize] = { 0, 1, 2, 3, 4, 5, 6, 7 }; + double timestamps[firstWriteSize] = { 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5 }; + uint64 eventCodes[firstWriteSize] = { 10, 11, 12, 13, 14, 15, 16, 17 }; + + EXPECT_EQ (dataBuffer.addToBuffer (data, sampleNumbers, timestamps, eventCodes, firstWriteSize), firstWriteSize); + } + + { + AudioBuffer discardBuffer (1, firstReadSize); + int64 sampleNumber = -1; + double timestamps[firstReadSize] = { -1, -1, -1, -1, -1 }; + uint64 eventCodes[firstReadSize] = { 0, 0, 0, 0, 0 }; + + EXPECT_EQ (dataBuffer.readAllFromBuffer (discardBuffer, &sampleNumber, timestamps, eventCodes, firstReadSize), firstReadSize); + } + + { + float data[secondWriteSize] = { 100, 101, 102, 103, 104, 105 }; + int64 sampleNumbers[secondWriteSize] = { 8, 9, 10, 11, 12, 13 }; + double timestamps[secondWriteSize] = { 4.0, 4.5, 5.0, 5.5, 6.0, 6.5 }; + uint64 eventCodes[secondWriteSize] = { 18, 19, 20, 21, 22, 23 }; + + EXPECT_EQ (dataBuffer.addToBuffer (data, sampleNumbers, timestamps, eventCodes, secondWriteSize), secondWriteSize); + } + + constexpr int expectedReadSize = firstWriteSize - firstReadSize + secondWriteSize; + AudioBuffer audioBuffer (1, expectedReadSize); + int64 startSampleNumber = -1; + double timestamps[expectedReadSize] = {}; + uint64 eventCodes[expectedReadSize] = {}; + + EXPECT_EQ (dataBuffer.readAllFromBuffer (audioBuffer, &startSampleNumber, timestamps, eventCodes, expectedReadSize), expectedReadSize); + EXPECT_EQ (startSampleNumber, 5); + + const float expectedSamples[expectedReadSize] = { 5, 6, 7, 100, 101, 102, 103, 104, 105 }; + const double expectedTimestamps[expectedReadSize] = { 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5 }; + const uint64 expectedEventCodes[expectedReadSize] = { 15, 16, 17, 18, 19, 20, 21, 22, 23 }; + + for (int sample = 0; sample < expectedReadSize; ++sample) + { + EXPECT_EQ (audioBuffer.getSample (0, sample), expectedSamples[sample]); + EXPECT_EQ (timestamps[sample], expectedTimestamps[sample]); + EXPECT_EQ (eventCodes[sample], expectedEventCodes[sample]); + } } \ No newline at end of file diff --git a/Tests/Processors/RecordNodeTests.cpp b/Tests/Processors/RecordNodeTests.cpp index e4e0ceb70..f313b167f 100644 --- a/Tests/Processors/RecordNodeTests.cpp +++ b/Tests/Processors/RecordNodeTests.cpp @@ -23,7 +23,8 @@ class RecordNodeTests : public testing::Test { numChannels, sampleRate, bitVolts - })); + }), + ProcessorTesterMode::FullApp); parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_tests"; if (std::filesystem::exists(parentRecordingDir)) { @@ -56,8 +57,8 @@ class RecordNodeTests : public testing::Test { return inputBuffer; } - void writeBlock(AudioBuffer &buffer, TTLEvent* maybeTtlEvent = nullptr) { - auto outBuffer = tester->processBlock(processor, buffer, maybeTtlEvent); + void writeBlock(AudioBuffer &buffer, TTLEvent* maybeTtlEvent = nullptr, const double* sampleTimestamps = nullptr) { + auto outBuffer = tester->processBlock(processor, buffer, maybeTtlEvent, sampleTimestamps); // Assert the buffer hasn't changed after process() ASSERT_EQ(outBuffer.getNumSamples(), buffer.getNumSamples()); ASSERT_EQ(outBuffer.getNumChannels(), buffer.getNumChannels()); @@ -179,6 +180,55 @@ class RecordNodeTests : public testing::Test { *output = loadNpyFileBinaryFullpath(npyFilePath.string()); } + template + std::vector parseNpyPayload(const std::vector& binary) { + if (binary.size() < static_cast(10)) { + ADD_FAILURE() << "NPY payload is too small to contain a valid header"; + return {}; + } + + const uint16_t headerLength = static_cast(binary[8]) + | (static_cast(static_cast(binary[9])) << 8); + const size_t payloadOffset = 10 + headerLength; + + if (payloadOffset > binary.size()) { + ADD_FAILURE() << "NPY payload header exceeds file size"; + return {}; + } + + if (((binary.size() - payloadOffset) % sizeof(T)) != static_cast(0)) { + ADD_FAILURE() << "NPY payload size is not aligned to element size"; + return {}; + } + + std::vector values((binary.size() - payloadOffset) / sizeof(T)); + + if (!values.empty()) { + memcpy(values.data(), binary.data() + payloadOffset, values.size() * sizeof(T)); + } + + return values; + } + + template + std::vector loadNpyData(const std::string& basename) { + bool success = false; + std::vector binary; + loadNpyFileBinary(basename, &binary, &success); + + if (!success) { + ADD_FAILURE() << "Failed to load NPY file: " << basename; + return {}; + } + + return parseNpyPayload(binary); + } + + template + std::vector loadNpyDataFullpath(const std::filesystem::path& path) { + return parseNpyPayload(loadNpyFileBinaryFullpath(path.string())); + } + void compareBinaryFilesHex(const std::string& filename, const std::vector binData, const std::string& expectedBinDataHex) { std::vector expectedBinData; @@ -223,6 +273,33 @@ class RecordNodeTests : public testing::Test { float sampleRate = 1.0; }; +class HardwareSynced_RecordNodeTests : public RecordNodeTests { +protected: + void SetUp() override { + sampleRate = 100.0f; + numChannels = 8; + tester = std::make_unique(TestSourceNodeBuilder + (FakeSourceNodeParams{ + numChannels, + sampleRate, + bitVolts, + 1, + 0, + true + }), + ProcessorTesterMode::FullApp); + + parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_hardware_sync_tests"; + if (std::filesystem::exists(parentRecordingDir)) { + std::filesystem::remove_all(parentRecordingDir); + } + std::filesystem::create_directory(parentRecordingDir); + + tester->setRecordingParentDirectory(parentRecordingDir.string()); + processor = tester->createProcessor(Plugin::Processor::RECORD_NODE); + } +}; + TEST_F(RecordNodeTests, TestInputOutput_Continuous_Single) { int numSamples = 100; tester->startAcquisition(true); @@ -421,6 +498,32 @@ TEST_F(RecordNodeTests, Test_PersistsSampleNumbersAndTimestamps) { compareBinaryFilesHex("timestamps.npy", timeStampsBin, expectedTimeStampsHex); } +TEST_F(HardwareSynced_RecordNodeTests, Test_PersistsPerSampleHardwareTimestamps) { + tester->startAcquisition(true); + + const int numSamples = 5; + auto firstBuffer = createBuffer(1000.0f, 20.0f, numChannels, numSamples); + std::vector firstTimestamps { 10.000, 10.011, 10.021, 10.034, 10.048 }; + writeBlock(firstBuffer, nullptr, firstTimestamps.data()); + + auto secondBuffer = createBuffer(2000.0f, 20.0f, numChannels, numSamples); + std::vector secondTimestamps { 10.061, 10.073, 10.084, 10.098, 10.113 }; + writeBlock(secondBuffer, nullptr, secondTimestamps.data()); + + tester->stopAcquisition(); + + auto persistedTimestamps = loadNpyData("timestamps.npy"); + + std::vector expectedTimestamps = firstTimestamps; + expectedTimestamps.insert(expectedTimestamps.end(), secondTimestamps.begin(), secondTimestamps.end()); + + ASSERT_EQ(persistedTimestamps.size(), expectedTimestamps.size()); + + for (size_t index = 0; index < expectedTimestamps.size(); ++index) { + EXPECT_DOUBLE_EQ(persistedTimestamps[index], expectedTimestamps[index]); + } +} + TEST_F(RecordNodeTests, Test_PersistsStructureOeBin) { tester->startAcquisition(true); @@ -530,6 +633,37 @@ TEST_F(RecordNodeTests, Test_PersistsEvents) { compareBinaryFilesHex("full_words.npy", fullWordsBin, expectedFullWordsHex); } +TEST_F(HardwareSynced_RecordNodeTests, Test_PersistsHardwareEventTimestampFromBlockArray) { + processor->setRecordEvents(true); + processor->updateSettings(); + + tester->startAcquisition(true); + + const int numSamples = 5; + auto streamId = processor->getDataStreams()[0]->getStreamId(); + auto eventChannels = tester->getSourceNodeDataStream(streamId)->getEventChannels(); + ASSERT_GE(eventChannels.size(), 1); + + TTLEventPtr eventPtr = TTLEvent::createTTLEvent( + eventChannels[0], + 1, + 2, + true); + + auto inputBuffer = createBuffer(1000.0f, 20.0f, numChannels, numSamples); + std::vector blockTimestamps { 20.000, 20.031, 20.047, 20.062, 20.081 }; + writeBlock(inputBuffer, eventPtr.get(), blockTimestamps.data()); + + tester->stopAcquisition(); + + std::filesystem::path timestampPath; + ASSERT_TRUE(eventsPathFor("timestamps.npy", ×tampPath)); + auto eventTimestamps = loadNpyDataFullpath(timestampPath); + + ASSERT_EQ(eventTimestamps.size(), static_cast(1)); + EXPECT_DOUBLE_EQ(eventTimestamps[0], blockTimestamps[1]); +} + // ============================================================================ // SEQUENTIAL BLOCK FILE BATCH WRITE TESTS // ============================================================================ @@ -741,7 +875,8 @@ class SingleChannel_RecordNodeTests : public RecordNodeTests { numChannels, sampleRate, bitVolts - })); + }), + ProcessorTesterMode::FullApp); parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_single_ch_tests"; if (std::filesystem::exists(parentRecordingDir)) { @@ -829,7 +964,8 @@ class MultiStream_RecordNodeTests : public testing::Test { 30000, // sample rate 1.0f, // bitVolts 3 // streams - })); + }), + ProcessorTesterMode::FullApp); parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_multi_stream_tests"; if (std::filesystem::exists(parentRecordingDir)) { @@ -923,7 +1059,8 @@ class BufferResize_RecordNodeTests : public RecordNodeTests { numChannels, sampleRate, bitVolts - })); + }), + ProcessorTesterMode::FullApp); parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_buffer_resize_tests"; if (std::filesystem::exists(parentRecordingDir)) { @@ -988,7 +1125,8 @@ class ManyChannels_RecordNodeTests : public RecordNodeTests { numChannels, sampleRate, bitVolts - })); + }), + ProcessorTesterMode::FullApp); parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_many_ch_tests"; if (std::filesystem::exists(parentRecordingDir)) { diff --git a/Tests/TestHelpers/Processors/FakeSourceNode.cpp b/Tests/TestHelpers/Processors/FakeSourceNode.cpp index b945c01fb..2542a14b1 100644 --- a/Tests/TestHelpers/Processors/FakeSourceNode.cpp +++ b/Tests/TestHelpers/Processors/FakeSourceNode.cpp @@ -20,7 +20,8 @@ void FakeSourceNode::updateSettings() "FakeSourceNode" + String (i), "description", "identifier", - params.sampleRate + params.sampleRate, + params.generatesTimestamps }; cachedDataStreams.add (new DataStream (settings)); diff --git a/Tests/TestHelpers/Processors/FakeSourceNode.h b/Tests/TestHelpers/Processors/FakeSourceNode.h index 96cbb9e7b..4ba0fcc40 100644 --- a/Tests/TestHelpers/Processors/FakeSourceNode.h +++ b/Tests/TestHelpers/Processors/FakeSourceNode.h @@ -12,6 +12,7 @@ struct FakeSourceNodeParams float bitVolts = 1.0f; int streams = 1; uint32_t metadataSizeBytes = 0; + bool generatesTimestamps = false; }; class TESTABLE FakeSourceNode : public GenericProcessor diff --git a/Tests/TestHelpers/include/TestFixtures.h b/Tests/TestHelpers/include/TestFixtures.h index 0d217bc22..01e97428d 100644 --- a/Tests/TestHelpers/include/TestFixtures.h +++ b/Tests/TestHelpers/include/TestFixtures.h @@ -10,6 +10,7 @@ #include #include #include +#include enum class TestSourceNodeType { @@ -17,6 +18,12 @@ enum class TestSourceNodeType Base }; +enum class ProcessorTesterMode +{ + GraphOnly, + FullApp +}; + class TestSourceNodeBuilder { public: @@ -64,7 +71,8 @@ class TestSourceNodeBuilder class ProcessorTester { public: - ProcessorTester (TestSourceNodeBuilder sourceNodeBuilder) + ProcessorTester (TestSourceNodeBuilder sourceNodeBuilder, + ProcessorTesterMode mode = ProcessorTesterMode::GraphOnly) { // Singletons... MessageManager::deleteInstance(); @@ -79,10 +87,14 @@ class ProcessorTester customLookAndFeel = std::make_unique(); LookAndFeel::setDefaultLookAndFeel (customLookAndFeel.get()); - // All of these sets the global state in AccessClass in their constructors - audioComponent = std::make_unique(); processorGraph = std::make_unique (true); - controlPanel = std::make_unique (processorGraph.get(), audioComponent.get(), true); + + if (mode == ProcessorTesterMode::FullApp) + { + // These set global state in AccessClass and require a real audio-capable test environment. + audioComponent = std::make_unique(); + controlPanel = std::make_unique (processorGraph.get(), audioComponent.get(), true); + } SourceNode* snTemp = sourceNodeBuilder.buildSourceNode(); sourceNodeId = nextProcessorId++; @@ -98,12 +110,14 @@ class ProcessorTester sn->initialize (false); sn->setDestNode (nullptr); - controlPanel->updateRecordEngineList(); + if (controlPanel != nullptr) + controlPanel->updateRecordEngineList(); // Refresh everything processorGraph->updateSettings (sn); - controlPanel->colourChanged(); + if (controlPanel != nullptr) + controlPanel->colourChanged(); } virtual ~ProcessorTester() @@ -159,6 +173,9 @@ class ProcessorTester void startAcquisition (bool startRecording, bool forceRecording = false) { + if (controlPanel == nullptr) + throw std::logic_error ("ProcessorTester::startAcquisition requires FullApp mode"); + if (startRecording) { // Do it this way to ensure the GUI elements (which apparently control logic) are set properly @@ -196,6 +213,9 @@ class ProcessorTester void stopAcquisition() { + if (controlPanel == nullptr) + throw std::logic_error ("ProcessorTester::stopAcquisition requires FullApp mode"); + controlPanel->stopAcquisition(); } @@ -230,27 +250,43 @@ class ProcessorTester AudioBuffer processBlock ( GenericProcessor* processor, const AudioBuffer& buffer, - TTLEvent* maybeTtlEvent = nullptr) + TTLEvent* maybeTtlEvent = nullptr, + const double* sampleTimestamps = nullptr) { auto audioProcessor = (AudioProcessor*) processor; auto dataStreams = processor->getDataStreams(); + auto* sourceProcessor = getSourceNode(); MidiBuffer eventBuffer; for (const auto* datastream : dataStreams) { HeapBlock data; auto streamId = datastream->getStreamId(); + double startTimestamp = sampleTimestamps != nullptr ? sampleTimestamps[0] : 0.0; size_t dataSize = SystemEvent::fillTimestampAndSamplesData ( data, - processor, + sourceProcessor, streamId, currentSampleIndex, - // NOTE: this timestamp is actually ignored in the current implementation? - 0, + startTimestamp, buffer.getNumSamples(), 0); eventBuffer.addEvent (data, dataSize, 0); + if (sampleTimestamps != nullptr) + { + HeapBlock timestampData; + size_t timestampDataSize = SystemEvent::fillTimestampArrayData ( + timestampData, + sourceProcessor, + streamId, + currentSampleIndex, + sampleTimestamps, + buffer.getNumSamples(), + 0); + eventBuffer.addEvent (timestampData, timestampDataSize, 0); + } + if (maybeTtlEvent != nullptr) { size_t ttlSize = maybeTtlEvent->getChannelInfo()->getDataSize() + maybeTtlEvent->getChannelInfo()->getTotalEventMetadataSize() + EVENT_BASE_SIZE; @@ -288,7 +324,7 @@ class DataThreadTester : public ProcessorTester { public: DataThreadTester (TestSourceNodeBuilder sourceNodeBuilder) : - ProcessorTester (sourceNodeBuilder) + ProcessorTester (sourceNodeBuilder, ProcessorTesterMode::GraphOnly) {} template <