Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SSL read/write queues. #952

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions ACE/ace/SSL/SSL_Asynch_Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ ACE_SSL_Asynch_Stream::ACE_SSL_Asynch_Stream (
: type_ (s_type),
proactor_ (0),
ext_handler_ (0),
ext_read_result_ (0),
ext_write_result_(0),
flags_ (0),
ssl_ (0),
handshake_complete_(false),
Expand Down Expand Up @@ -151,6 +149,15 @@ ACE_SSL_Asynch_Stream::~ACE_SSL_Asynch_Stream (void)
// leave that to the application developer? We do not reference
// count reactors (for example) and following some simple rules
// seems to work fine!

while (!ext_read_result_queue_.empty()) {
delete ext_read_result_queue_.front();
ext_read_result_queue_.pop_front();
}
while (!ext_write_result_queue_.empty()) {
delete ext_write_result_queue_.front();
ext_write_result_queue_.pop_front();
}
}

// ************************************************************
Expand Down Expand Up @@ -332,14 +339,9 @@ ACE_SSL_Asynch_Stream::read (ACE_Message_Block & message_block,
if (this->flags_ & SF_REQ_SHUTDOWN)
return -1;

// only one read operation is allowed now
// later it will be possible to make a queue

if (this->ext_read_result_ != 0)
return -1;

ACE_SSL_Asynch_Read_Stream_Result* result = 0;
// create result for future notification
ACE_NEW_RETURN (this->ext_read_result_,
ACE_NEW_RETURN (result,
ACE_SSL_Asynch_Read_Stream_Result (
*this->ext_handler_,
this->handle (),
Expand All @@ -351,6 +353,8 @@ ACE_SSL_Asynch_Stream::read (ACE_Message_Block & message_block,
signal_number),
-1);

ext_read_result_queue_.push_back(result);

this->do_SSL_state_machine (); // ignore return code

return 0;
Expand All @@ -375,14 +379,9 @@ ACE_SSL_Asynch_Stream::write (ACE_Message_Block & message_block,
if (this->flags_ & SF_REQ_SHUTDOWN)
return -1;

// only one read operation is allowed now
// later it will be possible to make a queue

if (this->ext_write_result_ != 0)
return -1;

ACE_SSL_Asynch_Write_Stream_Result* result = 0;
// create result for future notification
ACE_NEW_RETURN (this->ext_write_result_,
ACE_NEW_RETURN (result,
ACE_SSL_Asynch_Write_Stream_Result (
*this->ext_handler_,
this->handle (),
Expand All @@ -394,6 +393,9 @@ ACE_SSL_Asynch_Stream::write (ACE_Message_Block & message_block,
signal_number),
-1);


ext_write_result_queue_.push_back(result);

this->do_SSL_state_machine ();

return 0;
Expand Down Expand Up @@ -565,7 +567,7 @@ ACE_SSL_Asynch_Stream::post_handshake_check (void)
int
ACE_SSL_Asynch_Stream::do_SSL_read (void)
{
if (this->ext_read_result_ == 0) // nothing to do
if (this->ext_read_result_queue_.empty()) // nothing to do
{
return 0;
}
Expand All @@ -576,8 +578,8 @@ ACE_SSL_Asynch_Stream::do_SSL_read (void)
return -1;
}

ACE_Message_Block & mb = this->ext_read_result_->message_block ();
size_t bytes_req = this->ext_read_result_->bytes_to_read ();
ACE_Message_Block & mb = this->ext_read_result_queue_.front()->message_block ();
size_t bytes_req = this->ext_read_result_queue_.front()->bytes_to_read ();

ERR_clear_error ();

Expand Down Expand Up @@ -627,7 +629,7 @@ ACE_SSL_Asynch_Stream::do_SSL_read (void)
int
ACE_SSL_Asynch_Stream::do_SSL_write (void)
{
if (this->ext_write_result_ == 0) // nothing to do
if (ext_write_result_queue_.empty()) // nothing to do
{
return 0;
}
Expand All @@ -638,8 +640,8 @@ ACE_SSL_Asynch_Stream::do_SSL_write (void)
return -1;
}

ACE_Message_Block & mb = this->ext_write_result_->message_block ();
size_t bytes_req = this->ext_write_result_->bytes_to_write ();
ACE_Message_Block & mb = ext_write_result_queue_.front()->message_block ();
size_t bytes_req = ext_write_result_queue_.front()->bytes_to_write ();

ERR_clear_error ();

Expand Down Expand Up @@ -729,18 +731,18 @@ int
ACE_SSL_Asynch_Stream::notify_read (int bytes_transferred,
int error)
{
if (ext_read_result_ == 0) //nothing to notify
if (ext_read_result_queue_.empty()) //nothing to notify
return 1;

this->ext_read_result_->set_bytes_transferred (bytes_transferred);
this->ext_read_result_->set_error (error);
this->ext_read_result_queue_.front()->set_bytes_transferred (bytes_transferred);
this->ext_read_result_queue_.front()->set_error (error);

int retval =
this->ext_read_result_->post_completion (proactor_->implementation ());
this->ext_read_result_queue_.front()->post_completion (proactor_->implementation ());

if (retval == 0)
{
this->ext_read_result_ = 0;
this->ext_read_result_queue_.pop_front();
return 0; // success
}

Expand All @@ -759,19 +761,19 @@ int
ACE_SSL_Asynch_Stream::notify_write (int bytes_transferred,
int error)
{
if (this->ext_write_result_ == 0) //nothing to notify
if (this->ext_write_result_queue_.empty()) //nothing to notify
return 1;

this->ext_write_result_->set_bytes_transferred (bytes_transferred);
this->ext_write_result_->set_error (error);
this->ext_write_result_queue_.front()->set_bytes_transferred (bytes_transferred);
this->ext_write_result_queue_.front()->set_error (error);

int retval =
this->ext_write_result_->post_completion (
this->ext_write_result_queue_.front()->post_completion (
this->proactor_->implementation ());

if (retval == 0)
{
this->ext_write_result_ = 0;
this->ext_write_result_queue_.pop_front();
return 0; // success
}

Expand Down
8 changes: 5 additions & 3 deletions ACE/ace/SSL/SSL_Asynch_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include /**/ "ace/pre.h"
#include "SSL_Context.h"
#include <list>

#if !defined (ACE_LACKS_PRAGMA_ONCE)
#pragma once
Expand Down Expand Up @@ -383,12 +384,13 @@ class ACE_SSL_Export ACE_SSL_Asynch_Stream
/// External,i.e user handler
ACE_Handler * ext_handler_;

private:
/// External, i.e. read result faked for user
ACE_SSL_Asynch_Read_Stream_Result * ext_read_result_ ;
std::list<ACE_SSL_Asynch_Read_Stream_Result*> ext_read_result_queue_;

/// External, i.e. write result faked for user
ACE_SSL_Asynch_Write_Stream_Result * ext_write_result_ ;

std::list<ACE_SSL_Asynch_Write_Stream_Result*> ext_write_result_queue_;
protected:
/// Stream state/flags
enum Stream_Flag
{
Expand Down