• .NET分布式Orleans - 6 - 事件溯源


    基本概念

    事件溯源(Event Sourcing)是一种设计模式,它记录并存储了应用程序状态变化的所有事件。

    其核心思想是将系统中的每次状态变化都视为一个事件,并将这些事件以时间顺序的方式持久化存储。

    这样,通过重放这些事件,我们可以重建系统在任何特定时间点的状态。

    每个事件通常都包含了描述状态变化的必要信息,以及发生状态变化的原因和时间戳。

    工作原理

    工作原理方面,事件溯源主要依赖于两个关键部分:事件生成和事件存储。

    当系统中发生状态变化时,会生成一个或多个事件,这些事件随后被存储到事件存储中。

    事件存储需要设计成高可用、高一致且可伸缩的,以支持大规模的系统操作。

    之后,当需要重建系统状态时,只需从事件存储中按顺序读取事件,并依次应用这些事件到系统状态即可。

    使用场景

    在Orleans7中,事件溯源主要应用在以下几个场景:

    1. 分布式系统状态同步:在分布式系统中,各个节点之间的状态同步是一个重要问题。通过事件溯源,每个节点都可以记录并发送自己的状态变化事件,其他节点则可以通过订阅这些事件来同步自己的状态。

    2. 历史数据追踪和审计:在某些业务场景下,需要追踪系统的历史操作记录,以进行审计或分析。事件溯源提供了完整的操作历史,可以方便地查询和回放历史事件。

    3. 容错和恢复:当系统发生故障时,通过事件溯源可以方便地恢复到故障发生前的状态,或者根据事件日志进行故障排查。

    优势

    事件溯源在Orleans7中带来了以下优势:

    1. 数据完整性和一致性:由于事件溯源记录了所有状态变化的历史,因此可以确保数据的完整性和一致性。

    2. 灵活性和可扩展性:事件溯源的设计使得系统可以很容易地添加新的状态变化事件,同时也支持大规模的系统扩展。

    3. 容错和恢复能力:通过事件溯源,可以轻松地恢复到系统的任何历史状态,大大提高了系统的容错和恢复能力。

    4. 清晰的业务逻辑:每个事件都代表了一个具体的业务操作,因此通过查看事件日志,可以清晰地了解系统的业务逻辑和操作流程。

    总的来说,事件溯源是一种强大而灵活的设计模式,它在Orleans7中的应用为分布式系统带来了诸多优势。对于软件开发者来说,理解和掌握事件溯源机制,将有助于构建更加健壮、可靠和可扩展的分布式系统。

    示例

    下面使用事件溯源,来跟踪一个账户的变更记录。

    首先需要安装必须的nuget包

    "Microsoft.Orleans.EventSourcing" Version="8.0.0" />
    "Microsoft.Orleans.Clustering.AdoNet" Version="8.0.0" />
    "Microsoft.Orleans.Persistence.AdoNet" Version="8.0.0" />
    "Microsoft.Orleans.Server" Version="8.0.0" />

    然后设置Orleans,除了Orleans的常规设置外,还需要 siloHostBuilder.AddLogStorageBasedLogConsistencyProvider("LogStorage") 来设置LogConsistencyProvider

    复制代码
    builder.Host.UseOrleans(static siloHostBuilder =>
    {
        var invariant = "System.Data.SqlClient";
    
        var connectionString = "Data Source=localhost\\SQLEXPRESS;Initial Catalog=orleanstest;User Id=sa;Password=12334;";
    
        siloHostBuilder.AddLogStorageBasedLogConsistencyProvider("LogStorage");
    
        // Use ADO.NET for clustering
        siloHostBuilder.UseAdoNetClustering(options =>
        {
            options.Invariant = invariant;
            options.ConnectionString = connectionString;
        }).ConfigureLogging(logging => logging.AddConsole());
    
        siloHostBuilder.Configure(options =>
        {
            options.ClusterId = "my-first-cluster";
            options.ServiceId = "SampleApp";
        });
    
        // Use ADO.NET for persistence
        siloHostBuilder.AddAdoNetGrainStorage("GrainStorageForTest", options =>
        {
            options.Invariant = invariant;
            options.ConnectionString = connectionString;
            //options.GrainStorageSerializer = new JsonGrainStorageSerializer()
         
        });
    });
    复制代码

    定义账户的存储和提取事件类

    复制代码
    // the classes below represent events/transactions on the account
    // all fields are user-defined (none have a special meaning),
    // so these can be any type of object you like, as long as they are serializable
    // (so they can be sent over the wire and persisted in a log).
    
    [Serializable]
    [GenerateSerializer]
    public abstract class Transaction
    {
        ///  A unique identifier for this transaction  
        [Id(0)]
        public Guid Guid { get; set; }
    
        ///  A description for this transaction  
        [Id(1)]
        public string Description { get; set; }
    
        ///  time on which the request entered the system  
        [Id(2)]
        public DateTime IssueTime { get; set; }
    }
    
    [Serializable]
    [GenerateSerializer]
    public class DepositTransaction : Transaction
    {
        [Id(0)]
        public uint DepositAmount { get; set; }
    }
    
    [Serializable]
    [GenerateSerializer]
    public class WithdrawalTransaction : Transaction
    {
        [Id(0)]
        public uint WithdrawalAmount { get; set; }
    }
    复制代码

    再定义账户的Grain,其中有存钱,取钱,获取余额,与变更记录操作

    Grain类必须具有 LogConsistencyProviderAttribute 才能指定日志一致性提供程序。 还需要 StorageProviderAttribute设置存储。

    复制代码
    /// 
    /// An example of a journaled grain that models a bank account.
    /// 
    /// Configured to use the default storage provider.
    /// Configured to use the LogStorage consistency provider.
    /// 
    /// This provider persists all events, and allows us to retrieve them all.
    /// 
    
    /// 
    /// A grain that models a bank account
    /// 
    public interface IAccountGrain : IGrainWithStringKey
    {
        Task<uint> Balance();
    
        Task Deposit(uint amount, Guid guid, string desc);
    
        Task<bool> Withdraw(uint amount, Guid guid, string desc);
    
        Task> GetTransactionLog();
    }
    
    [StorageProvider(ProviderName = "GrainStorageForTest")]
    [LogConsistencyProvider(ProviderName = "LogStorage")]
    
    public class AccountGrain : JournaledGrain, IAccountGrain
    {
        /// 
        /// The state of this grain is just the current balance.
        /// 
        [Serializable]
        [Orleans.GenerateSerializer]
        public class GrainState
        {
            [Orleans.Id(0)]
            public uint Balance { get; set; }
    
            public void Apply(DepositTransaction d)
            {
                Balance = Balance + d.DepositAmount;
            }
    
            public void Apply(WithdrawalTransaction d)
            {
                if (d.WithdrawalAmount > Balance)
                    throw new InvalidOperationException("we make sure this never happens");
    
                Balance = Balance - d.WithdrawalAmount;
            }
        }
    
        public Task<uint> Balance()
        {
            return Task.FromResult(State.Balance);
        }
    
        public Task Deposit(uint amount, Guid guid, string description)
        {
            RaiseEvent(new DepositTransaction()
            {
                Guid = guid,
                IssueTime = DateTime.UtcNow,
                DepositAmount = amount,
                Description = description
            });
    
            // we wait for storage ack
            return ConfirmEvents();
        }
    
        public Task<bool> Withdraw(uint amount, Guid guid, string description)
        {
            // if the balance is too low, can't withdraw
            // reject it immediately
            if (State.Balance < amount)
                return Task.FromResult(false);
    
            // use a conditional event for withdrawal
            // (conditional events commit only if the version hasn't already changed in the meantime)
            // this is important so we can guarantee that we never overdraw
            // even if racing with other clusters, of in transient duplicate grain situations
            return RaiseConditionalEvent(new WithdrawalTransaction()
            {
                Guid = guid,
                IssueTime = DateTime.UtcNow,
                WithdrawalAmount = amount,
                Description = description
            });
        }
    
        public Task> GetTransactionLog()
        {
            return RetrieveConfirmedEvents(0, Version);
        }
    }
    复制代码

    最后即可通过client生成grain,并获取账户变动记录

    var palyer = client.GetGrain("zhangsan");
    await palyer.Deposit(1000, Guid.NewGuid(), "aaa");
    var logs = await palyer.GetTransactionLog();
    return Results.Ok(logs);

     


  • 相关阅读:
    upp(统一流程平台)项目,如果需要项目章程会怎么写
    js基础笔记学习256navigator
    微信小程序基于java实现v2支付,提现,退款
    一文学会Spring,Spring最简单的入门教程(万字好文)
    clip-path图片裁剪
    iOS自动化测试方案(一):MacOS虚拟机保姆级安装Xcode教程
    SpringBoot整合Swagger2
    【Python】PaddleOCR文字识别国产之光 从安装到pycharm中测试 (保姆级图文)
    xf86-video-intel源码分析5 —— intel_options.c和intel_options.h(2)
    玩转NVIDIA Jetson (25)--- jetson 安装pytorch和torchvision
  • 原文地址:https://www.cnblogs.com/chenyishi/p/18099103