ADTF
samplereader.h
Go to the documentation of this file.
1 
7 #pragma once
8 #include "samplestreamer_intf.h"
9 #include "streamtype_intf.h"
10 #include "sample_intf.h"
11 #include "sample.h"
12 #include "streamtype.h"
14 #include "streammetatypeplain.h"
15 #include "pin.h"
16 #include "sampleinstream_intf.h"
17 #include "samplestream_intf.h"
18 #include "streamitem_intf.h"
20 #include "streamingrequests_intf.h"
21 
22 #include <adtf_utils.h>
23 #include <adtfucom3/adtf_ucom3.h>
24 #include <adtfbase/string_intf.h>
25 
26 namespace adtf
27 {
28 namespace streaming
29 {
30 namespace ant
31 {
32 
39 template <typename INTERFACE, typename PINTYPE>
40 class sample_streamer : public INTERFACE
41 {
42  public:
43  typedef PINTYPE pin_type;
44 protected:
51 
53  sample_streamer(const sample_streamer&) = delete;
60 
61 public:
63  sample_streamer() = default;
67  sample_streamer(const char* strName,
68  const ucom::ant::iobject_ptr<const IStreamType>& pStreamType) : m_strName(strName), m_pStreamType(pStreamType)
69  {
70  }
72  virtual ~sample_streamer()
73  {
74  ResetPin();
75  }
76 
77 public:
78 
84  void SetName(const char* strName)
85  {
86  m_strName = strName;
87  }
88 
96  {
97  if (m_poPin)
98  {
99  ResetPin();
100  }
101  if (pPin)
102  {
103  pPin->RegisterStreamer(*this);
104  m_poPin = pPin;
105  }
106  else
107  {
108  ResetPin();
109  }
110  }
116  void ResetPin()
117  {
118  if (m_poPin)
119  {
120  m_poPin->UnregisterStreamer(*this);
121  m_poPin = nullptr;
122  }
123  }
130  {
131  return strName.Set(m_strName);
132  }
133 
141  {
142  if (pStreamType.Get())
143  {
144  m_pStreamType = ucom::make_object_ptr<cStreamType>(*pStreamType.Get());
145  }
146  else
147  {
148  m_pStreamType = nullptr;
149  }
151  }
152 
153 public: //implements ISampleStreamer
154  tResult GetType(ucom::ant::iobject_ptr<const IStreamType>& pStreamType) const override
155  {
156  RETURN_IF_FAILED(pStreamType.Reset(m_pStreamType));
158  }
159 
160  tResult EndStreaming() override { RETURN_NOERROR; }
161 };
162 
171 class cSampleReader : public sample_streamer<ISampleReader, cInPin>,
172  public ISampleReaderQueue,
173  public base::ant::runnable<base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink>
174 {
175 private:
179  std::mutex m_oReadLock;
185  std::function<tResult(tResult oStreamError)> m_fnStreamErrorCallback;
186 
191 
196  bool m_bValidType = true;
201 
204 
205 public:
212  {
213  }
214 
222  m_eAccessMode(eAccessMode)
223  {
224  }
225 
242 
243 public:
244  tResult BeginStreaming(ISampleStream& oSampleStream) override
245  {
246  EndStreaming();
247 
248  // async mode is not working yet.
249  if (m_eAccessMode != ISampleStreamAccess::tMode::PushRead)
250  {
251  RETURN_ERROR_DESC(ERR_NOT_SUPPORTED, "Currently only PushRead mode is supported.");
252  }
253 
254  {
255  std::lock_guard<std::mutex> _sync(m_oReadLock);
256 
257  ISampleStream::IPushReadEventSink* pSink = this;
258  //@TODO : Something for TYPE Checking !!!
259  RETURN_IF_FAILED(oSampleStream.Open(m_strName,
260  m_pInStream,
262  pSink,
264  0));
265  }
266 
268  }
269 
271  {
272  {
273  std::lock_guard<std::mutex> _sync(m_oReadLock);
274  //this will automatically close
275  m_pInStream.Reset();
276  Clear();
277  }
279  }
280 
288  void SetAcceptTypeCallback(const std::function<
289  tResult(const ucom::iobject_ptr<const IStreamType>& pStreamType)> & fnAcceptTypeCallback )
290  {
291  m_fnAcceptTypeCallback = fnAcceptTypeCallback;
292  }
293 
298  void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback)
299  {
300  m_fnStreamErrorCallback = fnStreamErrorCallback;
301  }
302 
303 public:
306  {
307  std::lock_guard<std::mutex> _sync(m_oReadLock);
308  if (m_pInStream)
309  {
310  //report the error !!
311  return m_pInStream->SetStreamError(oError);
312  }
313  RETURN_ERROR(ERR_NOT_CONNECTED);
314  }
315 
320  {
321  pSampleInStream.Reset(m_pInStream);
322  }
323 
328  {
330  {
332  }
333  else
334  {
335  pType.Reset(m_pStreamType);
336  }
337  }
338 
347  {
348  return ReadNextSample(pSample);
349  }
350 
351 protected:
353  tResult Push(tTimeStamp tmTimeofActivation)
354  {
355  //implementing Push
356  //this will be done sync
357  #ifdef _PIPES_DEBUG_LOG
358  LOG_DUMP("Run Push Event in Reader");
359  #endif
360  if (!m_pInStream)
361  {
362  RETURN_ERROR(ERR_NOT_CONNECTED);
363  }
364 
365  while (true)
366  {
367  cStreamItem oItem;
368  {
369  //very short LOCK !!
370  std::lock_guard<std::mutex> _sync(m_oReadLock);
371  if (IS_FAILED(m_pInStream->Read(oItem, tTimeStamp(0))))
372  {
373  break;
374  }
375  }
376 
377  RETURN_IF_FAILED(Push(oItem, tmTimeofActivation));
378  }
379 
380 
382  }
383 
384 protected:
395  {
396  if (!m_pInStream)
397  {
398  return ERR_NOT_CONNECTED;
399  }
400 
401  while (true)
402  {
403  cStreamItem oItem;
405  {
406  RETURN_IF_FAILED(m_pInStream->Read(oItem, tTimeStamp(0)));
407  }
408  else
409  {
410  RETURN_IF_FAILED(Pop(oItem));
411  }
412 
413  if (IS_OK(ProcessStreamItem(oItem)))
414  {
415  if (oItem.GetType() == IStreamItem::tType::Sample)
416  {
417  return oItem.GetSample(pSample);
418  }
419  }
420  }
421  }
422 
435  {
436  switch (oStreamItem.GetType())
437  {
439  {
441  RETURN_IF_FAILED(oStreamItem.GetStreamType(pHelper));
442  tResult nError = AcceptType(pHelper);
443  if (IS_FAILED(nError))
444  {
445  m_bValidType = false;
446  RETURN_IF_FAILED(HandleStreamError(ADTF_BASE_COMPOSED_RESULT(nError, "Stream type is not accepted by reader.")));
447  }
448  else
449  {
450  m_bValidType = true;
451  }
452  break;
453  }
454  default:
455  {
456  if (!m_bValidType)
457  {
458  RETURN_ERROR(ERR_INVALID_TYPE);
459  }
460  break;
461  }
462  }
463 
465  }
466 
467 public:
468  void Reset() override
469  {
470  m_pLastReadStreamType.Reset();
471  Clear();
472  }
473 
474 public:
478 
479 protected:
480  tResult HandleStreamError(tResult nError)
481  {
483  {
484  return m_fnStreamErrorCallback(nError);
485  }
486  else
487  {
488  return SetStreamError(nError);
489  }
490  }
491 };
492 
493 
498 {
499 private:
501  std::set<ISampleReaderQueue*> m_lstExternalQueues;
502 
503 public:
505 
506  tResult Push(const IStreamItem& oStreamItem, tTimeStamp tsTime) override
507  {
508  for (auto pQueue : m_lstExternalQueues)
509  {
510  RETURN_IF_FAILED(pQueue->Push(oStreamItem, tsTime));
511  }
512 
514  };
515 
516  void Clear() override
517  {
518  for (auto pQueue: m_lstExternalQueues)
519  {
520  pQueue->Clear();
521  }
522  }
523 
524  tResult Pop(IStreamItem& /* oStreamItem */) override
525  {
526  RETURN_ERROR(ERR_EMPTY);
527  }
528 
529  tResult ReadAllAvailableItems()
530  {
531  return cSampleReader::Push(0);
532  }
533 
534 public:
535  void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
536  {
537  m_lstExternalQueues.insert(pExternalBuffer);
538  }
539 
540  void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
541  {
542  m_lstExternalQueues.erase(pExternalBuffer);
543  }
544 
545 };
546 
551 {
552 protected:
555 
556 public:
557 
559 
569  {
571  while (IS_OK(ReadNextSample(pDummy)));
572 
573  if (!m_pLastValidSample)
574  {
575  RETURN_ERROR(ERR_EMPTY);
576  }
577 
578  return pSample.Reset(m_pLastValidSample);
579  }
580 
581  void Reset() override
582  {
583  m_pLastValidSample = nullptr;
584  cSampleReader::Reset();
585  }
586 
587 protected:
588 
593  {
595  m_pLastValidSample.Reset(pSample);
597  }
598 
599 };
600 
605  public ISampleReaderQueue,
606  private adtf_util::lock_free_queue<cStreamItem>
607 {
609  typedef adtf_util::lock_free_queue<cStreamItem> base_type;
610 
611 public: // implements ISampleReaderQueue
612  tResult Push(const IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
613  {
614  return base_type::Push(oStreamItem);
615  }
616 
617  void Clear() override
618  {
619  cStreamItem oItem;
620  while (IS_OK(base_type::Pop(&oItem)));
621  }
622 
623  tResult Pop(IStreamItem& oStreamItem) override
624  {
625  cStreamItem oQueueItem;
626  auto oResult = base_type::Pop(&oQueueItem);
627  // this is a performance optimization in order to prevent the annotation
628  // in the RETURN_IF_FAILED macro below.
629  if (oResult == ERR_EMPTY)
630  {
631  return oResult;
632  }
633  RETURN_IF_FAILED(oResult);
634  return oQueueItem.CopyTo(oStreamItem);
635  }
636 
637 };
638 
639 
641  public ISampleReaderQueue
642 {
643  private:
644  std::mutex m_oQueueMutex;
645  std::deque<cStreamItem> m_oItems;
646  cStreamItem m_oLastType;
647  size_t m_nSampleCount = 0;
648 
649  public:
650  tResult Push(const IStreamItem& oStreamItem, tTimeStamp /* tmTime */) override
651  {
652  std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
653  m_oItems.emplace_back(oStreamItem);
654  if (oStreamItem.GetType() == IStreamItem::tType::Sample)
655  {
656  ++m_nSampleCount;
657  }
658  CheckQueue(m_oItems);
660  }
661 
662  void Clear() override
663  {
664  std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
665  m_oItems.clear();
666  m_oLastType = cStreamItem();
667  m_nSampleCount = 0;
668  }
669 
671  {
672  std::lock_guard<std::mutex> oGuard(m_oQueueMutex);
673  if (m_oLastType.GetType() == IStreamItem::tType::StreamType)
674  {
675  RETURN_IF_FAILED(m_oLastType.CopyTo(oItem));
676  m_oLastType = cStreamItem();
677  }
678  else
679  {
680  if (m_oItems.empty())
681  {
682  RETURN_ERROR(ERR_EMPTY);
683  }
684 
685  RETURN_IF_FAILED(m_oItems.front().CopyTo(oItem));
686  if (m_oItems.front().GetType() == IStreamItem::tType::Sample)
687  {
688  --m_nSampleCount;
689  }
690  m_oItems.pop_front();
691  }
692 
694  }
695 
696  protected:
697  virtual void CheckQueue(const std::deque<cStreamItem>& oItems) = 0;
698  void PopFront()
699  {
700  if (m_oItems.front().GetType() == IStreamItem::tType::StreamType)
701  {
702  m_oLastType = m_oItems.front();
703  }
704  else
705  {
706  --m_nSampleCount;
707  }
708  m_oItems.pop_front();
709  }
710 
711  size_t GetSampleCount()
712  {
713  return m_nSampleCount;
714  }
715 };
716 
717 template<size_t MaxSize>
720 {
721  protected:
722  void CheckQueue(const std::deque<cStreamItem>& oItems) override
723  {
724  while (!oItems.empty() &&
725  (GetSampleCount() > MaxSize ||
726  oItems.front().GetType() == IStreamItem::tType::StreamType))
727  {
728  PopFront();
729  }
730  }
731 };
732 
733 template<tTimeStamp TimeRange>
736 {
737  protected:
738  void CheckQueue(const std::deque<cStreamItem>& oItems) override
739  {
740  if (oItems.back().GetType() == IStreamItem::tType::Sample)
741  {
743  if (IS_OK(oItems.back().GetSample(pLastSample)))
744  {
745  tTimeStamp nLastTime = pLastSample->GetTime();
746  while (oItems.size() > 1)
747  {
748  auto& oItem = oItems.front();
749  if (oItem.GetType() == IStreamItem::tType::Sample)
750  {
752  if (IS_OK(oItem.GetSample(pSample)))
753  {
754  if (nLastTime - pSample->GetTime() < TimeRange)
755  {
756  break;
757  }
758  }
759 
760  }
761 
762  PopFront();
763  }
764  }
765  }
766  }
767 };
768 
774 template<typename INTERNAL_QUEUE,
775  bool STORE_LAST_SAMPLE = true,
776  ISampleStreamAccess::tMode ACCESS_MODE = ISampleStreamAccess::tMode::PushRead>
777 class sample_reader : public std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type
778 {
779 protected:
781  INTERNAL_QUEUE m_oInternalQueue;
782 
784  typedef typename std::conditional<STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader>::type base_class;
785 public:
786 
787  sample_reader(): base_class(ACCESS_MODE)
788  {
789  }
790 
791  tResult Push(const IStreamItem& oItem, tTimeStamp tsTime) override
792  {
793  return m_oInternalQueue.Push(oItem, tsTime);
794  };
795 
796  void Clear() override
797  {
798  m_oInternalQueue.Clear();
799  };
800 
801  tResult Pop(IStreamItem& oItem) override
802  {
803  return m_oInternalQueue.Pop(oItem);
804  }
805 };
806 
814 
823 template<tTimeStamp TIME_RANGE, bool STORELASTSAMPLE = true>
825 
834 template<size_t MAX_SIZE, bool STORELASTSAMPLE=true>
836 
842 
852 {
854  pSample.Reset(pEmpty);
855  oSampleReader.GetNextSample(pSample);
856  return oSampleReader;
857 }
858 
867 template<typename DATATYPE>
869 {
871  if (IS_OK(oSampleReader.GetNextSample(pSample)))
872  {
873  oSampleData.Reset(pSample);
874  }
875  else
876  {
877  oSampleData.Reset();
878  }
879  return oSampleReader;
880 }
890 {
891  oSampleReader.GetLastType(pType);
892  return oSampleReader;
893 }
902 inline cSampleReader& operator>>(cSampleReader& oSampleReader, cSampleReader& (*pStreamfunc)(cSampleReader&))
903 {
904  return pStreamfunc(oSampleReader);
905 }
906 
915  const char* strNameOfReader,
916  const ucom::iobject_ptr<const IStreamType>& pStreamType)
917 {
918  oReader.SetName(strNameOfReader);
919  return oReader.SetType(pStreamType);
920 }
921 
928 {
929 public:
930  using cExternalQueueSampleReader::cExternalQueueSampleReader;
931 
932  ADTF3_DEPRECATED("The class 'cExternelQueueSampleReader' is deprecated. Please use 'cExternalQueueSampleReader' instead.")
935  {
936 
937  }
938 };
939 
940 } //namespace ant
941 
942 namespace flash
943 {
944 
949 {
950  public:
953  bool bStoreLastSample);
954 
955  ~cSampleReader() override;
956 
957  void SetName(const char* strName) override;
958  tResult GetName(base::ant::IString&& strName) override;
959 
962 
963  tResult BeginStreaming(ISampleStream& oSampleStream) override;
964  tResult EndStreaming() override;
965 
967 
971  void SetAcceptTypeCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)> & fnAcceptTypeCallback);
972 
976  void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback);
977  tResult SetStreamError(tResult oError) override;
978 
981 
983  void Reset();
984 
986 
991  uint32_t nSubStreamId,
992  const base::ant::IProperties* pRequestProperties = nullptr);
993  void SetSynchronousTypeUpdateCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)>& fnRequestTypeUpdateCallback);
994 
995  protected:
996  class cImplementation;
997  std::unique_ptr<cImplementation> m_pImplementation;
998 
999 };
1000 
1008 template<typename INTERNAL_QUEUE,
1009  bool STORE_LAST_SAMPLE = true,
1010  ant::ISampleStreamAccess::tMode ACCESS_MODE = ant::ISampleStreamAccess::tMode::Push>
1012 {
1013  protected:
1014  INTERNAL_QUEUE m_oQueue;
1015 
1016  public:
1017  sample_reader(): cSampleReader(m_oQueue, ACCESS_MODE, STORE_LAST_SAMPLE)
1018  {}
1019 };
1020 
1021 
1022 
1024 {
1025  public:
1026  void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
1027  {
1028  m_oExternalQueues.insert(pExternalBuffer);
1029  }
1030 
1031  void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer)
1032  {
1033  m_oExternalQueues.erase(pExternalBuffer);
1034  }
1035 
1036  public:
1037  tResult Push(const IStreamItem& oStreamItem, tTimeStamp tsTime) override
1038  {
1039  for (auto pQueue : m_oExternalQueues)
1040  {
1041  RETURN_IF_FAILED(pQueue->Push(oStreamItem, tsTime));
1042  }
1043 
1045  };
1046 
1047  void Clear() override
1048  {
1049  for (auto pQueue: m_oExternalQueues)
1050  {
1051  pQueue->Clear();
1052  }
1053  }
1054 
1055  tResult Pop(IStreamItem& /* oStreamItem */) override
1056  {
1057  RETURN_ERROR(ERR_EMPTY);
1058  }
1059 
1060  protected:
1062  std::set<ISampleReaderQueue*> m_oExternalQueues;
1063 };
1064 
1066 {
1067  protected:
1068  virtual ~IExternalReaderQueues() = default;
1069 
1070  public:
1071  virtual void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer) = 0;
1072  virtual void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer) = 0;
1073 };
1074 
1075 class cExternalQueueSampleReader : public sample_reader<cExternalQueuesWrapper, false>,
1076  public IExternalReaderQueues
1077 {
1078  public:
1079  void RegisterExternalQueue(ISampleReaderQueue * pExternalBuffer) override
1080  {
1081  m_oQueue.RegisterExternalQueue(pExternalBuffer);
1082  }
1083 
1084  void UnregisterExternalQueue(ISampleReaderQueue * pExternalBuffer) override
1085  {
1086  m_oQueue.UnregisterExternalQueue(pExternalBuffer);
1087  }
1088 };
1089 
1095 
1096 
1101 template<tTimeStamp TIME_RANGE, bool STORELASTSAMPLE = true>
1103 
1108 template<size_t MAX_SIZE, bool STORELASTSAMPLE=true>
1110 
1116 
1117 inline tResult make_sample_reader(ISampleReader& oReader,
1118  const char* strNameOfReader,
1119  const ucom::iobject_ptr<const IStreamType>& pStreamType)
1120 {
1121  oReader.SetName(strNameOfReader);
1122  return oReader.SetType(pStreamType);
1123 }
1124 
1125 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::ISample>& pSample)
1126 {
1128  pSample.Reset(pEmpty);
1129  oSampleReader.GetNextSample(pSample);
1130  return oSampleReader;
1131 }
1132 
1133 template<typename DATATYPE>
1134 ISampleReader& operator>>(ISampleReader& oSampleReader, sample_data<DATATYPE>& oSampleData)
1135 {
1137  if (IS_OK(oSampleReader.GetNextSample(pSample)))
1138  {
1139  oSampleData.Reset(pSample);
1140  }
1141  else
1142  {
1143  oSampleData.Reset();
1144  }
1145  return oSampleReader;
1146 }
1147 
1148 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ucom::ant::iobject_ptr<const ant::IStreamType>& pType)
1149 {
1150  oSampleReader.GetLastType(pType);
1151  return oSampleReader;
1152 }
1153 
1154 inline ISampleReader& operator>>(ISampleReader& oSampleReader, ISampleReader& (*pStreamfunc)(ISampleReader&))
1155 {
1156  return pStreamfunc(oSampleReader);
1157 }
1158 
1159 }
1160 
1161 namespace kiwi
1162 {
1163 
1168 {
1169 public:
1174 
1178  ~cNullReader() override;
1179 
1180  void SetName(const char* strName) override;
1181  tResult GetName(base::ant::IString&& strName) override;
1185  tResult EndStreaming() override;
1187 
1191  tResult SetStreamError(tResult oError) override;
1193 
1194  void SetAcceptTypeCallback(const std::function<tResult(const ucom::ant::iobject_ptr<const ant::IStreamType>& pStreamType)> & fnAcceptTypeCallback);
1195  void SetStreamErrorCallback(const std::function<tResult(tResult oStreamError)> & fnStreamErrorCallback);
1196 
1197 protected:
1198  class cImplementation;
1199  std::unique_ptr<cImplementation> m_pImplementation;
1200 };
1201 
1202 }
1203 
1207 
1208 using flash::cSampleReader;
1211 using flash::sample_reader;
1216 
1217 using flash::make_sample_reader;
1218 
1219 using kiwi::cNullReader;
1220 
1221 
1222 } //namespace streaming
1223 } //namespace adtf
#define ADTF3_DEPRECATED(_depr_message_)
Mark a function or variable as deprecated.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_BASE_COMPOSED_RESULT(_result,...)
for internal use
A_UTILS_NS::cResult tResult
For backwards compatibility and to bring latest version into scope.
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
Defintion of a property set container interface.
The IString interface provides methods for getting and setting strings through abstract interfaces.
Definition: string_intf.h:28
Runnable helper implementaton template.
Definition: runnable.h:58
Interface to create a sample reader buffer.
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
Access definiton for the ISampleStream::Open.
@ AsyncQueue
Asynch Operation mode for the ISampleStream::Open (Not implemented yet). See Asynchronous Mode - Asyn...
Interface of the SampleStream.
virtual tResult Open(const char *strName, adtf::ucom::ant::iobject_ptr< ISampleInStream > &pInStream, const adtf::ucom::ant::iobject_ptr< const IStreamType > &pInitialAcceptedStreamType, IPushReadEventSink *&pPushEventSink, ISampleStreamAccess::tMode ui32Mode, size_t szQueueSize)=0
Opens The SampleStream for reading access.
The IStreamItem interface is the base type for all object which are passed through a stream.
virtual tType GetType() const =0
Retrieves the type of the sample item.
virtual tResult GetStreamType(ucom::ant::iobject_ptr< const IStreamType > &pStreamType) const =0
Retrieves the StreamType of the StreamItem if GetType is tType::StreamType.
@ StreamType
item is a IStreamType. Mind: All StreamType changes will be queue too !!
@ Sample
item is a queue item contains a ISample
Implementation of a adtf::streaming::ant::ISampleReaderQueue with dynamic growing sample queue.
Definition: samplereader.h:607
adtf_util::lock_free_queue< cStreamItem > base_type
base type of cDynamicSampleReaderQueue
Definition: samplereader.h:609
tResult Pop(IStreamItem &oStreamItem) override
Returns the next sample from the queue.
Definition: samplereader.h:623
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
Definition: samplereader.h:612
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Definition: samplereader.h:617
Sample reader which allows the registration of external queue.
Definition: samplereader.h:498
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
Definition: samplereader.h:524
std::set< ISampleReaderQueue * > m_lstExternalQueues
A set of other registered buffer.
Definition: samplereader.h:501
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
Definition: samplereader.h:506
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Definition: samplereader.h:516
Sample reader which allows the registration of external queue.
Definition: samplereader.h:928
Sample reader which always provides the last successful received sample.
Definition: samplereader.h:551
tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample) override
Collect the next sample by overriding the adtf::streaming::ant::cSampleReader::ReadNextSample.
Definition: samplereader.h:592
ucom::object_ptr< const ISample > m_pLastValidSample
Last Sample Reference.
Definition: samplereader.h:554
tResult GetLastSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Returns the last Sample received.
Definition: samplereader.h:568
tResult Pop(IStreamItem &oItem)
Returns the next sample from the queue.
Definition: samplereader.h:670
tResult Push(const IStreamItem &oStreamItem, tTimeStamp) override
Push a new value to the internal sample queue.
Definition: samplereader.h:650
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
Definition: samplereader.h:662
The default Sample Reader will read the incomung Stream of a IInPin.
Definition: samplereader.h:174
ISampleStreamAccess::tMode m_eAccessMode
current access method
Definition: samplereader.h:203
void SetAcceptTypeCallback(const std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
Definition: samplereader.h:288
sample_streamer< ISampleReader, cInPin > base_type
base type of cSampleReader
Definition: samplereader.h:181
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
Gets the next Sample within internal queue which not has been read.
Definition: samplereader.h:346
void GetSampleInStream(ucom::ant::iobject_ptr< ISampleInStream > &pSampleInStream)
Returns the connected ISampleInStream.
Definition: samplereader.h:319
std::mutex m_oReadLock
Read synchronization.
Definition: samplereader.h:179
ucom::object_ptr< ISampleInStream > m_pInStream
Reader where to read.
Definition: samplereader.h:177
std::function< tResult(tResult oStreamError)> m_fnStreamErrorCallback
Callback to react on stream errors.
Definition: samplereader.h:185
bool m_bValidType
state wether the current type is a valid type or not this will be set to false in AcceptType fails.
Definition: samplereader.h:196
base::ant::runnable< base::ant::IRunnable::RUN_PUSH, ISampleStream::IPushReadEventSink > runnable_type
internal base type.
Definition: samplereader.h:190
ucom::object_ptr< const IStreamType > m_pLastReadStreamType
Last accepted incoming type.
Definition: samplereader.h:200
tResult SetStreamError(const tResult &oError)
Forward an error to the corresponding stream.
Definition: samplereader.h:305
tResult EndStreaming() override
Sample Stream disconnected.
Definition: samplereader.h:270
virtual tResult Pop(IStreamItem &oStreamItem)=0
Returns the next sample from the queue.
std::function< tResult(const ucom::iobject_ptr< const IStreamType > &pStreamType)> m_fnAcceptTypeCallback
Callback to reject type changes.
Definition: samplereader.h:183
virtual tResult ReadNextSample(ucom::ant::iobject_ptr< const ISample > &pSample)
This will read the stream items until a sample is reached and return it.
Definition: samplereader.h:394
void GetLastType(ucom::ant::iobject_ptr< const IStreamType > &pType)
Returns the connected ISampleInStream.
Definition: samplereader.h:327
virtual tResult AcceptType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Accept or reject a new stream type - see also AcceptType and IsCompatible implementations.
cSampleReader(ISampleStreamAccess::tMode eAccessMode)
Default CTOR which defines the access method.
Definition: samplereader.h:220
tResult ProcessStreamItem(const IStreamItem &oStreamItem)
This will process the stream items.
Definition: samplereader.h:434
tResult Push(tTimeStamp tmTimeofActivation)
internal Push operation to implement pushread mode
Definition: samplereader.h:353
virtual void Clear()=0
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
virtual tResult Push(const IStreamItem &oStreamItem, tTimeStamp tmTime)=0
Push a new value to the internal sample queue.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
Definition: samplereader.h:298
tResult BeginStreaming(ISampleStream &oSampleStream) override
BeginStreaming will open the given Sample Stream for Reading while a connection is establishing.
Definition: samplereader.h:244
Default implementation of an StreamItem as container of the Sample Stream s Queue.
tResult GetSample(ucom::ant::iobject_ptr< const ISample > &pSample) const
Retrieves the sample of the StreamItem if GetType is tType::Sample.
tType GetType() const
Retrieves the type of the sample item.
tResult CopyTo(IStreamItem &oDest) const
Copy its content to the oDest container.
Sample Data getter for an easy use of samples with samplebuffer set to the type T.
Definition: sample_data.h:33
A Possible Sample Reader of a Trigger Function! Sample reader with a internal queue,...
Definition: samplereader.h:778
INTERNAL_QUEUE m_oInternalQueue
A internal sample queue.
Definition: samplereader.h:781
std::conditional< STORE_LAST_SAMPLE, cLastSampleReader, cSampleReader >::type base_class
base class
Definition: samplereader.h:784
Helper template can be used to implement ISampleStreamer.
Definition: samplereader.h:41
void ResetPin()
Resets the pin reference to nullptr.
Definition: samplereader.h:116
sample_streamer(const char *strName, const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
CTOR initializer.
Definition: samplereader.h:67
sample_streamer(const sample_streamer &)=delete
deleted copy CTOR
ucom::object_ptr< PINTYPE > m_poPin
pin reference reading/writing from
Definition: samplereader.h:50
void SetName(const char *strName)
Sets the name of the streamer.
Definition: samplereader.h:84
sample_streamer & operator=(sample_streamer &&)=delete
deleted move operator
tResult SetType(const ucom::ant::iobject_ptr< const IStreamType > &pStreamType)
Sets the StreamType of the streamer.
Definition: samplereader.h:140
sample_streamer(sample_streamer &&)=delete
deleted move CTOR
ucom::object_ptr< const IStreamType > m_pStreamType
stream type of the streamer
Definition: samplereader.h:48
void ResetPin(ucom::ant::object_ptr< PINTYPE > &pPin)
Resets the pin reference This is only internaly used.
Definition: samplereader.h:95
adtf_util::cString m_strName
name of the streamer (used i.e. to create the pins name)
Definition: samplereader.h:46
sample_streamer & operator=(const sample_streamer &)=delete
deleted copy operator
tResult GetName(base::ant::IString &&strName) const
Gets the name of the streamer.
Definition: samplereader.h:129
Interface for sample reads that read from sample streams via input pins.
virtual tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample)=0
Reads the next available sample from the associated sample stream.
virtual void SetName(const char *strName)=0
Sets the name of the streamer.
virtual tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)=0
Sets the initial stream type of a streamer.
std::set< ISampleReaderQueue * > m_oExternalQueues
A set of other registered buffer.
tResult Pop(IStreamItem &) override
Returns the next sample from the queue.
tResult Push(const IStreamItem &oStreamItem, tTimeStamp tsTime) override
Push a new value to the internal sample queue.
void Clear() override
clears the local queue of the reader either only EndStreaming or incoming event ISampleInStream::IQue...
The default Sample Reader will read the incomung Stream of a IInPin.
Definition: samplereader.h:949
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the latest available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult RequestSamples(ucom::ant::iobject_ptr< hollow::IStreamingRequest > &pRequest, uint32_t nSubStreamId, const base::ant::IProperties *pRequestProperties=nullptr)
RequestSamples of the given Substream to be generated and/or transmitted.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
void SetStreamErrorCallback(const std::function< tResult(tResult oStreamError)> &fnStreamErrorCallback)
A callback function to react on stream errors.
void SetAcceptTypeCallback(const std::function< tResult(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType)> &fnAcceptTypeCallback)
Sets a callback function which is called while a Stream Type is received - see also AcceptType and Is...
tResult BeginStreaming(ISampleStream &oSampleStream) override
Begin streaming on the given sample stream.
Reads and stores Samples within the given queue implementation INTERNAL_QUEUE.
A reader that does not read anything.
tResult BeginStreaming(ant::ISampleStream &pStream) override
Begin streaming on the given sample stream.
tResult GetLastSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the latest available sample from the associated sample stream.
tResult GetType(ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) const override
Returns the initial stream type of the streamer.
tResult GetLastType(ucom::ant::iobject_ptr< const ant::IStreamType > &pType) override
Returns the last stream type that was read from the sample stream.
void SetName(const char *strName) override
Sets the name of the streamer.
tResult GetNextSample(ucom::ant::iobject_ptr< const ant::ISample > &pSample) override
Reads the next available sample from the associated sample stream.
tResult GetName(base::ant::IString &&strName) override
Retrieves the name of the streamer.
tResult EndStreaming() override
End streaming.
tResult SetStreamError(tResult oError) override
Sets an error on the associated sample stream.
tResult SetStreamerPin(const ucom::ant::iobject_ptr< flash::IStreamerPin > &pStreamerPin) override
Sets the pin that the streamer is associated with.
tResult SetType(const ucom::ant::iobject_ptr< const ant::IStreamType > &pStreamType) override
Sets the initial stream type of a streamer.
~cNullReader() override
Destructor.
tResult ReadAllAvailableItems() override
Reads all available items from the sample stream into internal queues.
virtual tResult Reset(const iobject_ptr< T > &i_oOther)=0
Reset this object_ptr<> with the content of another iobject_ptr<>
Base object pointer to realize binary compatible reference counting in interface methods.
Object pointer implementation used for reference counting on objects of type IObject.
Definition: object_ptr.h:163
string_base< cStackString > cString
cString implementation for a stack string which works on stack if string is lower than A_UTILS_DEFAUL...
Definition: string.h:2784
tResult make_sample_reader(cSampleReader &oReader, const char *strNameOfReader, const ucom::iobject_ptr< const IStreamType > &pStreamType)
Initializes a cSampleReader with given name and StreamType param[in] oReader Reader to intialize para...
Definition: samplereader.h:914
const ISampleInStream & operator>>(const ISampleInStream &oStreamReader, IStreamItem &oItem)
Streaming Operator>> to read a sample from the readers queue.
sample_reader< ant::cDynamicSampleReaderQueue > cDynamicSampleReader
The cDynamicSampleReader will create a sample reader which will create a internal sample queue with u...
sample_reader< ant::time_limited_sample_reader_queue< TIME_RANGE >, STORELASTSAMPLE > time_limited_sample_reader
The time_limited_sample_reader will create a sample reader which will create a internal sample queue ...
size_limited_sample_reader< 1 > cSingleSampleReader
The cSingleSampleReader will create a sample reader which will create a internal sample queue with on...
sample_reader< ant::size_limited_sample_reader_queue< MAX_SIZE >, STORELASTSAMPLE > size_limited_sample_reader
The size_limited_sample_reader will create a sample reader which will create a internal sample queue ...
Namespace for entire ADTF SDK.
Copyright © Audi Electronics Venture GmbH.
#define ADTF_RUN_FUNCTION(_fcName_)
Helper Macro to define Run function for adtf::base::ant::runnable<>
Definition: runnable.h:196
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.
Copyright © Audi Electronics Venture GmbH.