The publish-subscribe service enables an all-to-all multi directional exchange of data, by allowing nodes to subscribe and publish to topics.
After subscription, they receive all messages that are published into the topic by any remote node.
Internally, the data distribution does not rely on naive flooding, however employs a gossip protocol.
The GossipSub
Service
from hyveos_sdk import Connection
async with Connection () as connection:
gossip_sub = connection. get_gossip_sub_service ()
# continue with usage of gossip_sub
use hyveos_sdk :: Connection;
let connection = Connection :: new () . await . unwrap ();
let mut gossipsub = connection . gossipsub () ;
// continue with usage of gossipsub
import { Client } from ' hyveos-sdk ' ;
import { Connection } from ' hyveos-web '
const transport = new Connection ( ' http://localhost:8080 ' )
const client = new Client (transport)
const gossipSub = client . gossipsub
// continue with usage of gossipSub
Example 1. Obtain the GossipSub service handler. Note the async
environment.
Subscribing to a topic
Once a node is subscribed to a topic, it will receive any data published to that topic.
subscribe(topic)
will return an asynchronous stream of the eventually received messages.
from hyveos_sdk import Connection
async with Connection () as connection:
gossip_sub = connection. get_gossip_sub_service ()
# Handle the incoming stream of messages
async with await gossip_sub. subscribe ( " topic " ) as messages:
async for message in messages:
text = message.msg.data. decode ( " utf-8 " )
direct_source = message.propagation_source # -> Peer
print ( f "Received message from {message.source} via {direct_source} : {text} " )
print ( f "Received message from unknown source: {text} " )
use futures :: TryStreamExt as _;
use hyveos_sdk :: Connection;
let connection = Connection :: new () . await . unwrap ();
let mut gossipsub = connection . gossipsub ();
let mut messages = gossipsub . subscribe ( " topic " ) . await . unwrap ();
// Handle the incoming stream of messages
while let Some( message ) = messages . try_next () . await . unwrap () {
let string = String :: from_utf8 ( message . message . data) . unwrap ();
if let Some( source ) = message . source {
let direct_source = message . propagation_source; // -> PeerId
println! ( " Received message from {source} via {direct_source}: {string} " );
println! ( " Received message from unknown source: {string} " );
import { Client } from ' hyveos-sdk '
import { Connection } from ' hyveos-web '
async function main () : Promise < void > {
const transport = new Connection ( ' http://localhost:8080 ' )
const client = new Client (transport)
const gossipSub = connection . gossipsub ()
const subscription = gossipSub . subscribe ( " topic " )
// Handle the incoming stream of messages
for await ( const message of subscription) {
const text = new TextDecoder () . decode (message . msg )
const directSource = message . propagation_source
console . log ( ' Received message from ${message.source} via ${directSource}: ${text} ' )
console . log ( ' Received message from unknown source: ${text} ' )
main () . catch (console . error );
Example 2. Subscribe to "topic"
and continuously handle published messages.
Note the async context manager in Python for proper disposal of the stream.
Data Encoding Formats
The Rust SDK exposes seperate methods for encoding message data in either json
or cbor
format.
The choice depends on your use case: cbor is more encoding efficient, json is human-readable.
let mut messages = gossipsub . subscribe_json ( " topic " ) . await . unwrap ();
let mut messages = gossipsub . subscribe_cbor ( " topic " ) . await . unwrap ();
Example 3. Subscribing to a topic to then receive a stream of JSON or CBOR encoded messages published to that topic.
Publishing to a topic
Publishing to a topic will distribute the data to all nodes subscribed to the topic.
Messages can be distinguished by their id
.
publish(topic, data)
returns a message id
.
from hyveos_sdk import Connection
async with Connection () as connection:
gossip_sub = connection. get_gossip_sub_service ()
id = await gossip_sub. publish ( " Hello, world! " , " topic " )
print ( f "Published message with message_id: { id } " )
use hyveos_sdk :: Connection;
let connection = Connection :: new () . await . unwrap ();
let mut gossipsub = connection . gossipsub ();
let id = gossipsub . publish ( " topic " , " Hello, world! " ) . await . unwrap ();
println! ( " Published message with message_id: {id} " );
import { Client } from ' hyveos-sdk '
import { Connection } from ' hyveos-web '
const transport = new Connection ( ' http://localhost:8080 ' )
const client = new Client (transport)
const gossipSub = client . gossipsub ()
const id = await gossipsub . publish ( " topic " , " Hello, world! " )
console . log ( " Published message with message_id: ${id} " )
main () . catch ( ( error ) => {
Example 4. The node from Example 2 is now going to receive all messages that are published into
'topic'
.