The Cosmos Generic Enabler enables an easier BigData analysis over context integrated with some of the most popular BigData platforms.
The Cosmos Generic Enabler simplifies Big Data analysis of context data and
integrates with some of the many popular Big Data platforms.
Cosmos is a FIWARE Generic Enabler. Therefore, it can be integrated as part of any platform “Powered by FIWARE”. FIWARE
is a curated framework of open source platform components which can be assembled together with other third-party
platform components to accelerate the development of Smart Solutions.
This project is part of FIWARE. For more information check the FIWARE Catalogue entry for
Core Context Management.
Documentation |
Academy |
Roadmap |
|---|---|---|
The Cosmos Big Data Analysis GE is a set of tools that help achieving the tasks of Streaming and Batch processing over
context data. These tools are:
As the state of the real world changes, the entities representing your IoT devices are constantly changing. Big data
analysis allows for the study of datasets coming from your context data which are too large for traditional
data-processing software. You can apply predictive analysis or user behaviour analytics to extract meaningful
conclusions as to the state of your smart solution and bring value to your solution.
This is a Spark connector for the FIWARE Orion Context Broker. It has two parts:
OrionReceiver: Source for receiving NGSI v2 events in the shape of HTTP messages from subscriptions.NGSILDReceiver: Source for receiving NGSI-LD events from subscriptions via HTTP.OrionSink: Sink for writing back to the Context Broker.Download the JAR from the latest release. In your project directory run:
mvn install:install-file -Dfile=$(PATH_DOWNLOAD)/orion.spark.connector-1.2.2.jar -DgroupId=org.fiware.cosmos -DartifactId=orion.spark.connector -Dversion=1.2.2 -Dpackaging=jar
Add it to your pom.xml file inside the dependencies section.
<dependency><groupId>org.fiware.cosmos</groupId><artifactId>orion.spark.connector</artifactId><version>1.2.2</version></dependency>
import org.fiware.cosmos.orion.spark.connector.{OrionReceiver}
val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[3]")val ssc = new StreamingContext(sparkConf, Seconds(10))val eventStream = ssc.receiverStream(new OrionReceiver(9001))
val processedDataStream = eventStream..flatMap(event => event.entities)// ...processing
The received data is a DataStream of objects of the class NgsiEvent v2. This class has the following attributes:
creationTime: Timestamp of arrival.service: FIWARE service extracted from the HTTP headers.servicePath: FIWARE service path extracted from the HTTP headers.entities: Sequence of entites included in the message. Each entity has the following attributes:id: Identifier of the entity.type: Node type.attrs: Map of attributes in which the key is the attribute name and the value is an object with thetype: Type of value (Float, Int,…).value: Value of the attribute.metadata: Additional metadata.
import org.fiware.cosmos.orion.spark.connector.{NGSILDReceiver}
val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[3]")val ssc = new StreamingContext(sparkConf, Seconds(10))val eventStream = ssc.receiverStream(new NGSILDReceiver(9001))
val processedDataStream = eventStream..flatMap(event => event.entities)// ...processing
The received data is a DataStream of objects of the class NgsiEvent LD. This class has the following attributes:
creationTime: Timestamp of arrival.service: FIWARE service extracted from the HTTP headers.servicePath: FIWARE service path extracted from the HTTP headers.entities: Sequence of entites included in the message. Each entity has the following attributes:id: Identifier of the entity.type: Node type.attrs: Map of attributes in which the key is the attribute name and the value is an object with thetype: Type of value (Float, Int,…).value: Value of the attribute.@context: Map of terms to URIs providing an unambiguous definition.
import org.fiware.cosmos.orion.spark.connector.{OrionSink, OrionSinkObject, ContentType, HTTPMethod}
val processedDataStream = eventStream.// ... processing.map(obj =>new OrionSinkObject("{\"temperature_avg\": { \"value\":"+obj.temperature+", \"type\": \"Float\"}}", // Stringified JSON message"http://context-broker-url:8080/v2/entities/Room1", // URLContentType.JSON, // Content typeHTTPMethod.POST) // HTTP method)OrionSink.addSink( processedDataStream )
The sink accepts a DataStream of objects of the class OrionSinkObject. This class has 4 attributes:
content: Message content in String format. If it is a JSON, you need to make sure to stringify it beforeurl: URL to which the message should be sent.contentType: Type of HTTP content of the message. It can be ContentType.JSON or ContentType.Plain.method: HTTP method of the message. It can be HTTPMethod.POST, HTTPMethod.PUT or HTTPMethod.PATCH.headers (Optional): String Map including any additional HTTP headers.Warning
When packaging your code in a JAR, it is common to exclude dependencies like Spark and Scala since they are typically
provided by the execution environment. Nevertheless, it is necessary to include this connector in your packaged code,
since it is not part of the Spark distribution.
Some lessons on Big Data Fundamentals are offered in the
FIWARE Academy.
Several examples are provided to facilitate getting started with the connector. They are hosted in a separate
repository:
fiware-cosmos-orion-spark-connector-examples.
If you would like to see an example of a complete scenario using the FIWARE Orion Spark Connector with SparkML check out
the project presented in the 2019 Summit in Berlin.
This project is part of FIWARE and has been rated as follows:
The list of features that are planned for the subsequent release are available in the
ROADMAP file.
Contribution guidelines are detailed in the
CONTRIBUTING file.
In order to test the code run:
mvn clean test -Dtest=*Test cobertura:cobertura coveralls:report -Padd-dependencies-for-IDEA
Cosmos is licensed under Affero General Public License (GPL) version 3.
There is absolutely no problem in using a product licensed under AGPL 3.0. Issues with GPL (or AGPL) licenses are mostly
related with the fact that different people assign different interpretations on the meaning of the term “derivate work”
used in these licenses. Due to this, some people believe that there is a risk in just using software under GPL or AGPL
licenses (even without modifying it).
For the avoidance of doubt, the owners of this software licensed under an AGPL-3.0 license wish to make a clarifying
public statement as follows:
Please note that software derived as a result of modifying the source code of this software in order to fix a bug or
incorporate enhancements is considered a derivative work of the product. Software that merely uses or aggregates (i.e.
links to) an otherwise unmodified version of existing software is not considered a derivative work, and therefore it
does not need to be released as under the same license, or even released as open source.