Collecter des données vers Redis à la manière d'un écureuil
Apache Flink et Redis sont deux outils puissants qui peuvent être utilisés ensemble pour créer des pipelines de traitement de données en temps réel capables de gérer de gros volumes de données. Flink fournit une plate-forme hautement évolutive et tolérante aux pannes pour le traitement des flux de données, tandis que Redis fournit une base de données en mémoire hautes performances qui peut être utilisée pour stocker et interroger des données. Dans cet article, nous allons explorer comment Flink peut être utilisé pour appeler Redis à l'aide de fonctions asynchrones et montrer comment cela peut être utilisé pour transmettre des données à Redis de manière non bloquante.
Conte de Redis

"Redis : plus qu'un simple cache
Redis est un puissant magasin de structure de données en mémoire NoSQL qui est devenu un outil incontournable pour les développeurs. Bien qu'il soit souvent considéré comme un simple cache, Redis est bien plus que cela. Il peut fonctionner comme une base de données, un courtier de messages et un cache tout en un.
L'une des forces de Redis est sa polyvalence. Il prend en charge divers types de données, notamment les chaînes, les listes, les ensembles, les ensembles triés, les hachages, les flux, les hyperloglogs et les bitmaps. Redis propose également des index géospatiaux et des requêtes de rayon, ce qui en fait un outil précieux pour les applications basées sur la localisation.
Les fonctionnalités de Redis vont au-delà de son modèle de données. Il intègre la réplication, les scripts Lua et les transactions, et peut partitionner automatiquement les données avec Redis Cluster. De plus, Redis offre une haute disponibilité via Redis Sentinel.
Remarque : dans cet article, nous nous concentrerons davantage sur le mode cluster Redis

Redis Cluster utilise le partitionnement algorithmique avec Hashslots pour déterminer quel fragment contient une clé donnée et simplifie l'ajout de nouvelles instances. Pendant ce temps, il utilise Gossiping pour déterminer la santé du cluster, et si un nœud principal ne répond pas, un nœud secondaire peut être promu pour maintenir le cluster en bonne santé. Il est essentiel d'avoir un nombre impair de nœuds principaux et deux répliques pour une configuration robuste afin d'éviter le phénomène de cerveau divisé (où les clusters sont incapables de décider qui promouvoir et se retrouvent avec une décision partagée)
Pour parler à Redis Cluster, nous utiliserons laitue un client Redis Async Java.
Conte de Flink

Apache Flink est un framework de traitement de flux et de traitement par lots unifié et open source conçu pour gérer le traitement de données en temps réel, à haut débit et tolérant aux pannes. Il est construit sur le framework Apache Gelly et est conçu pour prendre en charge le traitement d'événements complexes et les calculs avec état sur les flux limités et illimités. Ce qui le rend rapide, c'est son exploitation des performances en mémoire et le point de contrôle asynchrone de l'état local.
Le héros de l'histoire

L'interaction asynchrone avec les bases de données change la donne pour les applications de traitement de flux. Avec cette approche, une seule instance de fonction peut gérer plusieurs requêtes à la fois, ce qui permet des réponses simultanées et une augmentation significative du débit. En chevauchant le temps d'attente avec d'autres demandes et réponses, le pipeline de traitement devient beaucoup plus efficace.
Nous allons prendre un exemple de données de commerce électronique pour calculer le nombre de ventes pour chaque catégorie dans la fenêtre glissante de 24 heures avec une diapositive de 30 secondes et l'envoyer à Redis pour une recherche plus rapide d'un service en aval.
Exemple d' ensemble de données
Category, TimeStamp
Electronics,1679832334
Furniture,1679832336
Fashion,1679832378
Food,16798323536
package Aysnc_kafka_redis;
import AsyncIO.RedisSink;
import akka.japi.tuple.Tuple3;
import deserializer.Ecommdeserialize;
import model.Ecomm;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
public class FlinkAsyncRedis {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Ecommdeserialize jsonde = new Ecommdeserialize();
KafkaSource<Ecomm> source = KafkaSource.<Ecomm>builder()
.setTopics("{dummytopic}")
.setBootstrapServers("{dummybootstrap}")
.setGroupId("test_flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(jsonde)
.build();
DataStream<Ecomm> orderData = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
orderData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Ecomm>(Time.seconds(10)) {
@Override
public long extractTimestamp(Ecomm element) {
return element.getEventTimestamp(); // extract watermark column from stream
}
});
SingleOutputStreamOperator<Tuple3<String, Long, Long>> aggregatedData = orderData.keyBy(Ecomm::getCategory)
.window(SlidingEventTimeWindows.of(Time.hours(24),Time.seconds(30)))
.apply((WindowFunction<Ecomm, Tuple3<String, Long, Long>, String, TimeWindow>) (key, window, input, out) -> {
long count = 0;
for (Ecomm event : input) {
count++; // increment the count for each event in the window
}
out.collect(new Tuple3<>(key, window.getEnd(), count)); // output the category, window end time, and count
});
// calling async I/0 operator to sink data to redis in UnOrdered way
SingleOutputStreamOperator<String> sinkResults = AsyncDataStream.unorderedWait(aggregatedData,new RedisSink(
"{redisClusterUrl}"),
1000, // the timeout defines how long an asynchronous operation take before it is finally considered failed
TimeUnit.MILLISECONDS,
100); //capacity This parameter defines how many asynchronous requests may be in progress at the same time.
sinkResults.print(); // print out the redis set response stored in the future for every key
env.execute("RedisAsyncSink"); // you will be able to see your job running on cluster by this name
}
}
package AsyncIO;
import akka.japi.tuple.Tuple3;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import lombok.AllArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import scala.collection.immutable.List;
import java.util.ArrayList;
import java.util.Collections;
@AllArgsConstructor
public class RedisSink extends RichAsyncFunction<Tuple3<String, Long, Long>, String> {
String redisUrl;
public RedisSink(String redisUrl){
this.redisUrl=redisUrl;
}
private transient RedisClusterClient client = null;
private transient StatefulRedisClusterConnection<String, String> clusterConnection = null;
private transient RedisAdvancedClusterAsyncCommands<String, String> asyncCall = null;
// method executes any operator-specific initialization
@Override
public void open(Configuration parameters) {
if (client == null ) {
client = RedisClusterClient.create(redisUrl);
}
if (clusterConnection == null) {
clusterConnection = client.connect();
}
if (asyncCall == null) {
asyncCall = clusterConnection.async();
}
}
// core logic to set key in redis using async connection and return result of the call via ResultFuture
@Override
public void asyncInvoke(Tuple3<String, Long, Long> stream, ResultFuture<String> resultFuture) {
String productKey = stream.t1();
System.out.println("RedisKey:" + productKey); //for logging
String count = stream.t3().toString();
System.out.println("Redisvalue:" + count); //for logging
RedisFuture<String> setResult = asyncCall.set(productKey,count);
setResult.whenComplete((result, throwable) -> {if(throwable!=null){
System.out.println("Callback from redis failed:" + throwable);
resultFuture.complete(new ArrayList<>());
}
else{
resultFuture.complete(new ArrayList(Collections.singleton(result)));
}});
}
// method closes what was opened during initialization to free any resources
// held by the operator (e.g. open network connections, io streams)
@Override
public void close() throws Exception {
client.close();
}
}
- Les données diffusées sur Redis peuvent être utilisées par le modèle de science des données pour rechercher et générer plus de produits pour les catégories qui se vendent fréquemment pendant la saison des soldes.
- Il peut être utilisé pour présenter des graphiques et des chiffres en tant que statistiques de vente sur la page Web, afin de créer une motivation parmi les utilisateurs pour des achats agressifs.
- Flink fournit une plate-forme hautement évolutive et tolérante aux pannes pour le traitement des flux de données, tandis que Redis fournit une base de données en mémoire hautes performances qui peut être utilisée pour stocker et interroger des données.
- La programmation asynchrone peut être utilisée pour améliorer les performances des pipelines de traitement de données en autorisant des appels non bloquants vers des systèmes externes tels que Redis.
- Une combinaison des deux pourrait aider à apporter une culture de décision en temps réel.

https://architecturenotes.co/redis/.
https://www.baeldung.com/java-redis-lettuce
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/asyncio/