ZeroMQ

De Lea Linux
Aller à la navigation Aller à la recherche

ZeroMQ

ZeroMQ est une librairie de transfert de données asynchrone de haute-performance. Très utile, elle permet d'envoyer des données via des méthodes qui sort du cadre des sockets habituelles ou des IPC (même si elles sont utilisés).

Pour plus d'informations, nous vous conseillons le site officiel, ainsi qu'un excellent résumé sur le site de Amaury Bouchard.

Les différentes parties de cette documentation seront en constante construction, considérez-les comme des notes rapides de recherches. Les différents morceaux de code seront écrits en python (sauf indication contraire).

Il existe une multitude de code source d'exemple sur ce site: http://zguide.zeromq.org/page:all.

ZeroMQ et le contexte

Quand vous commencez, il faut définir un "contexte" à l'aide de *zmq.Context*.

Définition d'un nouveau contexte :

context = zmq.Context()

Réutilisation du contexte actuel :

context = zmq.Context.instance()

Attention, en "inproc", utilisez un seul contexte par processus, sinon vous ne pourrez communiquer. Sur les autres méthodes (tcp, ipc, egpm), vous pouvez initier de nouveau contexte.

Très important à retenir, surtout dans un environnement "fork" ou de "thread".


ZeroMQ avec une socket de type PAIR

La socket de type PAIR permet de joindre deux sockets de façon exclusive. Aucun autre "PAIR" ne peut se connecter en plus. Si vous avez plusieurs threads avec différents PAIR, il est normal qu'un seul marche. Les autres ne pourront jamais se connecter.

__Avantages__

  • PAIR ne supprime aucun paquet même en cas d'inondation (flood) de données, il garantit la réception des données
  • PAIR semble attendre que le sender et le receiver soit connectés pour s'envoyer des données. Vous avez donc la garantie de l'envoi et de la réception de vos données.(*1)

__Désavantages__

  • PAIR ne peuvent être que deux.

Exemple de code "serveur"

import zmq from zmq.eventloop import ioloop, zmqstream

context = zmq.Context.instance()

sender = context.socket(zmq.PAIR) sender.connect("inproc://mon_pairing")

with open("votre_fichier", 'rb') as file:

       for buffer in iter(lambda: file.read(65536), b):
               sender.send(buffer)

file.close() return(True)

Exemple de code "client"

import zmq from zmq.eventloop import ioloop, zmqstream

context = zmq.Context.instance()

receiver = context.socket(zmq.PAIR) receiver.bind("inproc://mon_pairing")

while receiver.poll():

     buffer = receiver.recv()
     print(buffer)

  • 1 = En tout cas, dans les nombreux tests effectués, il n'y a eu aucune perte de données ni même de retard dans la réception.

ZeroMQ avec une socket de type PUB/SUB

Si vous souhaitez avoir plusieurs sockets en écoute, vous pouvez utiliser PUBLISH et SUBSCRIBE.

Exemple de la partie serveur PUBLISHER (corps seulement)

context = zmq.Context.instance()

sender = context.socket(zmq.PUB) sender.bind("inproc://thread")

while (...)

Exemple de la partie client SUBSCRIBE (corps seulement)

context = zmq.Context.instance()

receiver = context.socket(zmq.SUB) receiver.connect("inproc://thread") receiver.setsockopt(zmq.SUBSCRIBE, b)

while (...)

Le setsockopt "SUBSCRIBE" à (vide) défini qu'on souhaite l'ensemble des données (aucun filtre) Vous pouvez définir plusieurs clients avec les mêmes éléments. Chaque client va recevoir les données du "PUBLISHER". Il est important de définir le setsockopt SUBSCRIBE après le connect.

Important à savoir, les sockets de type "PUB/SUB" comme ROUTER/DEALER/etc. vont supprimer des paquets si vous envoyez trop de données en même temps. Pour éviter ce genre de problème, définissez les "Watermarks" (HWM) à 0 (illimité). Il est important de définir les watermarks APRÉS la socket et AVANT le connect ou bind.

Exemple

context = zmq.Context.instance()

sender = context.socket(zmq.PUB) sender.setsockopt(zmq.SNDHWM, 0) sender.setsockopt(zmq.RCVHWM, 0) sender.bind("inproc://thread")

while (...) context = zmq.Context.instance()

receiver = context.socket(zmq.SUB) receiver.setsockopt(zmq.SNDHWM, 0) receiver.setsockopt(zmq.RCVHWM, 0) receiver.connect("inproc://thread") receiver.setsockopt(zmq.SUBSCRIBE, b)

while (...)

Il est important de respecter cette hiérarchie (socket, setsockopt HWM, connect/bind, setsockopt SUBSCRIBE). Sans cela, vous aurez soit des pertes de paquets, soit aucune information. Important aussi: il faut démarrer les instances clientes (SUB) AVANT le publisher. Sinon, il peut manquer des informations en tout début d'envoi ou de réception.


A suivre ...



@ Retour à la rubrique Réseau