Skip to content

Commit

Permalink
small safety fixes, add save_raw_frames option
Browse files Browse the repository at this point in the history
  • Loading branch information
lrlunin committed Nov 8, 2024
1 parent d20ea9d commit 4c6d91f
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 159 deletions.
96 changes: 46 additions & 50 deletions tango-moenchzmq/src/backend/CPUComputationBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ CPUComputationBackend::CPUComputationBackend(FileWriter *fileWriter,
THREAD_AMOUNT(THREAD_AMOUNT) {
thread_pool = new boost::asio::thread_pool(THREAD_AMOUNT);
dispatcher_thread = std::thread(&CPUComputationBackend::dispatchTasks, this);
// initThreads();
resetAccumulators();
};

Expand All @@ -34,38 +33,10 @@ CPUComputationBackend::~CPUComputationBackend() {
destoy_dispatcher = true;
dispatcher_thread.join();
thread_pool->join();
// delete dispatcher_thread;
delete thread_pool;
// destroyThreads();
#ifdef SINGLE_FRAMES_DEBUG
delete[] pedestal_storage_ptr;
delete[] pedestal_rms_storage_ptr;
delete[] frame_classes_storage_ptr;
#endif
delete[] individual_analog_storage_ptr;
delete frameindex_storage_ptr;
deleteIndividualStorage();
};

void CPUComputationBackend::initThreads() {
/*
* Creation of kind of thread pool instead of spawning threads.
* Maybe can be done better with async and futures.
*/
for (unsigned int x = 0; x < THREAD_AMOUNT; x++) {
auto t = thread(&CPUComputationBackend::threadTask, this);
threads.push_back(move(t));
}
}

void CPUComputationBackend::destroyThreads() {
// send signal to threads to stop in threadTask loop
destroy_threads = true;
// before this object is destroyed, we need to join all threads
for (auto &thread : threads) {
thread.join();
}
}

void CPUComputationBackend::pause() { threads_sleep = true; }

void CPUComputationBackend::resume() {
Expand All @@ -74,27 +45,55 @@ void CPUComputationBackend::resume() {
}

void CPUComputationBackend::allocateIndividualStorage() {
delete[] frameindex_storage_ptr;
/*** To be sure that we don't delete the same storage pointer
* twice we delete the storage only here and allocate it
* only here too.
*
* So the individual storage pointers are garanteed to be
* either nullptr or valid allocation address.
*/
deleteIndividualStorage();
frameindex_storage_ptr = new int[individual_frame_buffer_capacity];
// fill with zeros, because we will take the max value later
std::fill(frameindex_storage_ptr,
frameindex_storage_ptr + individual_frame_buffer_capacity, 0);
delete[] individual_analog_storage_ptr;
individual_analog_storage_ptr
= new float[individual_frame_buffer_capacity * consts::LENGTH];
// need to consider the case if after the acquistion the
// save raw frames will be disabled -> the allocated memory will be wasted
if (saveRawFrames) {
individual_raw_storage_ptr
= new unsigned short[individual_frame_buffer_capacity
* consts::LENGTH];
}
#ifdef SINGLE_FRAMES_DEBUG
delete[] pedestal_storage_ptr;
pedestal_storage_ptr
= new float[individual_frame_buffer_capacity * consts::LENGTH];
delete[] pedestal_rms_storage_ptr;
pedestal_rms_storage_ptr
= new float[individual_frame_buffer_capacity * consts::LENGTH];
delete[] frame_classes_storage_ptr;
frame_classes_storage_ptr
= new char[individual_frame_buffer_capacity * consts::LENGTH];
#endif
}

void CPUComputationBackend::deleteIndividualStorage() {
delete[] frameindex_storage_ptr;
frameindex_storage_ptr = nullptr;
delete[] individual_analog_storage_ptr;
individual_analog_storage_ptr = nullptr;
// I believe this would be safe now and we would not waste any memory
delete[] individual_raw_storage_ptr;
individual_raw_storage_ptr = nullptr;
#ifdef SINGLE_FRAMES_DEBUG
delete[] pedestal_storage_ptr;
pedestal_storage_ptr = nullptr;
delete[] pedestal_rms_storage_ptr;
pedestal_rms_storage_ptr = nullptr;
delete[] frame_classes_storage_ptr;
frame_classes_storage_ptr = nullptr;
#endif
}

void CPUComputationBackend::resetAccumulators() {
memory_pool::release_memory();
analog_sum.zero();
Expand All @@ -119,12 +118,17 @@ void CPUComputationBackend::dumpAccumulators() {
int max_frame_index = *std::max_element(
frameindex_storage_ptr,
frameindex_storage_ptr + individual_frame_buffer_capacity);
// for the frameindex of 0...max_frame_index there are max_frame_index+1
// frames
// for the frameindex of (0, ..., max_frame_index) there are
// max_frame_index+1 frames
int max_stack_length = max_frame_index + 1;
fileWriter->writeFrameStack("individual_frames", "analog",
individual_analog_storage_ptr,
max_stack_length);
if (saveRawFrames) {
fileWriter->writeFrameStack("individual_frames", "raw",
individual_raw_storage_ptr,
max_stack_length);
}
#ifdef SINGLE_FRAMES_DEBUG
fileWriter->writeFrameStack("individual_frames", "pedestal",
pedestal_storage_ptr, max_stack_length);
Expand Down Expand Up @@ -178,6 +182,11 @@ void CPUComputationBackend::processFrame(FullFrame *ff_ptr) {
float *frame_ptr
= individual_analog_storage_ptr + frameindex * consts::LENGTH;
frame_subtracted_pedestal.copy_to_buffer<float *>(frame_ptr, true);
if (saveRawFrames) {
unsigned short *raw_frame_ptr
= individual_raw_storage_ptr + frameindex * consts::LENGTH;
ff_ptr->f.copy_to_buffer<unsigned short *>(raw_frame_ptr, true);
}
#ifdef SINGLE_FRAMES_DEBUG
std::memcpy(pedestal_storage_ptr + frameindex * consts::LENGTH,
pedestal_current.arr,
Expand Down Expand Up @@ -323,16 +332,3 @@ void CPUComputationBackend::dispatchTasks() {
break;
}
}

void CPUComputationBackend::threadTask() {
FullFrame *ff_ptr;
while (true) {
while (!threads_sleep && frame_ptr_queue.pop(ff_ptr)) {
CPUComputationBackend::processFrame(ff_ptr);
}
this_thread::sleep_for(0.03s);
// if the threads should be destroyed, we should break the loop
if (destroy_threads)
break;
}
}
6 changes: 3 additions & 3 deletions tango-moenchzmq/src/backend/CPUComputationBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ class CPUComputationBackend {
unsigned long long THREAD_AMOUNT);
CPUComputationBackend(FileWriter *fileWriter);
~CPUComputationBackend();
void initThreads();
void destroyThreads();
void pause();
void resume();
void allocateIndividualStorage();
void deleteIndividualStorage();
void resetAccumulators();
void resetPedestalAndRMS();
void dumpAccumulators();
Expand All @@ -54,7 +53,6 @@ class CPUComputationBackend {
OrderedFrame<unsigned short, consts::LENGTH> &raw_frame,
OrderedFrame<char, consts::LENGTH> &frame_classes, bool isPedestal);
void dispatchTasks();
void threadTask();
void processFrame(FullFrame *ptr);

OrderedFrame<float, consts::LENGTH> pedestal_counter_counting;
Expand All @@ -75,8 +73,10 @@ class CPUComputationBackend {
std::atomic_bool isPedestal = true;
long updatePedestalPeriod = 1;
std::atomic_bool saveIndividualFrames = true;
std::atomic_bool saveRawFrames = true;
int *frameindex_storage_ptr = nullptr;
float *individual_analog_storage_ptr = nullptr;
unsigned short *individual_raw_storage_ptr = nullptr;
#ifdef SINGLE_FRAMES_DEBUG
float *pedestal_storage_ptr = nullptr;
float *pedestal_rms_storage_ptr = nullptr;
Expand Down
5 changes: 5 additions & 0 deletions tango-moenchzmq/src/backend/FileWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ class FileWriter {
char *frame_stack_ptr,
size_t frame_stack_length)
= 0;
virtual void writeFrameStack(const std::string group_name,
const std::string frame_stack_name,
unsigned short *frame_stack_ptr,
size_t frame_stack_length)
= 0;
};
22 changes: 22 additions & 0 deletions tango-moenchzmq/src/backend/HDFWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,25 @@ void HDFWriter::writeFrameStack(const std::string group_name,
image_stack_dataspace);
dataset.write(frame_stack_ptr, image_datatype, image_stack_dataspace);
};

void HDFWriter::writeFrameStack(const std::string group_name,
const std::string frame_stack_name,
unsigned short *frame_stack_ptr,
size_t frame_stack_length) {
const H5::DataType image_datatype(H5::PredType::NATIVE_UINT16);
const hsize_t image_stack_dimension[3]
= { static_cast<hsize_t>(frame_stack_length), consts::FRAME_HEIGHT,
consts::FRAME_WIDTH };
const H5::DataSpace image_stack_dataspace(3, image_stack_dimension);

const hsize_t image_single_dimension[3]
= { 1, consts::FRAME_HEIGHT, consts::FRAME_WIDTH };
const H5::DataSpace image_single_dataspace(3, image_single_dimension, NULL);
H5::H5File h5_file(buildFullFilePath(), H5F_ACC_RDWR);
if (!h5_file.exists(group_name))
h5_file.createGroup(group_name);
std::string image_path = fmt::format("{}/{}", group_name, frame_stack_name);
H5::DataSet dataset = h5_file.createDataSet(image_path, image_datatype,
image_stack_dataspace);
dataset.write(frame_stack_ptr, image_datatype, image_stack_dataspace);
};
4 changes: 4 additions & 0 deletions tango-moenchzmq/src/backend/HDFWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ class HDFWriter : public FileWriter {
const std::string frame_stack_name,
char *frame_stack_ptr,
size_t frame_stack_length) override;
void writeFrameStack(const std::string group_name,
const std::string frame_stack_name,
unsigned short *frame_stack_ptr,
size_t frame_stack_length) override;
};
8 changes: 5 additions & 3 deletions tango-moenchzmq/src/backend/ZMQListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ void ZMQListener::listen_socket() {
if (d["data"].GetUint() == 1) {
std::cout << "Received data" << std::endl;
if (socket.recv(data_zmq_msg) && receive_data) {
// if data is not a full frame, skip it
if (data_zmq_msg.size() < FRAME_SIZE) {
continue;
}
FullFrame *ff_ptr = static_cast<FullFrame *>(
CPUComputationBackend::memory_pool::malloc());
ff_ptr->m.frameIndex = d["frameIndex"].GetUint64();
ff_ptr->m.bitmode = d["bitmode"].GetUint();
std::memcpy(ff_ptr->f.arr, data_zmq_msg.data(),
std::min(data_zmq_msg.size(),
static_cast<size_t>(sizeof(FullFrame::f.arr))));
std::memcpy(ff_ptr->f.arr, data_zmq_msg.data(), FRAME_SIZE);
comp_backend_ptr->frame_ptr_queue.push(ff_ptr);
received_frames_amount++;
}
Expand Down
1 change: 1 addition & 0 deletions tango-moenchzmq/src/backend/ZMQListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ZMQListener {
std::thread zmq_listener_thread;
std::atomic_bool receive_data;
std::atomic_bool abort_wait;
constexpr static size_t FRAME_SIZE = sizeof(FullFrame::f.arr);

public:
CPUComputationBackend *comp_backend_ptr;
Expand Down
34 changes: 32 additions & 2 deletions tango-moenchzmq/src/tangods/MoenchZMQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
// threshold | Tango::DevDouble Scalar
// counting_sigma | Tango::DevFloat Scalar
// process_pedestal | Tango::DevBoolean Scalar
// save raw frames | Tango::DevBoolean Scalar
// analog_img | Tango::DevFloat Image ( max = 400 x 400)
// counting_img | Tango::DevFloat Image ( max = 400 x 400)
//================================================================
Expand Down Expand Up @@ -79,6 +80,7 @@ void MoenchZMQ::delete_device() {
delete[] attr_threshold_read;
delete[] attr_counting_sigma_read;
delete[] attr_process_pedestal_read;
delete[] attr_save_raw_frames_read;
delete[] attr_analog_img_read;
delete[] attr_counting_img_read;
delete[] attr_analog_img_pumped_read;
Expand Down Expand Up @@ -114,6 +116,7 @@ void MoenchZMQ::init_device() {
attr_threshold_read = new Tango::DevDouble[1];
attr_counting_sigma_read = new Tango::DevFloat[1];
attr_process_pedestal_read = new Tango::DevBoolean[1];
attr_save_raw_frames_read = new Tango::DevBoolean[1];
attr_split_pumped_read = new Tango::DevBoolean[1];
attr_analog_img_read = new Tango::DevFloat[400 * 400];
attr_analog_img_pumped_read = new Tango::DevFloat[400 * 400];
Expand All @@ -138,7 +141,7 @@ void MoenchZMQ::get_device_property() {
// Initialize property data members
ZMQ_RX_IP = "127.0.0.1";
ZMQ_RX_PORT = 50003;
SAVE_ROOT_PATH = "/home/data";
SAVE_ROOT_PATH = "/tmp";
THREAD_AMOUNT = 2;
PEDESTAL_BUFFER_LENGTH = 5000;
mandatoryNotDefined = false;
Expand Down Expand Up @@ -556,6 +559,25 @@ void MoenchZMQ::read_process_pedestal(Tango::Attribute &attr) {
attr.set_value(attr_process_pedestal_read);
}

//--------------------------------------------------------
/**
* Read attribute save_raw_frames related method
*
*
* Data type: Tango::DevBoolean
* Attr type: Scalar
*/
//--------------------------------------------------------
void MoenchZMQ::read_save_raw_frames(Tango::Attribute &attr) {
DEBUG_STREAM << "MoenchZMQ::read_save_raw_frames(Tango::Attribute &attr) "
"entering... "
<< std::endl;
// Set the attribute value
*attr_save_raw_frames_read
= zmq_listener_ptr->comp_backend_ptr->saveRawFrames;
attr.set_value(attr_save_raw_frames_read);
}

//--------------------------------------------------------
/**
* Read attribute split_pumped related method
Expand Down Expand Up @@ -621,6 +643,14 @@ void MoenchZMQ::write_process_pedestal(Tango::WAttribute &attr) {
zmq_listener_ptr->comp_backend_ptr->isPedestal = w_val;
}

void MoenchZMQ::write_save_raw_frames(Tango::WAttribute &attr) {
Tango::DevBoolean w_val;
attr.get_write_value(w_val);

*attr_save_raw_frames_read = w_val;
zmq_listener_ptr->comp_backend_ptr->saveRawFrames = w_val;
}

//--------------------------------------------------------
/**
* Read attribute analog_img related method
Expand Down Expand Up @@ -737,7 +767,7 @@ void MoenchZMQ::stop_receiver() {
void MoenchZMQ::abort_receiver() {
DEBUG_STREAM << "MoenchZMQ::abort_receiver() - " << device_name
<< std::endl;

zmq_listener_ptr->abort_receive();
// Add your own code
}

Expand Down
11 changes: 11 additions & 0 deletions tango-moenchzmq/src/tangods/MoenchZMQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class MoenchZMQ : public TANGO_BASE_CLASS {
Tango::DevDouble *attr_threshold_read;
Tango::DevFloat *attr_counting_sigma_read;
Tango::DevBoolean *attr_process_pedestal_read;
Tango::DevBoolean *attr_save_raw_frames_read;
Tango::DevBoolean *attr_split_pumped_read;
Tango::DevLong *attr_acquired_frames_read;
Tango::DevFloat *attr_analog_img_read;
Expand Down Expand Up @@ -213,6 +214,16 @@ class MoenchZMQ : public TANGO_BASE_CLASS {
virtual void read_process_pedestal(Tango::Attribute &attr);
virtual void write_process_pedestal(Tango::WAttribute &attr);
virtual bool is_process_pedestal_allowed(Tango::AttReqType type);
/**
* Attribute save_raw_frames related methods
*
*
* Data type: Tango::DevBoolean
* Attr type: Scalar
*/
virtual void read_save_raw_frames(Tango::Attribute &attr);
virtual void write_save_raw_frames(Tango::WAttribute &attr);
virtual bool is_save_raw_frames_allowed(Tango::AttReqType type);
/**
* Attribute split_pumped related methods
*
Expand Down
Loading

0 comments on commit 4c6d91f

Please sign in to comment.