Publish-Subscribe
The publish-subscribe service enables an all-to-all multi directional exchange of data, by allowing nodes to subscribe and publish to topics. After subscription, they receive all messages that are published into the topic by any remote node.
Internally, the data distribution does not rely on naive flooding, however employs a gossip protocol.
The GossipSub
Service
from hyveos_sdk import Connectionimport asyncio
async with Connection() as connection: gossip_sub = connection.get_gossip_sub_service()
# continue with usage of gossip_sub # TODO
use hyveos_sdk::Connection;
#[tokio::main]async fn main() { let connection = Connection::new().await.unwrap(); let mut gossipsub = connection.gossipsub();
// continue with usage of gossipsub // todo!()}
import { Client } from 'hyveos-sdk';import { Connection } from 'hyveos-web'
async function main() { const transport = new Connection('http://localhost:8080') const client = new Client(transport) const gossipSub = client.gossipsub
// continue with usage of gossipSub // TODO}
Example 1. Obtain the GossipSub service handler. Note the async
environment.
Subscribing to a topic
Once a node is subscribed to a topic, it will receive any data published to that topic.
subscribe(topic)
will return an asynchronous stream of the eventually received messages.
from hyveos_sdk import Connectionimport asyncio
async def main(): async with Connection() as connection: gossip_sub = connection.get_gossip_sub_service()
# Handle the incoming stream of messages async with await gossip_sub.subscribe("topic") as messages: async for message in messages: text = message.msg.data.decode("utf-8")
if message.source: direct_source = message.propagation_source # -> Peer print(f"Received message from {message.source} via {direct_source}: {text}") else: print(f"Received message from unknown source: {text}")
asyncio.run(main())
use futures::TryStreamExt as _;use hyveos_sdk::Connection;
#[tokio::main]async fn main() { let connection = Connection::new().await.unwrap(); let mut gossipsub = connection.gossipsub();
let mut messages = gossipsub.subscribe("topic").await.unwrap();
// Handle the incoming stream of messages while let Some(message) = messages.try_next().await.unwrap() { let string = String::from_utf8(message.message.data).unwrap(); if let Some(source) = message.source { let direct_source = message.propagation_source; // -> PeerId println!("Received message from {source} via {direct_source}: {string}"); } else { println!("Received message from unknown source: {string}"); } }
}
import { Client } from 'hyveos-sdk'import { Connection } from 'hyveos-web'
async function main(): Promise<void> { const transport = new Connection('http://localhost:8080') const client = new Client(transport) const gossipSub = connection.gossipsub()
const subscription = gossipSub.subscribe("topic")
// Handle the incoming stream of messages for await (const message of subscription) { const text = new TextDecoder().decode(message.msg)
if(message.source) { const directSource = message.propagation_source console.log('Received message from ${message.source} via ${directSource}: ${text}') } else { console.log('Received message from unknown source: ${text}') } }}
main().catch(console.error);
Example 2. Subscribe to "topic"
and continuously handle published messages.
Note the async context manager in Python for proper disposal of the stream.
Publishing to a topic
Publishing to a topic will distribute the data to all nodes subscribed to the topic.
Messages can be distinguished by their id
.
publish(topic, data)
returns a message id
.
from hyveos_sdk import Connectionimport asyncio
async with Connection() as connection: gossip_sub = connection.get_gossip_sub_service()
id = await gossip_sub.publish("Hello, world!", "topic")
print(f"Published message with message_id: {id}")
asyncio.run(main())
use hyveos_sdk::Connection;
#[tokio::main]async fn main() { let connection = Connection::new().await.unwrap(); let mut gossipsub = connection.gossipsub();
let id = gossipsub.publish("topic", "Hello, world!").await.unwrap();
println!("Published message with message_id: {id}");}
import { Client } from 'hyveos-sdk'import { Connection } from 'hyveos-web'
async function main() { const transport = new Connection('http://localhost:8080') const client = new Client(transport) const gossipSub = client.gossipsub()
const id = await gossipsub.publish("topic", "Hello, world!")
console.log("Published message with message_id: ${id}")}
main().catch((error) => { console.error(error) process.exit(1)});
Example 4. The node from Example 2 is now going to receive all messages that are published into
'topic'
.
© 2025 P2P Industries. This documentation is licensed under the MIT License.
Cookie Policy
Privacy Policy