All about the Filter Graph
| ADTF enables you to develop concrete (automotive) functionality and complex applications that transform, record and visualize data. To stay flexible, configurable and re-useable you will use within ADTF the ADTF Configuration Editor to create a Filter Graph. This Filter Graph is the major key point of ADTF - all about the Filter Graph.
The Filter Graph will give you the possibility to design an application with small processing parts (Filters) those fulfill a particular task. It is recommended to implement and design Filters to reuse them in more than one Filter Graph Configuration.
|
To clearify some terms in advance obtain following list:
Time | The term time is used in case of a continously raising value with clear defines time base. Time is not a value!
|
Timestamp | The term timestamp is used in case of describing a moment of Time. This is a static value with a static offset to its time base.
|
type tTimeStamp | The type tTimeStamp is usually a timestamp in microseconds (if not, the declaration of a use needs to mark this explicitly).
|
Data | Usually data are any kind of user defined signal, structs or memory.
|
Streaming Data | Streaming Data is a continous data flow which have a time relation on each data by a timestamp. |
Filter Graph
To create and setup a Filter Graph you have different building blocks and components. These components can be connected to each other.
Filter
|
| The Filters are processing units that will provide the functionality of:
- receiving data via so called In Pins (Input) and Sample Readers
- processing data, manipulating or calculating them
- containing control algorithms, kalman filters
- visualizing data
- sending processed data via so called Out Pins (Output) and Sample Writers
Interface API: streaming::IFilter
Base Class API: filter::cFilter
Delivered Binaries: Delivered Filters
Detailed Description: Filter
|
Named Graph Objects
Each component that can be added to the Filter Graph uses the basic interface INamedGraphObject.
Filter
Filters are small processing units.
For getting started on programming your own Filter see: My First Filter.
Some predefined utility Filters are part of the ADTF delivery, see a summary here.
The Filter is able to:
- receive data via In Pins and there corresponding Sample Reader
- send data via Out Pins and there corresponding Sample Writer
- processing data in the context of Runners (also called trigger contextes, runnables, runnable functions, callables)
Filters can be used to:
- decode streaming data from streaming sources like i.e. can raw data, most or flexray cycles
- pre-process incoming data and prepare it for a algorithm implementation
- re-calculate and merge incoming data by a fancy algorithm implementation
- implement a loop controller
- receive incoming data and show, log or display it in a human readable way
- ... and many things more.
You can setup a Filter with Properties to adjust its concrete behaviour or its initial state. A simple Filter State Machine is to create, initialize and start a Filter step-by-step.
The Filter itself is designed to separate the data transmissions and runtime behaviour from each other. This gives users of a specific Filter the flexibility of Trigger configuration while using the Filter in a specific Filter Graph and connecting the so called Runners. Users can decide while configuring and using if the Filter runs by
- a Data Trigger ... the functionality runs on an incoming data event
- or a Time Trigger ... the functionality runs on an incoming time event.
- Note
- Within the ADTF Streaming SDK a Filter Base class implementation does exist.
We recommend to use the convenience API in the ADTF Filter SDK.
Use the Filter SDK as follows:
Copyright © Audi Electronics Venture GmbH.
{
public:
cMyFilter()
{
}
};
#define ADTF_CLASS_ID_NAME(_class, _strcid, _strclabel)
Common macro to enable correct treatment of class identifier AND Class Name by IClassInfo.
Base class for ADTF filters.
Usually the Filter will be packed into a Plugin. This Plugin Mechanism of ADTF allows you to enrich the frameworks functionality and enables it to load i.e. Filters into the Runtime of ADTF. To create a Plugin for the above Filter cMyFilter
you must use following code:
#define ADTF_PLUGIN(__plugin_identifier,...)
The ADTF Plugin Macro will add the code of a adtf::ucom::ant::IPlugin implementation.
Out Pins and Sample Writers/Pin Writers
The data a Filter can create and write out are Samples. The formal description and classification of those Samples and its content is done by a Stream Type. Within the Filter you need to create Out Pins to identify a connection point to this filter, where data can be written out. Write Samples and set Stream Types onto a connected stream with the help of a Sample Writer (interface streaming::ISampleWriter).
Have a look at the docummentation of the CreateOutputPin function to add a Out Pin to the Filter:
{
public:
cMyFilter()
{
}
private:
};
WriterType * CreateOutputPin(const char *strName)
Creates a new output pin without an initial stream type.
Interface for sample writers that write to sample streams via output pins.
Runners and Triggers
Each Filter needs at least one function that reacts on Trigger events while the Filter is processing data in the running state. Within ADTF Filters these events are called Triggers. To "catch" a Trigger you should define a so called Runner. This Runner is an entry point to connect ...
The above Filter with one Out Pin and one Runner can be created by the following code:
#include <chrono>
#include <cinttypes>
{
public:
cMyFilter()
{
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
}
{
if (m_pRunner == pRunner)
{
LOG_INFO(
"Time of triggering %" PRIi64, tmTimeOfTriggering.nCount);
m_nCounter++;
}
else
{
}
return {};
}
private:
uint64_t m_nCounter = 0;
};
#define LOG_INFO(...)
Logs an info message.
#define RETURN_ERROR_DESC(_code,...)
Same as RETURN_ERROR(_error) using a printf like parameter list for detailed error description.
A common result class usable as return value throughout.
virtual tResult Process(base::flash::tNanoSeconds tmTrigger, streaming::ant::IRunner *pRunner)
The default Runner function of the graph object.
The Interface defines a runnable item of the GraphObjects providing a IRuntimeBehaviour.
Wrapper class that facilitates the handling of output samples.
A timestamp with nanosecond precision.
Sample
Data are packed into so called Samples. These Samples are containers for any kind of data and refer to streaming data as well as some important meta information.
Interface API: streaming::ISample
Samples have:
- Note
- There is no classifier on the Sample that reveals what kind of data is being sent. A classification is done via Stream Type being also part of the stream.
Examples how to create and write Samples
Samples are memory buffers. You should always copy your data into these sample buffers.
The following examples of writing data to a sample are recommended:
- Use of output_sample_data<T> template
m_nCounter = 124;
*m_pWriter << output_sample_data<uint64_t>(tmTimeOfTriggering, m_nCounter);
Namespace for the ADTF Streaming SDK.
- Use of output_sample_data<T> template and set the Sample Time
m_nCounter = 124;
output_sample_data<uint64_t> oSampleData(tmTimeOfTriggering, m_nCounter);
*m_pWriter << oSampleData.Release();
- Set a memory buffer (and so copy the content) to the sample
m_nCounter = 124;
object_ptr<flash::ISample> pSample;
RETURN_IF_FAILED(pSample->Set(tmTimeOfTriggering, &m_nCounter,
sizeof(m_nCounter)));
*m_pWriter << ucom_object_ptr_cast<ISample>(pSample);
#define RETURN_IF_FAILED(s)
Return if expression is failed, which requires the calling function's return type to be tResult.
tResult alloc_sample(ucom::ant::iobject_ptr< ucom::ant::IObject > &pSampleObject, const char *strSampleCID)
Helper Function to get a Sample Instance through the adtf::ucom::ant::IRuntime.
Namespace for the ADTF uCOM3 SDK.
- Write the content of a memory buffer to the samples buffer
m_nCounter = 124;
object_ptr<ISample> pSample;
{
object_ptr_locked<ISampleBuffer> oBuffer;
}
*m_pWriter << pSample;
Template class implementation for the ant::IRawMemory interface (see Supported types for adtf_memory<...
- Write a string content to the samples buffer
std::string strWriteToSampleString("This is a cString I want to write!");
object_ptr<ISample> pSample;
{
object_ptr_locked<ISampleBuffer> oBuffer;
}
*m_pWriter << pSample;
- Type based writing to a Pin Writer
cMyFilterTypeBased()
{
m_pWriter = CreateOutputPin<adtf::filter::pin_writer<uint64_t>>("OutPin_1");
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
}
{
return {};
}
private:
adtf::filter::pin_writer<uint64_t>* m_pWriter = nullptr;
uint64_t m_nCounter = 0;
Additional Sample Information
There is a possibility to add additional information to a sample. Visit the ISample::GetSampleInfo method for getting to know how to add these additional values to the samples instance:
- Set an additional Sample Info value
double m_fValueToWrite;
m_fValueToWrite = 124;
output_sample_data<double> oSampleData(tmTimeOfTriggering, m_fValueToWrite);
*m_pWriter << oSampleData.Release();
static constexpr tHashKey SAI_Counter
A Counter Hash Key to set as additional sample information.
- Get an additional Sample Info value
adtf::util::cVariant get_sample_info(const ISampleInfo &oSampleInfo, const ISampleInfo::tHashKey &oHash, const adtf::util::cVariant oDefault)
Retrieves a variant value for the hash key oHash out of the sample information oSampleInfo .
Referenced Sample
There are possibilities to reference data instead of copying it, but usually the Filter receiving samples will queue them. For that, you have to make sure, the referenced data lives as long as the Sample lives within the Filter Graph. If you really need to use that have a look at cReferenceSample.
- Note
- So please keep in mind: If you are using referenced sample buffers, your whole application must be able to deal with that!
Usually this does not work for Filters you did not program by yourself.
Stream Type
A Stream Type has:
- a Stream Meta Type name which can be i.e. "adtf/video", "adtf/default", "adtf/audio" or "adtf/plaintype".
- Properties to describe the content of Samples.
A Stream Meta Type defines which properties are set within the Stream Type.
- For a "adtf/video" this will be i.e. "width", "height", "bitsperpixel", "pixelformat".
- For a "adtf/default" this will be a structural description of structral data with "md_struct" and "md_definitions"
- see also the Overview of writing and reading samples
See Chapter: Stream Meta Types for more information on existing Stream Types.
In Pins and Sample Readers/Pin Readers
The incoming Samples can be read via In Pins and Sample Readers. In Pins identify a connection point to the Filter. The incoming Stream contains Samples and Stream Types. Sample Readers (interface streaming::ISampleReader) are queues to buffer the incoming data.
Usually you will create one Reader per In Pin or one In Pin per Reader. The simplest way to create an In Pin is to call the CreateInputPin function within the CTOR.
- Example to create a In Pin in Filters CTOR
#include <chrono>
#include <cinttypes>
{
public:
cMyFilter()
{
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms), true);
}
{
if (m_pRunner == pRunner)
{
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
auto nTime = oSampleCounter.GetTime();
LOG_INFO(
"Received data on \"InPin_1\": %" PRIu64
" at time %" PRIi64, *oSampleCounter, nTime);
}
}
else
{
}
return {};
}
private:
};
#define IS_OK(s)
Check if result is OK.
ReaderType * CreateInputPin(const char *strName, bool bDataInTrigger=true, bool bForwardTriggerViaOutputPins=true)
Creates a new input pin with an anonymous stream type.
Interface for sample reads that read from sample streams via input pins.
Easy data access for input samples of non trivial type @T (see Supported types for adtf_memory<T> for...
Object pointer implementation used for reference counting on objects of type IObject.
Examples for Reading Samples
The following examples of reading data from Samples are recommended:
- Reading Samples with GetNextSample
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
auto nTime = oSampleCounter.GetTime();
LOG_INFO(
"Received data on \"InPin_1\": %" PRIu64
" at time %" PRIi64, *oSampleCounter, nTime);
}
- Reading Samples with streaming operator
bool bEndOfStream = false;
while (!bEndOfStream)
{
*m_pReader1 >> pSample;
if (pSample)
{
auto nTime = oSampleCounter.GetTime();
LOG_INFO(
"Received data on \"InPin_1\": %" PRIu64
" at time %" PRIi64, *oSampleCounter, nTime);
}
else
{
bEndOfStream = true;
}
}
- Access Sample Buffer memory with static_cast
object_ptr<const ISample> pSample;
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
object_ptr_shared_locked<const ISampleBuffer> oBuffer;
auto nTime = pSample->GetTime();
auto pValue = static_cast<const uint64_t*>(oBuffer->GetPtr());
LOG_INFO(
"Received data on \"InPin_1\": %" PRIu64
" at time %" PRIi64, *pValue, nTime);
}
- Read Sample Buffers content with adtf_memory for trivial types
object_ptr<const ISample> pSample;
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
object_ptr_shared_locked<const ISampleBuffer> oBuffer;
auto nTime = pSample->GetTime();
uint64_t nValue = {};
LOG_INFO(
"Received data on \"InPin_1\": %" PRIu64
" at time %" PRIi64, nValue, nTime);
}
Namespace for the ADTF Base SDK.
- Read Sample Buffers content with adtf_memory for string types
object_ptr<const ISample> pSample;
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
object_ptr_shared_locked<const ISampleBuffer> oBuffer;
auto nTime = pSample->GetTime();
std::string strValue = {};
LOG_INFO(
"Received data on \"InPin_1\": %s at time %" PRIi64, strValue.c_str(), nTime);
}
Reading Stream Types
All examples above are using untyped data (anonymous data)! The Filter implementation is receiving data while the code expects always some tUInt64
values. To make sure the Filter will only accept expected sample content, the data must be classified and stream typed by a Stream Type. The Stream Type itself is part of the stream. While receiving, you will always get a Stream Type first, then Samples and a Stream Type again if it changes. For more detailed information see also Stream Type and Stream Meta Type.
Reading Samples by Data Trigger
The examples above used only Runners which should be connected within the Filter Graph with an active Timer Runner (see Active Runner). But we can also use a Data Trigger in our code, so we will be called after an incoming data event on a specified pin.
#include <cinttypes>
{
public:
cMyFilter()
{
auto pCreatedReader1 = CreateInputPin("InPin_1",
stream_type_plain<uint64_t>(),
false,
false);
pCreatedReader1->SetAcceptTypeCallback(
{
return is_compatible(pStreamIncomingType, stream_type_plain<uint64_t>());
});
m_pReader1 = pCreatedReader1;
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint("InPin_1"), true);
}
{
if (m_pRunner == pRunner)
{
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
auto nTime = oSampleCounter.GetTime();
LOG_INFO(
"Received data on \"InPin_1\": %" PRIu64
" at time %" PRIi64, *oSampleCounter, nTime);
}
}
else
{
}
return {};
}
private:
};
Base object pointer to realize binary compatible reference counting in interface methods.
adtf::streaming::ant::cDynamicSampleReader cPinReader
use cSampleReader as cPinReader
Namespace for the ADTF Filter SDK.
tResult is_compatible(const ant::IStreamType &oCheckedType, const ant::IStreamType &oExpectedType)
Checks whether oCheckedType is compatible with oExpectedType.
As you might see in the code example: There is no change on the Process()
implementation. This is one of the ADTF 3 benefits! It might be easy to configure Data or Time-Triggered behaviour within a very late stage.
- Data Trigger with
ProcessInput
- For your convienience you are able to forward each Data Trigger to a single method, the
ProcessInput
method. Also one single AcceptType
is possible.
{
public:
cMyFilter()
{
true,
true);
}
{
if (pReader == m_pReader1)
{
}
else if (pReader == m_pReader2)
{
}
return {};
};
{
if (pReader == m_pReader1)
{
}
else if (pReader == m_pReader2)
{
}
return {};
}
private:
};
virtual tResult AcceptType(streaming::flash::ISampleReader *pReader, const ucom::ant::iobject_ptr< const streaming::ant::IStreamType > &pType)
Called whenever a new type is read from a reader that has no other accept type callback (streaming::a...
virtual tResult ProcessInput(base::flash::tNanoSeconds tmTrigger, streaming::flash::ISampleReader *pReader)
Provides access to the reader of incoming data.
Generator template to create an instance of a ant::IStreamType class for penguin::stream_meta_type_pl...
The good thing about this single point of processing is:
- It is easy to use.
- It is already synchronized
Each ProcessInput
and each AcceptType
locks the Filter, no parallel call within the Filter, no race conditions can appear.
But you should also consider CreateInputPinWithCallback before starting to implement huge if-else-if
or switch-case
commands.
Properties
Properties are the possibility to adjust some capabilities of the Filter without re-compiling. While using it within the Filter Graph you can adjust its behaviour.
- Example for a property
{
public:
ADTF_CLASS_ID_NAME(cMyFilter,
"my_constant_data_generator.filter.example.cid",
"My Constant Data Generator");
cMyFilter()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
RegisterPropertyVariable("constant", m_nConstantProperty);
}
{
uint32_t nCurrentValueToWrite = m_nConstantProperty;
}
private:
adtf::filter::pin_writer<uint32_t>* m_pWriter = nullptr;
};
#define RETURN_NOERROR
Return status ERR_NOERROR, which requires the calling function's return type to be tResult.
Property Variable template for the given T.
For a full description see ADTF Base SDK.
Filter State Machine
The following state machine is provided by the Filter Base and implementation.
Streaming Service
Streaming Services - Sources and Sinks - are components to define system entry points and system exit points. Usually these implementations are the first or last item of a Data Pipe or Trigger Pipe.
- Note
- Please note that as of ADTF 3.8 and later, we are no longer limiting the use of Streaming Services to Streaming Graphs, but you can use these components in a Filter Graph as well. You are encouraged to use Filter Graphs in favour of Streaming Graphs as they allow for greater flexibility and reusability.
The main difference between Streaming Services and Filters are:
Streaming Services are allowed to create threads and timers internally without a Runner! This is, because they usually deal with devices and real hardware.
We recommend to use system::kernel_thread or system::kernel_timer for thread and timer handling!
Streaming Source
The Streaming Source is a system entry point for Samples and Data Triggers coming i.e. from a hardware. Usually you will implement any device link as Streaming Source.
Streaming Sources can be used to
- read streaming data like i.e. video streams from cameras, CAN messages from CAN-Bus devices
- harddisk reader to provide filebased simulation data
- network or interprocess connection to receive samples from a distributed system like ROS (Robotic operating system) or FEP (Functional Engineering Platform).
- Coding Examples:
- Use the CMAKE macro
adtf_add_streaming_service
to create a plugin containing a Streaming Source.
- Following requirements exists for Streaming Sources:
- Only Out Pins are allowed!
- Sources are allowed to create threads and timers, if it is not possible to use a Runner
- Make sure to create Data Triggers (streaming::trigger, ManualTrigger) after finishing data transmission.
Streaming Sink
The Streaming Sink is an ADTF system exit point for Samples and Triggers. Usually you will implement any device link as a Streaming Sink.
Streaming Sinks can be used to
- write streaming data like raw CAN messages to a CAN-Bus device
- write FlexRay cycles or Automotive Ethernet PDUs to devices
- create a harddisk access for filebased data logging and high performance recording
- Coding Examples:
- Use the CMAKE macro
adtf_add_streaming_service
to create a plugin containing a Streaming Sink.
- Following requirements exist for Streaming Sinks:
- Only In Pins are allowed
- Sinks are allowed to create own threads and timers, if it is not possible to use a Runner.
- Do not block an incoming Trigger within your implementation. As long as you block incoming Triggers you block the Trigger Pipe.
- You are advised to describe your runtime behaviour within your documentation.
Streaming Service State Machine (within Filter Graph)
Data Pipe
The Data Pipe within a Filter Graph is part of the connection between Sample Writers and Sample Readers. Within ADTF one common Data Pipe starts at the Sample Writer and its Out Pin of a sending Filter, over one Sample Stream, to one or more In Pins and their corresponding Sample Readers.
Substreams
Using substreams is one possibility to reduce Pin and Connection appearance within the Filter Graph.
See Chapter: Substreams for more details and how to use them.
Trigger Pipe
The Trigger Pipe within ADTF is a connection based Trigger Path through the Filter Graph. Usually it starts with an Active Runner that triggers the connected items like a Runner of a Filter.
The above example of a Trigger Pipe shows the Active Timer Runner as a Trigger Pipe Source. Trigger Pipe Sources are the very beginning of a Trigger Pipe. Each connected item can be discovered with the help of the ITriggerPipeSource interface. The connected items of a the Trigger Pipe are called: Trigger Pipe Item. They are also sources for the further items in the Trigger Pipe. At the end, a collection of Trigger Pipes in your Filter Graph will show you a callgraph and the critical paths.
- Note
- The Trigger event as seen in the picture above is not really an object, it is a Run(RUN_TRIGGER) call on Trigger Pipe Items.
Following components can be Trigger Pipe Sources only:
Following components can be Trigger Pipe Items:
Sample Stream
ADTF provides a default implementation of the ISampleStream. A Sample Stream is a mediator between Pins and forward Samples and Data Triggers.
There are different operation modes intented within current Streaming API design, but only two modes are realized yet. The delivered default Sample Stream can be used within the Filter Graph by operating in Push mode.
PushRead Mode - Synchronous Data Pipes
With the Streaming API the ISampleStreamAccess::PushRead was defined.
While writing data onto a Stream the Sample Stream will have different states:
Sample Stream - write()
After writing data onto a Sample Stream the Samples will be queued within a writers queue. There is no Trigger event yet emitted!
- Note
- Keep in mind: At least one Stream Type will be written onto the stream before any kind of data is transmitted.
Sample Stream - flush()
After flushing the Sample Writer the Samples will be queued in all Sample Reader queues. There is only a temporary reader queue within the Sample Stream to lock the content a short time.
Flushing the writer queue will temporarly forward the samples to Reader Queues.
Still within the flush() call, the Samples (and Stream Types) will be read into the Sample Reader queue of the Sample Reader 1 by a Push event. A Sample Reader using the PushRead mode MUST read the Samples immediatelly while the Push event appears, otherwise the Samples are lost for this Reader.
The Samples (and Stream Types) will be read into the Sample Reader queue of the Sample Reader 2 by a Push event. This is now the end state of the queues after flushing.
It is possible to use many different kinds of Sample Readers/Pin Readers: Look at ADTF Filter SDK! If using the Filter implementation you do not need to call that flush separately!
Sample Stream - trigger()
The trigger call is to create a Data Trigger on the Triger Pipe. Following call sequence:
- OutPin_1->Run(RUN_TRIGGER)
- Sample_Stream->Run(RUN_TRIGGER)
- InPin_1->Run(RUN_TRIGGER)
- InPin_2->Run(RUN_TRIGGER)
Asynchronous Mode - Asynchronous Data Pipes
- Note
- This mode is not yet implemented in ADTF 3
But so far: there is no Push event. The reader queues within the Sample Stream (not within the Sample Readers) are used and each Sample Reader directly reads from there.
Push Mode - Queue Free Data Pipes
- Note
- detailed description in progress
Active Runner
Active Runners are the single self-active objects of the Filter Graph. They are the very beginngin of a Trigger Pipe.
By default ADTF will deliver 2 different kinds of Active Runner you may use in you Filter Graph:
- A Timer Runner will periodically activate a Trigger Pipe with timing constraints. It will send a Time Trigger. (see also Timer Runner)
- A Thread Runner will cyclically activate a Trigger Pipe with NO timing constraints (or only single shot). It will send a Thread Trigger. (see also Thread Runner)
See also Delivered Active Runners for base components and Qt5 Key Event Runner Plugin as an example to implement own Active Runner.
Inner Trigger Pipe of a Filter
Usually, a Filter works in a context of a RUN_TRIGGER call that is catched by a Runner. This Runner is part of the configured Trigger Pipe within the Filter Graph.
To describe the internal runtime behaviour of the Filter, you should use Inner Pipes. Inner Pipes are inside the Filter and describe how the Filter will forward the context of a Trigger to its Out Pins. Otherwise the Filter is the end point of a Trigger Pipe and might be the beginning of another, that is "hidden".
Hidden Trigger Pipes are trigger paths, that are not discoverable that they belong to another trigger path. They will appear, if you create ManualTriggers within a Filter implementation.
The ADTF Filter SDK will help you to create valid Filters with well formed Inner Pipes. Have a look at the following code examples, what will happen to the Inner Pipe of that particular Filter or Streaming Service.
Example 1: Simple Inner Pipe
cMyFilter1()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms));
}
{
uint32_t nSampleValue = 123;
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
| - Trigger Call Sequence
- (1) incoming RUN_TRIGGER call on "1" will call and wait for "Runner_1"
- (2) "Runner_1" will call and wait for
Process()
- (3) after (2) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (4) only after (4) returned, (3) will return, (1) will return
|
Example 2: No Inner Pipe to the Outputs
cMyFilter2()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms), false);
}
{
uint32_t nSampleValue = 123;
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
| - Trigger Call Sequence
- incoming Trigger on (1) will call and wait on "Runner_1"
- "Runner_1" will call and wait on (2)
Process()
- after (2) returned (1) returns
|
Example 3: Inner Pipe and second hidden Trigger Pipe
cMyFilter3()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
using namespace std::chrono_literals;
m_pRunner = CreateRunner("Runner_1", cTimerTriggerHint(100ms), false);
}
{
uint32_t nSampleValue = 123;
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
WRITER_TYPE & flush(WRITER_TYPE &oWriter)
Global function template to flush a writer.
WRITER_TYPE & trigger(WRITER_TYPE &oWriter)
Global function template to trigger a writers sample stream manually.
| - Trigger Call Sequence
- (A1) incoming Trigger will call and wait for "Runner_1"
- (A2) "Runner_1" will call and wait for
Process()
|
Example 4: Inner Pipe with Data Trigger
cMyFilter4()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), false, true);
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint("InPin_1"), true);
}
{
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
uint32_t nSampleValue = sample_data<uint32_t>(pSample);
}
}
base::flash::tNanoSeconds get_sample_time(const ucom::ant::iobject_ptr< const ant::ISample > &pSample)
Returns the sample time stamp with nanosecond precision.
| - Trigger Call Sequence
- (1) incoming Data Trigger on "InPin_1" will call and wait for "Runner_1" on (2)
- (3) "Runner_1" will call and wait for
Process()
- (4) after (3) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (5) only after (5) returned, (4) will return, (2) will return and last (1) will return
|
Example 5: Inner Pipe with only 1 Data Trigger catched
cMyFilter5()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), false, true);
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<uint32_t>(), false, false);
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint("InPin_1"), true);
}
{
uint32_t nSampleValue = sample_data<uint32_t>(oLastSample1) + sample_data<uint32_t>(oLastSample2);
m_pWriter->Write(tmTimeOfTriggering, nSampleValue);
}
| - Trigger Call Sequence
- (A1) incoming Data Trigger on "InPin_1" will call and wait for "Runner_1" on (A2)
- (A3) "Runner_1" will call and wait for
Process()
- (A4) after (A3) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
- (B1) The second incoming Trigger Pipe immediatelly returns, the Pipe ends here!
|
Example 6: Inner Pipe using ProcessInput
cMyFilter6()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), true, true);
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<uint32_t>(), false, false);
}
{
{
uint32_t nSampleValue = sample_data<uint32_t>(pSample) + sample_data<uint32_t>(oLastSample2);
}
else
{
}
}
#define RETURN_ERROR(code)
Return specific error code, which requires the calling function's return type to be tResult.
virtual T * Get() const =0
Get raw pointer to shared object.
| - Trigger Call Sequence
- This is exactly the same like Example 5: Inner Pipe with only 1 Data Trigger catched but the Process is additionally forwarded to a ProcessInput() call.
- (A1) incoming Data Trigger on "InPin_1" will call and wait for "InPin_1_trigger" on (A2)
- (A3) "InPin_1_trigger" will call and wait for
Process()
- (A4) after (A3) returned "Runner_1" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
- (B1) The second incoming Trigger Pipe immediatelly returns, the Pipe ends here!
|
Example 7: Inner Pipe using ProcessInput and 2 Data Triggers
cMyFilter7()
{
m_pWriter = CreateOutputPin<pin_writer<uint32_t>>("OutPin_1", stream_type_plain<uint32_t>());
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<uint32_t>(), true, true);
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<uint32_t>(), true, true);
}
{
{
if (pReader == m_pReader1)
{
uint32_t nSampleValue = sample_data<uint32_t>(pSample) + sample_data<uint32_t>(oLastOtherSample);
}
else if (pReader == m_pReader2)
{
uint32_t nSampleValue = sample_data<uint32_t>(pSample) + sample_data<uint32_t>(oLastOtherSample);
}
else
{
}
}
else
{
}
}
| - Trigger Call Sequence on A
- (A1) incoming Data Trigger on "InPin_1" will call and wait for "InPin_1_trigger" on (A2)
- (A3) "InPin_1_trigger" will call and wait for
Process() and this the ProcessInput()
- As long as (B3) is currently working, this pipe is blocked
- (A4) after (A3) returned "InPin_1_trigger" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
- Trigger Call Sequence on B
- (B1) incoming Data Trigger on "InPin_2" will call and wait for "InPin_2_trigger" on (B2)
- (B3) "InPin_2_trigger" will call and wait for
Process() and this the ProcessInput()
- As long as (A3) is currently working, this pipe is blocked
- (B4) after (B3) returned "InPin_2_trigger" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (B5) only after (B5) returned, (B4) will return, (B2) will return and last (B1) will return
|
Example 8: Inner Pipe with one Runner and 2 Data Triggers
cMyFilter8()
{
m_pWriter = CreateOutputPin<pin_writer<tUInt32>>("OutPin_1", stream_type_plain<tUInt32>());
m_pReader1 = CreateInputPin("InPin_1", stream_type_plain<tUInt32>(), false, false);
m_pReader2 = CreateInputPin("InPin_2", stream_type_plain<tUInt32>(), false, false);
m_pRunner = CreateRunner("Runner_1", cDataTriggerHint({"InPin_1", "InPin_2"}), true);
}
{
while (
IS_OK(m_pReader1->GetNextSample(pSample)))
{
tUInt32 nSampleValue = sample_data<tUInt32>(pSample);
}
while (
IS_OK(m_pReader2->GetNextSample(pSample)))
{
tUInt32 nSampleValue = sample_data<tUInt32>(pSample);
}
}
uint32_t tUInt32
type definition for unsigned integer values (32bit) (platform and compiler independent type).
| - Trigger Call Sequence on A
- (A1) incoming Data Trigger on "InPin_1" will call and wait for "Runner_1" (A2) (without a "InPin_1_trigger" call in between)
- (A3) "Runner_1" will call and wait for
Process() to return
- By using
CreateRunner() the filters Process() call is automatically synchronized against (B3) with a mutex
- After (A3) returned (A4) and (A5) can be performed.
- (A5) only after (A5) returned, (A4) will return, (A2) will return and last (A1) will return
- Trigger Call Sequence on B
- (B1) incoming Data Trigger on "InPin_2" will also directly call and wait on "Runner_1" on (B2)
- (B3) "Runner_1" will call and wait for
Process()
- (B3) is synchronized and protected against parallel (A3) call.
- (B4) is called after (B3) returned from "Process()" will call and wait for a RUN_TRIGGER call at "OutPin_1"
- (B5) only after (B5) returned, (B4) will return, (B2) will return and last (B1) will return
|
ManualTrigger call
It is possible to call ManualTrigger after writing Samples and Stream Types onto the Sample Writer. But you should prevent that.
The reasons for this is:
- Each ManualTrigger call might be the beginning of a new Trigger Pipe within the Filter Graph!
- Each ManualTrigger may double or at least raise the amount of Trigger calls in the system, if there are already Inner Pipes configured!
But the ManualTrigger must be called by a Streaming Source that is mostly the beginning of Trigger Pipe with Data Triggers.
The design purpose for a good Filter and so a good Filter Graph is, to prevent the ManualTrigger call. Only Filters with a well formed Inner Pipe are reconfigurable in their runtime behaviour.
Client/Server Connections
- The Filters interface binding items are BindingObjects.
- A Filter can define some common functionality which must be used by others through a interface agreement.
- To publish such an interface you can define a IBindingServer
- To subscribe and use such an interface you can define a IBindingClient
While interconnecting these items the client can obtain the servers interface.
- Note
- Do not use interface binding to add samples to the Data Pipe or to trigger the Runner Pipe. Otherwise you could have undefined behaviour!
Binding Proxy
The Binding Proxy will connect formal ADTF Client/Server interfaces between Filters. It always acts as mediator between these clients and servers.
Have a look at the Demo Interface Binding Plugin example, to see how to program and use it.
Sub Graph
Sub Graphs are Filter Graph instances that are added to another Filter Graph.
- Note
- These Sub Graphs are treated like Filters!
On changing Filter Graphs state the Sub Graphs (subgraph) state will be set before all other objects as one specific characteristic. See States of the Filter Graph for more details.
Filter Graph Ports
Since Filter Graphs can be added as a Sub Graph the Filter Graph must clearly define its data binding, interface binding and runtime behaviour to the Filter Graph it is embedded to.
- The Filter Graphs data access points are Data In Ports and Data Out Ports.
- Data Out Ports are treated similar to Out Pins of the Sub Graph to define outgoing streaming data.
- Data In Port are treated similar to In Pins of the Sub Graph to define incoming streaming data.
- The Filter Graphs runtime behaviour can be configured with the help of Runner Ports.
- Runner Ports are treated similar to Runner and will forward the Trigger Pipe
- The Filter Graphs interface binding items are Client Ports and Server Ports.
- To forward a Server Object you can add a Server Port.
- To forward an Client Object you can add a Client Port.
States of the Filter Graph
The Filter Graph tates can be set by the same interface function like the Filter, the SetState(tFilterGraphState) method.
Usually the Filter Graph is managed together with the Streaming Graph by the Session Manager. It lives only while Runlevel RL_FilterGraph and RL_Running. Have a look at Chapter: Initialization Order to understand the states of the Filter Graph.
Streaming Graph as a Top-Level Graph
- Note
- Please note that as of ADTF 3.8 and later, we are no longer advocating the use of Streaming Graphs but encourage you to use a Filter Graph instead. They allow for greater flexibility and reusability and can now provide all the required functionality (Streaming Services, etc.).
The ADTF Streaming Graph is a Top-Level Graph to create well defined ADTF system borders and interconnections with the "outside" world. You may add
- Streaming Sources where samples are produced and data is read from devices or process borders
- Streaming Sinks where samples and data are sent to devices or process borders
- Exactly one Filter Graph (IFilterGraph) where samples are manipulated, functions are calculated and data is transformed.
- Following Named Graph Objects are accepted:
-
If a Streaming Service is added it will immediatelly set to adtf::streaming::ant::IStreamingService::tStreamingState" "State_Constructed".
A added Filter Graph will be set to IFilterGraph::tFilterGraphState::State_Constructed.
Stream Port - to embed a Filter Graph
The Streaming Graph is able to automatically connect a Filter Graph which is embedded. To do so a Filter Graph has to define Sample Streaming Ports. These Sample Streaming Ports are data access points of the Filter Graph. Stream Ports define a Port ID (IStreamingPort::GetPortID). The Port ID will be used to connect to the defined Stream Port of the Streaming Graph with the same name.
- Note
- Example: If you create a Stream Port with the name "VIDEO" within the Stream Graph and if you define a Sample Streaming Ports in the Filter Graph to embed, there will be an automatic connection between them.
-
It is possible to add more than one Sample Streaming Port with the same Port ID. If you add 3 Sample Streaming Ports you will get 3 connections!
Sample Streaming Ports in the Filter Graph
- The Filter Graphs streaming access points are Sample Streaming Ports.
-
Init Sequence
The Streaming Graph initializes in a well defined order from "left" to "right". This will offer the opertunity to discover connections from left to the right.
For more detail see Reaching Runlevel Streaming Graph and Reaching Runlevel Filter Graph.
Start Streaming Sequence
The Streaming Graph starts streaming in the order from "right" to "left". This will prevent any processing of Samples while a Filter or Streaming Sink is not yet in running state. This means: Streaming Sources comes always last.
For more detail see Reaching Runlevel Running.
Connections within the Filter Graph
To build up a graph Synchronous connections are part of a Trigger Pipe. Connections have a category left
, middle
or right
and some constrainst which item is connectable to whom. Consider the following table:
Source | Dest | Sync / Async |
Client Port (IFilterGraphInterfaceClientPort) | Filter Binding Client (IBindingClient) | does not matter |
Data InPort (IFilterGraphDataInPort) | Sample Stream (ISampleStream) | always sync |
Sample Stream InPort (ISampleStreamInPort) | Sample Stream (ISampleStream) | always sync |
Sample Stream InPort (ISampleStreamInPort) | Filter InPin (IInPin) | sync or async |
Filter Binding Server (IBindingServer) | Binding Proxy (IBindingProxy) | does not matter |
Filter OutPin (IOutPin) | Sample Stream (ISampleStream) | sync or async |
Filter OutPin (IOutPin) | Sample Stream OutPort (ISampleStreamOutPort) | sync or async |
Filter InPin (IInPin) | Filter Runner (IRunner) | sync (defines a DataTrigger) |
Filter Runner (IRunner) | Filter OutPin (IOutPin) | sync (defines a internal DataTrigger on the OutPin) |
Active Runner (IActiveRunner) | Filter Runner (IRunner) | always sync (defines the TimeTrigger or ThreadTrigger) |
Active Runner (IActiveRunner) | Sample Stream (ISampleStream) | always sync |
Filter Runner (IRunner) | Filter Runner (IRunner) | sync (forwards the Trigger) |
Sample Stream (ISampleStream) | Filter InPin (IInPin) | sync or async |
Sample Stream (ISampleStream) | Sample Stream (ISampleStream) | sync or async |
Sample Stream (ISampleStream) | Sample Stream OutPort (ISampleStreamOutPort) | sync or async |
Sample Stream (ISampleStream) | Data OutPort (IFilterGraphDataOutPort) | always sync |
Binding Proxy (IBindingProxy) | Filter BindingClient (IBindingClient) | does not matter |
Binding Proxy (IBindingProxy) | Server Port (IFilterGraphInterfaceServerPort) | does not matter |
Runner Port (IFilterGraphRunnerPort) | Filter Runner (IRunner) | always sync |
- Note
- Note that the Substream Selector as special adaption of the Sample Stream also implements an
ISampleStream
and can be so the replacement of any Sample Stream occurence within the overview.
Additional Information
Dependency resolving
If a new Filter, Streaming Source or Streaming Sink should support automatic dependency resolving, while building sessions within the ADTF Configuration Editor, follow the instructions for Setup dependencies between components.