eventbus 一些框架的引入实践

2022/04/19 eventbus

Vertx EventBus

源代码

local eventbus

依赖 implementation ‘io.vertx:vertx-core:4.2.7’ 即可

package io.github.viakiba.hinx.eventbus.vertx;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;

public class VertxEventBus {

    // https://vertx.io/docs/vertx-core/java/#_the_event_bus_api
    public static void main(String[] args) {
        EventBus eventBus = Vertx.vertx().eventBus();
        eventBus.localConsumer("test", msg -> {
            System.out.println("receive msg: " + msg.body());
        });
        eventBus.localConsumer("test", msg -> {
            System.out.println("receive msg1: " + msg.body());
        });
        // 1 对多 发布消息
        eventBus.publish("test", "hello world1");
        // 一对一 发布消息
        eventBus.send("test", "hello world");
    }

}

remote eventbus

依赖

远端 eventbus 依赖 vertx 的集群实现,接下来的用力以 zookeeper 为集群注册中心为例。增加下述依赖:

implementation 'io.vertx:vertx-zookeeper:4.2.7'   exclude group: 'org.apache.zookeeper', module: 'zookeeper'
    implementation 'org.apache.zookeeper:zookeeper:3.5.8'

并增加 logback 支持,并使用默认配置。

implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.11'

zookeeper server

https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/

启动 zkServer.sh start xxxx配置(conf)
这里的server 与上面依赖的包 对应。

创建集群

文件层级关系

.
├── config
│   └── zookeeper.json
└── src
    ├── main
    │   ├── kotlin
    │   │   ├── Main.kt
    │   │   ├── MainEventBusSend.kt
    │   │   ├── MainEventBusSub.kt
    │   │   └── verticle
    │   │       ├── Verticle.kt
    │   │       ├── VerticleSend.kt
    │   │       └── VerticleSub.kt
    │   └── resources
    │       └── logback.xml

基础的集群实现

// kotlin代码
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.spi.cluster.ClusterManager
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager

// jvm启动参数 -Dvertx.zookeeper.config=./config/zookeeper.json
fun main() {
    val mgr: ClusterManager = ZookeeperClusterManager()
    val options = VertxOptions().setClusterManager(mgr)
    var clusteredVertx = Vertx.clusteredVertx(options)
    clusteredVertx.onSuccess {
        val vertx = it
        // 全路径的包名+类名
        vertx.deployVerticle("verticle.Verticle")
    }.onFailure {
        println("Failed to deploy verticle")
    }
}

eventbus 生产者 verticle

// VerticleSend.kt 文件
package verticle

import io.vertx.core.*
import io.vertx.core.Verticle

class VerticleSend : AbstractVerticle() {

    override fun start() {
        var eventBus = vertx.eventBus()                                                                                    
        var myCodec = MyCodec()                                                                                            
        eventBus.registerCodec( myCodec)                                                                                   
        vertx.createHttpServer().requestHandler { req ->                                                                   
                                                                                                                        
        //只发给一个订阅者                                                                                                     
        eventBus.send("verticle.publish", "send Hello from VerticleSub")                                               
        // 所有订阅者都会处理                                                                                                   
        eventBus.publish("verticle.publish", "publish Hello from VerticleSub")                                         
        eventBus.request<String>("verticle.publish", "send Hello from VerticleSub"){                                    
            if (it.succeeded()) {                                                                                      
                println(it.result().body())                                                                             
            } else {                                                                                                   
                println(it.cause().message)                                                                            
            }                                                                                                          
        }                                                                                                              
         // 指定 codec                                                                                                               
        val options = DeliveryOptions().setCodecName(myCodec.name())                                                   
        eventBus.send("verticle.publish.codec", "codec send Hello from VerticleSub", options)                          
                                                                                                                        
        req.response().end("Hello from Vert.x-Web!")                                                                   
        }.listen(8080)                                                                                                     
    }
}

class MyCodec : io.vertx.core.eventbus.MessageCodec<String, String> {  
    override fun encodeToWire(buffer: Buffer, s: String) {             
        buffer.appendString(s)                                         
    }                                                                  
                                                                       
    override fun decodeFromWire(pos: Int, buffer: Buffer): String {    
        return buffer.getString(pos, buffer.length())                  
    }                                                                  
                                                                       
    override fun transform(s: String): String {                        
        return s                                                       
    }                                                                  
                                                                       
    override fun name(): String {                                      
        return "myCodec"                                               
    }                                                                  
                                                                       
    override fun systemCodecID(): Byte {                               
        return -1                                                      
    }                                                                  
}                                                                      

eventbus 消费者 verticle

// VerticleSub.kt 文件
package verticle

import io.vertx.core.*
import io.vertx.core.Verticle

class VerticleSub : AbstractVerticle() {

    override fun start() {
        var eventBus = vertx.eventBus()
        eventBus.consumer<String>("verticle.publish") { message ->
            println("Received message: ${message.body()}")
            message.reply("Ok Received message: ${message.body()}")
        }

        var myCodec = MyCodec()
        eventBus.registerCodec(myCodec)
        eventBus.consumer<String>("verticle.publish.codec") { message ->
            println("1111111111111111111 Received message: ${message.body()}")
        }
    }
}

两个进程分开部署生产者与消费之 verticle

// MainEventBusSend.kt 文件
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.spi.cluster.ClusterManager
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager


fun main() {
    val mgr: ClusterManager = ZookeeperClusterManager()
    val options = VertxOptions().setClusterManager(mgr)
    var clusteredVertx = Vertx.clusteredVertx(options)
    clusteredVertx.onSuccess {
        val vertx = it
        vertx.deployVerticle("verticle.VerticleSend")
    }.onFailure {
        println("Failed to deploy verticle")
    }
}
// MainEventBusSub.kt 文件
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.spi.cluster.ClusterManager
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager


fun main() {
    val mgr: ClusterManager = ZookeeperClusterManager()
    val options = VertxOptions().setClusterManager(mgr)
    var clusteredVertx = Vertx.clusteredVertx(options)
    clusteredVertx.onSuccess {
        val vertx = it
        vertx.deployVerticle("verticle.VerticleSub")
    }.onFailure {
        println("Failed to deploy verticle")
    }
}

启动上述两个main方法。访问 127.0.0.1:8080 即可访问触发。

Guava EventBus

手动注册或者扫描包注册
package io.github.viakiba.hinx.eventbus.guava;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;

import java.lang.reflect.Method;
import java.util.Set;
import java.util.concurrent.Executors;

class LoginEvent {
    private String username;
    private String age;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }
}

class LoginEventHandler {
     public LoginEventHandler(){}
    @Subscribe
    public void LoginEventHandler(LoginEvent msg) {
        System.out.println("String msg: " + msg.getUsername());
    }
}

class LoginEventHandler1 {
    public LoginEventHandler1(){}
    @Subscribe
    public void LoginEventHandler(LoginEvent msg) {
        System.out.println("1String msg: " + msg.getUsername());
    }
}

public class GuavaEventBusTest {
    private static EventBus eventBus = new EventBus();

    private static EventBus asyncEventBus = new AsyncEventBus("xxx", Executors.newSingleThreadExecutor());

    public static void main(String[] args) throws Exception {
        test1();
        test2();
        System.exit(0);
    }

    private static void test1() {
        eventBus.register(new LoginEventHandler());
        eventBus.register(new LoginEventHandler1());
        asyncEventBus.register(new LoginEventHandler());
        asyncEventBus.register(new LoginEventHandler1());
        LoginEvent loginEvent = new LoginEvent();
        loginEvent.setAge("12");
        loginEvent.setUsername("viakiba");
        asyncEventBus.post(loginEvent);
        eventBus.post(loginEvent);
    }

    private static void test2() throws Exception {
        org.reflections.Reflections reflections = new org.reflections.Reflections(
                new ConfigurationBuilder()
                        .setUrls(ClasspathHelper.forPackage("io.github.viakiba.hinx.eventbus.guava"))
                        .addScanners(Scanners.MethodsAnnotated));
        Set<Method> annotatedWith = reflections.getMethodsAnnotatedWith(Subscribe.class);
        for (Method p : annotatedWith) {
            Object o = p.getDeclaringClass().getConstructor().newInstance();
            eventBus.register(o);
            asyncEventBus.register(o);
        }
        LoginEvent loginEvent = new LoginEvent();
        loginEvent.setAge("12");
        loginEvent.setUsername("viakiba");
        asyncEventBus.post(loginEvent);
        eventBus.post(loginEvent);
    }
}

Go 手动实现

package observer_test

import (
	"fmt"
	"testing"
)

func TestObserver(t *testing.T) {
	observer.ObserverInstance.Register("test", func(args observer.Event) {
		t.Log("test", args)
		fmt.Println("xxxxxxxx")
	})
	observer.ObserverInstance.Register("test", func(args observer.Event) {
		t.Log("test", args)
		fmt.Println("YYYYYYYY")
	})
	event := observer.LoginEvent{EventNameStr: "test", UserIdStr: "xxxx"}
	observer.ObserverInstance.Notify(event)
}

实现

package observer

// event 定义
type Event interface {
	EventName() string
	UserId() string
}

// event 例子
type LoginEvent struct {
	UserIdStr    string
	EventNameStr string
}

func (loginEvent LoginEvent) EventName() string {
	return loginEvent.EventNameStr
}

func (loginEvent LoginEvent) UserId() string {
	return loginEvent.UserIdStr
}

// 事件监听 接口定义
type Observer interface {
	Register(string, ExecuteFunction)

	Notify(Event)
}

// 定义方法集合
type ExecuteFunction func(event Event)

type ExecuteCollection struct {
	Collection []ExecuteFunction
}

type ObserverImpl struct {
	observers map[string]*ExecuteCollection
}

func (o ObserverImpl) Register(eventName string, executeFunction ExecuteFunction) {
	collection, ok := o.observers[eventName]
	if !ok {
		collection = &ExecuteCollection{}
		collection.Collection = append(collection.Collection, executeFunction)
		o.observers[eventName] = collection
	} else {
		collection.Collection = append(collection.Collection, executeFunction)
	}
}

func (o ObserverImpl) Notify(event Event) {
	collection, ok := o.observers[event.EventName()]
	if !ok {
		return
	}
	for _, function := range collection.Collection {
		function(event)
	}
}

var ObserverInstance ObserverImpl = ObserverImpl{
	observers: make(map[string]*ExecuteCollection),
}

Search

    Table of Contents