带着问题看源码7-NodeRed中的context模块[通俗易懂]

带着问题看源码7-NodeRed中的context模块[通俗易懂]1.此模块的意义NodeRedcontext模块是运行期做数据保存及共享的一种机制。当节点间有相同内容时,可以考虑将相同的内容放入context以实现共享,既减少了空间,也有利于修改。2.NodeRed此模块功能context有三类:global、flow、node,其中global是随着context模块初始化时创建,全局唯一;flowcontext以flowId为标识,为所有具有相同flowId的节点做记录;nodecontext以flow:id为标识。

大家好,欢迎来到IT知识分享网。

1. 此模块的意义

NodeRed context 模块是运行期做数据保存及共享的一种机制。当节点间有相同内容时,可以考虑将相同的内容放入 context 以实现共享,既减少了空间,也有利于修改。

2. NodeRed 此模块功能

context 有三类:global、flow、node,其中 global 是随着 context 模块初始化时创建,全局唯一;flow context 以 flowId 为标识,为所有具有相同 flowId 的节点做记录;node context 以 flow:id 为标识。三者作用域不相同:全局、流、节点,不同流 context 数据不共享,不同节点 context 数据不共享。

context 支持以配置文件插件化方式实现功能:插件实现要求的接口,修改配置文件实现自定义功能。

3. NodeRed 中此模块的实现及参与者

  1. global context
function init(_settings) {
    settings = _settings;
    contexts = {};
    stores = {};
    storeList = [];
    hasConfiguredStore = false;
    var seed = settings.functionGlobalContext || {};
    contexts['global'] = createContext("global",seed);
    // create a default memory store - used by the unit tests that skip the full
    // `load()` initialisation sequence.
    // If the user has any stores configured, this will be disgarded
    stores["_"] = new memory();
    defaultStore = "memory";
}

global context 在初始化时创建,默认使用’memory’方式实现 context 的读写。contexts 对象保存了所有的 context

  1. flow context 实现
function getFlowContext(flowId,parentFlowId) {
    if (contexts.hasOwnProperty(flowId)) {
        return contexts[flowId];
    }
    var parentContext = contexts[parentFlowId];
    if (!parentContext) {
        parentContext = createRootContext();
        contexts[parentFlowId] = parentContext;
        // throw new Error("Flow "+flowId+" is missing parent context "+parentFlowId);
    }
    var newContext = createContext(flowId,undefined,parentContext);
    contexts[flowId] = newContext;
    return newContext;

}
  1. 节点 context 实现
function getContext(nodeId, flowId) {
    var contextId = nodeId;
    if (flowId) {
        contextId = nodeId+":"+flowId;
    }
    if (contexts.hasOwnProperty(contextId)) {
        return contexts[contextId];
    }
    var newContext = createContext(contextId);

    if (flowId) {
        var flowContext = contexts[flowId];
        if (!flowContext) {
            // This is most likely due to a unit test for a node which doesn't
            // initialise the flow properly.
            // To keep things working, initialise the missing context.
            // This *does not happen* in normal node-red operation
            flowContext = createContext(flowId,undefined,createRootContext());
            contexts[flowId] = flowContext;
        }
        Object.defineProperty(newContext, 'flow', {
            value: flowContext
        });
    }
    Object.defineProperty(newContext, 'global', {
        value: contexts['global']
    })
    contexts[contextId] = newContext;
    return newContext;
}

节点 context 是可以获取到 globle 和 flow context 的。

  1. 插件加载
function load() {
    return new Promise(function(resolve,reject) {
        // load & init plugins in settings.contextStorage
        var plugins = settings.contextStorage || {};
        var defaultIsAlias = false;
        var promises = [];
        if (plugins && Object.keys(plugins).length > 0) {
            var hasDefault = plugins.hasOwnProperty('default');
            var defaultName;
            for (var pluginName in plugins) {
                if (plugins.hasOwnProperty(pluginName)) {
                    // "_" is a reserved name - do not allow it to be overridden
                    if (pluginName === "_") {
                        continue;
                    }
                    if (!/^[a-zA-Z0-9_]+$/.test(pluginName)) {
                        return reject(new Error(log._("context.error-invalid-module-name", {name:pluginName})));
                    }

                    // Check if this is setting the 'default' context to be a named plugin
                    if (pluginName === "default" && typeof plugins[pluginName] === "string") {
                        // Check the 'default' alias exists before initialising anything
                        if (!plugins.hasOwnProperty(plugins[pluginName])) {
                            return reject(new Error(log._("context.error-invalid-default-module", {storage:plugins["default"]})));
                        }
                        defaultIsAlias = true;
                        continue;
                    }
                    if (!hasDefault && !defaultName) {
                        defaultName = pluginName;
                    }
                    var plugin;
                    if (plugins[pluginName].hasOwnProperty("module")) {
                        // Get the provided config and copy in the 'approved' top-level settings (eg userDir)
                        var config = plugins[pluginName].config || {};
                        copySettings(config, settings);

                        if (typeof plugins[pluginName].module === "string") {
                            // This config identifies the module by name - assume it is a built-in one
                            // TODO: check it exists locally, if not, try to require it as-is
                            try {
                                plugin = require("./"+plugins[pluginName].module);
                            } catch(err) {
                                return reject(new Error(log._("context.error-loading-module2", {module:plugins[pluginName].module,message:err.toString()})));
                            }
                        } else {
                            // Assume `module` is an already-required module we can use
                            plugin = plugins[pluginName].module;
                        }
                        try {
                            // Create a new instance of the plugin by calling its module function
                            stores[pluginName] = plugin(config);
                            var moduleInfo = plugins[pluginName].module;
                            if (typeof moduleInfo !== 'string') {
                                if (moduleInfo.hasOwnProperty("toString")) {
                                    moduleInfo = moduleInfo.toString();
                                } else {
                                    moduleInfo = "custom";
                                }
                            }
                            log.info(log._("context.log-store-init", {name:pluginName, info:"module="+moduleInfo}));
                        } catch(err) {
                            return reject(new Error(log._("context.error-loading-module2",{module:pluginName,message:err.toString()})));
                        }
                    } else {
                        // Plugin does not specify a 'module'
                        return reject(new Error(log._("context.error-module-not-defined", {storage:pluginName})));
                    }
                }
            }

            // Open all of the configured contexts
            for (var plugin in stores) {
                if (stores.hasOwnProperty(plugin)) {
                    promises.push(stores[plugin].open());
                }
            }
            // There is a 'default' listed in the configuration
            if (hasDefault) {
                // If 'default' is an alias, point it at the right module - we have already
                // checked that it exists. If it isn't an alias, then it will
                // already be set to a configured store
                if (defaultIsAlias) {
                    stores["_"] =  stores[plugins["default"]];
                    defaultStore = plugins["default"];
                } else {
                    stores["_"] = stores["default"];
                    defaultStore = "default";
                }
            } else if (defaultName) {
                // No 'default' listed, so pick first in list as the default
                stores["_"] = stores[defaultName];
                defaultStore = defaultName;
                defaultIsAlias = true;
            } else {
                // else there were no stores list the config object - fall through
                // to below where we default to a memory store
                storeList = ["memory"];
                defaultStore = "memory";
            }
            hasConfiguredStore = true;
            storeList = Object.keys(stores).filter(n=>!(defaultIsAlias && n==="default") && n!== "_");
        } else {
            // No configured plugins
            log.info(log._("context.log-store-init", {name:"default", info:"module=memory"}));
            promises.push(stores["_"].open())
            storeList = ["memory"];
            defaultStore = "memory";
        }
        return resolve(Promise.all(promises));
    }).catch(function(err) {
        throw new Error(log._("context.error-loading-module",{message:err.toString()}));
    });
}

在 settings.js 中配置 Context Storage 变量 ,通过此函数加载模块。

  1. context 实例化 的方法
function createContext(id,seed,parent) {
...

    Object.defineProperties(obj, {
        get: {
            value: function(key, storage, callback) {
                var context;
                if (!callback && typeof storage === 'function') {
                    callback = storage;
                    storage = undefined;
                }
                if (callback && typeof callback !== 'function'){
                    throw new Error("Callback must be a function");
                }
                if (!validateContextKey(key)) {
                    var err = Error("Invalid context key");
                    if(callback) {
                        return callback(err);
                    } else {
                        throw err;
                    }
                }
                if (!Array.isArray(key)) {
                    var keyParts = util.parseContextStore(key);
                    key = keyParts.key;
                    if (!storage) {
                        storage = keyParts.store || "_";
                    }
                    var result = followParentContext(parent, key);
                    if (result) {
                        var [ctx, new_key] = result;
                        if (ctx && new_key) {
                            return ctx.get(new_key, storage, callback);
                        }
                        else {
                            if (callback) {
                                return callback(undefined);
                            }
                            else {
                                return undefined;
                            }
                        }
                    }
                } else {
                    if (!storage) {
                        storage = "_";
                    }
                }
                context = getContextStorage(storage);

                if (callback) {
                    if (!seed) {
                        context.get(scope,key,callback);
                    } else {
                        context.get(scope,key,function() {
                            if (arguments[0]) {
                                callback(arguments[0]);
                                return;    // Seed is only set for global context - sourced from functionGlobalContext
    var scope = id;
    var obj = seed || {};
    var seedKeys;
    var insertSeedValues;
    if (seed) {
        seedKeys = Object.keys(seed);
        insertSeedValues = function(keys,values) {
            if (!Array.isArray(keys)) {
                if (values[0] === undefined) {
                    try {
                        values[0] = util.getObjectProperty(seed,keys);
                    } catch(err) {
                        if (err.code === "INVALID_EXPR") {
                            throw err;
                        }
                        values[0] = undefined;
                    }
                }
            } else {
                for (var i=0;i<keys.length;i++) {
                    if (values[i] === undefined) {
                        try {
                            values[i] = util.getObjectProperty(seed,keys[i]);
                        } catch(err) {
                            if (err.code === "INVALID_EXPR") {
                                throw err;
                            }
                            values[i] = undefined;
                        }
                    }
                }
            }
        }
    }
                                insertSeedValues(key,results);
                            } catch(err) {
                                callback.apply(err);
                                return
                            }
                            // Put the err arg back
                            results.unshift(undefined);
                            callback.apply(null,results);
                        });
                    }
                } else {
                    // No callback, attempt to do this synchronously
                    var results = context.get(scope,key);
                    if (seed) {
                        if (Array.isArray(key)) {
                            insertSeedValues(key,results);
                        } else if (results === undefined){
                            try {
                                results = util.getObjectProperty(seed,key);
                            } catch(err) {
                                if (err.code === "INVALID_EXPR") {
                                    throw err;
                                }
                                results = undefined;
                            }
                        }
                    }
                    return results;
                }
            }
        },
        set: {
            value: function(key, value, storage, callback) {
                var context;
                if (!callback && typeof storage === 'function') {
                    callback = storage;
                    storage = undefined;
                }
                if (callback && typeof callback !== 'function'){
                    throw new Error("Callback must be a function");
                }
                if (!validateContextKey(key)) {
                    var err = Error("Invalid context key");
                    if(callback) {
                        return callback(err);
                    } else {
                        throw err;
                    }
                }
                if (!Array.isArray(key)) {
                    var keyParts = util.parseContextStore(key);
                    key = keyParts.key;
                    if (!storage) {
                        storage = keyParts.store || "_";
                    }
                    var result = followParentContext(parent, key);
                    if (result) {
                        var [ctx, new_key] = result;
                        if (ctx && new_key) {
                            return ctx.set(new_key, value, storage, callback);
                        }
                        else {
                            if (callback) {
                                return callback();
                            }
                            return undefined;
                        }
                    }
                } else {
                    if (!storage) {
                        storage = "_";
                    }
                }
                context = getContextStorage(storage);

                context.set(scope, key, value, callback);
            }
        },
        keys: {
            value: function(storage, callback) {
                var context;
                if (!storage && !callback) {
                    context = stores["_"];
                } else {
                    if (typeof storage === 'function') {
                        callback = storage;
                        storage = "_";
                    }
                    if (callback && typeof callback !== 'function') {
                        throw new Error("Callback must     // Seed is only set for global context - sourced from functionGlobalContext
    var scope = id;
    var obj = seed || {};
    var seedKeys;
    var insertSeedValues;
    if (seed) {
        seedKeys = Object.keys(seed);
        insertSeedValues = function(keys,values) {
            if (!Array.isArray(keys)) {
                if (values[0] === undefined) {
                    try {
                        values[0] = util.getObjectProperty(seed,keys);
                    } catch(err) {
                        if (err.code === "INVALID_EXPR") {
                            throw err;
                        }
                        values[0] = undefined;
                    }
                }
            } else {
                for (var i=0;i<keys.length;i++) {
                    if (values[i] === undefined) {
                        try {
                            values[i] = util.getObjectProperty(seed,keys[i]);
                        } catch(err) {
                            if (err.code === "INVALID_EXPR") {
                                throw err;
                            }
                            values[i] = undefined;
                        }
                    }
                }
            }
        }
    }
                if (seed && settings.exportGlobalContextKeys !== false) {
                    if (callback) {
                        context.keys(scope, function(err,keys) {
                            callback(err,Array.from(new Set(seedKeys.concat(keys)).keys()));
                        });
                    } else {
                        var keys = context.keys(scope);
                        return Array.from(new Set(seedKeys.concat(keys)).keys())
                    }
                } else {
                    return context.keys(scope, callback);
                }
            }
        }
    });
    if (parent) {
        Object.defineProperty(obj, "$parent", {
            value: parent
        });
    }
    return obj;
}

有三个方法 get、set、keys。

4. NodeRed 为什么这么设计,这种设计的优劣有哪些

  1. 与 Log 模块类似,留下接口给外部扩展,虽然复杂度略高,但扩展性好
  2. NodeRed 中大量使用的插件机制,利用的 JS 的模块加载,实现起来比静态语言方便很多。
  3. 若将 context 外置,存在运行数据被修改的风险

5. 应用场景分析

  1. 以文件形式存储,不会因进程关闭而影响 context,可对 context 进行恢复
  2. 以文件形式存储,可提供运行时在外部修改进程 context,比如直接修改文件,改变 context 变量
  3. 以 Restful 形式发送到服务端,服务端进行处理和存储。

6. 实践

存储

  1. settings.js
  contextStorage: {
    default: {
      module: 'localfilesystem',
      config: {
        base: 'context', // the base directory to use
        // default: "context"
        dir: '/home/freeman/.node-red/', // the directory to create the base directory in
        // default: settings.userDir
        cache: false, // whether to cache contents in memory
        // default: true
        flushInterval: 30, // if cache is enabled, the minimum interval
        // between writes to storage, in seconds. This
      },
    },
  },

  1. flow
[
    {
        "id": "cbd2b3b600a928eb",
        "type": "tab",
        "label": "流程 1",
        "disabled": false,
        "info": ""
    },
    {
        "id": "e7b11ff31b0737d4",
        "type": "inject",
        "z": "cbd2b3b600a928eb",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": "0",
        "topic": "",
        "payloadType": "date",
        "x": 120,
        "y": 80,
        "wires": [
            [
                "216def814448f2b8"
            ]
        ]
    },
    {
        "id": "216def814448f2b8",
        "type": "function",
        "z": "cbd2b3b600a928eb",
        "name": "",
        "func": "var processSwapInt = function processSwapInt(id, buffer) {\n    let wordlength = 2; //一个字占2字节\n    \n    var buf = Buffer.alloc(4);\n    buf[0] = buffer[id * wordlength + 2];\n    buf[1] = buffer[id * wordlength + 3];\n    buf[2] = buffer[id * wordlength + 0];\n    buf[3] = buffer[id * wordlength + 1];\n    return buf.readIntBE(0,4);\n}\n\nvar processNoSwapInt = function (id, buffer) {\n    let wordlength = 2; //一个字占2字节\n    let value;\n    var buf = buffer.subarray(id * wordlength, id * wordlength + 4);\n    return buf.readIntBE(0,4);\n}\n\nvar processSwapFloat = function (id, buffer){\n    let wordlength = 2; //一个字占2字节\n    var buf = Buffer.alloc(4);\n    buf[0] = msg.payload.buffer[id * wordlength + 2];\n    buf[1] = msg.payload.buffer[id * wordlength + 3];\n    buf[2] = msg.payload.buffer[id * wordlength + 0];\n    buf[3] = msg.payload.buffer[id * wordlength + 1];\n    return buf.readFloatBE(0);\n}\n\nvar processNoSwapFloat = function (id, buffer){\n    let wordlength = 2; //一个字占2字节\n\tvar buf = buffer.subarray(id*wordlength,id*wordlength+4);\n\t\n\treturn buf.readFloatBE(0);\n}\n\nvar processNoSwapShort = function (id, buffer){\n    let wordlength = 2; //一个字占2字节\n    var buf = buffer.subarray(id * wordlength, id * wordlength + 2);\n    return buf.readInt16BE(0,2);\n}\n\nglobal.set('processSwapInt', processSwapInt);\nglobal.set('processNoSwapInt', processNoSwapInt);\n\nglobal.set('processSwapFloat', processNoSwapFloat);\nglobal.set('processNoSwapFloat', processNoSwapFloat);\n\nglobal.set('processNoSwapShort', processNoSwapShort);\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 350,
        "y": 80,
        "wires": [
            [
                "3f820a34fd1a8f6d"
            ]
        ]
    },
    {
        "id": "3f820a34fd1a8f6d",
        "type": "debug",
        "z": "cbd2b3b600a928eb",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "statusVal": "",
        "statusType": "auto",
        "x": 640,
        "y": 80,
        "wires": []
    },
    {
        "id": "5d1a746cb00206ce",
        "type": "function",
        "z": "cbd2b3b600a928eb",
        "name": "",
        "func": "\n\nreturn new Promise((reslove,reject)=>{\n    global.get('processNoSwapInt',function(error, processNoSwapInt){\n        \n      var buf = Buffer.alloc(4);\n      buf[0] = 1;\n      buf[1] = 2;\n      buf[2] = 3;\n      buf[3] = 4;\n      \n    if(processNoSwapInt){\n        msg.payload = processNoSwapInt(0, buf)\n    }\n    reslove(msg);\n    })\n    \n})\n\n        ",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 300,
        "y": 320,
        "wires": [
            [
                "7dc9a6ff3a7be0b0"
            ]
        ]
    },
    {
        "id": "dc0bde4b0474c180",
        "type": "inject",
        "z": "cbd2b3b600a928eb",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 110,
        "y": 320,
        "wires": [
            [
                "5d1a746cb00206ce"
            ]
        ]
    },
    {
        "id": "7dc9a6ff3a7be0b0",
        "type": "debug",
        "z": "cbd2b3b600a928eb",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "statusVal": "",
        "statusType": "auto",
        "x": 540,
        "y": 320,
        "wires": []
    }
]
  1. 源码(修改部分代码,实现函数的读写)
packages/node_modules/@node-red/runtime/lib/nodes/context/localfilesystem.js
  1. 结果

截图_选择区域_20220322155310

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/25493.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信