Skip to content

Consume messages⚓︎

For processing of asynchronous messages, the INVERS OneAPI supports push functionality via AMQP (Advanced Message Queue Protocol).​ We allow the consumption of messages using AMQP version 0.9.1.

Configuration⚓︎

If you would like to subscribe to message topics, you need some configuration values, including:​

  • hostname of the AMQP broker (api.invers.com)
  • connection port (5671)
  • virtual host (used to isolate ‘environments’, ensuring that your messages are only visible to you)
  • username
  • password
  • queue name(s)​

The first two values (host and port) are always the same and can be found in the API reference. The other ones are specific to your subscription, and INVERS provides you with the required data to consume your messages. Once you have received all configuraton values, you can start implementing your consuming application.

Multiple consumers

In many scenarios, it is sufficient to have one consuming application and to use that application to receive all messages you need for your workflows. However, there might also be cases where you want to have more than one consuming application. For example, you might have one application that handles your vehicle’s masterdata, and another application that processes vehicle state updates, e.g. to produce custom alerts. Or your might want to provide access to specific message topics to another company that does analytics for you.

In order to achieve this, you can create a separate message consumer for each of the applications and only subscribe it to the topics the particular application needs to consume. You can subscribe to any subset of the available topics, and thereby limiting the access of each of your applications to just the necessary topics.

Code example⚓︎

Here is a code sample in C# that shows how to connect to our AMQP-Broker and receive messages using the configuration parameters above.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
string user = "f-FLT23-c-CNSM23A"; // (1)
string pass = "❰consumer password❱"; // (2)
string hostName = "api.invers.com";
int port = 5671;
string vhost = "fleet-FLT23";  // (3)
string queueName = "VehicleState_v1.CNSM23A";  // (4)

ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri($"amqps://{user}:{pass}@{hostName}:{port}/{vhost}");

IConnection conn = factory.CreateConnection();
IModel channel = conn.CreateModel();

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (ch, ea) =>
  {
    var body = ea.Body;
    try
    {
      // (5)
    }
    catch (Exception e)
    {
      e.Dump();
    }
    channel.BasicAck(ea.DeliveryTag, false);
  };

String consumerTag = channel.BasicConsume(queueName, false, consumer); // (6)
  1. Make sure to fill in the value for your consumer’s user_name.

    Learn more about consumers.

  2. Make sure to fill in the value for your consumer’s password.

    Learn more about consumers.

  3. Make sure to fill in the value for your consumer’s virtual_host.

    Learn more about consumers.

  4. Make sure to fill in the value for your consumer’s queue_name.

    Learn more about consumers.

  5. Here, you can parse the message’s body with JsonDocument.Parse(body) or sonSerializer.Deserialize(body).

    Use the header information first if there are multiple message types within a topic.

  6. Starts consumption of the messages.

    consumerTag identifies the subscription when it has to be cancelled.

Consume⚓︎

​​Once you are connected to our AMQP Broker, your connection will remain alive. In default operation mode, new messages pushed to the queue are received by your system automatically.​

  • Messages published to the topic queue will be kept for one week. After this period the messages will expire.
  • Upon consumption, your system has to acknowledge received messages. Acknowledged messages are removed from the queue. Unacknowledged consumed messages will be returned to the queue in the same order.​
  • TLS is enforced for security reasons.
  • Permissions are set to basic.get and basic.consume only, so you cannot create new queues on the existing exchanges.​
  • It is recommended to set prefetch-count to 50.

Parsing messages⚓︎

When consuming messages, you receive the message content and additional headers via AMQP.

Example

This examples shows payload and headers for message of topic VehicleChange.

Message Headers

`Message-Type`:    `VehicleCreated`
`Message-Version`: `2.0.0`
`Flow-ID`:         `804f2161-a15c-420f-8f21-61a15c320f68`

Message Payload

{
  "metadata": {
    "message_id": "01F1FHJ7SHQK5MGF69B8K2346E",
    "message_type": "VehicleCreated",
    "occurred_at": "2020-01-30T13:22:05Z",
    "version": "2.0.0",
    "flow_id": "804f2161-a15c-420f-8f21-61a15c320f68"
  },
  "data": {
      // message's data content
  }
}

When you receive messages from a topic, you often want to parse the message payload and, depending on your software stack, convert the message to a data representation like a class. Because message topics can publish messages with different message types, it is recommended to use the message headers to determine the Message-Type. For example, the message topic VehicleChange publishes messages with message types VehicleCreated, VehicleDataChanged, and VehicleDeleted. The header allows you to determine which message type the received message is and parse it accordingly.

To find out more about the format of messages, go to message format.

Back to top