39 template <
typename INTERFACE,
typename PINTYPE>
43 typedef PINTYPE pin_type;
103 pPin->RegisterStreamer(*
this);
120 m_poPin->UnregisterStreamer(*
this);
142 if (pStreamType.Get())
144 m_pStreamType = ucom::make_object_ptr<cStreamType>(*pStreamType.Get());
173 public base::ant::runnable<base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink>
251 RETURN_ERROR_DESC(ERR_NOT_SUPPORTED,
"Currently only PushRead mode is supported.");
357 #ifdef _PIPES_DEBUG_LOG
358 LOG_DUMP(
"Run Push Event in Reader");
398 return ERR_NOT_CONNECTED;
468 void Reset()
override
529 tResult ReadAllAvailableItems()
540 void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
581 void Reset()
override
584 cSampleReader::Reset();
606 private adtf_util::lock_free_queue<cStreamItem>
609 typedef adtf_util::lock_free_queue<cStreamItem>
base_type;
614 return base_type::Push(oStreamItem);
620 while (
IS_OK(base_type::Pop(&oItem)));
626 auto oResult = base_type::Pop(&oQueueItem);
629 if (oResult == ERR_EMPTY)
634 return oQueueItem.
CopyTo(oStreamItem);
644 std::mutex m_oQueueMutex;
645 std::deque<cStreamItem> m_oItems;
647 size_t m_nSampleCount = 0;
652 std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
653 m_oItems.emplace_back(oStreamItem);
658 CheckQueue(m_oItems);
664 std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
672 std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
680 if (m_oItems.empty())
690 m_oItems.pop_front();
697 virtual void CheckQueue(
const std::deque<cStreamItem>& oItems) = 0;
702 m_oLastType = m_oItems.front();
708 m_oItems.pop_front();
711 size_t GetSampleCount()
713 return m_nSampleCount;
717 template<
size_t MaxSize>
722 void CheckQueue(
const std::deque<cStreamItem>& oItems)
override
724 while (!oItems.empty() &&
725 (GetSampleCount() > MaxSize ||
733 template<tTimeStamp TimeRange>
738 void CheckQueue(
const std::deque<cStreamItem>& oItems)
override
743 if (
IS_OK(oItems.back().GetSample(pLastSample)))
745 tTimeStamp nLastTime = pLastSample->GetTime();
746 while (oItems.size() > 1)
748 auto& oItem = oItems.front();
752 if (
IS_OK(oItem.GetSample(pSample)))
754 if (nLastTime - pSample->GetTime() < TimeRange)
774 template<
typename INTERNAL_QUEUE,
775 bool STORE_LAST_SAMPLE =
true,
777 class sample_reader :
public std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type
784 typedef typename std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type
base_class;
796 void Clear()
override
801 tResult Pop(IStreamItem& oItem)
override
823 template<tTimeStamp TIME_RANGE,
bool STORELASTSAMPLE = true>
834 template<
size_t MAX_SIZE,
bool STORELASTSAMPLE=true>
854 pSample.
Reset(pEmpty);
856 return oSampleReader;
867 template<
typename DATATYPE>
873 oSampleData.Reset(pSample);
879 return oSampleReader;
892 return oSampleReader;
904 return pStreamfunc(oSampleReader);
915 const char* strNameOfReader,
918 oReader.
SetName(strNameOfReader);
919 return oReader.
SetType(pStreamType);
930 using cExternalQueueSampleReader::cExternalQueueSampleReader;
932 ADTF3_DEPRECATED(
"The class 'cExternelQueueSampleReader' is deprecated. Please use 'cExternalQueueSampleReader' instead.")
953 bool bStoreLastSample);
991 uint32_t nSubStreamId,
996 class cImplementation;
997 std::unique_ptr<cImplementation> m_pImplementation;
1008 template<
typename INTERNAL_QUEUE,
1009 bool STORE_LAST_SAMPLE =
true,
1014 INTERNAL_QUEUE m_oQueue;
1081 m_oQueue.RegisterExternalQueue(pExternalBuffer);
1086 m_oQueue.UnregisterExternalQueue(pExternalBuffer);
1101 template<tTimeStamp TIME_RANGE,
bool STORELASTSAMPLE = true>
1108 template<
size_t MAX_SIZE,
bool STORELASTSAMPLE=true>
1118 const char* strNameOfReader,
1121 oReader.
SetName(strNameOfReader);
1122 return oReader.
SetType(pStreamType);
1128 pSample.
Reset(pEmpty);
1130 return oSampleReader;
1133 template<
typename DATATYPE>
1134 ISampleReader& operator>>(ISampleReader& oSampleReader, sample_data<DATATYPE>& oSampleData)
1137 if (
IS_OK(oSampleReader.GetNextSample(pSample)))
1139 oSampleData.Reset(pSample);
1143 oSampleData.Reset();
1145 return oSampleReader;
1148 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::IStreamType>& pType)
1150 oSampleReader.GetLastType(pType);
1151 return oSampleReader;
1154 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ISampleReader& (*pStreamfunc)(ISampleReader&))
1156 return pStreamfunc(oSampleReader);
1195 void SetStreamErrorCallback(
const std::function<
tResult(
tResult oStreamError)> & fnStreamErrorCallback);
1198 class cImplementation;
1199 std::unique_ptr<cImplementation> m_pImplementation;
1217 using flash::make_sample_reader;
#define ADTF3_DEPRECATED(_depr_message_)
Mark a function or variable as deprecated.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_BASE_COMPOSED_RESULT(_result,...)
for internal use
#define LOG_DUMP(...)
Logs a dump message.
tInt64 tTimeStamp
type definition for a time value.
A_UTILS_NS::cResult tResult
For backwards compatibility and to bring latest version into scope.
#define IS_FAILED(s)
Check if result is failed.
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
#define IS_OK(s)
Check if result is OK.
A common result class usable as return value throughout.
Defintion of a property set container interface.
The IString interface provides methods for getting and setting strings through abstract interfaces.
Runnable helper implementaton template.
Interface to create a sample reader buffer.
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
Access definiton for the ISampleStream::Open.
tMode
Open mode of ISampleStream::Open.
@ AsyncQueue
Asynch Operation mode for the ISampleStream::Open (Not implemented yet). See Asynchronous Mode - Asyn...
Interface of the SampleStream.
virtual tResult Open(const char *strName, adtf::ucom::ant::iobject_ptr< ISampleInStream > &pInStream, const adtf::ucom::ant::iobject_ptr< const IStreamType > &pInitialAcceptedStreamType, IPushReadEventSink *&pPushEventSink, ISampleStreamAccess::tMode ui32Mode, size_t szQueueSize)=0
Opens The SampleStream for reading access.
The IStreamItem interface is the base type for all object which are passed through a stream.
virtual tType GetType() const =0
Retrieves the type of the sample item.
virtual tResult GetStreamType(ucom::ant::iobject_ptr< const IStreamType > &pStreamType) const =0
Retrieves the StreamType of the StreamItem if GetType is tType::StreamType.
@ StreamType
item is a IStreamType. Mind: All StreamType changes will be queue too !!
@ Sample
item is a queue item contains a ISample
Implementation of a adtf::streaming::ant::ISampleReaderQueue with dynamic growing sample queue.
adtf_util::lock_free_queue< cStreamItem > base_type
base type of cDynamicSampleReaderQueue
tResult Pop(IStreamItem &oStreamItem) override
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Sample reader which allows the registration of external queue.
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
std::set< ISampleReaderQueue * > m_lstExternalQueues
A set of other registered buffer.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Sample reader which allows the registration of external queue.
Sample reader which always provides the last successful received sample.
tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample) override
Collect the next sample by overriding the adtf::streaming::ant::cSampleReader::ReadNextSample.
ucom::object_ptr< const ISample > m_pLastValidSample
Last Sample Reference.
tResult GetLastSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Returns the last Sample received.
tResult Pop(IStreamItem &oItem)
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
The default Sample Reader will read the incomung Stream of a IInPin.
ISampleStreamAccess::tMode m_eAccessMode
current access method
void SetAcceptTypeCallback(const std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
sample_streamer< ISampleReader, cInPin > base_type
base type of cSampleReader
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Gets the next Sample within internal queue which not has been read.
void GetSampleInStream(ucom::ant::iobject_ptr< ISampleInStream > &pSampleInStream)
Returns the connected ISampleInStream.
std::mutex m_oReadLock
Read synchronization.
ucom::object_ptr< ISampleInStream > m_pInStream
Reader where to read.
std::function< tResult(tResult oStreamError)> m_fnStreamErrorCallback
Callback to react on stream errors.
bool m_bValidType
state wether the current type is a valid type or not this will be set to false in AcceptType fails.
base::ant::runnable< base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink > runnable_type
internal base type.
ucom::object_ptr< const IStreamType > m_pLastReadStreamType
Last accepted incoming type.
tResult SetStreamError(const tResult &oError)
Forward an error to the corresponding stream.
tResult EndStreaming() override
Sample Stream disconnected.
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> m_fnAcceptTypeCallback
Callback to reject type changes.
virtual tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
This will read the stream items until a sample is reached and return it.
void GetLastType(ucom::ant::iobject_ptr< const IStreamType > &pType)
Returns the connected ISampleInStream.
virtual tResult AcceptType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Accept or reject a new stream type - see also AcceptType and IsCompatible implementations.
cSampleReader(ISampleStreamAccess::tMode eAccessMode)
Default CTOR which defines the access method.
tResult ProcessStreamItem(const IStreamItem &oStreamItem)
This will process the stream items.
tResult Push(tTimeStamp tmTimeofActivation)
internal Push operation to implement pushread mode
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
cSampleReader()
Default CTOR.
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
tResult BeginStreaming(ISampleStream &oSampleStream) override
BeginStreaming will open the given Sample Stream for Reading while a connection is establishing.
Default implementation of an StreamItem as container of the Sample Stream s Queue.
tResult GetSample(ucom::ant::iobject_ptr< const ISample > &pSample) const
Retrieves the sample of the StreamItem if GetType is tType::Sample.
tType GetType() const
Retrieves the type of the sample item.
tResult CopyTo(IStreamItem &oDest) const
Copy its content to the oDest container.
Sample Data getter for an easy use of samples with samplebuffer set to the type T.
A Possible Sample Reader of a Trigger Function! Sample reader with a internal queue,...
INTERNAL_QUEUE m_oInternalQueue
A internal sample queue.
std::conditional< STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader >::type base_class
base class
Helper template can be used to implement ISampleStreamer.
void ResetPin()
Resets the pin reference to nullptr.
sample_streamer(const char *strName, const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
CTOR initializer.
sample_streamer(const sample_streamer &)=delete
deleted copy CTOR
ucom::object_ptr< PINTYPE > m_poPin
pin reference reading/writing from
virtual ~sample_streamer()
DTOR.
void SetName(const char *strName)
Sets the name of the streamer.
sample_streamer & operator=(sample_streamer &&)=delete
deleted move operator
tResult SetType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Sets the StreamType of the streamer.
sample_streamer(sample_streamer &&)=delete
deleted move CTOR
ucom::object_ptr< const IStreamType > m_pStreamType
stream type of the streamer
void ResetPin(ucom::ant::object_ptr< PINTYPE > &pPin)
Resets the pin reference This is only internaly used.
adtf_util::cString m_strName
name of the streamer (used i.e. to create the pins name)
sample_streamer & operator=(const sample_streamer &)=delete
deleted copy operator
tResult GetName(base::ant::IString &&strName) const
Gets the name of the streamer.
sample_streamer()=default
CTOR.
Interface for sample reads that read from sample streams via input pins.
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample)=0
Reads the next available sample from the associated sample stream.
virtual void SetName(const char *strName)=0
Sets the name of the streamer.
virtual tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)=0
Sets the initial stream type of a streamer.
std::set< ISampleReaderQueue * > m_oExternalQueues
A set of other registered buffer.
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
The default Sample Reader will read the incomung Stream of a IInPin.
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the latest available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult RequestSamples(ucom::ant::iobject_ptr< hollow::IStreamingRequest > &pRequest, uint32_t nSubStreamId, const base::ant::IProperties *pRequestProperties=nullptr)
RequestSamples of the given Substream to be generated and/or transmitted.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
void SetAcceptTypeCallback(const std::function< tResult(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
tResult BeginStreaming(ISampleStream &oSampleStream) override
Begin streaming on the given sample stream.
Reads and stores Samples within the given queue implementation INTERNAL_QUEUE.
A reader that does not read anything.
tResult BeginStreaming(ant::ISampleStream &pStream) override
Begin streaming on the given sample stream.
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the latest available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
cNullReader()
Constructor.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< flash::IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
~cNullReader() override
Destructor.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
virtual tResult Reset(const iobject_ptr< T > &i_oOther)=0
Reset this object_ptr<> with the content of another iobject_ptr<>
Base object pointer to realize binary compatible reference counting in interface methods.
Object pointer implementation used for reference counting on objects of type IObject.
string_base< cStackString > cString
cString implementation for a stack string which works on stack if string is lower than A_UTILS_DEFAUL...
tResult make_sample_reader(cSampleReader &oReader, const char *strNameOfReader, const ucom::iobject_ptr< const IStreamType > &pStreamType)
Initializes a cSampleReader with given name and StreamType param[in] oReader Reader to intialize para...
const ISampleInStream & operator>>(const ISampleInStream &oStreamReader, IStreamItem &oItem)
Streaming Operator>> to read a sample from the readers queue.
sample_reader< ant::cDynamicSampleReaderQueue > cDynamicSampleReader
The cDynamicSampleReader will create a sample reader which will create a internal sample queue with u...
sample_reader< ant::time_limited_sample_reader_queue< TIME_RANGE >, STORELASTSAMPLE > time_limited_sample_reader
The time_limited_sample_reader will create a sample reader which will create a internal sample queue ...
size_limited_sample_reader< 1 > cSingleSampleReader
The cSingleSampleReader will create a sample reader which will create a internal sample queue with on...
sample_reader< ant::size_limited_sample_reader_queue< MAX_SIZE >, STORELASTSAMPLE > size_limited_sample_reader
The size_limited_sample_reader will create a sample reader which will create a internal sample queue ...
Namespace for entire ADTF SDK.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_RUN_FUNCTION(_fcName_)
Helper Macro to define Run function for adtf::base::ant::runnable<>
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.