admin 管理员组文章数量: 1087709
Kafka 的 Confluent Schema Registry安装与使用教程
1 、Confluent Schema Registry 安装教程
- Schema Registry的各个发现行版本的下载链接
- 上传到linux系统进行解压安装。
- 本教程使用外部以安装好的Kafka集群不使用内部默认的。
- 修改confluent-5.3.1/etc/schema-registry/schema-registry.properties配置文件
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# .0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## The address the socket server listens on.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# 注册服务器的监听地址及其端口号
listeners=:8081# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# 有关连接外部集群的地址有两种方式:1 通过zk连接 2 通过kafka的控制器 。 本教程采用zk连接
kafkastore.connection.url=henghe-042:2181# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the master schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092# The name of the topic to store schemas in
kafkastore.topic=_schemas# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
- 注册服务器的启动
../../bin/schema-registry-start -daemon ../../etc/schema-registry/schema-registry.properties
- 注册服务器的API接口
# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-key/versions{"id":1}# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-value/versions{"id":1}# List all subjects
$ curl -X GET http://localhost:8081/subjects["Kafka-value","Kafka-key"]# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions[1]# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1{"schema":"\"string\""}# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/33# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value[1, 2, 3, 4, 5]# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-key{"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest{"is_compatible":true}# Get top level config
$ curl -X GET http://localhost:8081/config{"compatibilityLevel":"BACKWARD"}# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"compatibility": "NONE"}' \http://localhost:8081/config{"compatibility":"NONE"}# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"compatibility": "BACKWARD"}' \http://localhost:8081/config/Kafka-value{"compatibility":"BACKWARD"}
2、Confluent Schema Registry 使用教程
- 创建java工程的pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.yss</groupId><artifactId>Kafka</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version></dependency><!--此依赖是通过本地依赖库导入的,有关如何把jar放入本地依赖库自行搜索--><!--本人的jar文件是在编译源码时自动到依赖库中的所以直接引用--><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>5.3.2</version></dependency></dependencies></project>
- 生产者示例:
package com.registry;import java.util.Properties;
import java.util.Random;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;/*** @description:* @author: wangshuai* @create: 2020-01-03 14:17**/
public class ConfluentProducer {public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +"{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "192.168.101.42:9092");props.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");// 使用Confluent实现的KafkaAvroSerializerprops.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://192.168.101.42:8081");Producer<String, GenericRecord> producer = new KafkaProducer<>(props);Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(USER_SCHEMA);Random rand = new Random();int id = 0;while (id < 100) {id++;String name = "name" + id;int age = rand.nextInt(40) + 1;GenericRecord user = new GenericData.Record(schema);user.put("id", id);user.put("name", name);user.put("age", age);ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test-topic", user);producer.send(record);Thread.sleep(1000);}producer.close();}
}
- 消费者示例:
package com.registry;import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @description:* @author: wangshuai* @create: 2020-01-03 14:34**/
public class ConfluentConsumer {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "192.168.101.42:9092");props.put("group.id", "test1");props.put("enable.automit", "false");// 配置禁止自动提交,每次从头消费供测试使用props.put("auto.offset.reset", "earliest");props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");// 使用Confluent实现的KafkaAvroDeserializerprops.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");// 添加schema服务的地址,用于获取schemaprops.put("schema.registry.url", "http://192.168.101.42:8081");KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, GenericRecord> record : records) {GenericRecord user = record.value();System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "+ "partition = " + record.partition() + ", " + "offset = " + record.offset());}}} finally {consumer.close();}}
}
- 消费者消费结果示例。
本文标签: Kafka 的 Confluent Schema Registry安装与使用教程
版权声明:本文标题:Kafka 的 Confluent Schema Registry安装与使用教程 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1694425583a251903.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论