Skip to content
This repository has been archived by the owner on Oct 13, 2024. It is now read-only.

alpha的akka模式下,事务条目是否有写入redis #732

Open
chensk0601 opened this issue Feb 2, 2022 · 12 comments
Open

alpha的akka模式下,事务条目是否有写入redis #732

chensk0601 opened this issue Feb 2, 2022 · 12 comments

Comments

@chensk0601
Copy link

看设计图,akka模式下,事务条目信息是有追加到redis中的,但是实现上并没有看到,只有看到事务结束后才把数据写入ES

@coolbeevip
Copy link
Member

Akka Persistence Redis Plugin is a plugin for Akka persistence

@chensk0601
Copy link
Author

chensk0601 commented Feb 3, 2022 via email

@chensk0601 chensk0601 reopened this Feb 8, 2022
@chensk0601
Copy link
Author

明白了,还有一个疑问,如果将这个akka persistence用于生产系统,已经完成的事务数据怎么进行老化处理,有没有这方面的一些经验和实践

@WillemJiang
Copy link
Member

你说的老化是redis的数据老化,还是ES的数据老化?

@chensk0601
Copy link
Author

chensk0601 commented Feb 8, 2022 via email

@coolbeevip
Copy link
Member

老化处理是什么意思?

@chensk0601
Copy link
Author

chensk0601 commented Feb 10, 2022 via email

@coolbeevip
Copy link
Member

就是数据怎么清理的意思

2022年2月10日 下午1:36,Lei Zhang @.***> 写道: 老化处理是什么意思? — Reply to this email directly, view it on GitHub <#732 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/ANVDS4QFTZDHKSOHW6XWXTTU2NFFJANCNFSM5NMM3IZA. Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub. You are receiving this because you modified the open/close state.

actor 在停止后会调用以下方法清理 Redis 中的数据和快照

deleteMessages(lastSequenceNr());
deleteSnapshot(snapshotSequenceNr());

但是 highestSequenceNr 不会自动清理,你可以看这个注释(我在注释里提供了一个 Lua 脚本用来清理)

// 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
// 以下基于 journal-redis 说明:
// 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
// journal:persistenceIds
// journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
// journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
//
// 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
// 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
// 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
// 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
//
// 何如清理:
// 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
// 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
// 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
// 并删除 journal:persisted:item:highestSequenceNr
//
// 目前可以看到的解释是 https://github.com/akka/akka/issues/21181
//
// Lua script akka-persistence-redis-clean.lua
// local ids = redis.call('smembers','journal:persistenceIds');
// local delkeys = {};
// for k, v in pairs(ids) do
// local jpid = 'journal:persisted:' .. v;
// local jpidnr = 'journal:persisted:' .. v .. ':highestSequenceNr';
// local hasjpid = redis.call('exists',jpid);
// if(hasjpid == 0)
// then
// local hasjpidnr = redis.call('exists',jpidnr);
// if(hasjpidnr == 1)
// then
// redis.call('del',jpidnr);
// table.insert(delkeys,jpid);
// end
// end
// end
// return delkeys;

ES 中的数据你可以根据 事务数据持久化 中的数据样例了解JSON 接口,可以使用 ES API 自行删除不需要的数据

@chensk0601
Copy link
Author

chensk0601 commented Feb 10, 2022 via email

@coolbeevip
Copy link
Member

coolbeevip commented Feb 10, 2022

这里有一个事件定义文档

还有一个状态机设计

你可以了解一下,SataTimeoutEvent 时间不是由 omega 发起的,我印象中是使用了 Akka 的 goTo()...forMax() 实现的,在相关状态 goto 的时候都要超时设置

.forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS));

通过内置 StateTimeout() 实现超时后发送状态机停止事件,并挂起事物

).event(Collections.singletonList(StateTimeout()), SagaData.class,

@chensk0601
Copy link
Author

chensk0601 commented Feb 10, 2022 via email

@coolbeevip
Copy link
Member

coolbeevip commented Feb 10, 2022

你说的这个forMax超时的时候,应该是.event(Collections.singletonList(StateTimeout())

2022年2月10日 下午3:40,Lei Zhang @.***> 写道: 这里有一个事件定义文档 https://github.com/apache/servicecomb-pack/blob/master/docs/fsm/design_fsm_zh.md#%E4%BA%8B%E4%BB%B6%E5%AE%9A%E4%B9%89 还有一个状态机设计 https://github.com/apache/servicecomb-pack/blob/master/docs/fsm/design_fsm_zh.md#%E7%8A%B6%E6%80%81%E6%9C%BA%E8%AE%BE%E8%AE%A1 你可以了解一下,SataTimeoutEvent 时间不是由 omega 发起的,我印象中是使用了 Akka 的 goTo()...forMax() 实现的,在相关状态 goto 的时候都要超时设置

.forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS));
.forMax(Duration.create(event.getTimeout(), TimeUnit.SECONDS));
— Reply to this email directly, view it on GitHub <#732 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/ANVDS4REIYGK6RACPDM7LH3U2NTX3ANCNFSM5NMM3IZA. Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub. You are receiving this because you modified the open/close state.

是的,时间比较长了,我还有点印象不太深了,你可以跑以下单元测试

和这个集成测试

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants