3.5.2. App Functions SDK (Golang) - Beta¶
Welcome the App Functions SDK for EdgeX. This sdk is meant to provide all the plumbing necessary for developers to get started in processing/transforming/exporting data out of EdgeX.
3.5.3. Table of contents¶
- Getting Started
- Triggers
- Context API
- Built-In Functions
- Configuration
- Error Handling
- Advanced Topics
3.5.3.1. Getting Started¶
3.5.3.1.1. Build Prerequisites¶
Please see the edgex-go README.
3.5.3.1.2. The SDK¶
The SDK is built around the idea of a “Functions Pipeline”. A functions
pipeline is a collection of various functions that process the data in
the order that you’ve specified. The functions pipeline is executed by
the specified trigger in the configuration.toml
.
The first function in the pipeline is called with the event that
triggered the pipeline (ex. events.Model
). Each successive call in
the pipeline is called with the return result of the previous function.
Let’s take a look at a simple example that creates a pipeline to filter
particular device ids and subsequently transform the data to XML:
package main
import (
"fmt"
"github.com/edgexfoundry/app-functions-sdk-go/appsdk"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/transforms"
"os"
)
func main() {
// 1) First thing to do is to create an instance of the EdgeX SDK, giving it a service key
edgexSdk := &appsdk.AppFunctionsSDK{
ServiceKey: "SimpleFilterXMLApp", // Key used by Registry (Aka Consul)
}
// 2) Next, we need to initialize the SDK
if err := edgexSdk.Initialize(); err != nil {
edgexSdk.LoggingClient.Error(fmt.Sprintf("SDK initialization failed: %v\n", err))
os.Exit(-1)
}
// 3) Since our FilterByDeviceName Function requires the list of Device Names we would
// like to search for, we'll go ahead and define that now.
deviceNames := []string{"Random-Float-Device"}
// 4) This is our pipeline configuration, the collection of functions to
// execute every time an event is triggered.
if err := edgexSdk.SetFunctionsPipeline(
transforms.NewFilter(deviceNames).FilterByDeviceName,
transforms.NewConversion().TransformToXML,
); err != nil {
edgexSdk.LoggingClient.Error(fmt.Sprintf("SDK SetPipeline failed: %v\n", err))
os.Exit(-1)
}
// 5) shows how to access the application's specific configuration settings.
appSettings := edgexSdk.ApplicationSettings()
if appSettings != nil {
appName, ok := appSettings["ApplicationName"]
if ok {
edgexSdk.LoggingClient.Info(fmt.Sprintf("%s now running...", appName))
} else {
edgexSdk.LoggingClient.Error("ApplicationName application setting not found")
os.Exit(-1)
}
} else {
edgexSdk.LoggingClient.Error("No application settings found")
os.Exit(-1)
}
// 6) Lastly, we'll go ahead and tell the SDK to "start" and begin listening for events to trigger the pipeline.
edgexSdk.MakeItRun()
}
The above example is meant to merely demonstrate the structure of your application. Notice that the output of the last function is not available anywhere inside this application. You must provide a function in order to work with the data from the previous function. Let’s go ahead and add the following function that prints the output to the console.
func printXMLToConsole(edgexcontext *appcontext.Context, params ...interface{}) (bool,interface{}) {
if len(params) < 1 {
// We didn't receive a result
return false, errors.New("No Data Received")
}
println(params[0].(string))
return true, nil
}
After placing the above function in your code, the next step is to modify the pipeline to call this function:
edgexSdk.SetFunctionsPipeline(
transforms.NewFilter(deviceNames).FilterByDeviceName,
transforms.NewConversion().TransformToXML,
printXMLToConsole //notice this is not a function call, but simply a function pointer.
)
After making the above modifications, you should now see data printing
out to the console in XML when an event is triggered. > You can find
this example in the /examples
directory located in this repository.
You can also use the provided `EdgeX Applications Function
SDK.postman_collection.json” file to load into postman to trigger the
sample pipeline.
Up until this point, the pipeline has been triggered by
an event over HTTP and the data at the end of that pipeline lands in the
last function specified. In the example, data ends up printed to the
console. Perhaps we’d like to send the data back to where it came from.
In the case of an HTTP trigger, this would be the HTTP response. In the
case of a message bus, this could be a new topic to send the data back
to for other applications that wish to receive it. To do this, simply
call edgexcontext.Complete([]byte outputData)
passing in the data
you wish to “respond” with. In the above printXMLToConsole(...)
function, replace println(params[0].(string))
with
edgexcontext.Complete([]byte(params[0].(string)))
. You should now
see the response in your postman window when testing the pipeline.
3.5.3.2. Examples¶
The App Service
Examples
repo contains a variety of simple to advanced example Application
Services built upon the App Functions SDK. Examples that once were
in the examples folder of the SDK have been moved to this Examples
repo.
3.5.3.3. Triggers¶
Triggers determine how the app functions pipeline begins execution. In
the simple example provided above, an HTTP trigger is used. The trigger
is determine by the configuration.toml
file located in the /res
directory under a section called [Binding]
. Check out the
Configuration Section for more information about
the toml file.
3.5.3.3.1. Message Bus Trigger¶
A message bus trigger will execute the pipeline every time data is received off of the configured topic.
3.5.3.3.1.1. Type and Topic configuration¶
Here’s an example:
Type="messagebus"
SubscribeTopic="events"
PublishTopic=""
The Type=
is set to “messagebus”. `EdgeX Core Data <>`__ is
publishing data to the events
topic. So to receive data from core
data, you can set your SubscribeTopic=
either to ""
or
"events"
. You may also designate a PublishTopic=
if you wish to
publish data back to the message bus.
edgexcontext.Complete([]byte outputData)
- Will send data back to
back to the message bus with the topic specified in the
PublishTopic=
property #### Message bus connection configuration The
other piece of configuration required are the connection settings:
[MessageBus]
Type = 'zero' #specifies of message bus (i.e zero for ZMQ)
[MessageBus.PublishHost]
Host = '*'
Port = 5564
Protocol = 'tcp'
[MessageBus.SubscribeHost]
Host = 'localhost'
Port = 5563
Protocol = 'tcp'
By default, EdgeX Core Data
publishes data to the events
topic
on port 5563. The publish host is used if publishing data back to the
message bus. >Important Note: Publish Host MUST be different
for every topic you wish to publish to since the SDK will bind to the
specific port. 5563 for example cannot be used to publish since
EdgeX Core Data
has bound to that port. Similarly, you cannot have
two separate instances of the app functions SDK running publishing to
the same port.
3.5.3.3.2. HTTP Trigger¶
Designating an HTTP trigger will allow the pipeline to be triggered by a
RESTful POST
call to http://[host]:[port]/trigger/
. The body of
the POST must be an EdgeX event.
edgexcontext.Complete([]byte outputData)
- Will send the specified
data as the response to the request that originally triggered the HTTP
Request.
3.5.3.4. Context API¶
The context parameter passed to each function/transform provides operations and data associated with each execution of the pipeline. Let’s take a look at a few of the properties that are available:
3.5.3.4.1. LoggingClient¶
The LoggingClient
exposed on the context is available to leverage
logging libraries/service utilized throughout the EdgeX framework. The
SDK has initialized everything so it can be used to log Trace
,
Debug
, Warn
, Info
, and Error
messages as appropriate.
See examples/simple-filter-xml/main.go
for an example of how to use
the LoggingClient
.
3.5.3.4.2. EventClient¶
The EventClient
exposed on the context is available to leverage Core
Data’s Event
API. See interface
definition
for more details. This client is useful for querying events and is used
by the MarkAsPushed convenience API described below.
3.5.3.4.3. ValueDescriptorClient¶
The ValueDescriptorClient
exposed on the context is available to
leverage Core Data’s ValueDescriptor
API. See interface
definition
for more details. Useful for looking up the value descriptor for a
reading received.
3.5.3.4.4. CommandClient¶
The CommandClient
exposed on the context is available to leverage
Core Command’s Command
API. See interface
definition
for more details. Useful for sending commands to devices.
3.5.3.4.5. NotificationsClient¶
The CommandClient
exposed on the context is available to leverage
Support Notifications’ Notifications
API. See
README
for more details. Useful for sending notifications.
3.5.3.4.6. Note about Clients¶
Each of the clients above is only initialized if the Clients section of
the configuration contains an entry for the service associated with the
Client API. If it isn’t in the configuration the client will be nil
.
Your code must check for nil
to avoid panic in case it is missing
from the configuration. Only add the clients to your configuration that
your Application Service will actually be using. All application
services need the Logging
and many will need Core-Data
. The
following is an example Clients
section of a configuration.toml with
all supported clients specified:
[Clients]
[Clients.Logging]
Protocol = "http"
Host = "localhost"
Port = 48061
[Clients.CoreData]
Protocol = 'http'
Host = 'localhost'
Port = 48080
[Clients.Command]
Protocol = 'http'
Host = 'localhost'
Port = 48082
[Clients.Notifications]
Protocol = 'http'
Host = 'localhost'
Port = 48060
3.5.3.4.7. .MarkAsPushed()¶
.MarkAsPushed()
is used to indicate to EdgeX Core Data that an event
has been “pushed” and is no longer required to be stored. The scheduler
service will purge all events that have been marked as pushed based on
the configured schedule. By default, it is once daily at midnight. If
you leverage the built in export functions (i.e. HTTP Export, or MQTT
Export), then the event will automatically be marked as pushed upon a
successful export. ### .PushToCore()
.PushToCore(string deviceName, string readingName, byte[] value)
is
used to push data to EdgeX Core Data so that it can be shared with other
applications that are subscribed to the message bus that core-data
publishes to. deviceName
can be set as you like along with the
readingName
which will be set on the EdgeX event sent to CoreData.
This function will return the new EdgeX Event with the ID populated,
however the CorrelationId will not be available.
NOTE: If validation is turned on in CoreServices then yourdeviceName
andreadingName
must exist in the CoreMetadata and be properly registered in EdgeX.
WARNING: Be aware that without a filter in your pipeline, it is possible to create an infinite loop when the messagebus trigger is used. Choose your device-name and reading name appropriately. ### .Complete().Complete([]byte outputData)
can be used to return data back to the configured trigger. In the case of an HTTP trigger, this would be an HTTP Response to the caller. In the case of a message bus trigger, this is how data can be published to a new topic per the configuration.
3.5.3.4.8. .SetRetryData()¶
.SetRetryData(payload []byte)
can be used to store data for later
retry. This is useful when creating a custom export function that needs
to retry on failure sending the data. The payload data will be stored
for later retry based on Store and Forward
configuration. When the
retry is triggered, the function pipeline will be re-executed starting
with the function that called this API. That function will be passed the
stored data, so it is important that all transformations occur in
functions prior to the export function. The Context
will also be
restored to the state when the function called this API. See Store and
Forward for more details.
NOTE:Store and Forward
be must enabled when calling this API.
3.5.3.5. Built-In Transforms/Functions¶
All transforms define a type and a New
function which is used to
initialize an instance of the type with the required parameters. These
instances returned by these New
functions give access to their
appropriate pipeline function pointers when build the function pipeline.
E.G. NewFilter([] {"Device1", "Device2"}).FilterByDeviceName
3.5.3.5.1. Filtering¶
There are two basic types of filtering included in the SDK to add to
your pipeline. Theses provided Filter functions return a type of
events.Model. If filtering results in no remaining data, the pipeline
execution for that pass is terminated. If no values are provided for
filtering, then data flows through unfiltered. -
NewFilter([]string filterValues)
- This function returns a
Filter
instance initialized with the passed in filter values. This
Filter
instance is used to access the following filter functions
that will operate using the specified filter values. -
FilterByDeviceName
- This function will filter the event data down
to the specified device names and return the filtered data to the
pipeline. - FilterByValueDescriptor
- This function will filter the
event data down to the specified device value descriptor and return the
filtered data to the pipeline.
3.5.3.5.2. Encryption¶
There is one encryption transform included in the SDK that can be added to your pipeline.
NewEncryption(key string, initializationVector string)
- This function returns aEncryption
instance initialized with the passed in key and initialization vector. ThisEncryption
instance is used to access the following encryption function that will use the specified key and initialization vector.EncryptWithAES
- This function receives a either astring
,[]byte
, orjson.Marshaller
type and encrypts it using AES encryption and returns a[]byte
to the pipeline.
3.5.3.5.3. Conversion¶
There are two conversions included in the SDK that can be added to your
pipeline. These transforms return a string
.
NewConversion()
- This function returns aConversion
instance that is used to access the following conversion functions:TransformToXML
- This function receives anevents.Model
type, converts it to XML format and returns the XML string to the pipeline.TransformToJSON
- This function receives anevents.Model
type and converts it to JSON format and returns the JSON string to the pipeline.
### Compressions There are two compression types included in the SDK
that can be added to your pipeline. These transforms return a
[]byte
.
NewCompression()
- This function returns aCompression
instance that is used to access the following compression functions:CompressWithGZIP
- This function receives either astring
,[]byte
, orjson.Marshaler
type, GZIP compresses the data, converts result to base64 encoded string, which is returned as a[]byte
to the pipeline.CompressWithZLIB
- This function receives either astring
,[]byte
, orjson.Marshaler
type, ZLIB compresses the data, converts result to base64 encoded string, which is returned as a[]byte
to the pipeline.
3.5.3.5.4. CoreData Functions¶
These are functions that enable interactions with the CoreData REST API.
- NewCoreData()
- This function returns a CoreData
instance.
This CoreData
instance is used to access the following function(s).
- MarkAsPushed
- This function provides the MarkAsPushed function
from the context as a First-Class Transform that can be called in your
pipeline. See Definition Above. The data passed
into this function from the pipeline is passed along unmodifed since all
required information is provided on the context (EventId,
CorrelationId,etc.. ) - PushToCore
- This function provides the
PushToCore function from the context as a First-Class Transform that can
be called in your pipeline. See Definition Above.
The data passed into this function from the pipeline is wrapped in an
EdgeX event with the deviceName
and readingName
that were set
upon instantiation and then sent to CoreData to be added as an event.
Returns the new EdgeX event with ID populated.
> NOTE: If validation is turned on in CoreServices then your `deviceName` and `readingName` must exist in the CoreMetadata and be properly registered in EdgeX.
3.5.3.5.5. Export Functions¶
There are two export functions included in the SDK that can be added to
your pipeline. -
NewHTTPSender(url string, mimeType string, persistOnError bool)
-
This function returns a HTTPSender
instance initialized with the
passed in url, mime type and persistOnError values. This HTTPSender
instance is used to access the following functions that will use the
required url and optional mime type and persistOnError:
HTTPPost
- This function receives either astring
,[]byte
, orjson.Marshaler
type from the previous function in the pipeline and posts it to the configured endpoint. If no previous function exists, then the event that triggered the pipeline, marshaled to json, will be used. Currently, only unauthenticated endpoints are supported. Authenticated endpoints will be supported in the future. If the post fails andpersistOnError
istrue
andStore and Forward
is enabled, the data will be stored for later retry. See Store and Forward for more detailsNewMQTTSender(logging logger.LoggingClient, addr models.Addressable, keyCertPair *KeyCertPair, mqttConfig MqttConfig, persistOnError bool)
- This function returns aMQTTSender
instance initialized with the passed in MQTT configuration . ThisMQTTSender
instance is used to access the following function that will use the specified MQTT configurationKeyCertPair
- This structure holds the Key and Certificate information for when using secure TLS connection to the broker. Can benil
if not using secure TLS connection.MqttConfig
- This structure holds addition MQTT configuration settings.Qos byte Retain bool AutoReconnect bool SkipCertVerify bool User string Password string
The
GO
complier will default these to0
,false
and""
, so you only need to set the fields that your usage requires that differ from the default.MQTTSend
- This function receives either astring
,[]byte
, orjson.Marshaler
type from the previous function in the pipeline and sends it to the specified MQTT broker. If no previous function exists, then the event that triggered the pipeline, marshaled to json, will be used. If the send fails andpersistOnError
istrue
andStore and Forward
is enabled, the data will be stored for later retry. See Store and Forward for more details
3.5.3.5.6. Output Functions¶
There is one output function included in the SDK that can be added to your pipeline.
- NewOutput() - This function returns a
Output
instance that is used to access the following output function:SetOutput
- This function receives either astring
,[]byte
, orjson.Marshaler
type from the previous function in the pipeline and sets it as the output data for the pipeline to return to the configured trigger. If configured to use message bus, the data will be published to the message bus as determined by theMessageBus
andBinding
configuration. If configured to use HTTP trigger the data is returned as the HTTP response. Note that calling Complete() from the Context API in a custom function can be used in place of adding this function to your pipeline
3.5.3.6. Configuration¶
Similar to other EdgeX services, configuration is first determined by
the configuration.toml
file in the /res
folder. If -r
is
passed to the application on startup, the SDK will leverage the provided
registry (i.e Consul) to push configuration from the file into the
registry and monitor configuration from there. You will find the
configuration under the edgex/appservices/1.0/
key. There are two
primary sections in the configuration.toml
file that will need to be
set that are specific to the AppFunctionsSDK. 1) [Binding]
- This
specifies the trigger type and associated data required
to configure a trigger.
[Binding]
Type=""
SubscribeTopic=""
PublishTopic=""
[ApplicationSettings]
- Is used for custom application settings and is accessed via the ApplicationSettings() API. The ApplicationSettings API returns amap[string] string
containing the contents on the ApplicationSetting section of theconfiguration.toml
file.
[ApplicationSettings]
ApplicationName = "My Application Service"
3.5.3.7. Error Handling¶
- Each transform returns a
true
orfalse
as part of the return signature. This is called thecontinuePipeline
flag and indicates whether the SDK should continue calling successive transforms in the pipeline. return false, nil
will stop the pipeline and stop processing the event. This is useful for example when filtering on values and nothing matches the criteria you’ve filtered on.return false, error
, will stop the pipeline as well and the SDK will log the errorString you have returned.- Returning
true
tells the SDK to continue, and will call the next function in the pipeline with your result. - The SDK will return control back to main when receiving a SIGTERM/SIGINT event to allow for custom clean up.
3.5.3.8. Advanced Topics¶
The following items discuss topics that are a bit beyond the basic use cases of the Application Functions SDK when interacting with EdgeX.
3.5.3.8.1. Configurable Functions Pipeline¶
This SDK provides the capability to define the functions pipeline via configuration rather than code using the app-service-configurable application service. See app-service-configurable README for more details.
3.5.3.8.2. Using The Webserver¶
It is not uncommon to require your own API endpoints when building an
app service. Rather than spin up your own webserver inside of your app
(alongside the already existing running webserver), we’ve exposed a
method that allows you add your own routes to the existing webserver. A
few routes are reserved and cannot be used: - /api/version -
/api/v1/ping - /api/v1/metrics - /api/v1/config - /api/v1/trigger To add
your own route, use the
AddRoute(route string, handler func(nethttp.ResponseWriter, *nethttp.Request), methods ...string)
function provided on the sdk. Here’s an example:
Under the hood, this simply adds the provided route, handler, and method
to the gorilla mux.Router
we use in the SDK. For more information
you can check out the github repo
here. You can access the resources
such as the logging client by accessing the context as shown above –
this is useful for when your routes might not be defined in your main.go
where you have access to the edgexSdk
instance.
3.5.3.8.3. Target Type¶
The target type is the object type of the incoming data that is sent to
the first function in the function pipeline. By default this is an EdgeX
Event
since typical usage is receiving events
from Core Data via
Message Bus.
For other usages where the data is not events
coming from Core Data,
the TargetType
of the accepted incoming data can be set when the SDK
instance is created. There are scenarios where the incoming data is not
an EdgeX Event
. One example scenario is 2 application services are
chained via the Message Bus. The output of the first service back to the
Messages Bus is inference data from analyzing the original input
Event
data. The second service needs to be able to let the SDK know
the target type of the input data it is expecting.
For usages where the incoming data is not events
, the TargetType
of the excepted incoming data can the set when the SDK instance is
created.
Example:
type Person struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
}
edgexSdk := &appsdk.AppFunctionsSDK {
ServiceKey: serviceKey,
TargetType: &Person{},
}
Note that TargetType
must be set to a pointer to an instance of your
target type such as &Person{}
. The first function in your function
pipeline will be passed an instance of your target type, not a pointer
to it. In the example above the first function in the pipeline would
start something like:
func MyPersonFunction(edgexcontext *appcontext.Context, params ...interface{}) (bool, interface{}) {
edgexcontext.LoggingClient.Debug("MyPersonFunction")
if len(params) < 1 {
// We didn't receive a result
return false, nil
}
person, ok := params[0].(Person)
if !ok {
return false, errors.New("type received is not a Person")
}
....
The SDK supports unmarshaling JSON or CBOR encoded data into an instance
of the target type. If your incoming data is not JSON or CBOR encoded,
you then need to set the TargetType
to &[]byte
.
If the target type is set to &[]byte
the incoming data will not be
unmarshaled. The content type, if set, will be passed as the second
parameter to the first function in your pipeline. Your first function
will be responsible for decoding the data or not.
3.5.3.8.4. Command Line Options¶
The following command line options are available
-c=<path>
--confdir=<path>
Specify an alternate configuration directory.
-p=<profile>
--profile=<profile>
Specify a profile other than default.
-r
--registry
Indicates the service should use the registry.
-o
-overwrite
Overwrite configuration in the Registry with local values.
-s
-skipVersionCheck
Indicates the service should skip the Core Service's version compatibility check.
Examples:
simple-filter-xml -r -c=./res -p=docker
or
simple-filter-xml --registry --confdir=./res --profile=docker
3.5.3.8.5. Environment Variable Overrides¶
All the configuration settings from the configuration.toml file can be overridden by environment variables. Except for two special cases listed below, the overrides only occur when the configuration values are first pushed into the Registry. Once the values are in the Registry, the Registry values are always used.
The environment variable names have the following format:
<TOML Key>
<TOML Section>_<TOML Key>
<TOML Section>_<TOML Sub-Section>_<TOML Key>
Examples:
TOML : FailLimit = 30
ENVVAR : FailLimit=100
TOML : [Logging]
EnableRemote = false
ENVVAR : Logging.EnableRemote=true
TOML : [Clients]
[Clients.CoreData]
Host = 'localhost'
ENVVAR : Clients_CoreData_Host=edgex-core-data
3.5.3.8.5.1. edgex_registry¶
This environment variable overrides the Registry connection information and occurs every time the application service starts. The value is in the format of a URL.
edgex_registry=consul://edgex-core-consul:8500
This sets the Registry information fields as follows:
Type: consul
Host: edgex-core-consul
Port: 8500
3.5.3.8.5.2. edgex_service¶
This environment variable overrides the Service connection information and occurs every time the application service starts. The value is in the format of a URL.
edgex_service=http://192.168.1.2:4903
This sets the Service information fields as follows:
Protocol: http
Host: 192.168.1.2
Port: 4903
3.5.3.8.5.3. edgex_profile¶
This environment variable overrides the command line profile
argument. It will replace the current value passed via the -p
or
--profile
, if one exists. If not specified it will add the
--profile
argument. This is useful when running the service via
docker-compose.
Using docker-compose:
app-service-configurable-rules:
image: edgexfoundry/docker-app-service-configurable:1.1.0
environment:
- edgex_profile : "rules-engine"
ports:
- "48095:48095"
container_name: edgex-app-service-configurable
hostname: edgex-app-service-configurable
networks:
edgex-network:
aliases:
- edgex-app-service-configurable
depends_on:
- data
- command
This sets the --profile=docker-rules-engine
command line argument so
that the application service uses the docker-rules-engine
configuration profile which resides at
/res/docker-rules-engine/configuration.toml
Note that Application Services no longer use docker profiles. They use Environment Overrides in the docker compose file to make the necessary changes to the configuration for running in Docker. See theEnvironment Variable Overrides For Dockersection inApp Service Configurable’s README for more details and an example.
3.5.3.8.6. Store and Forward¶
The Store and Forward capability allows for export functions to persist data on failure and for the export of the data to be retried at a later time.
Note: The order the data exported via this retry mechanism is not guaranteed to be the same order in which the data was initial received from Core Data
3.5.3.8.6.1. Configuration¶
Two sections of configuration have been added for Store and Forward.
Writable.StoreAndForward
allows enabling, setting the interval
between retries and the max number of retries. If running with Registry,
these setting can be changed on the fly without having to restart the
service.
[Writable.StoreAndForward]
Enabled = false
RetryInterval = '5m'
MaxRetryCount = 10
Note: RetryInterval should be at least 1 second (eg. ‘1s’) or greater. If a value less than 1 second is specified, 1 second will be used.
Note: Endless retries will occur when MaxRetryCount is set to 0.
Note: If MaxRetryCount is set to less than 0, a default of 1 retry will be used.
Database describes which database type to use, mongodb
or
redisdb
, and the information required to connect to the database.
This section is required if Store and Forward is enabled, otherwise it
is currently optional.
[Database]
Type = "mongodb"
Host = "localhost"
Port = 27017
Timeout = '5s'
Username = ""
Password = ""
3.5.3.8.6.2. How it works¶
When an export function encounters an error sending data it can call
SetRetryData(payload []byte)
on the Context. This will store the
data for later retry. If the application service is stop and then
restarted while stored data hasn’t been successfully exported, the
export retry will resume once the service is up and running again.
Note: It is important that export functions return an error and stop pipeline execution after the call toSetRetryData
. See HTTPPost function in SDK as an example
When the RetryInterval
expires, the function pipeline will be
re-executed starting with the export function that saved the data. The
saved data will be passed to the export function which can then attempt
to resend the data.
NOTE: The export function will receive the data as it was stored, so it is important that any transformation of the data occur in functions prior to the export function. The export function should only export the data that it receives.
One of three out comes can occur after the export retried has completed.
Export retry was successful
In this case the stored data is removed from the database and the execution of the pipeline functions after the export function, if any, continues.
Export retry fails and retry count
has not been
exceededIn this case the store data is updated in the database with the incremented retry count
Export retry fails and retry count
has been
exceededIn this case the store data is removed from the database and never retried again.
NOTE: Changing Writable.Pipeline.ExecutionOrder will invalidate all currently stored data and result in it all being removed from the database on the next retry. This is because the position of the export function can no longer be guaranteed and no way to ensure it is properly executed on the retry.