作者:joohhnnn

optimism中的libp2p运用

在本节中,首要用于解说optimism是怎么运用libp2p来完结op-node中的p2p网络树立的。
p2p网络首要是用于在不同的node中传递信息,例如sequencer完结unsafe的区块构建后,经过p2p的gossiphub的pub/sub进行传达。libp2p还处理了其他,例如网络,寻址等在p2p网络中的根底件层。

了解libp2p

libp2p(简称来自“库对等”或“library peer-to-peer”)是一个面向对等(P2P)网络的结构,能够帮助开发P2P运用程序。它包含了一套协议、规范和库,使网络参与者(也称为“对等体”或“peers”)之间的P2P通讯变得更为简便 (source[2])。libp2p最初是作为IPFS(InterPlanetary File System,星际文件体系)项目的一部分,后来它演化成了一个独立的项目,成为了分布式网络的模块化网络堆栈 (source[3])。

libp2p是IPFS社区的一个开源项目,欢迎广泛社区的贡献,包含帮助编写规范、编码完结以及创立示例和教程 (source[4])。libp2p是由多个构建模块组成的,每个模块都有非常明确、有文档记载且经过测验的接口,使得它们可组合、可替换,因而可升级 (source[5])。libp2p的模块化特性使得开发人员能够挑选并运用仅对他们的运用程序必要的组件,从而在构建P2P网络运用程序时促进了灵敏性和效率。

相关资源

  • libp2p的官方文档[6]

  • libp2p的GitHub库房[7]

  • 在ProtoSchool上的libp2p简介[8]

libp2p的模块化架构和开源特性为开发强壮、可扩展和灵敏的P2P运用程序供给了杰出的环境,使其成为分布式网络和网络运用程序开发领域的重要参与者。

libp2p完结办法

在运用libp2p时,你会需求完结和装备一些核心组件以构建你的P2P网络。以下是libp2p在运用中的一些首要完结方面:

1. 节点创立与装备:

  • 创立和装备libp2p节点是最基本的进程,这包含设置节点的网络地址、身份和其他基本参数。
    要害运用代码:

libp2p.New()

2. 传输协议:

  • 挑选和装备你的传输协议(例如TCP、WebSockets等)以保证节点之间的通讯。
    要害运用代码:

tcpTransport := tcp.NewTCPTransport()

3. 多路复用和流操控:

  • 完结多路复用来答应在单一的衔接上处理多个并发的数据流。

  • 完结流量操控来办理数据的传输速率和处理速率。
    要害运用代码:

yamuxTransport := yamux.New()

4. 安全和加密:

  • 装备安全传输层以保证通讯的安全性和隐私。

  • 完结加密和身份验证机制以保护数据和验证通讯方。
    要害运用代码:

tlsTransport := tls.New()

5. 协议和音讯处理:

  • 界说和完结自界说协议来处理特定的网络操作和音讯交流。

  • 处理接纳到的音讯并依据需求发送呼应。
    要害运用代码:

host.SetStreamHandler("/my-protocol/1.0.0", myProtocolHandler)

6. 发现和路由:

  • 完结节点发现机制来找到网络中的其他节点。

  • 完结路由逻辑以确定怎么将音讯路由到网络中的正确节点。
    要害运用代码:

dht := kaddht.NewDHT(ctx, host, datastore.NewMapDatastore())

7. 网络行为和战略:

  • 界说和完结网络的行为和战略,例如衔接办理、过错处理和重试逻辑。
    要害运用代码:

connManager := connmgr.NewConnManager(lowWater, highWater, gracePeriod)

8. 状态办理和存储:

  • 办理节点和网络的状态,包含衔接状态、节点列表和数据存储。
    要害运用代码:

peerstore := pstoremem.NewPeerstore()

9. 测验和调试:

  • 为你的libp2p运用编写测验以保证其正确性和可靠性。

  • 运用调试工具和日志来确诊和处理网络问题。
    要害运用代码:

logging.SetLogLevel("libp2p", "DEBUG")

10. 文档和社区支撑:

- 查阅libp2p的文档以了解其各种组件和API。
- 与libp2p社区交流以获取支撑和处理问题。

}

以上是运用libp2p时需求考虑和完结的一些首要方面。每个项目的具体完结或许会有所不同,但这些基本方面是构建和运转libp2p运用所必需的。在完结这些功用时,能够参阅libp2p的官方文档[9]和GitHub库房[10]中的示例代码和教程。

在OP-node中libp2p的运用

为了弄清楚op-node和libp2p的关系,咱们有必要弄清楚几个问题

- 为什么挑选libp2p?为什么不挑选devp2p(geth运用devp2p)
- OP-node有哪些数据或者流程和p2p网络严密相关
- 这些功用是如安在代码层完结的

op-node需求libp2p网络的原因

首要咱们要了解为什么optimism需求p2p网络libp2p是一个模块化的网络协议,答应开发人员构建去中心化的点对点运用,适用于多种用例 (source[11])(source[12])。而devp2p首要用于以太坊生态体系,专为以太坊运用定制 (source[13])。libp2p的灵敏性和广泛适用性或许使其成为开发人员的首选。

op-node首要运用libp2p的功用点

- 用于sequencer将发生的unsafe的block传递到其他非sequencer节点
- 用于非sequencer形式下的其他节点当出现gap时进行快速同步(反向链同步)
- 用于选用积分声誉体系来规范全体节点的杰出环境

代码完结

host自界说初始化

host能够理解为是p2p的节点,当开启这个节点的时分,需求针对自己的项目进行一些特殊的初始化装备

现在让咱们看一下 op-node/p2p/host.go文件中的Host办法,

该函数首要用于设置 libp2p 主机并进行各种装备。以下是该函数的要害部分以及各部分的简略中文描述:

  1. 查看是否禁用 P2P
    假如 P2P 被禁用,函数会直接回来。

  2. 从公钥获取 Peer ID
    运用装备中的公钥来生成 Peer ID。

  3. 初始化 Peerstore
    创立一个根底的 Peerstore 存储。

  4. 初始化扩展 Peerstore
    在根底 Peerstore 的根底上,创立一个扩展的 Peerstore。

  5. 将私钥和公钥增加到 Peerstore
    在 Peerstore 中存储 Peer 的私钥和公钥。

  6. 初始化衔接操控器(Connection Gater)
    用于操控网络衔接。

  7. 初始化衔接办理器(Connection Manager)
    用于办理网络衔接。

  8. 设置传输和监听地址
    设置网络传输协议和主机的监听地址。

  9. 创立 libp2p 主机
    运用前面的一切设置来创立一个新的 libp2p 主机。

  10. 初始化静态 Peer
    假如有装备静态 Peer,进行初始化。

  11. 回来主机
    最终,函数回来创立好的 libp2p 主机。

这些要害部分负责 libp2p 主机的初始化和设置,每个部分都负责主机装备的一个特定方面。

    func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
        if conf.DisableP2P {
            return nil, nil
        }
        pub := conf.Priv.GetPublic()
        pid, err := peer.IDFromPublicKey(pub)
        if err != nil {
            return nil, fmt.Errorf("failed to derive pubkey from network priv key: %w", err)
        }
        basePs, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
        if err != nil {
            return nil, fmt.Errorf("failed to open peerstore: %w", err)
        }
        peerScoreParams := conf.PeerScoringParams()
        var scoreRetention time.Duration
        if peerScoreParams != nil {
            // Use the same retention period as gossip will if available
            scoreRetention = peerScoreParams.PeerScoring.RetainScore
        } else {
            // Disable score GC if peer scoring is disabled
            scoreRetention = 0
        }
        ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, scoreRetention)
        if err != nil {
            return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
        }
        if err := ps.AddPrivKey(pid, conf.Priv); err != nil {
            return nil, fmt.Errorf("failed to set up peerstore with priv key: %w", err)
        }
        if err := ps.AddPubKey(pid, pub); err != nil {
            return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
        }
        var connGtr gating.BlockingConnectionGater
        connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
        if err != nil {
            return nil, fmt.Errorf("failed to open connection gater: %w", err)
        }
        connGtr = gating.AddBanExpiry(connGtr, ps, log, clock.SystemClock, metrics)
        connGtr = gating.AddMetering(connGtr, metrics)
        connMngr, err := DefaultConnManager(conf)
        if err != nil {
            return nil, fmt.Errorf("failed to open connection manager: %w", err)
        }
        listenAddr, err := addrFromIPAndPort(conf.ListenIP, conf.ListenTCPPort)
        if err != nil {
            return nil, fmt.Errorf("failed to make listen addr: %w", err)
        }
        tcpTransport := libp2p.Transport(
            tcp.NewTCPTransport,
            tcp.WithConnectionTimeout(time.Minute*60)) // break unused connections
        // TODO: technically we can also run the node on websocket and QUIC transports. Maybe in the future?
        var nat lconf.NATManagerC // disabled if nil
        if conf.NAT {
            nat = basichost.NewNATManager
        }
        opts := []libp2p.Option{
            libp2p.Identity(conf.Priv),
            // Explicitly set the user-agent, so we can differentiate from other Go libp2p users.
            libp2p.UserAgent(conf.UserAgent),
            tcpTransport,
            libp2p.WithDialTimeout(conf.TimeoutDial),
            // No relay services, direct connections between peers only.
            libp2p.DisableRelay(),
            // host will start and listen to network directly after construction from config.
            libp2p.ListenAddrs(listenAddr),
            libp2p.ConnectionGater(connGtr),
            libp2p.ConnectionManager(connMngr),
            //libp2p.ResourceManager(nil), // TODO use resource manager interface to manage resources per peer better.
            libp2p.NATManager(nat),
            libp2p.Peerstore(ps),
            libp2p.BandwidthReporter(reporter), // may be nil if disabled
            libp2p.MultiaddrResolver(madns.DefaultResolver),
            // Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
            libp2p.Ping(true),
            // Help peers with their NAT reachability status, but throttle to avoid too much work.
            libp2p.EnableNATService(),
            libp2p.AutoNATServiceRateLimit(10, 5, time.Second*60),
        }
        opts = append(opts, conf.HostMux...)
        if conf.NoTransportSecurity {
            opts = append(opts, libp2p.Security(insecure.ID, insecure.NewWithIdentity))
        } else {
            opts = append(opts, conf.HostSecurity...)
        }
        h, err := libp2p.New(opts...)
        if err != nil {
            return nil, err
        }
        staticPeers := make([]*peer.AddrInfo, len(conf.StaticPeers))
        for i, peerAddr := range conf.StaticPeers {
            addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
            if err != nil {
                return nil, fmt.Errorf("bad peer address: %w", err)
            }
            staticPeers[i] = addr
        }
        out := &extraHost{
            Host:        h,
            connMgr:     connMngr,
            log:         log,
            staticPeers: staticPeers,
            quitC:       make(chan struct{}),
        }
        out.initStaticPeers()
        if len(conf.StaticPeers) > 0 {
            go out.monitorStaticPeers()
        }
        out.gater = connGtr
        return out, nil
    }

gossip下的区块传达

gossip在分布式体系中用于保证数据一致性,并修复由多播引起的问题。它是一种通讯协议,其中信息从一个或多个节点发送到网络中的其他节点集,当网络中的一组客户端一起需求相同的数据时,这会很有用。当sequencer发生出unsafe状态的区块的时分,就是经过gossip网络传递给其他节点的。

首要让咱们来看看节点是在哪里参加gossip网络的,op-node/p2p/node.go中的init办法,在节点初始化的时分,调用JoinGossip办法参加了gossip网络

    func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
            …
            // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
            n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
            if err != nil {
                return fmt.Errorf("failed to start gossipsub router: %w", err)
            }
            n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
            …
    }

接下来来到op-node/p2p/gossip.go

以下是 JoinGossip 函数中履行的首要操作的简略概述:

  1. 验证器创立

  • val 被赋予 guardGossipValidator 函数调用的成果,目的是为八卦音讯创立验证器,该验证器查看网络中传达的区块的有用性。

  • 区块主题称号生成

    • 运用 blocksTopicV1 函数生成 blocksTopicName,该函数依据装备(cfg)中的 L2ChainID 格式化字符串。格式化的字符串遵循特定的结构:/optimism/{L2ChainID}/0/blocks

  • 主题验证器注册

    • 调用 psRegisterTopicValidator 办法,以将 val 注册为区块主题的验证器。还指定了一些验证器的装备选项,例如3秒的超时和4的并发等级。

  • 参加主题

    • 函数经过调用 ps.Join(blocksTopicName) 测验参加区块八卦主题。假如出现过错,它将回来一个过错音讯,指示无法参加主题。

  • 事情处理器创立

    • 经过调用 blocksTopic.EventHandler() 测验为区块主题创立事情处理器。假如出现过错,它将回来一个过错音讯,指示无法创立处理器。

  • 记载主题事情

    • 生成了一个新的goroutine来运用 LogTopicEvents 函数记载主题事情。

  • 主题订阅

    • 函数经过调用 blocksTopic.Subscribe() 测验订阅区块八卦主题。假如出现过错,它将回来一个过错音讯,指示无法订阅。

  • 订阅者创立

    • 运用 MakeSubscriber 函数创立了一个 subscriber,该函数封装了一个 BlocksHandler,该处理器处理来自 gossipInOnUnsafeL2Payload 事情。生成了一个新的goroutine来运转供给的 subscription

  • 创立并回来发布者

    • 创立了一个 publisher 实例并回来,该实例装备为运用供给的装备和区块主题。

    func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
        val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
        blocksTopicName := blocksTopicV1(cfg) // return fmt.Sprintf("/optimism/%s/0/blocks", cfg.L2ChainID.String())
        err := ps.RegisterTopicValidator(blocksTopicName,
            val,
            pubsub.WithValidatorTimeout(3*time.Second),
            pubsub.WithValidatorConcurrency(4))
        if err != nil { 
            return nil, fmt.Errorf("failed to register blocks gossip topic: %w", err)
        }
        blocksTopic, err := ps.Join(blocksTopicName)
        if err != nil {
            return nil, fmt.Errorf("failed to join blocks gossip topic: %w", err)
        }
        blocksTopicEvents, err := blocksTopic.EventHandler()
        if err != nil {
            return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
        }
        go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
        subscription, err := blocksTopic.Subscribe()
        if err != nil {
            return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
        }
        subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
        go subscriber(p2pCtx, subscription)
        return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil
    }

这样,一个非sequencer节点的订阅就现已树立了,接下来让咱们把目光移到sequencer形式的节点傍边,然后看看他是假如将区块播送出去的。

op-node/rollup/driver/state.go

在eventloop中经过循环来等待sequencer形式中新的payload的发生(unsafe区块),然后将这个payload经过PublishL2Payload传达到gossip网络中

    func (s *Driver) eventLoop() {
        …
        for(){
            …
            select {
            case <-sequencerCh:
                payload, err := s.sequencer.RunNextSequencerAction(ctx)
                if err != nil {
                    s.log.Error("Sequencer critical error", "err", err)
                    return
                }
                if s.network != nil && payload != nil {
                    // Publishing of unsafe data via p2p is optional.
                    // Errors are not severe enough to change/halt sequencing but should be logged and metered.
                    if err := s.network.PublishL2Payload(ctx, payload); err != nil {
                        s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
                        s.metrics.RecordPublishingError()
                    }
                }
                planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
                …
                }
        …
        }
        …
    }

这样,一个新的payload的就进入到gossip网络中了。

在libp2p的pubsub体系中,节点首要从其他节点接纳音讯,然后查看音讯的有用性。假如音讯有用而且符合节点的订阅规范,节点会考虑将其转发给其他节点。根据某些战略,如网络拓扑和节点的订阅状况,节点会决议是否将音讯转发给其它节点。假如决议转发,节点会将音讯发送给与其衔接并订阅了相同主题的一切节点。在转发进程中,为避免音讯在网络中无限循环,一般会有机制来跟踪已转发的音讯,并保证不会多次转发同一音讯。一起,音讯或许具有“生存时刻”(TTL)属性,界说了音讯能够在网络中转发的次数或时刻,每逢音讯被转发时,TTL值都会递减,直到音讯不再被转发停止。在验证方面,音讯一般会经过一些验证进程,例如查看音讯的签名和格式,以保证音讯的完整性和真实性。在libp2p的pubsub模型中,这个进程保证了音讯能够广泛传达到网络中的许多节点,一起避免了无限循环和网络拥塞,完结了有用的音讯传递和处理。

当存在缺失区块,经过p2p快速同步

当节点由于特殊状况,比如宕机后重新链接,或许会发生一些没有同步上的区块(gaps),当遇到这种状况时,能够经过p2p网络的反向链的办法快速同步。

咱们来看一下op-node/rollup/driver/state.go中的checkForGapInUnsafeQueue函数

该代码段界说了一个名为 checkForGapInUnsafeQueue 的办法,属于 Driver 结构体。它的目的是查看一个名为 "unsafe queue" 的行列中是否存在数据缺口,并测验经过一个名为 altSync 的备用同步办法来检索缺失的负载。这儿的要害点是,该办法是为了保证数据的连续性,并在检测到数据缺失时测验从其他同步办法中检索缺失的数据。以下是函数的首要进程:

  1. 函数首要从 s.derivation 中获取 UnsafeL2HeadUnsafeL2SyncTarget 作为查看规模的起始和完毕点。

  2. 函数查看在 startend 之间是否存在缺失的数据块,这是经过比较 endstartNumber 值来完结的。

  3. 假如检测到数据缺口,函数会经过调用 s.altSync.RequestL2Range(ctx, start, end) 来恳求缺失的数据规模。假如 end 是一个空引证(即 eth.L2BlockRef{}),函数将恳求一个开放完毕规模的同步,从 start 开端。

  4. 在恳求数据时,函数会记载一个调试日志,说明它正在恳求哪个规模的数据。

  5. 函数最终回来一个过错值。假如没有过错,它会回来 nil

    // checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
    // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
    // Results are received through OnUnsafeL2Payload.
    func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
        start := s.derivation.UnsafeL2Head()
        end := s.derivation.UnsafeL2SyncTarget()
        // Check if we have missing blocks between the start and end. Request them if we do.
        if end == (eth.L2BlockRef{}) {
            s.log.Debug("requesting sync with open-end range", "start", start)
            return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
        } else if end.Number > start.Number+1 {
            s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
            return s.altSync.RequestL2Range(ctx, start, end)
        }
        return nil
    }

RequestL2Range函数向requests通道里传递恳求区块的开端和完毕信号。

然后经过onRangeRequest办法来对恳求向peerRequests通道分发,peerRequests通道会被多个peer开启的loop所等待,即每一次分发都只有一个peer会去处理这个request。

    func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
            …
            for i := uint64(0); ; i++ {
            num := req.end.Number - 1 - i
            if num <= req.start {
                return
            }
            // check if we have something in quarantine already
            if h, ok := s.quarantineByNum[num]; ok {
                if s.trusted.Contains(h) { // if we trust it, try to promote it.
                    s.tryPromote(h)
                }
                // Don't fetch things that we have a candidate for already.
                // We'll evict it from quarantine by finding a conflict, or if we sync enough other blocks
                continue
            }
            if _, ok := s.inFlight[num]; ok {
                log.Debug("request still in-flight, not rescheduling sync request", "num", num)
                continue // request still in flight
            }
            pr := peerRequest{num: num, complete: new(atomic.Bool)}
            log.Debug("Scheduling P2P block request", "num", num)
            // schedule number
            select {
            case s.peerRequests <- pr:
                s.inFlight[num] = pr.complete
            case <-ctx.Done():
                log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
                return
            default: // peers may all be busy processing requests already
                log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num)
                return
            }
        }
    }

接下来咱们看看,当peer收到这个request的时分会怎么处理。

首要咱们要知道的是,peer和恳求节点之间的链接,或者音讯传递是经过libp2p的stream来传递的。stream的处理办法由接纳peer节点完结,stream的创立由发送节点来开启。

咱们能够在之前的init函数中看到这样的代码,这儿MakeStreamHandler回来了一个处理函数,SetStreamHandler将协议id和这个处理函数绑定,因而,每逢发送节点创立并运用这个stream的时分,都会触发回来的处理函数。

    n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
    // register the sync protocol with libp2p host
    payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
    n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)

接下来让咱们看看处理函数里边是怎么处理的
函数首要进行全局和个人的速率限制查看,以操控处理恳求的速度。然后,它读取并验证了恳求的区块号,保证它在合理的规模内。之后,函数从 L2 层获取恳求的区块负载,并将其写入到呼应流中。在写入呼应数据时,它设置了写入截止时刻,以避免在写入进程中被慢速的 peer 衔接堵塞。最终,函数回来恳求的区块号和或许的过错。

    func (srv *ReqRespServer) handleSyncRequest(ctx context.Context, stream network.Stream) (uint64, error) {
        peerId := stream.Conn().RemotePeer()
        // take a token from the global rate-limiter,
        // to make sure there's not too much concurrent server work between different peers.
        if err := srv.globalRequestsRL.Wait(ctx); err != nil {
            return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
        }
        // find rate limiting data of peer, or add otherwise
        srv.peerStatsLock.Lock()
        ps, _ := srv.peerRateLimits.Get(peerId)
        if ps == nil {
            ps = &peerStat{
                Requests: rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst),
            }
            srv.peerRateLimits.Add(peerId, ps)
            ps.Requests.Reserve() // count the hit, but make it delay the next request rather than immediately waiting
        } else {
            // Only wait if it's an existing peer, otherwise the instant rate-limit Wait call always errors.
            // If the requester thinks we're taking too long, then it's their problem and they can disconnect.
            // We'll disconnect ourselves only when failing to read/write,
            // if the work is invalid (range validation), or when individual sub tasks timeout.
            if err := ps.Requests.Wait(ctx); err != nil {
                return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err)
            }
        }
        srv.peerStatsLock.Unlock()
        // Set read deadline, if available
        _ = stream.SetReadDeadline(time.Now().Add(serverReadRequestTimeout))
        // Read the request
        var req uint64
        if err := binary.Read(stream, binary.LittleEndian, &req); err != nil {
            return 0, fmt.Errorf("failed to read requested block number: %w", err)
        }
        if err := stream.CloseRead(); err != nil {
            return req, fmt.Errorf("failed to close reading-side of a P2P sync request call: %w", err)
        }
        // Check the request is within the expected range of blocks
        if req < srv.cfg.Genesis.L2.Number {
            return req, fmt.Errorf("cannot serve request for L2 block %d before genesis %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr)
        }
        max, err := srv.cfg.TargetBlockNumber(uint64(time.Now().Unix()))
        if err != nil {
            return req, fmt.Errorf("cannot determine max target block number to verify request: %w", invalidRequestErr)
        }
        if req > max {
            return req, fmt.Errorf("cannot serve request for L2 block %d after max expected block (%v): %w", req, max, invalidRequestErr)
        }
        payload, err := srv.l2.PayloadByNumber(ctx, req)
        if err != nil {
            if errors.Is(err, ethereum.NotFound) {
                return req, fmt.Errorf("peer requested unknown block by number: %w", err)
            } else {
                return req, fmt.Errorf("failed to retrieve payload to serve to peer: %w", err)
            }
        }
        // We set write deadline, if available, to safely write without blocking on a throttling peer connection
        _ = stream.SetWriteDeadline(time.Now().Add(serverWriteChunkTimeout))
        // 0 - resultCode: success = 0
        // 1:5 - version: 0
        var tmp [5]byte
        if _, err := stream.Write(tmp[:]); err != nil {
            return req, fmt.Errorf("failed to write response header data: %w", err)
        }
        w := snappy.NewBufferedWriter(stream)
        if _, err := payload.MarshalSSZ(w); err != nil {
            return req, fmt.Errorf("failed to write payload to sync response: %w", err)
        }
        if err := w.Close(); err != nil {
            return req, fmt.Errorf("failed to finishing writing payload to sync response: %w", err)
        }
        return req, nil
    }

至此,反向链同步恳求和处理的大致流程现已解说完毕

p2p节点中的积分声誉体系

为了避免某些节点进行恶意的恳求与呼应来破坏整个网络的安全性,optimism还运用了一套积分体系。

例如在op-node/p2p/app_scores.go 中存在一系列函数对peer的分数进行设置

    func (s *peerApplicationScorer) onValidResponse(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementValidResponses{Cap: s.params.ValidResponseCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }
    func (s *peerApplicationScorer) onResponseError(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementErrorResponses{Cap: s.params.ErrorResponseCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }
    func (s *peerApplicationScorer) onRejectedPayload(id peer.ID) {
        _, err := s.scorebook.SetScore(id, store.IncrementRejectedPayloads{Cap: s.params.RejectedPayloadCap})
        if err != nil {
            s.log.Error("Unable to update peer score", "peer", id, "err", err)
            return
        }
    }

然后在增加新的节点前会查看其积分状况

    func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {
        return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}
    }
    func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {
        score, err := g.scores.GetPeerScore(p)
        if err != nil {
            return false
        }
        return score >= g.minScore
    }
    func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
        return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)
    }
    func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
        return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)
    }
    func (g *ScoringConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
        return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)
    }

总结

libp2p的高度可装备性使得整个项目的p2p具有高度的可自界说化和模块话,以上是optimsim对libp2p进行个性化完结的首要逻辑,还有其他细节能够在p2p目录下经过阅览源码的办法来具体学习。

此时快讯

【DYdX Chain作为独立的Cosmos Layer 1在主网正式启动】金色财经报道,dYdX V4及其独立的Cosmos区块链已经推出了alpha主网阶段,DYdX Chain作为独立的Cosmos Layer 1在主网正式启动,截至美东时间1:00PM,dYdX Chain验证者创建了dYdX Chain的创世区块。
在alpha阶段,主网的主要目的是对网络进行压力测试。它正在招募60多个验证者以确保网络安全。计划随后推出测试版,等待社区治理投票的批准,最终将在网络上进行交易。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注