转自:
springboot-starter如何整合阿里云datahub呢?
下文笔者讲述springboot整合datahub的方法分享,如下所示
Datahub简介说明
DataHub的功能:
1.与大数据解决方案中Kafka具有相同的角色
同时还提供数据队列功能
2.DataHub还可与阿里云其它上下游产品对接
其一个交换的功能,称之为数据交换
DataHub 简介
datahub对外提供开发者生产和消费的sdk 在springboot中,我们也可用使用自定义starter的方式加载sdk
实现思路:
1.引入相应的starter器
2.application.yml中加入相应的配置信息
3.编写相应的代码
引入相应的starter器
cry-starters-projects
cn.com.cry.starters
2022-1.0.0
启动客户端
配置阿里云DataHub的endpoint以及AK信息
aliyun:
datahub:
# 开启功能
havingValue: true
#是否为私有云
isPrivate: false
accessId: xxx
accessKey: xxx
endpoint: xxx
#连接DataHub客户端超时时间
conn-timeout: 10000
获取DataHub客户端
DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
写数据
public int write(@RequestParam("id") Integer shardId) {
List datas = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Student s = new Student();
s.setAge(i);
s.setName("name-" + i);
s.setAddress("address-" + i);
datas.add(s);
}
int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
return successNumbers;
}
上述代码说明:
projectName为my_test
topicName为student
shardId 为N的hub里写数据
且返回插入成功的条数
读数据
读数据开发的逻辑类似RabbitMq的starter
使用@DataHubListener和@DataHubHandler处理器注解进行使用
@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {
@DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
public void handler(Message message) {
System.out.println("读取到shardId=0的消息");
System.out.println(message.getData());
System.out.println(message.getCreateTsime());
System.out.println(message.getSize());
System.out.println(message.getConfig());
System.out.println(message.getMessageId());
}
}
以上代码
通过LATEST游标的方式
监听 project=my_test
topicName=student
shardId=0
最终使用Message的包装类
获取dataHub实时写入的数据
此处可设置多种游标类型
例:根据最新的系统时间、最早录入的序号等
核心代码
需一个DataHubClient增强类
在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId
根据游标规则来读取当前的cursor进行数据的读取
public class DataHubClientWrapper implements InitializingBean, DisposableBean {
@Autowired
private AliyunAccountProperties properties;
@Autowired
private ApplicationContext context;
private DatahubClient datahubClient;
public DataHubClientWrapper() {
}
/**
* 执行销毁方法
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
WorkerResourceExecutor.shutdown();
}
@Override
public void afterPropertiesSet() throws Exception {
/**
* 创建DataHubClient
*/
this.datahubClient = DataHubClientFactory.create(properties);
/**
* 打印Banner
*/
BannerUtil.printBanner();
/**
* 赋值Template的静态对象dataHubClient
*/
DataHubTemplate.setDataHubClient(datahubClient);
/**
* 初始化Worker线程
*/
WorkerResourceExecutor.initWorkerResource(context);
/**
* 启动Worker线程
*/
WorkerResourceExecutor.start();
}
}
//写数据
//构建了一个类似RedisDataTemplate的模板类
//封装了write的逻辑
//调用时只需要用DataHubTemplate.write调用
public class DataHubTemplate {
private static DatahubClient dataHubClient;
private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);
/**
* 默认不开启重试机制
*
* @param projectName
* @param topicName
* @param datas
* @param shardId
* @return
*/
public static int write(String projectName, String topicName, List> datas, Integer shardId) {
return write(projectName, topicName, datas, shardId, false);
}
/**
* 往指定的projectName以及topic和shard下面写数据
*
* @param projectName
* @param topicName
* @param datas
* @param shardId
* @param retry
* @return
*/
private static int write(String projectName, String topicName, List> datas, Integer shardId, boolean retry) {
RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
List recordEntries = new ArrayList<>();
for (Object o : datas) {
RecordEntry entry = new RecordEntry();
Map data = BeanUtil.beanToMap(o);
TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
for (String key : data.keySet()) {
tupleRecordData.setField(key, data.get(key));
}
entry.setRecordData(tupleRecordData);
entry.setShardId(String.valueOf(shardId));
recordEntries.add(entry);
}
PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
int failedRecordCount = result.getFailedRecordCount();
if (failedRecordCount > 0 && retry) {
retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
}
return datas.size() - failedRecordCount;
}
/**
* @param client
* @param records
* @param retryTimes
* @param project
* @param topic
*/
private static void retry(DatahubClient client, List records, int retryTimes, String project, String topic) {
boolean suc = false;
List failedRecords = records;
while (retryTimes != 0) {
logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
retryTimes = retryTimes - 1;
PutRecordsResult result = client.putRecords(project, topic, failedRecords);
int failedNum = result.getFailedRecordCount();
if (failedNum > 0) {
failedRecords = result.getFailedRecords();
continue;
}
suc = true;
break;
}
if (!suc) {
logger.error("DataHub send message retry failure");
}
}
public static DatahubClient getDataHubClient() {
return dataHubClient;
}
public static void setDataHubClient(DatahubClient dataHubClient) {
DataHubTemplate.dataHubClient = dataHubClient;
}
}
//读数据
//需要在Spring启动时开启一个监听线程DataListenerWorkerThread
//执行一个死循环不停轮询DataHub下的对应通道
public class DataListenerWorkerThread extends Thread {
private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
private volatile boolean init = false;
private DatahubConfig config;
private String workerKey;
private int recordLimits;
private int sleep;
private RecordSchema recordSchema;
private RecordHandler recordHandler;
private CursorHandler cursorHandler;
public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
this.config = new DatahubConfig(projectName, topicName, shardId);
this.workerKey = projectName + "-" + topicName + "-" + shardId;
this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
this.recordLimits = recordLimits;
this.sleep = sleep;
this.setName("DataHub-Worker");
this.setDaemon(true);
}
@Override
public void run() {
initRecordSchema();
String cursor = cursorHandler.positioningCursor(config);
for (; ; ) {
try {
GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
if (result.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(sleep);
continue;
}
List
最后记得要做成一个starter
在resources下新建一个META-INF文件夹
新建一个spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
cry.starter.datahub.DataHubClientAutoConfiguration