2023年6月21日发(作者:)
⽤Java来测试Avro数据格式在Kafka的传输,及测试AvroSchema的兼容性为了测试Avro Schema的兼容性,新建2个Java project,其中v1代表的是第⼀个版本, v2代表的是第⼆个版本。2个project结构如下
v1的主要代码:
{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "b", "type": "string", "default":"v1"}, { "name": "c", "type": "string", "default":"v1"} ] }View Code
ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV1Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer
ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV1Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v1"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer
v2的主要代码:与v1⼀致{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "c", "type": "string", "default": "v2"}, { "name": "d", "type": "string", "default": "v2"}, { "name": "e", "type": "string", "default": "v2"} ]}View Code
ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV2Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer
ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV2Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v2"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer
测试步骤:1. Run producer-v1,去schema registry UI看schema版本2. Run producer-v2,去schema registry UI看schema版本3. Run consumer-v1,旧schema读新数据,演⽰forward4. Run consumer-v2,新schema读旧数据,演⽰ TestV1Producer,发送成功
去schema registry UI查看schema信息,此时schema版本是v.1 TestV2Producer,发送成功
去schema registry UI查看schema信息,此时schema版本是v.2 TestV1Consumer,⽤旧schema去读新数据,测试forward(向前兼容),可以看到,新旧资料都读取了
TestV2Consumer,⽤新schema去读旧数据,测试backward(向后兼容)
2023年6月21日发(作者:)
⽤Java来测试Avro数据格式在Kafka的传输,及测试AvroSchema的兼容性为了测试Avro Schema的兼容性,新建2个Java project,其中v1代表的是第⼀个版本, v2代表的是第⼆个版本。2个project结构如下
v1的主要代码:
{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "b", "type": "string", "default":"v1"}, { "name": "c", "type": "string", "default":"v1"} ] }View Code
ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV1Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer
ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV1Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v1"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer
v2的主要代码:与v1⼀致{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "c", "type": "string", "default": "v2"}, { "name": "d", "type": "string", "default": "v2"}, { "name": "e", "type": "string", "default": "v2"} ]}View Code
ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV2Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer
ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV2Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v2"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer
测试步骤:1. Run producer-v1,去schema registry UI看schema版本2. Run producer-v2,去schema registry UI看schema版本3. Run consumer-v1,旧schema读新数据,演⽰forward4. Run consumer-v2,新schema读旧数据,演⽰ TestV1Producer,发送成功
去schema registry UI查看schema信息,此时schema版本是v.1 TestV2Producer,发送成功
去schema registry UI查看schema信息,此时schema版本是v.2 TestV1Consumer,⽤旧schema去读新数据,测试forward(向前兼容),可以看到,新旧资料都读取了
TestV2Consumer,⽤新schema去读旧数据,测试backward(向后兼容)
发布评论