Commit 118c4731 authored by wcb's avatar wcb
Browse files

Merge pull request 'release 0.17.8' (#111) from develop into master

Reviewed-on: https://git.sloong.com:8000/sloongnet/engine/pulls/111
Showing with 66 additions and 87 deletions
+66 -87
......@@ -156,7 +156,7 @@ ResultType Sloong::ConnectSession::sendDataPackage(UniquePackage pack)
return ResultType::Error;
}
}
// In ET_MODE, we don't care about the connection is not in the monitor list. just add to the list and send it.
// In ET_MODE, we don't care about the connection is not in the monitor list. just add to the list and send it.
else
{
_addToSendList(move(pack));
......@@ -166,8 +166,10 @@ ResultType Sloong::ConnectSession::sendDataPackage(UniquePackage pack)
void Sloong::ConnectSession::_addToSendList(UniquePackage pack)
{
// In here we set the send list empty to false before lock the mutex. becuase it will help the other thread to check the send list is empry, and the can add to the list in first time.
_isSendListEmpty = false;
lock_guard<mutex> lock(_mutexPrepare);
_isSendListEmpty = false;
_log->trace(format("Add package {} to prepare send list {}. list size:[{}]", pack->id(), _connection->id(), _prepareSendList.size() + 1));
_prepareSendList.push(std::move(pack));
if (_setMonitor)
......@@ -193,6 +195,8 @@ ReceivePackageListResult Sloong::ConnectSession::onDataCanReceive()
{
auto package = res.takeResultObject();
_log->debug(format("Received new package {}.", package->id()));
#ifdef ENABLE_RECV_PACK_LIMIT
if (IsOverflowPackage(package.get()))
{
......@@ -271,60 +275,37 @@ ReceivePackageListResult Sloong::ConnectSession::onDataCanReceive()
ResultType Sloong::ConnectSession::onDataCanSend()
{
if (_mode == TiggerMode::LT_MODE)
if (!_mutexSendList.try_lock())
{
if (!_mutexSendList.try_lock())
{
_log->trace(format("Connection {} send socket lock fialed.", _connection->id()));
return ResultType::Invalid;
}
_log->trace("send socket lock successful. prepare to send data.");
unique_lock<mutex> srlck(_mutexSendList, std::adopt_lock);
while (!isSendListEmpty())
{
_processPrepareSendList();
auto res = _processSendList();
if (res != ResultType::Succeed)
return res;
}
// If all data sent, lock the prepare and recheck, and then modify the monitor.
unique_lock<mutex> lock(_mutexPrepare);
if (!isSendListEmpty())
{
_log->trace(format("all {} data sent successfully. But recheck send list is not empty.", _connection->id()));
lock.unlock();
srlck.unlock();
return onDataCanSend();
}
_setMonitor(_connection->id(), false);
_isSendListEmpty = true;
_log->trace(format("Connection {} send socket lock fialed.", _connection->id()));
return ResultType::Invalid;
}
_log->trace("send socket lock successful. prepare to send data.");
unique_lock<mutex> srlck(_mutexSendList, std::adopt_lock);
return ResultType::Succeed;
while (!isSendListEmpty())
{
_processPrepareSendList();
auto res = _processSendList();
if (res != ResultType::Succeed)
return res;
}
else
// If all data sent, lock the prepare and recheck, and then modify the monitor.
unique_lock<mutex> lock(_mutexPrepare);
if (!isSendListEmpty())
{
while (!isSendListEmpty())
{
_processPrepareSendList();
auto res = _processSendList();
if (res != ResultType::Succeed)
return res;
}
// If all data sent, lock the prepare and recheck, and then modify the monitor.
unique_lock<mutex> lock(_mutexPrepare);
_log->trace(format("all {} data sent successfully. But recheck send list is not empty.", _connection->id()));
lock.unlock();
srlck.unlock();
return onDataCanSend();
}
if (_setMonitor)
_setMonitor(_connection->id(), false);
if (!isSendListEmpty())
{
_log->trace(format("all {} data sent successfully. But recheck send list is not empty.", _connection->id()));
lock.unlock();
return onDataCanSend();
}
_isSendListEmpty = true;
_isSendListEmpty = true;
return ResultType::Succeed;
}
return ResultType::Succeed;
}
void Sloong::ConnectSession::_processPrepareSendList()
......@@ -342,21 +323,7 @@ void Sloong::ConnectSession::_processPrepareSendList()
ResultType Sloong::ConnectSession::_processSendList()
{
if (_mode == TiggerMode::ET_MODE)
{
if (!_mutexSend.try_lock())
{
_log->trace(format("Connection {} send socket lock fialed.", _connection->id()));
return ResultType::Invalid;
}
_log->trace("send socket lock successful. prepare to send data.");
}
else
{
_mutexSend.lock();
}
unique_lock<mutex> srlck(_mutexSend, std::adopt_lock);
lock_guard<mutex> lock(_mutexSend);
_log->trace("Start process connection {} send list.", _connection->id());
if (_connection->isSending())
......@@ -398,8 +365,7 @@ ResultType Sloong::ConnectSession::_processSendList()
if (pack == nullptr)
continue;
id = pack->id();
_log->trace(format("Send new package {}, the list size[{}], package size[{}],", id, list.size(), pack->ByteSizeLongEx()));
_log->trace(format("Take new package, the list size[{}].", list.size()));
break;
}
......@@ -409,6 +375,9 @@ ResultType Sloong::ConnectSession::_processSendList()
return ResultType::Succeed;
}
id = pack->id();
_log->debug(format("Send new package {}, package size [{}]. timeline [{}]", id, pack->ByteSizeLongEx(), pack->format_timeline()));
auto res = _connection->sendPackage(move(pack));
if (res.result() == ResultType::Error)
......
......@@ -221,6 +221,7 @@ void Sloong::NetworkHub::_addMessageToSendList(UniquePackage pack)
if (_recordFunc != nullptr)
_recordFunc(_ic, true, pack.get());
pack->record_point("SEND");
auto res = session->sendDataPackage(std::move(pack));
if (res == ResultType::Error)
{
......
......@@ -103,6 +103,20 @@ class Package
uint64_t sessionid() { return SessionID; }
void set_sessionid(uint64_t sessionid) { SessionID = sessionid; }
string format_timeline()
{
string str = "";
if (_timeline.size() == 0)
return str;
auto prev = _timeline.front().first;
for (auto &t : _timeline)
{
str = format("{}[{}:{}]", str, t.second, std::chrono::duration_cast<std::chrono::milliseconds>(t.first - prev).count());
prev = t.first;
}
return str;
}
auto get_timeline() { return _timeline; }
void merge_timeline(Package *other)
......@@ -326,8 +340,7 @@ class Package
* Only used it when you need return and extend data.
* This package will be see big package, and add to send list.
*/
static unique_ptr<Package> MakeResponse(Package *request_pack, ResultType result, const string &message,
const string &extend)
static unique_ptr<Package> MakeResponse(Package *request_pack, ResultType result, const string &message, const string &extend)
{
auto response_pack = MakeResponse(request_pack);
response_pack->set_result(result);
......@@ -348,8 +361,7 @@ class Package
* Only used it when you need return and extend data.
* This package will be see big package, and add to send list.
*/
static unique_ptr<Package> MakeResponse(Package *request_pack, ResultType result, const string &message,
const char *extend, int size)
static unique_ptr<Package> MakeResponse(Package *request_pack, ResultType result, const string &message, const char *extend, int size)
{
auto response_pack = MakeResponse(request_pack);
response_pack->set_result(result);
......@@ -370,8 +382,7 @@ class Package
* Only used it when you need return and extend data.
* This package will be see big package, and add to send list.
*/
static unique_ptr<Package> MakeResponse(Package *request_pack, ResultType result, const string &message,
vector<char> *extend)
static unique_ptr<Package> MakeResponse(Package *request_pack, ResultType result, const string &message, vector<char> *extend)
{
auto response_pack = MakeResponse(request_pack);
response_pack->set_result(result);
......
......@@ -434,7 +434,7 @@ SResult Sloong::FileManager::UploadingHandler(const string &str_req, Package *pa
auto sha256 = CSHA256::Encode(pack->extend());
if (data.sha256() != sha256)
return SResult::MakeError(format("Hasd check error.[{}]<->[{}]", sha256, data.sha256()));
return SResult::MakeError(format("Uploading package hash check error.[{}]<->[{}]", sha256, data.sha256()));
FileRange range;
range.Start = data.start();
......@@ -481,7 +481,7 @@ SResult Sloong::FileManager::UploadedHandler(const string &str_req, Package *pac
_log->debug(format("Save file to [{}]. Hash [{}]", temp_path.native(), info->sha256));
auto sha256 = Utility::SHA256EncodeFile(temp_path);
if (info->sha256 != sha256)
return SResult::MakeError(format("Hasd check error.[{}]<->[{}]", sha256, info->sha256));
return SResult::MakeError(format("File {} hash check error.[{}]<->[{}]", info->name, sha256, info->sha256));
auto repo = _mapIDToRepository->tryGet(info->repo);
if (repo == nullptr)
......@@ -511,7 +511,7 @@ SResult Sloong::FileManager::SimpleUploadHandler(const string &str_req, Package
// if (req->sha256() != CSHA256::Encode(trans_pack->extend()))
// {
// return SResult::MakeError("Hasd check error.");
// return SResult::MakeError("Hass check error.");
// }
// if (req->size() != trans_pack->extend().length())
// {
......
......@@ -93,14 +93,11 @@ PackageResult Sloong::GatewayTranspond::MessageToProcesser(Package *pack)
trans_pack->set_extend(pack->extend());
trans_pack->set_function(info.function_id);
trans_pack->set_sender(pack->sessionid());
auto id = snowflake::Instance->nextid();
trans_pack->set_id(id);
trans_pack->set_sessionid(info.connection_id);
_log->info(format("Trans package [{}][{}] -> [{}][{}]", pack->sessionid(), pack->id(), trans_pack->sessionid(), trans_pack->id()));
_log->info(format("Trans package [{}] [{}] -> [{}]", pack->id(),pack->sessionid(), trans_pack->sessionid()));
GatewayService::Instance->_mapSerialToPackage.set(trans_pack->id(), move(response));
GatewayService::Instance->_mapSerialToPackage.set(pack->id(), move(response));
return PackageResult::MakeOKResult(move(trans_pack));
}
else
......@@ -113,11 +110,11 @@ PackageResult Sloong::GatewayTranspond::MessageToProcesser(Package *pack)
PackageResult Sloong::GatewayTranspond::MessageToClient(UniquePackage info, Package *pack)
{
_log->debug(format("Receive forward response <<< {}", pack->ShortDebugString()));
info->record_point("ResponseToClient");
info->record_point("ForwardToClient");
info->set_result(pack->result());
info->set_content(pack->content());
info->set_extend(pack->extend());
_log->info(format("Response package [{}][{}] <- [{}][{}]", info->sessionid(), info->id(), pack->sessionid(), pack->id()));
_log->info(format("Response package [{}] [{}] <- [{}]", pack->id(), info->sessionid(), pack->sessionid()));
return PackageResult::MakeOKResult(move(info));
}
\ No newline at end of file
}
......@@ -283,12 +283,13 @@ void Sloong::ServerManage::onConnectionBreaked(SharedEvent event)
return;
}
tplItem->Created.remove(nodeID);
_notifyReferences(nodeID, Manager::Events::ReferenceModuleOffline);
_mapSessionToNodeID->erase(con, false);
_mapUUIDToNodeItem->erase(nodeID);
l.unlock();
_notifyReferences(nodeID, Manager::Events::ReferenceModuleOffline);
}
PackageResult Sloong::ServerManage::processHandler(Package *pack)
......
#ifndef VERSION_H
#define VERSION_TEXT "0.17.7.1530"
#define VERSION_TEXT "0.17.8.1539"
#define PRODUCT_TEXT "Sloong Network Engine"
#define COPYRIGHT_TEXT "Copyright 2015-2022 Sloong.com. All Rights Reserved "
......
0.17.7.1530
0.17.8.1539
#ifndef VERSION_H
#define VERSION_TEXT "0.17.7.GIT_VERSION"
#define VERSION_TEXT "0.17.8.GIT_VERSION"
#define PRODUCT_TEXT "Sloong Network Engine"
#define COPYRIGHT_TEXT "Copyright 2015-2022 Sloong.com. All Rights Reserved "
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment