Formation > Blog > Langage > Comment tester un stream Kafka ?

apache kafka guide

Vous avez développé une application utilisant Kafka Streams et vous vous demandez comment tester efficacement vos flux ? Ne cherchez pas plus loin !

Dans cet article, nous allons plonger dans les meilleures pratiques pour tester vos applications Kafka Streams, en utilisant le TopologyTestDriver et en explorant les techniques de test unitaire pour les processeurs Kafka. Accrochez-vous, car nous allons démystifier l’art de tester vos flux Kafka !

Si vous êtes intéressé par la gestion et l’analyse de données en temps réel, notre formation Kafka est faite pour vous. Avec cette formation, vous pourrez acquérir les compétences nécessaires pour maîtriser les outils puissants d’Apache Kafka, permettant l’ingestion, le traitement et la diffusion de flux de données à grande échelle.

L’équipe Ambient IT

Table des matières

1 – Importation des utilitaires de test

2 – Tests d’applications Kafka Streams

3 – Conclusion

Importation des utilitaires de test

Pour tester une application Kafka Streams, Kafka fournit un artefact test-utils qui peut être ajouté en tant que dépendance régulière à votre code de test. Voici un exemple de fragment de pom.xml lorsque vous utilisez Maven :

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>2.1.0</version>
    <scope>test</scope>
</dependency>

Tests d’applications Kafka Streams

Utilisation du TopologyTestDriver

Le package test-utils fournit un TopologyTestDriver qui peut être utilisé pour transmettre des données à travers une topologie qui est assemblée :

  • soit manuellement en utilisant l’API des processeurs
  • soit via le DSL en utilisant StreamsBuilder

Le test driver simule l’exécution de la bibliothèque qui récupère continuellement les enregistrements des sujets d’entrée et les traite en traversant la topologie.

Vous pouvez utiliser le test driver pour vérifier que votre topologie de processeur spécifiée calcule le résultat correct avec les enregistrements de données transmis manuellement.

Le test driver capture les enregistrements de résultats et permet d’interroger ses magasins d’état intégrés.

// API du processeur
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// ou
// en utilisant le DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();

// configuration du test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

Le test driver accepte des ConsumerRecords avec des types de clé et de valeur byte[]. Parce que les types byte[] peuvent poser problème, vous pouvez utiliser ConsumerRecordFactory pour générer ces enregistrements en fournissant des types Java réguliers pour les clés et les valeurs et les sérialiseurs correspondants.

ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));

Pour vérifier la sortie, le test driver produit des ProducerRecords avec des types de clé et de valeur byte[]. Pour la vérification des résultats, vous pouvez spécifier des désérialiseurs correspondants lors de la lecture de l’enregistrement de sortie à partir du driver.

ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

Pour la vérification des résultats, vous pouvez utiliser OutputVerifier. Il offre des méthodes d’aide pour comparer uniquement certaines parties de l’enregistrement de résultat : par exemple, vous pouvez vous intéresser uniquement à la clé et à la valeur, mais pas à l’horodatage de l’enregistrement de résultat.

OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // lance une AssertionError si la clé ou la valeur ne correspond pas

Le TopologyTestDriver prend également en charge les ponctuations. Les ponctuations basées sur le temps d’événement sont déclenchées automatiquement en fonction des horodatages des enregistrements traités. Les ponctuations basées sur l’horloge murale peuvent également être déclenchées en faisant avancer le temps de l’horloge murale du test driver (le driver simule l’horloge murale en interne pour donner aux utilisateurs un contrôle sur elle).

testDriver.advanceWallClockTime(20L);

De plus, vous pouvez accéder aux magasins d’état via le test driver avant ou après un test. L’accès aux magasins avant un test est utile pour préremplir un magasin avec certaines valeurs initiales. Après le traitement des données, les mises à jour attendues du magasin peuvent être vérifiées.

KeyValueStore store = testDriver.getKeyValueStore("store-name");

Il est important de noter que vous devez toujours fermer le test driver à la fin pour vous assurer que toutes les ressources sont libérées correctement.

testDriver.close();

Conclusion

En conclusion, bien que tester des applications Kafka Streams puisse être complexe en raison de la nature distribuée et asynchrone de la plateforme, l’utilisation des outils de test appropriés et la mise en place de bonnes pratiques de test peuvent grandement faciliter le processus de développement et garantir la qualité et la robustesse des applications produites.

BONUS : Exemple de test de processor avec MockProcessorContext

Voici un exemple illustrant comment tester un Processor en utilisant MockProcessorContext fourni par test-utils. Cet exemple suppose que vous avez développé un Processor personnalisé qui effectue un traitement sur les données entrantes.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.PunctuationType;
import org.junit.Test;

import java.util.Iterator;
import java.util.List;

import static org.junit.Assert.*;

public class CustomProcessorTest {

    @Test
    public void shouldProcessDataCorrectly() {
        // Création du Processor à tester
        Processor<String, String> processorUnderTest = new CustomProcessor();

        // Initialisation du mock context
        MockProcessorContext context = new MockProcessorContext();
        processorUnderTest.init(context);

        // Passage de données au Processor
        processorUnderTest.process("key", "value");

        // Vérification des résultats
        List<MockProcessorContext.CapturedForward> captures = context.forwarded();
        assertEquals(1, captures.size());
        assertEquals(new KeyValue<>("processed-key", "processed-value"), captures.get(0).keyValue());

        assertTrue(context.committed());
    }

    @Test
    public void shouldSchedulePunctuatorCorrectly() {
        // Création du Processor à tester
        Processor<String, String> processorUnderTest = new CustomProcessor();

        // Initialisation du mock context
        MockProcessorContext context = new MockProcessorContext();
        processorUnderTest.init(context);

        // Vérification de la planification du punctuator
        List<MockProcessorContext.CapturedPunctuator> punctuators = context.scheduledPunctuators();
        assertEquals(1, punctuators.size());
        assertEquals(PunctuationType.STREAM_TIME, punctuators.get(0).getType());
        assertFalse(punctuators.get(0).cancelled());

        // Exécution du punctuator
        punctuators.get(0).getPunctuator().punctuate(0L);

        // Vérification des résultats
        List<MockProcessorContext.CapturedForward> captures = context.forwarded();
        assertEquals(1, captures.size());
        assertEquals(new KeyValue<>("punctuated-key", "punctuated-value"), captures.get(0).keyValue());

        assertTrue(context.committed());
    }
}

Dans cet exemple, nous testons un Processor personnalisé appelé CustomProcessor. Nous initialisons un MockProcessorContext, passons des données au Processor, puis vérifions les résultats en inspectant ce que le Processor a transmis au contexte et en vérifiant si le contexte a été commis.

Nous testons également la planification d’un punctuateur dans le Processor et vérifions si le punctuateur a été correctement planifié en inspectant le contexte, puis exécutons le punctuateur et vérifions les résultats similaires à l’exemple précédent.

Ce processus permet un test efficace et complet des Processors Kafka Streams, garantissant qu’ils fonctionnent comme prévu dans différentes situations et scénarios.

UNE QUESTION ? UN PROJET ? UN AUDIT DE CODE / D'INFRASTRUCTURE ?

Pour vos besoins d’expertise que vous ne trouvez nulle part ailleurs, n’hésitez pas à nous contacter.

ILS SE SONT FORMÉS CHEZ NOUS

partenaire sncf
partenaire hp
partenaire allianz
partenaire sfr
partenaire engie
partenaire boursorama
partenaire invivo
partenaire orange
partenaire psa
partenaire bnp
partenaire sncf
partenaire hp
partenaire allianz
partenaire sfr
partenaire engie
partenaire boursorama
partenaire invivo
partenaire orange
partenaire psa
partenaire bnp