2023年6月21日发(作者:)

⽤Java来测试Avro数据格式在Kafka的传输,及测试AvroSchema的兼容性为了测试Avro Schema的兼容性,新建2个Java project,其中v1代表的是第⼀个版本, v2代表的是第⼆个版本。2个project结构如下

v1的主要代码: 4.0.0 ak05v1 1.0-SNAPSHOT 1.8.2 1.1.0 5.3.0 tory /earlyaccess/all/ avro ${n} kafka-clients ${n} ent kafka-avro-serializer ${n} 4j slf4j-api 1.7.25 4j slf4j-log4j12 1.7.25 s maven-compiler-plugin 3.7.0 1.8 1.8 avro-maven-plugin ${n} generate-sources schema protocol idl-protocol ${r}/src/main/resources/avro String false true private build-helper-maven-plugin 3.0.0 add-source generate-sources add-source target/generated-sources/avro View Code

{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "b", "type": "string", "default":"v1"}, { "name": "c", "type": "string", "default":"v1"} ] }View Code

ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV1Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer producer = new KafkaProducer<>(props); // msgKey是string, msgValue是Employee // 步驟3. 指定想要發佈訊息的topic名稱 String topicName = "002"; try { // 步驟4. 直接使⽤Maven從scheam產⽣出來的物件來做為資料的容器 // 送進第1個員⼯(schema v1) Test test = lder() .setA("001") .setB("Jack") .setC("Ma") .build(); RecordMetadata metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第2個員⼯(schema v1) test = lder() .setA("002") .setB("Pony") .setC("Ma") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第3個員⼯(schema v1) test = lder() .setA("003") .setB("Robin") .setC("Li") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); } catch(Exception e) { tackTrace(); } finally { (); (); } }}View Code

ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV1Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v1"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer consumer = new KafkaConsumer<>(props); // msgKey是string, msgValue是Test // 步驟3. 指定想要訂閱訊息的topic名稱 String topicName = "002"; // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic (每次重起的時候使⽤seekToListener來移動ConsumerGroup的offset到topic的最前⾯) ibe((topicName), new SeekToListener(consumer)); // 步驟5. 持續的拉取Kafka有進來的訊息 try { n("Start listen incoming messages ..."); while (true) { // 請求Kafka把新的訊息吐出來 ConsumerRecords records = (1000); // 如果有任何新的訊息就會進到下⾯的迭代 for (ConsumerRecord record : records){ // ** 在這裡進⾏商業邏輯與訊息處理 ** // 取出相關的metadata String topic = (); int partition = ion(); long offset = (); TimestampType timestampType = ampType(); long timestamp = amp(); // 取出msgKey與msgValue String msgKey = (); Test msgValue = (); //<-- 注意 // 秀出metadata與msgKey & msgValue訊息 n(topic + "-" + partition + "-" + offset + " : (" + () + ", " + msgValue + ")"); } Async(); } } finally { // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線 (); n("Stop listen incoming messages"); } }}View Code

v2的主要代码:与v1⼀致{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "c", "type": "string", "default": "v2"}, { "name": "d", "type": "string", "default": "v2"}, { "name": "e", "type": "string", "default": "v2"} ]}View Code

ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV2Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer producer = new KafkaProducer<>(props); // msgKey是string, msgValue是Employee // 步驟3. 指定想要發佈訊息的topic名稱 String topicName = "002"; try { // 步驟4. 直接使⽤Maven從scheam產⽣出來的物件來做為資料的容器 // 送進第1個員⼯(schema v1) Test test = lder() .setA("a1") .setC("c1") .setD("d1") .setE("e1") .build(); RecordMetadata metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第2個員⼯(schema v1) test = lder() .setA("a2") .setC("c2") .setD("d2") .setE("e2") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第3個員⼯(schema v1) test = lder() .setA("a3") .setC("c3") .setD("d3") .setE("e3") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); } catch(Exception e) { tackTrace(); } finally { (); (); } }}View Code

ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV2Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v2"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer consumer = new KafkaConsumer<>(props); // msgKey是string, msgValue是Test // 步驟3. 指定想要訂閱訊息的topic名稱 String topicName = "002"; // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic (每次重起的時候使⽤seekToListener來移動ConsumerGroup的offset到topic的最前⾯) ibe((topicName), new SeekToListener(consumer)); // 步驟5. 持續的拉取Kafka有進來的訊息 try { n("Start listen incoming messages ..."); while (true) { // 請求Kafka把新的訊息吐出來 ConsumerRecords records = (1000); // 如果有任何新的訊息就會進到下⾯的迭代 for (ConsumerRecord record : records){ // ** 在這裡進⾏商業邏輯與訊息處理 ** // 取出相關的metadata String topic = (); int partition = ion(); long offset = (); TimestampType timestampType = ampType(); long timestamp = amp(); // 取出msgKey與msgValue String msgKey = (); Test msgValue = (); //<-- 注意 // 秀出metadata與msgKey & msgValue訊息 n(topic + "-" + partition + "-" + offset + " : (" + () + ", " + msgValue + ")"); } Async(); } } finally { // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線 (); n("Stop listen incoming messages"); } }}View Code

测试步骤:1. Run producer-v1,去schema registry UI看schema版本2. Run producer-v2,去schema registry UI看schema版本3. Run consumer-v1,旧schema读新数据,演⽰forward4. Run consumer-v2,新schema读旧数据,演⽰ TestV1Producer,发送成功

去schema registry UI查看schema信息,此时schema版本是v.1 TestV2Producer,发送成功

去schema registry UI查看schema信息,此时schema版本是v.2 TestV1Consumer,⽤旧schema去读新数据,测试forward(向前兼容),可以看到,新旧资料都读取了

TestV2Consumer,⽤新schema去读旧数据,测试backward(向后兼容)

2023年6月21日发(作者:)

⽤Java来测试Avro数据格式在Kafka的传输,及测试AvroSchema的兼容性为了测试Avro Schema的兼容性,新建2个Java project,其中v1代表的是第⼀个版本, v2代表的是第⼆个版本。2个project结构如下

v1的主要代码: 4.0.0 ak05v1 1.0-SNAPSHOT 1.8.2 1.1.0 5.3.0 tory /earlyaccess/all/ avro ${n} kafka-clients ${n} ent kafka-avro-serializer ${n} 4j slf4j-api 1.7.25 4j slf4j-log4j12 1.7.25 s maven-compiler-plugin 3.7.0 1.8 1.8 avro-maven-plugin ${n} generate-sources schema protocol idl-protocol ${r}/src/main/resources/avro String false true private build-helper-maven-plugin 3.0.0 add-source generate-sources add-source target/generated-sources/avro View Code

{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "b", "type": "string", "default":"v1"}, { "name": "c", "type": "string", "default":"v1"} ] }View Code

ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV1Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer producer = new KafkaProducer<>(props); // msgKey是string, msgValue是Employee // 步驟3. 指定想要發佈訊息的topic名稱 String topicName = "002"; try { // 步驟4. 直接使⽤Maven從scheam產⽣出來的物件來做為資料的容器 // 送進第1個員⼯(schema v1) Test test = lder() .setA("001") .setB("Jack") .setC("Ma") .build(); RecordMetadata metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第2個員⼯(schema v1) test = lder() .setA("002") .setB("Pony") .setC("Ma") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第3個員⼯(schema v1) test = lder() .setA("003") .setB("Robin") .setC("Li") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); } catch(Exception e) { tackTrace(); } finally { (); (); } }}View Code

ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV1Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v1"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer consumer = new KafkaConsumer<>(props); // msgKey是string, msgValue是Test // 步驟3. 指定想要訂閱訊息的topic名稱 String topicName = "002"; // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic (每次重起的時候使⽤seekToListener來移動ConsumerGroup的offset到topic的最前⾯) ibe((topicName), new SeekToListener(consumer)); // 步驟5. 持續的拉取Kafka有進來的訊息 try { n("Start listen incoming messages ..."); while (true) { // 請求Kafka把新的訊息吐出來 ConsumerRecords records = (1000); // 如果有任何新的訊息就會進到下⾯的迭代 for (ConsumerRecord record : records){ // ** 在這裡進⾏商業邏輯與訊息處理 ** // 取出相關的metadata String topic = (); int partition = ion(); long offset = (); TimestampType timestampType = ampType(); long timestamp = amp(); // 取出msgKey與msgValue String msgKey = (); Test msgValue = (); //<-- 注意 // 秀出metadata與msgKey & msgValue訊息 n(topic + "-" + partition + "-" + offset + " : (" + () + ", " + msgValue + ")"); } Async(); } } finally { // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線 (); n("Stop listen incoming messages"); } }}View Code

v2的主要代码:与v1⼀致{ "type": "record", "namespace": "", "name": "Test", "fields": [ { "name": "a", "type": "string"}, { "name": "c", "type": "string", "default": "v2"}, { "name": "d", "type": "string", "default": "v2"}, { "name": "e", "type": "string", "default": "v2"} ]}View Code

ckage ;import ;import roducer;import er;import erRecord;import Metadata;import ties;import ionException;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroSerializer來傳送資料進Kafka */public class TestV2Producer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? //private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? private static String SCHEMA_REGISTRY_URL = "/api/schema-registry"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("izer", "Serializer"); // 指定msgKey的序列化器 ("izer", "vroSerializer"); // <-- 指定msgValue的序列化器 //("izer", "Serializer"); ("", SCHEMA_REGISTRY_URL);// SchemaRegistry的服務在那裡? ("acks","all"); ("tion","1"); ("retries",_VALUE+""); // 步驟2. 產⽣⼀個Kafka的Producer的實例 <-- 注意 Producer producer = new KafkaProducer<>(props); // msgKey是string, msgValue是Employee // 步驟3. 指定想要發佈訊息的topic名稱 String topicName = "002"; try { // 步驟4. 直接使⽤Maven從scheam產⽣出來的物件來做為資料的容器 // 送進第1個員⼯(schema v1) Test test = lder() .setA("a1") .setC("c1") .setD("d1") .setE("e1") .build(); RecordMetadata metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第2個員⼯(schema v1) test = lder() .setA("a2") .setC("c2") .setD("d2") .setE("e2") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); // 送進第3個員⼯(schema v1) test = lder() .setA("a3") .setC("c3") .setD("d3") .setE("e3") .build(); metaData = (new ProducerRecord(topicName, (), test)).get(); // msgKey是string, msgValue是Employee n(() + " --> " + test); } catch(Exception e) { tackTrace(); } finally { (); (); } }}View Code

ckage ;import ;import er;import erRecord;import erRecords;import onsumer;import ampType;import ;import ties;/** * ⽰範如何使⽤SchemaRegistry與KafkaAvroDeserializer來從Kafka裡讀取資料 */public class TestV2Consumer { private static String KAFKA_BROKER_URL = "localhost:9092"; // Kafka集群在那裡? private static String SCHEMA_REGISTRY_URL = "10.37.35.115:9086"; // SchemaRegistry的服務在那裡? public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); ("s", KAFKA_BROKER_URL); // Kafka集群在那裡? ("", "ak05-v2"); // <-- 這就是ConsumerGroup ("alizer", "Deserializer"); // 指定msgKey的反序列化器 ("alizer", "vroDeserializer"); // 指定msgValue的反序列化器 ("", SCHEMA_REGISTRY_URL); // <-- SchemaRegistry的服務在那裡? ("", "true"); // <-- 告訴KafkaAvroDeserializer來反序列成Avro產⽣的specific物件類別 // (如果沒有設定, 則都會以GenericRecord⽅法反序列) ("", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 ("", "false"); // 步驟2. 產⽣⼀個Kafka的Consumer的實例 Consumer consumer = new KafkaConsumer<>(props); // msgKey是string, msgValue是Test // 步驟3. 指定想要訂閱訊息的topic名稱 String topicName = "002"; // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic (每次重起的時候使⽤seekToListener來移動ConsumerGroup的offset到topic的最前⾯) ibe((topicName), new SeekToListener(consumer)); // 步驟5. 持續的拉取Kafka有進來的訊息 try { n("Start listen incoming messages ..."); while (true) { // 請求Kafka把新的訊息吐出來 ConsumerRecords records = (1000); // 如果有任何新的訊息就會進到下⾯的迭代 for (ConsumerRecord record : records){ // ** 在這裡進⾏商業邏輯與訊息處理 ** // 取出相關的metadata String topic = (); int partition = ion(); long offset = (); TimestampType timestampType = ampType(); long timestamp = amp(); // 取出msgKey與msgValue String msgKey = (); Test msgValue = (); //<-- 注意 // 秀出metadata與msgKey & msgValue訊息 n(topic + "-" + partition + "-" + offset + " : (" + () + ", " + msgValue + ")"); } Async(); } } finally { // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線 (); n("Stop listen incoming messages"); } }}View Code

测试步骤:1. Run producer-v1,去schema registry UI看schema版本2. Run producer-v2,去schema registry UI看schema版本3. Run consumer-v1,旧schema读新数据,演⽰forward4. Run consumer-v2,新schema读旧数据,演⽰ TestV1Producer,发送成功

去schema registry UI查看schema信息,此时schema版本是v.1 TestV2Producer,发送成功

去schema registry UI查看schema信息,此时schema版本是v.2 TestV1Consumer,⽤旧schema去读新数据,测试forward(向前兼容),可以看到,新旧资料都读取了

TestV2Consumer,⽤新schema去读旧数据,测试backward(向后兼容)