【转帖】基于paramiko的二次封装

基于,paramiko,二次,封装 · 浏览次数 : 0

小编点评

## Summary of the provided code This code defines a Python class `SSHPool` that manages SSH connections to multiple remote hosts. It uses the `paramiko` library to establish and manage the connections. **Main features:** * **Pool management:** It maintains a pool of active SSH clients, allowing multiple connections to the same host. * **Connection establishment:** It uses `paramiko` to establish SSH connections to the remote hosts with specified usernames, passwords, and private keys. * **Communication:** Once a connection is established, it allows communication with the remote host through methods like `exec_command()`. * **Connection closing:** When a client closes, it is removed from the pool and the connection is closed. * **Dynamic sizing:** The pool size can be dynamically adjusted based on the number of available connections. **Additional details:** * The code assumes that the `paramiko` library is installed and available. * The `get()` method returns a new `SSHPool` instance if no connection is found in the pool. * The `remove()` method closes the specified client and removes it from the pool. * The `current_size` variable likely keeps track of the number of active connections. * This code demonstrates a basic implementation of managing SSH connections with `paramiko` in a Python class. **Usage:** The code can be used to manage SSH connections to multiple remote servers. It can be run directly or imported into a Python script. Once imported, it can be used to access the available servers and establish connections. **Overall, this code provides a convenient and efficient way to manage SSH connections in a Python application.**

正文

https://www.jianshu.com/p/944674f44b24

 

paramiko 是 Python 中的一个用来连接远程主机的第三方工具,通过使用 paramiko 可以用来代替以 ssh 连接到远程主机执行命令。

paramiko 模块提供了两个核心组件,分别是 SSHClient 和 SFTPClient。前者用于在远程主机上执行命令,是对于 ssh 会话的封装;后者用于对资源上传下载等操作,是对 sftp 的封装。

paramiko 模块提供了一些SSH核心组件

  • Channel: 建立 ssh 连接后,paramiko会调用底层的 channel 类来打开一个socket连接,后续发送的命令会通过 channel 进行发送,当接收到命令的返回结果(标准输出或标准错误)后,结果又将通过 channel 发送到连接的客户端。
  • Client: 建立的 ssh 会话的客户端。通过客户端可以去连接多个远程主机,并将执行的命令通过 client.exec_command() 方法发送到远程主机。
  • Message: 一个用来将传输过程中的字符、整形、布尔及长类型的字符组合进行编码组成的字节流信息。
  • PacketizerImplementation of the base SSH packet protocol.
  • Transport: 封装了一个加密的会话,调用该会话时会创建一个流式隧道(通常称为 channel)

封装的具体逻辑,为了保持功能的纯粹性,仅通过 paramiko 实现客户端的的建立及命令的发送。

以下为实现逻辑

#!/usr/bin/env python3
# coding=utf-8

import sys
import socket
import paramiko
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException

class Connect(object):
    
    def __init__(self, username: str, 
                 password: str, 
                 port: int = 22,
                 hostname: str,
                 pkey: str = None,
                 look_for_keys: bool = True,
                 allow_agent: bool = True,
                 timeout: float = None
                ):
        self.hostname = hostname
        self.username = suername
        self.port = port
        self.password = password
        self.pkey = pkey
        self.allow_agent = allow_agent
        self.look_for_keys = look_for_keys
        self.timeout = timeout
        
    def _connect(self):
        self.client = SSHClient()
        self.client.load_system_host_keys()
        self.client.set_missing_host_key_policy(paramiko.AutoAddpolicy())
        try:
            self.client.connect(hostname=self.hostname,
                               port=self.port,
                               username=self.username,
                               password=self.password,
                               pkey=self.pkey,
                               allow_agent=self.allow_agent,
                               look_for_keys=self.look_for_keys
                               )
        except BadHostKeyException as badKey:
            msg = "Receive a bad key"
            sys.exit()
        except AuthenticationException as auth:
            msg = "Invalid username or password"
            sys.exit()
        except SSHExpection as ssh:
            msg = "Establish ssh session error"
            sys.exit()
        exception scoket.error as sock:
            msg = "Connecting has a socket error"
            sys.exit()

        
    def run(self, command: str):
        stdin, stdout, stderr = self.client.exec_command(command, timeout=self.timeout)
        stdin.close()
        stdout.flush()
        
        try:
            output = stdout.read()
            err_msg = stderr.read()
            
            output = output.decode("utf-8") if isinstance(output, bytes) else output
            err_msg = err_msg.decode("utf-8") if isinstance(err_msg, bytes) else err_msg
            return output, err_msg
            
        except socket.timeout:
            raise(f"Exec Command {command} timeout")
            
    def close(self):
        self.client.close()
               

将ssh会话保存到会话池中

#!/usr/bin/env python3
# encoding=utf-8

import os
import socket
import paramiko
from collections import deque
from paramiko.ssh_exception import SSHException
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException
from eventlet import pools


class SSHPool(pools.Pool):
    
    """创建 SSH 对象池 """
    _pool = deque()
    
    def __init__(self,
                 ip: str,
                 username: str,
                 password: str = None,
                 port: int = 22,
                 privatekey: str = None,
                 timeout: float = None,
                 **kwargs
                ):
        self.ip = ip
        self.port = port
        self.password = password
        self.username = username
        self.privatekey = privatekey
        self.timeout = timeout
        super(SSHPool, self).__init__(**kwargs)
        
    def create(self):
        try:
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            if self.password:
                client.connect(self.ip, 
                            port=self.port,
                            username=self.username,
                            password=self.password,
                            timeout=self.timeout
                           )
            elif self.privatekey:
                if isinstance(self.privatekey, paramiko.rsakey.RSAKey):
                    key = self.privatekey
                else:
                    keyfile = os.path.expanduser(self.privatekey)
                    key = paramiko.RSAKey.from_private_key_file(keyfile)
                client.connect(self.ip,
                            port=self.port,
                            username=self.username,
                            look_for_keys=True,
                            pkey=key,
                            timeout=self.timeout
                           )
            else:
                raise SSHException("Invalid username or password")
                
            
            # Paramiko by default sets the socket timeout to 0.1 seconds,
            # ignoring what we set through the sshclient. This doesn't help for
            # keeping long lived connections. Hence we have to bypass it, by
            # overriding it after the transport is initialized. We are setting
            # the sockettimeout to None and setting a keepalive packet so that,
            # the server will keep the connection open. All that does is send
            # a keepalive packet every ssh_conn_timeout seconds.
            
            if self.timeout:
                transport = client.get_transport()
                transport.sock.settimeout(None)
                transport.set_keepalive(self.timeout)
            return client
        
        except socket.timeout:
            raise SSHException("Connect timeout")
        except NoValidConnectionsError as novalid:
            raise SSHException("Connect valid failed")
        except AuthenticationException as auth:
            raise SSHException("Invalid username or password")
        except Exception as e:
            raise SSHException("An exception happened")

    def get(self):
        """从会话池中获取 SSH 会话
        1. 返回一个存活的会话
        2. 不存在的会话或已经失效的会话, 会新建一个会话并返回该会话
        """
        conn = super(SSHPool, self).get()
        print("get: [%s]" % conn)
        if conn:
            if conn.get_transport().is_active():
                return conn
            else:
                conn.close()
        return self.create()
    
    #def put(self, ssh):
    #    """将 SSH 会话添加到会话池中"""
    #    conn = super(SSHPool, self).get()
    #    if ssh not in self._pool:
    #        self._pool.append(ssh)
            
    def remove(self, ssh):
        """关闭 SSH 会话,并将其从会话池中移除"""
        ssh.close()
        ssh = None
        print("remove: [%s]" % ','.join(self.free_items))
        if ssh in self.free_items:
            self.free_items.pop(ssh)
        if self.current_size > 0:
            self.current -= 1


if __name__ == "__main__":
    client = SSHPool(username='root', password='****!', ip='****')
    with client.get() as conn:
        _, stdout, stderr = conn.exec_command("ls -l")
        print(stdout.read().decode())     

# 参考自:  https://opendev.org/openstack/cinder/commit/75ef446fef63320e9c1ed4a04e59ffbbb62b5cef?style=unified 

与【转帖】基于paramiko的二次封装相似的内容:

【转帖】基于paramiko的二次封装

https://www.jianshu.com/p/944674f44b24 paramiko 是 Python 中的一个用来连接远程主机的第三方工具,通过使用 paramiko 可以用来代替以 ssh 连接到远程主机执行命令。 paramiko 模块提供了两个核心组件,分别是 SSHClient 

[转帖]基于eBPF的微服务网络安全

https://www.cnblogs.com/charlieroro/p/12724848.html 翻译自:Network security for microservices with eBPF 一些开源的kubernetes工具已经开始使用eBPF,这些工具大多数与网络,监控和安全相关。 本

[转帖]基于 Nginx 实现 10万+ 并发,Linux 内核优化

来源:http://t.cn/EyQTMwG 由于默认的Linux内核参数考虑的是最通用场景,这明显不符合用于支持高并发访问的Web服务器的定义,所以需要修改Linux内核参数,是的Nginx可以拥有更高的性能; 在优化内核时,可以做的事情很多,不过,我们通常会根据业务特点来进行调整,当Nginx作

[转帖]基于腾讯云微服务引擎(TSE) ,轻松实现云上全链路灰度发布

https://my.oschina.net/u/4587289/blog/8570699 1. 概述 软件开发过程中,应用发布非常频繁,通常情况下,开发或运维人员会将系统里所有服务同时上线,使得所有用户都使用上新版本。这样的操作时常会导致发布失败,或因发布前修改代码,线上出现 Bug。 假设一个在

【转帖】基于官方rpm包方式安装Oracle19c

https://blog.whsir.com/post-5489.html 本文基于Centos7.x环境,通过官方提供的rpm包来安装19c 1、下载Oracle19c安装包 https://www.oracle.com/database/technologies/oracle-database-

[转帖]基于 Skywalking 部署应用性能监控

https://www.jianshu.com/p/50627b9ab0be 今天我们就着重讲一讲如何基于 Skywalking 来快速搭建一套应用性能监控平台 walkingfunny.com.png 一、Skywaling 介绍 Skywalking是由国内开源爱好者吴晟开源并提交到Apache

[转帖]基于Fuse的用户态文件系统性能优化几点建议

https://zhuanlan.zhihu.com/p/68085075 目前很多文件系统基于Fuse( http://fuse.sourceforge.net/ )开发,在较为深入钻研Fuse实现后,总结出开发此类文件系统时可考虑的优化方案,拿出来与大家讨论讨论,如有不准确的地方,还望大家不吝赐

[转帖][译] 基于 Envoy、Cilium 和 eBPF 实现透明的混沌测试(KubeCon, 2019)

http://arthurchiao.art/blog/transparent-chaos-testing-with-envoy-cilium-ebpf-zh/ 译者序 本文内容来自 2019 年的一个技术分享 Transparent Chaos Testing with Envoy, Cilium

[转帖][译] Cilium:基于 BPF+EDT+FQ+BBR 实现更好的带宽管理(KubeCon, 2022)

http://arthurchiao.art/blog/better-bandwidth-management-with-ebpf-zh/ Published at 2022-10-30 | Last Update 2022-10-30 译者序 本文翻译自 KubeCon+CloudNativeCo

[转帖][译] 如何基于 Cilium 和 eBPF 打造可感知微服务的 Linux(InfoQ, 2019)

http://arthurchiao.art/blog/how-to-make-linux-microservice-aware-with-cilium-zh/ 译者序 本文内容来自 2019 年的一个技术分享 How to Make Linux Microservice-Aware with Ci