#include <serialization.h>
#ifdef CreateService
#undef CreateService
#endif
constexpr char CID_TCP_SOURCE[] = "demo_foreign_application_tcp_receiver.streaming_source.adtf.cid";
constexpr char CID_TCP_SINK[] = "demo_foreign_application_tcp_sender.streaming_sink.adtf.cid";
constexpr char CID_UDP_SOURCE[] = "demo_foreign_application_udp_receiver.streaming_source.adtf.cid";
constexpr char CID_UDP_SINK[] = "demo_foreign_application_udp_sender.streaming_sink.adtf.cid";
#ifdef _DEBUG
static const cString TEST_PORT{
"54300"};
#else
static const cString TEST_PORT{
"54301"};
#endif
{
cMyTestSystem(bool bWithLogging = false): cTestSystem({}, {}, false, bWithLogging)
{
LoadPlugin("adtf_clock.adtfplugin");
LoadPlugin("adtf_kernel.adtfplugin");
CreateService("adtf_media_description.adtfplugin",
"mds",
tADTFRunLevel::RL_System,
{{"media_description_files", ADTF_TESTING_SOURCE_DIR "/test_serialization.description"}});
configure_media_description_service();
LoadPlugin("foreign_application_udp.adtfplugin");
LoadPlugin("foreign_application_tcp.adtfplugin");
}
~cMyTestSystem()
{
}
void configure_media_description_service()
{
object_ptr<adtf::services::IMediaDescriptionService> pService;
IConfiguration* pConfig = ucom_cast<IConfiguration*>(pService.Get());
set_property<cString>(*pConfig, "media_description_files", ADTF_TESTING_SOURCE_DIR "/serialization.description");
}
};
struct cMyTestSystemWithLog : public cMyTestSystem
{
cMyTestSystemWithLog(): cMyTestSystem(true)
{
}
};
struct tTestGraph
{
object_ptr<cStreamingGraph> pStreamingGraph;
std::unique_ptr<cOutputRecorder> pSourceOutput;
std::unique_ptr<cTestWriter> pSinkInput;
};
class cTestApplication
{
public:
cTestApplication(
const cString& strArguments)
{
REQUIRE_OK(adtf::util::cSystem::ChildExecute(&m_nPID, DEMO_APPLICATION, strArguments));
}
~cTestApplication()
{
adtf::util::cSystem::ChildTerminate(m_nPID);
}
private:
uint64_t m_nPID;
};
tTestGraph create_graph(
const cString& strSourceCid,
const std::map<cString, cString>& oSourceConfig,
const std::map<cString, cString>& oSinkConfig,
bool bConnectSockets = true)
{
auto pGraph = make_object_ptr<cStreamingGraph>();
object_ptr<IObject> pSource;
REQUIRE_OK(
add_graph_object(*pGraph, strSourceCid,
"source", oSourceConfig, 0, pSource));
object_ptr<IObject> pSink;
REQUIRE_OK(
add_graph_object(*pGraph, strSinkCid,
"sink", oSinkConfig, 0, pSink));
if (bConnectSockets)
{
REQUIRE_OK(pGraph->AddConnection("connection1", "source", "socket", "socket", "", 0, true));
REQUIRE_OK(pGraph->AddConnection("connection2", "socket", "", "sink", "socket", 0, true));
}
REQUIRE_OK(pGraph->SetState(IStreamingGraph::tStreamingState::State_Initialized));
return { pGraph, std::make_unique<cOutputRecorder>(pSource, "output"), std::make_unique<cTestWriter>(pSink, "input", oType) };
}
void test_raw(tTestGraph& oGraph)
{
REQUIRE_OK(oGraph.pStreamingGraph->SetState(IStreamingGraph::tStreamingState::State_Streaming));
for (uint64_t nCounter = 123456789; nCounter < 123456789 + 100; ++nCounter)
{
auto oOutput = oGraph.pSourceOutput->GetCurrentOutput();
REQUIRE(!oOutput.GetSamples().empty());
{
sample_data<uint64_t> oData(oOutput.GetSamples().front());
REQUIRE(oData == nCounter);
}
}
}
void test_serialization(tTestGraph& oGraph)
{
REQUIRE_OK(oGraph.pStreamingGraph->SetState(IStreamingGraph::tStreamingState::State_Streaming));
tTestSerialization sTest{ 1, 2 };
auto oOutput = oGraph.pSourceOutput->GetCurrentOutput();
REQUIRE(!oOutput.GetSamples().empty());
{
sample_data<tTestSerialization> oData(oOutput.GetSamples().front());
REQUIRE(oData->nValue1 == sTest.nValue2);
REQUIRE(oData->nValue2 == sTest.nValue1);
}
}
TEST_CASE_METHOD(cMyTestSystem, "Test Raw TCP / IPv4", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "127.0.0.1"},
{"remote_port", TEST_PORT},
},
CID_TCP_SINK,
{
});
test_raw(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Raw UDP / IPv4", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT);
auto oGraph = create_graph(
CID_UDP_SOURCE, {},
CID_UDP_SINK,
{
{"remote_host", "127.0.0.1"},
{"port", TEST_PORT},
});
test_raw(oGraph);
}
TEST_CASE_METHOD(cMyTestSystemWithLog, "Test Raw receive buffer size set", "[req:gitlab-#2939]")
{
cTestApplication oApplication("-p=" + TEST_PORT);
auto oGraph = create_graph(CID_UDP_SOURCE,
{
{
"socket_receive_buffer_size",
std::to_string(2 * 1024 * 1024).c_str()}
},
CID_UDP_SINK,
{
{"remote_host", "127.0.0.1"},
{"port", TEST_PORT}
});
test_raw(oGraph);
CHECK(oMessages.ContainsMessage("Sockets buffersize set from"));
CHECK(oMessages.ContainsMessage(
"to " +
std::to_string(2 * 1024 * 1024) +
" bytes"));
}
TEST_CASE_METHOD(cMyTestSystem, "Test Raw UDP Multicast / IPv4", "[req:ACORE-11192]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -m=224.0.0.4");
auto oGraph = create_graph(
CID_UDP_SOURCE, {},
CID_UDP_SINK,
{
{"remote_host", "224.0.0.4"},
{"port", TEST_PORT},
});
test_raw(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Raw TCP / IPv6", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp -ipv6");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "::1"},
{"remote_port", TEST_PORT},
},
CID_TCP_SINK, {});
test_raw(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Raw UDP / IPv6", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -ipv6");
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"interface", "::1"}
},
CID_UDP_SINK,
{
{"remote_host", "::1"},
{"port", TEST_PORT},
});
test_raw(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Source Deserialization TCP / IPv4", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "127.0.0.1"},
{"remote_port", TEST_PORT},
{"ddl_struct_name", "tTestSerialization"},
{"deserialize_via_media_description", "true"}
},
CID_TCP_SINK, {},
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Source Deserialization TCP / IPv6", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp -ipv6");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "::1"},
{"remote_port", TEST_PORT},
{"ddl_struct_name", "tTestSerialization"},
{"deserialize_via_media_description", "true"}
},
CID_TCP_SINK, {},
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Source Deserialization UDP / IPv4", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT);
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"ddl_struct_name", "tTestSerialization"},
{"deserialize_via_media_description", "true"}
},
CID_UDP_SINK,
{
{"remote_host", "127.0.0.1"},
{"port", TEST_PORT},
});
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Source Deserialization UDP / IPv6", "[req:ACORE-8868][req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -ipv6");
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"interface", "::1"},
{"ddl_struct_name", "tTestSerialization"},
{"deserialize_via_media_description", "true"}
},
CID_UDP_SINK,
{
{"remote_host", "::1"},
{"port", TEST_PORT},
});
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Sink Serialization TCP / IPv4", "[req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "127.0.0.1"},
{"remote_port", TEST_PORT},
{"ddl_struct_name", "tTestSerialization"}
},
CID_TCP_SINK,
{
{"serialize_via_media_description", "true"}
},
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Sink Serialization TCP / IPv6", "[req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp -ipv6");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "::1"},
{"remote_port", TEST_PORT},
{"ddl_struct_name", "tTestSerialization"}
},
CID_TCP_SINK,
{
{"serialize_via_media_description", "true"}
},
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Sink Serialization UDP / IPv4", "[req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT);
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"ddl_struct_name", "tTestSerialization"}
},
CID_UDP_SINK,
{
{"serialize_via_media_description", "true"},
{"remote_host", "127.0.0.1"},
{"port", TEST_PORT},
},
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test Sink Serialization UDP / IPv6", "[req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -ipv6");
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"interface", "::1"},
{"ddl_struct_name", "tTestSerialization"}
},
CID_UDP_SINK,
{
{"serialize_via_media_description", "true"},
{"remote_host", "::1"},
{"port", TEST_PORT},
},
test_serialization(oGraph);
}
TEST_CASE_METHOD(cMyTestSystem, "Test TCP Source Fixed Packet Size", "[req:ACORE-10086]")
{
cTestApplication oApplication("-p=" + TEST_PORT + " -tcp");
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "127.0.0.1"},
{"remote_port", TEST_PORT},
{"fixed_packet_size", "8"},
},
CID_TCP_SINK, {});
REQUIRE_OK(oGraph.pStreamingGraph->SetState(IStreamingGraph::tStreamingState::State_Streaming));
uint32_t nValue1 = 1;
uint32_t nValue2 = 2;
auto oOutput = oGraph.pSourceOutput->GetCurrentOutput();
REQUIRE(!oOutput.GetSamples().empty());
{
sample_data<uint32_t[2]> oData(oOutput.GetSamples().front());
REQUIRE((*oData)[0] == nValue1);
REQUIRE((*oData)[1] == nValue2);
}
}
TEST_CASE_METHOD(cMyTestSystem, "Independent UDP Source and Sink Sockets", "[req:ACORE-10086]")
{
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"port", TEST_PORT},
},
CID_UDP_SINK,
{
{"remote_host", "127.0.0.1"},
{"port", TEST_PORT},
},
stream_meta_type_anonymous(),
false);
REQUIRE_OK(oGraph.pStreamingGraph->SetState(IStreamingGraph::tStreamingState::State_Streaming));
uint32_t nValue1 = 1;
auto oOutput = oGraph.pSourceOutput->GetCurrentOutput();
REQUIRE(!oOutput.GetSamples().empty());
{
sample_data<uint32_t> oData(oOutput.GetSamples().front());
REQUIRE(oData == nValue1);
}
}
TEST_CASE_METHOD(cMyTestSystem, "Independent UDP Source and Sink Sockets Multicast", "[req:ACORE-11192]")
{
auto oGraph = create_graph(
CID_UDP_SOURCE,
{
{"port", TEST_PORT},
{"multicast_group", "224.0.0.2"},
},
CID_UDP_SINK,
{
{"remote_host", "224.0.0.2"},
{"port", TEST_PORT},
},
stream_meta_type_anonymous(),
false);
REQUIRE_OK(oGraph.pStreamingGraph->SetState(IStreamingGraph::tStreamingState::State_Streaming));
uint32_t nValue1 = 1;
auto oOutput = oGraph.pSourceOutput->GetCurrentOutput();
REQUIRE(!oOutput.GetSamples().empty());
{
sample_data<uint32_t> oData(oOutput.GetSamples().front());
REQUIRE(oData == nValue1);
}
}
void tcp_server(std::promise<void> oListening)
{
cServerSocket oServer;
oListening.set_value();
cStreamSocket oClient1;
cStreamSocket oClient2;
uint8_t nBuffer;
}
TEST_CASE_METHOD(cMyTestSystem, "Independent TCP Source and Sink Sockets", "[req:ACORE-10086]")
{
auto oGraph = create_graph(
CID_TCP_SOURCE,
{
{"remote_host", "127.0.0.1"},
{"remote_port", TEST_PORT},
},
CID_TCP_SINK,
{
{"remote_host", "127.0.0.1"},
{"remote_port", TEST_PORT},
},
stream_meta_type_anonymous(),
false);
std::promise<void> oListening;
auto oListeningStarted = oListening.get_future();
auto oServerResult = std::async(std::launch::async, tcp_server, std::move(oListening));
oListeningStarted.get();
REQUIRE_OK(oGraph.pStreamingGraph->SetState(IStreamingGraph::tStreamingState::State_Streaming));
uint8_t nValue1 = 123;
auto oOutput = oGraph.pSourceOutput->GetCurrentOutput();
REQUIRE(!oOutput.GetSamples().empty());
{
sample_data<uint8_t> oData(oOutput.GetSamples().front());
REQUIRE(oData == nValue1);
}
oServerResult.get();
}
void tcp_server_source(std::promise<void> oListening)
{
cServerSocket oServer;
oListening.set_value();
cStreamSocket oClient1;
uint8_t nBuffer = 123;
}
TEST_CASE_METHOD(cMyTestSystem, "Test TCP Source Reconnection", "[req:ACORE-11249]")
{
object_ptr<IStreamingSource> pSource;
auto pConfiguration = ucom_cast<IConfiguration*>(pSource.Get());
SECTION("disabled reconnection")
{
set_property(*pConfiguration,
"enable_automatic_reconnection",
false);
REQUIRE_OK(pSource->SetState(IStreamingService::tStreamingState::State_Initialized));
REQUIRE_FAILED(pSource->SetState(IStreamingService::tStreamingState::State_Streaming));
}
SECTION("enabled reconnection")
{
set_property(*pConfiguration,
"enable_automatic_reconnection",
true);
REQUIRE_OK(pSource->SetState(IStreamingService::tStreamingState::State_Initialized));
cOutputRecorder oRecorder(pSource, "output");
REQUIRE_OK(pSource->SetState(IStreamingService::tStreamingState::State_Streaming));
for (size_t nCounter = 0; nCounter < 5; ++ nCounter)
{
std::promise<void> oListening;
auto oListeningStarted = oListening.get_future();
auto oServerResult = std::async(std::launch::async, tcp_server_source, std::move(oListening));
oListeningStarted.get();
REQUIRE(oRecorder.GetCurrentOutput().GetSamples().size() == 1);
oServerResult.get();
}
}
}
void tcp_server_sink(std::promise<void> oListening, std::promise<void> oRecievedData)
{
cServerSocket oServer;
oListening.set_value();
cStreamSocket oClient1;
uint8_t nBuffer = 123;
oRecievedData.set_value();
}
TEST_CASE_METHOD(cMyTestSystem, "Test TCP Sink Reconnection", "[req:ACORE-11249]")
{
object_ptr<IStreamingSink> pSink;
auto pConfiguration = ucom_cast<IConfiguration*>(pSink.Get());
SECTION("disabled reconnection")
{
set_property(*pConfiguration,
"enable_automatic_reconnection",
false);
REQUIRE_OK(pSink->SetState(IStreamingService::tStreamingState::State_Initialized));
REQUIRE_FAILED(pSink->SetState(IStreamingService::tStreamingState::State_Streaming));
}
SECTION("enabled reconnection")
{
set_property(*pConfiguration,
"enable_automatic_reconnection",
true);
REQUIRE_OK(pSink->SetState(IStreamingService::tStreamingState::State_Initialized));
cTestWriter oWriter(pSink, "input");
REQUIRE_OK(pSink->SetState(IStreamingService::tStreamingState::State_Streaming));
for (uint8_t nCounter = 0; nCounter < 5; ++ nCounter)
{
std::promise<void> oListening;
auto oListeningStarted = oListening.get_future();
std::promise<void> oRecievedData;
auto oReceivedDataDone = oRecievedData.get_future();
auto oServerResult = std::async(std::launch::async, tcp_server_sink, std::move(oListening), std::move(oRecievedData));
oListeningStarted.get();
do
{
oWriter.Write(nCounter, true);
}
oServerResult.get();
}
}
}
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define LOG_INFO(...)
Logs an info message.
Helper class that wraps different objects into an adtf::streaming::ant::IStreamType.
This class enables you to setup an ADTF system where you can test your filters and services.
Utility class to record all log messages in an ADTF system during tests.
tMessages GetCurrentMessages(bool bClear=true)
Returns all messages currently received.
virtual tResult CreateInstance(const char *strCID, iobject_ptr< IObject > &pObject, const tChar *strNameOfObject="") const =0
Creates a new instance of an object.
virtual tResult GetObject(iobject_ptr< IObject > &pObject, const char *strNameOID) const =0
Get registered object from object registry.
cString to_string(const tResult &i_oResult, eResultFormatFlags i_eFormatFlags=eResultFormatFlags::RFF_DisableNone, const tChar *i_strFormat=nullptr)
Copy all information of an assigned result object to a (formatted) string.
string_base< cStackString > cString
cString implementation for a stack string which works on stack if string is lower than A_UTILS_DEFAUL...
std::chrono::seconds seconds
Compatibility to C++11 std::chrono::seconds
std::chrono::milliseconds milliseconds
Compatibility to C++11 std::chrono::milliseconds
@ RL_Session
The session level.
tResult set_property(IConfiguration &oConfiguration, const char *strNameOfValue, VALUETYPE oValue)
Set the property.
Namespace for the ADTF Base SDK.
Namespace for all testing functionality of the ADTF Filter SDK.
tResult add_binding_proxy(ant::cGraph &oGraph, const util::cString &strName, int32_t nOrderNumber, ucom::iobject_ptr< ucom::IObject > &pBindingProxy)
adds a Binding Proxy to a graph and return the created proxy.
tResult add_graph_object(ant::cGraph &oGraph, const util::cString &strCID, const util::cString &strName, const std::map< util::cString, util::cString > &oProperties, int32_t nOrderNumber, ucom::iobject_ptr< ucom::IObject > &pObject)
Convenience functionality to create and add add graph object to a existing graph.
Namespace for the ADTF Streaming SDK.
Namespace for the ADTF uCOM3 SDK.
alias namespace for the A_UTILS Library.
adtf::ucom::IRuntime * _runtime
Global Runtime Pointer to reference to the current runtime.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define THROW_IF_FAILED(s)
throws if the expression returns a failed tResult