feat: update event mechanism

add event type local and global, clients have to use contractID and topic to subscribe local event; allow clients to subscribe topics (will not be recorded)
This commit is contained in:
Frank.R.Wu 2021-10-31 23:07:14 +08:00
parent 877dcfe661
commit ad2b0fb6c3
3 changed files with 69 additions and 31 deletions

View File

@ -7,7 +7,7 @@ contract EventPublisher {
* function te: publish an event with topic 'te' and default semantics AT_LEAST_ONCE * function te: publish an event with topic 'te' and default semantics AT_LEAST_ONCE
* function tes: publish an event with topic 'te' and another semantics * function tes: publish an event with topic 'te' and another semantics
*/ */
event te; event global te;
/* /*
* event declaration with semantics * event declaration with semantics
* valid semantics: AT_LEASE_ONCE, AT_MOST_ONCE, and ONLY_ONCE * valid semantics: AT_LEASE_ONCE, AT_MOST_ONCE, and ONLY_ONCE
@ -15,7 +15,7 @@ contract EventPublisher {
* function tews: publish an event with topic 'tews' and semantics AT_MOST_ONCE * function tews: publish an event with topic 'tews' and semantics AT_MOST_ONCE
* function tewss: publish an event with topic 'tews' and another semantics * function tewss: publish an event with topic 'tews' and another semantics
*/ */
event AT_MOST_ONCE tews; event global tews(AT_MOST_ONCE);
// publish an event with declared topic and default semantics AT_LEAST_ONCE // publish an event with declared topic and default semantics AT_LEAST_ONCE
export function pub1(e) { export function pub1(e) {
var content = YancloudUtil.random() + ' EventPublisher.pub1'; var content = YancloudUtil.random() + ' EventPublisher.pub1';

View File

@ -56,7 +56,9 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
userManagerAction, userManagerAction,
new MasterWSAction(userManagerAction), // 多节点执行 new MasterWSAction(userManagerAction), // 多节点执行
new CMLogAction(), new CMLogAction(),
new ProcessAction(), GRPCPool.instance) { new ProcessAction(),
GRPCPool.instance,
new EventActions()) {
@Override @Override
public boolean checkPermission(Action a, JsonObject arg, long permission) { public boolean checkPermission(Action a, JsonObject arg, long permission) {
long val = a.userPermission(); long val = a.userPermission();
@ -74,25 +76,21 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
flag = false; flag = false;
} }
LOGGER.debug( LOGGER.debug(
"[checkpermission] " String.format(
+ action "%s val:%d permission:%d status:%s",
+ " val:" action,
+ val val,
+ " permission:" permission,
+ permission status));
+ " status:"
+ status);
String sb = "{\"action\":\"" + CMHttpServer.nodeLogDB.put(
action + action,
"\",\"pubKey\":\"" + String.format(
userManagerAction.getPubKey() + "{\"action\":\"%s\",\"pubKey\":\"%s\",\"status\":\"%s\",\"date\":%d}",
"\",\"status\":\"" + action,
status + userManagerAction.getPubKey(),
"\",\"date\":" + status,
System.currentTimeMillis() + System.currentTimeMillis()));
"}";
CMHttpServer.nodeLogDB.put(action, sb);
// TimeDBUtil.instance.put(CMTables.LocalNodeLogDB.toString(), // TimeDBUtil.instance.put(CMTables.LocalNodeLogDB.toString(),
// sb.toString()); // sb.toString());
return flag; return flag;
@ -113,8 +111,7 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
} }
@Override @Override
protected void channelRead0(final ChannelHandlerContext ctx, WebSocketFrame frame) protected void channelRead0(final ChannelHandlerContext ctx, WebSocketFrame frame) {
throws Exception {
// ping and pong frames already handled // ping and pong frames already handled
if (frame instanceof TextWebSocketFrame) { if (frame instanceof TextWebSocketFrame) {
// Send the uppercase string back. // Send the uppercase string back.
@ -166,7 +163,7 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
ae.handle( ae.handle(
action, action,
map, map,
new ResultCallback() { new ResultCallback(ctx.channel()) {
@Override @Override
public void onResult(String ret) { public void onResult(String ret) {
if (ret != null) { if (ret != null) {
@ -181,7 +178,6 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
ctx.channel().writeAndFlush(new TextWebSocketFrame((JsonUtil.toJson(response)))); ctx.channel().writeAndFlush(new TextWebSocketFrame((JsonUtil.toJson(response))));
} catch (Exception e) { } catch (Exception e) {
ByteArrayOutputStream bo = new ByteArrayOutputStream(); ByteArrayOutputStream bo = new ByteArrayOutputStream();
e.printStackTrace();
e.printStackTrace(new PrintStream(bo)); e.printStackTrace(new PrintStream(bo));
response = new Response(); response = new Response();
response.action = "onException"; response.action = "onException";
@ -189,12 +185,24 @@ public class ContractManagerFrameHandler extends SimpleChannelInboundHandler<Web
StringBuilder ret = new StringBuilder(); StringBuilder ret = new StringBuilder();
int count = 0; int count = 0;
for (String s : strs) { for (String s : strs) {
if (s.contains("sun.reflect")) continue; if (s.contains("sun.reflect")) {
if (s.contains("java.lang.reflect")) continue; continue;
if (s.contains("org.apache")) continue; }
if (s.contains("java.util")) continue; if (s.contains("java.lang.reflect")) {
if (s.contains("java.lang")) continue; continue;
if (s.contains("io.netty")) continue; }
if (s.contains("org.apache")) {
continue;
}
if (s.contains("java.util")) {
continue;
}
if (s.contains("java.lang")) {
continue;
}
if (s.contains("io.netty")) {
continue;
}
ret.append(s); ret.append(s);
ret.append("\n"); ret.append("\n");
if (count++ > 5) break; if (count++ > 5) break;

View File

@ -0,0 +1,30 @@
package org.bdware.server.ws;
import com.google.gson.JsonObject;
import org.bdware.sc.ContractClient;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.util.HashUtil;
import org.bdware.server.action.Action;
import org.bdware.server.action.CMActions;
public class EventActions {
@Action(async = true, userPermission = 0)
public void subEvent(JsonObject args, final ResultCallback rcb) {
if (!args.has("topic")) {
rcb.onResult("{\"status\":\"Error\",\"data\":\"no topic arg!\"}");
return;
}
String topic = args.get("topic").getAsString();
if (args.has("contractID")) {
String argCID = args.get("contractID").getAsString();
ContractClient client = CMActions.manager.getClient(argCID);
if (null == client) {
rcb.onResult("{\"status\":\"Error\",\"data\":\"invalid contract ID or Name!\"}");
return;
}
String contractID = client.getContractID();
topic = HashUtil.sha3(contractID + topic);
}
CMActions.manager.subEventByClient(topic, rcb.getChannel());
}
}