• 关于响应式编程ReactiveX,RxGo


    ReactiveX,简称为 Rx,是一个异步编程的 API。与 callback(回调)、promise(JS 提供这种方式)和 deferred(Python 的 twisted 网络编程库就是使用这种方式)这些异步编程方式有所不同,Rx 是基于事件流的。这里的事件可以是系统中产生或变化的任何东西,在代码中我们一般用对象表示。在 Rx 中,事件流被称为 Observable(可观察的,被观察者)。事件流需要被 Observer(观察者)处理才有意义。

    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。它是一套API,针对不同的编程语言会有不同的实现,比如 RxJS, RxJava, RxGo

    ReactiveX官网:https://reactivex.io/

    ReactiveX仓库:https://github.com/ReactiveX

    RxGo 是 Rx 的 Go 语言实现。借助于 Go 语言简洁的语法和强大的并发支持(goroutine、channel),Rx 与 Go 语言的结合非常完美。

    pipelines (官方博客:https://blog.golang.org/pipelines)是 Go 基础的并发编程模型。其中包含,fan-in——多个 goroutine 产生数据,一个goroutine 处理数据,fan-out——一个 goroutine 产生数据,多个 goroutine 处理数据,fan-inout——多个 goroutine 产生数据,多个 goroutine 处理数据。它们都是通过 channel 连接。RxGo 的实现就是基于 pipelines 的理念,并且提供了方便易用的包装和强大的扩展。

    通常来说,Go写异步程序很容易,完全可以自己封装实现,而RxGo的封装更加标准化,遵循了 ReactiveX 规范,易于理解。

    在这里插入图片描述

    RxGo: https://github.com/ReactiveX/RxGo

    go get -u github.com/reactivex/rxgo/v2

    简单使用
    func t1() {
    	observable := rxgo.Just(1, 2, 3, 4, 5)()
    	ch := observable.Observe()
    	for item := range ch {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    使用 RxGo 的一般流程如下:

    • 使用相关的 Operator 创建 ObservableOperator 就是用来创建 Observable 的。这些术语都比较难贴切地翻译,而且英文也很好懂,就不强行翻译了;
    • 中间各个阶段可以使用过滤操作筛选出我们想要的数据,使用转换操作对数据进行转换;
    • 调用 ObservableObserve()方法,该方法返回一个<- chan rxgo.Item。然后for range遍历即可。

    实际上rxgo.Item还可以包含错误。所以在使用时,我们应该做一层判断

    func t2() {
    	observable := rxgo.Just(1, 2, errors.New("unknown"), 4, 5)()
    	ch := observable.Observe()
    	for item := range ch {
    		if item.Error() {
    			fmt.Println("Error:", item.E)
    		} else {
    			fmt.Println(item.V)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    除了使用for range之外,我们还可以调用 ObservableForEach()方法来实现遍历。ForEach()接受 3 个回调函数:

    • NextFunc:类型为func (v interface {}),处理数据;
    • ErrFunc:类型为func (err error),处理错误;
    • CompletedFunc:类型为func ()Observable 完成时调用。
    func t3() {
    	observable := rxgo.Just(1, 2, 3, 4, 5)()
    	<-observable.ForEach(func(v interface{}) {
    		fmt.Println("onNext:", v)
    	}, func(err error) {
    		fmt.Println("onError:", err)
    	}, func() {
    		fmt.Println("onComplete")
    	})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onComplete
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ForEach()实际上是在 goroutine 里执行的,它返回一个接收通知的 channel。当 Observable 数据发送完毕时,该 channel 会关闭。所以如果要等待ForEach()执行完成,我们需要使用<-。上面的示例中如果去掉<-,可能就没有输出了,因为主 goroutine 结束了,整个程序就退出了。

    创建 Observable

    上面使用最简单的方式创建 Observable:直接调用Just()方法传入一系列数据。下面再介绍几种创建 Observable 的方式。

    Create

    传入一个[]rxgo.Producer的切片,其中rxgo.Producer的类型为func(ctx context.Context, next chan<- Item)。我们可以在代码中调用rxgo.Of(value)生成数据,rxgo.Error(err)生成错误,然后发送到next通道中:

    func t4() {
    	observable := rxgo.Create([]rxgo.Producer{
    		func(ctx context.Context, next chan<- rxgo.Item) {
    			next <- rxgo.Of(1)
    		},
    		func(ctx context.Context, next chan<- rxgo.Item) {
    			next <- rxgo.Of(2)
    			next <- rxgo.Error(errors.New("unknown"))
    			next <- rxgo.Of(4)
    			next <- rxgo.Of(5)
    		},
    	})
    	<-observable.ForEach(func(v interface{}) {
    		fmt.Println("onNext:", v)
    	}, func(err error) {
    		fmt.Println("onError:", err)
    	}, func() {
    		fmt.Println("onComplete")
    	})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    FromChannel

    直接从一个已存在的<-chan rxgo.Item对象中创建 Observable

    func t5() {
    	ch := make(chan rxgo.Item) // no buffer
    	go func() {
    		for i := 1; i <= 5; i++ {
    			ch <- rxgo.Of(i)
    		}
    		close(ch)
    	}()
    
    	observable := rxgo.FromChannel(ch)
    	<-observable.ForEach(func(v interface{}) {
    		fmt.Println("onNext:", v)
    	}, func(err error) {
    		fmt.Println("onError:", err)
    	}, func() {
    		fmt.Println("onComplete")
    	})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    注意:通道需要手动调用close()关闭,上面Create()方法内部rxgo自动帮我们执行了这个步骤。

    Interval

    以传入的时间间隔生成一个无穷的数字序列,从 0 开始

    func t6() {
    	observable := rxgo.Interval(rxgo.WithDuration(3 * time.Second))
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    Range

    生成一个范围内的数字,达到最大值就结束,不包含右值

    func t7() {
    	observable := rxgo.Range(0, 5)
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    Repeat

    每隔指定时间,重复一次该序列,一共重复指定次数:

    func t8() {
    	observable := rxgo.Just(1, 2, 3)()
    	// 每隔指定时间,重复一次该序列,一共重复指定次数
    	observable = observable.Repeat(5, rxgo.WithDuration(5*time.Second))
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    Start

    可以给Start方法传入[]rxgo.Supplier作为参数,它可以包含任意数量的rxgo.Supplier类型。rxgo.Supplier的底层类型为 func(ctx context.Context) Item

    func t9() {
    	observable := rxgo.Start([]rxgo.Supplier{
    		func(ctx context.Context) rxgo.Item {
    			return rxgo.Of(1)
    		},
    		func(ctx context.Context) rxgo.Item {
    			return rxgo.Of(2)
    		},
    		func(ctx context.Context) rxgo.Item {
    			return rxgo.Of(3)
    		},
    	})
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    Observable 分类

    根据数据在何处生成,Observable 被分为 HotCold 两种类型(类比热启动和冷启动)。数据在其它地方生成的被成为 Hot Observable。相反,在 Observable 内部生成数据的就是 Cold Observable

    使用上面介绍的方法创建的实际上都是 Hot Observable

    ch := make(chan rxgo.Item)
    go func() {
        for i := 0; i < 3; i++ {
            ch <- rxgo.Of(i)
        }
        close(ch)
    }()
    
    observable := rxgo.FromChannel(ch)
    
    for item := range observable.Observe() {
        fmt.Println(item.V)
    }
    
    for item := range observable.Observe() {
        fmt.Println(item.V)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    上面创建的是 Hot Observable。但是有个问题,第一次Observe()消耗了所有的数据,第二个就没有数据输出了。

    Cold Observable 就不会有这个问题,因为它创建的流是独立于每个观察者的。即每次调用Observe()都创建一个新的 channel。我们使用Defer()方法创建 Cold Observable,它的参数与Create()方法一样。

    Defer
    func t10() {
    	// Defer does not create the Observable until the observer subscribes,
    	// and creates a fresh Observable for each observer.
    	//
    	// Cold Observable: 也就是在 subscribe 的时候才去生产数据流;
    	// 与之相反的是 Hot Observable,也就是在创建 Observable 的时候就同时创建了数据流,
    	// 这样第一次 subscribe 的时候就把数据消耗完了,再次 subscribe 是没有数据的,前面的
    	// 例子都是 Hot Observable.
    	observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    		for i := 0; i < 3; i++ {
    			ch <- rxgo.Of(i)
    		}
    	}})
    
    	// 有数据
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    	// 有数据
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    0
    1
    2
    0
    1
    2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    可连接的 Observable

    可连接的(Connectable)Observable 对普通的 Observable 进行了一层组装。调用它的Observe()方法时并不会立刻产生数据。使用它,我们可以等所有的观察者都准备就绪了(即调用了Observe()方法)之后,再调用其Connect()方法开始生成数据。我们通过两个示例比较使用普通的 Observable 和可连接的 Observable 有何不同。

    普通的:

    func t11() {
    	ch := make(chan rxgo.Item)
    	go func() {
    		for i := 1; i <= 3; i++ {
    			ch <- rxgo.Of(i)
    		}
    		close(ch)
    	}()
    
    	// 普通的 Observable,只要有一个观察者注册成功就会释放数据
    	observable := rxgo.FromChannel(ch)
    
    	// 注册观察者1
    	observable.DoOnNext(func(i interface{}) {
    		fmt.Printf("First observer: %d\n", i)
    	})
    
    	time.Sleep(3 * time.Second)
    	fmt.Println("before subscribe second observer")
    
    	// 注册观察者2
    	observable.DoOnNext(func(i interface{}) {
    		fmt.Printf("Second observer: %d\n", i)
    	})
    
    	time.Sleep(3 * time.Second)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    上例中我们使用DoOnNext()方法来注册观察者。由于DoOnNext()方法是异步执行的,所以为了等待结果输出,在最后增加了一行time.Sleep。运行:

    First observer: 1
    First observer: 2
    First observer: 3
    before subscribe second observer
    
    • 1
    • 2
    • 3
    • 4

    由输出可以看出,注册第一个观察者之后就开始产生数据了。

    我们通过在创建 Observable 的方法中指定rxgo.WithPublishStrategy()选项就可以创建可连接的 Observable

    func t12() {
    	ch := make(chan rxgo.Item)
    	go func() {
    		for i := 1; i <= 3; i++ {
    			ch <- rxgo.Of(i)
    		}
    		close(ch)
    	}()
    
    	// 可连接的 Observable
    	observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
    
    	// 注册观察者1
    	observable.DoOnNext(func(i interface{}) {
    		fmt.Printf("First observer: %d\n", i)
    	})
    
    	time.Sleep(3 * time.Second)
    	fmt.Println("before subscribe second observer")
    
    	// 注册观察者2
    	observable.DoOnNext(func(i interface{}) {
    		fmt.Printf("Second observer: %d\n", i)
    	})
    
    	// 通知 Observable,意味着所有的观察者已经注册完毕,才开始释放数据
    	// 另外,可连接的 Observable 是 Cold Observable,即每个观察者都会收到一份相同的拷贝。
    	observable.Connect(context.Background())
    
    	time.Sleep(5 * time.Second)
    
    	fmt.Println("over.")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    before subscribe second observer
    Second observer: 1
    First observer: 1
    First observer: 2
    First observer: 3
    Second observer: 2
    Second observer: 3
    over.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    上面是等两个观察者都注册之后,并且手动调用了 Observable 的Connect()方法才产生数据。而且可连接的 Observable 有一个特性:它是 Cold Observable !!!,即每个观察者都会收到一份相同的拷贝。

    其他常用操作符
    Map

    转换操作符,Map()方法简单修改它收到的rxgo.Item然后发送到下一个阶段(转换或过滤)。Map()接受一个类型为func (context.Context, interface{}) (interface{}, error)的函数。第二个参数就是rxgo.Item中的数据,返回转换后的数据。如果出错,则返回错误。

    func t13() {
    	observable := rxgo.Just(1, 2, 3)()
    	// Map 转换或者过滤
    	// 如果出现一个 error,整个数据流都是无效的
    	observable = observable.Map(func(ctx context.Context, v interface{}) (interface{}, error) 	  {
    		// vv := v.(int)
    		// if vv%2 == 0 {
    		// 	return vv * 2, nil
    		// } else {
    		// 	return vv, errors.New("Error")
    		// }
    
    		return v.(int) * 2, nil
    	}).Map(func(ctx context.Context, v interface{}) (interface{}, error) {
    		return v.(int) + 1, nil
    	})
    
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    Marshal

    Marshal对经过它的数据进行一次Marshal。这个Marshal可以是json.Marshal/proto.Marshal,甚至我们自己写的Marshal函数。它接受一个类型为func(interface{}) ([]byte, error)的函数用于对数据进行处理。

    type User struct {
    	Name string `json:"name"`
    	Age  int    `json:"age"`
    }
    
    func t14() {
    	observable := rxgo.Just(
    		User{
    			Name: "dj",
    			Age:  18,
    		},
    		User{
    			Name: "jw",
    			Age:  20,
    		},
    	)()
    
    	observable = observable.Marshal(json.Marshal)
    
    	for item := range observable.Observe() {
    		fmt.Println(string(item.V.([]byte)))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    Unmarshal
    func t15() {
    	observable := rxgo.Just(
    		`{"name":"dj","age":18}`,
    		`{"name":"jw","age":20}`,
    	)()
    
    	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    		return []byte(i.(string)), nil
    	}).Unmarshal(json.Unmarshal, func() interface{} {
    		return &User{}
    	})
    
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    Buffer

    Buffer按照一定的规则收集接收到的数据,然后一次性发送出去(作为切片),而不是收到一个发送一个。有 3 种类型的Buffer

    • BufferWithCount(n):每收到n个数据发送一次,最后一次可能少于n个;
    • BufferWithTime(n):发送在一个时间间隔n内收到的数据;
    • BufferWithTimeOrCount(d, n):收到n个数据,或经过d时间间隔,发送当前收到的数据。
    func t16() {
    	observable := rxgo.Just(1, 2, 3, 4)().BufferWithCount(3)
    	for item := range observable.Observe() {
    		fmt.Println(item.V) // item.V 此时是切片类型
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    [1 2 3]
    [4]
    
    • 1
    • 2
    GroupBy

    GroupBy根据传入一个 Hash 函数,为每个不同的结果分别创建新的 Observable。换句话说,GroupBy生成一个数据类型为 ObservableObservable

    func t17() {
    	count := 3
    
    	observable := rxgo.Range(0, 10).GroupBy(count, func(item rxgo.Item) int {
    		return item.V.(int) % count
    	}, rxgo.WithBufferedChannel(10))
    
    	for subObservable := range observable.Observe() {
    		fmt.Println("New observable:")
    
    		for item := range subObservable.V.(rxgo.Observable).Observe() {
    			fmt.Printf("item: %v\n", item.V)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    New observable:
    item: 0
    item: 3
    item: 6
    item: 9
    New observable:
    item: 1
    item: 4
    item: 7
    New observable:
    item: 2
    item: 5
    item: 8
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    注意rxgo.WithBufferedChannel(10)的使用,由于我们的数字是连续生成的,依次为 0->1->2->…->9->10。而 Observable 默认是惰性的,即由Observe()驱动。内层的Observe()在返回一个 0 之后就等待下一个数,但是下一个数 1 不在此 Observable 中。所以会陷入死锁。使用rxgo.WithBufferedChannel(10),设置它们之间的连接 channel 缓冲区大小为 10,这样即使我们未取出 channel 里面的数字,上游还是能发送数字进来。

    并行操作

    默认情况下,这些转换操作都是串行的,即只有一个 goroutine 负责执行转换函数。我们也可以使用rxgo.WithPool(n)选项设置运行n个 goroutine,或者rxgo.WitCPUPool()选项设置运行与逻辑 CPU 数量相等的 goroutine。

    func t18() {
    	observable := rxgo.Range(1, 20)
    
    	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    		time.Sleep(time.Duration(rand.Int31()))
    		return i.(int)*2 + 1, nil
    	}, rxgo.WithCPUPool())
    
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    由于是并行,所以输出顺序就不确定了。为了让不确定性更明显一点,我在代码中加了一行time.Sleep

    Filter

    Filter()接受一个类型为func (i interface{}) bool的参数,通过的数据使用这个函数断言,返回true的将发送给下一个阶段。否则,丢弃。

    func t19() {
    	observable := rxgo.Range(1, 10)
    
    	observable = observable.Filter(func(i interface{}) bool {
    		return i.(int)%2 == 0
    	})
    
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    ElementAt

    ElementAt()只发送指定索引的数据,如ElementAt(2)只发送索引为 2 的数据,即第 3 个数据。

    func t20() {
    	observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
    
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    Debounce

    Debounce()比较有意思,它收到数据后还会等待指定的时间间隔,后续间隔内没有收到其他数据才会发送刚开始的数据。

    func t21() {
    	ch := make(chan rxgo.Item)
    
    	go func() {
    		ch <- rxgo.Of(1)
    		time.Sleep(2 * time.Second)
    		ch <- rxgo.Of(2)
    		ch <- rxgo.Of(3)
    		time.Sleep(2 * time.Second)
    		close(ch)
    	}()
    
    	observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    上面示例,先收到 1,然后 2s 内没收到数据,所以发送 1。接着收到了数据 2,由于马上又收到了 3,所以 2 不会发送。收到 3 之后 2s 内没有收到数据,发送了 3。所以最后输出为 1,3。

    Distinct

    Distinct()会记录它发送的所有数据,它不会发送重复的数据。由于数据格式多样,Distinct()要求我们提供一个函数,根据原数据返回一个唯一标识码(有点类似哈希值)。基于这个标识码去重。

    func t22() {
    	observable := rxgo.Just(1, 2, 2, 3, 3, 4, 4)().
    		Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
    			return i, nil
    		})
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    Skip

    跳过前面若干个数据

    func t23() {
    	observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    Take

    只取前面若干个数据

    func t24() {
    	observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    选项 rxgo.Option

    rxgo 提供的大部分方法的最后一个参数是一个可变长的选项类型。这是 Go 中特有的、经典的选项设计模式。我们前面已经使用了:

    • rxgo.WithBufferedChannel(10):设置 channel 的缓存大小;
    • rxgo.WithPool(n) / rxgo.WithCpuPool():使用多个 goroutine 执行转换操作;
    • rxgo.WithPublishStrategy():使用发布策略,即创建可连接的 Observable

    除此之外,rxgo 还提供了很多其他选项。

  • 相关阅读:
    可以直接调用 Thread 类的 run 方法吗?
    聊一聊UDF/UDTF/UDAF是什么,开发要点及如何使用?
    jvm安全退出
    最后写入胜利(丢弃并发写入)
    docker目录挂载失败:Check if the specified host path exists and is the expected type
    27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2-1)
    LeetCode题练习与总结:搜索插入位置
    Java 通配符 在短信发送之中 通配符参数动态获取解决方案
    web应用及微信小程序版本更新检测方案实践
    【iOS开发】- Block传值的基础学习
  • 原文地址:https://blog.csdn.net/raoxiaoya/article/details/134444558