Paramiko는 Python 기반의 SSHv2 프로토콜 구현체로, 원격 서버에 대한 안전한 연결과 명령 실행, 파일 전송 기능을 제공합니다. 암호화된 인증 방식을 지원하며, 크로스 플랫폼 환경에서 동작하여 다양한 운영체제(Linux, macOS, Windows 등)에서 원격 관리 작업을 자동화하는 데 활용됩니다.
환경 구성
Paramiko는 내부적으로 암호화 라이브러리에 의존성을 가지므로, 사전에 개발 환경을 준비해야 합니다.
# 시스템 수준 의존성 설치
yum -y install python-devel
# Python 패키지 설치
pip3 install pycryptodome
pip3 install paramiko
SSH 세션 기반 명령 실행
원격 호스트에 연결하여 셸 명령을 실행하고 결과를 수신하는 기본 패턴입니다. 호스트 키 검증 정책을 설정하여 초기 연결 시 대화형 프롬프트를 방지합니다.
import paramiko
def execute_remote_command(target_ip, user, secret, shell_cmd):
session = paramiko.SSHClient()
session.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
session.connect(
hostname=target_ip,
port=22,
username=user,
password=secret,
timeout=10
)
channel_in, channel_out, channel_err = session.exec_command(shell_cmd)
output = channel_out.read().decode('utf-8')
error = channel_err.read().decode('utf-8')
return {'stdout': output, 'stderr': error}
finally:
session.close()
# 실행 예시
result = execute_remote_command('192.168.1.100', 'admin', 'SecurePass123!', 'uptime')
print(result['stdout'])
SFTP 파일 업로드 구현
Transport 계층을 직접 활용하여 파일을 원격 경로로 전송합니다. 대용량 파일 전송에 적합한 방식으로, 진행 상황 콜백도 지원할 수 있습니다.
import paramiko
def upload_file_to_server(host, port, credentials, local_source, remote_dest):
transport_layer = paramiko.Transport((host, port))
try:
transport_layer.connect(
username=credentials['user'],
password=credentials['pass']
)
file_client = paramiko.SFTPClient.from_transport(transport_layer)
file_client.put(local_source, remote_dest)
print(f"업로드 완료: {local_source} → {remote_dest}")
finally:
transport_layer.close()
# 사용 예시
auth_info = {'user': 'deploy', 'pass': 'AppSecret99'}
upload_file_to_server(
'10.0.0.50', 22, auth_info,
'./build/app.tar.gz',
'/opt/releases/app.tar.gz'
)
원격 파일 스트리밍 읽기
파일을 로컬로 다운로드하지 않고 원격에서 직접 읽어 처리하는 방식입니다. 로그 파일 분석이나 대용량 파일의 부분 처리에 효율적입니다.
import paramiko
def stream_remote_file(server_config, file_path):
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
conn.connect(
server_config['host'],
server_config['port'],
server_config['username'],
server_config['password'],
compress=True
)
sftp_session = conn.open_sftp()
remote_handle = sftp_session.open(file_path)
try:
for line in remote_handle:
yield line.strip()
finally:
remote_handle.close()
finally:
conn.close()
# 적용 예시 - 로그 실시간 분석
config = {
'host': 'db-server.internal',
'port': 22,
'username': 'logger',
'password': 'LogAccess2024'
}
for log_line in stream_remote_file(config, '/var/log/application/error.log'):
if 'CRITICAL' in log_line:
print(f"심각 오류 발견: {log_line}")
SFTP 파일 다운로드
SSHClient의 Transport를 재활용하여 파일을 가져오는 방식입니다. 연결 리소스를 효율적으로 관리합니다.
import paramiko
def fetch_remote_file(remote_addr, auth, remote_path, local_path):
ssh_conn = paramiko.SSHClient()
ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh_conn.connect(remote_addr, username=auth['id'], password=auth['key'])
transport = ssh_conn.get_transport()
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.get(remote_path, local_path)
print(f"다운로드 성공: {remote_path} → {local_path}")
finally:
ssh_conn.close()
# 실행
fetch_remote_file(
'backup.server.com',
{'id': 'backup_user', 'key': 'Bkp#Secure7'},
'/backups/database/latest.sql',
'./restores/db_backup.sql'
)
다중 호스트 일괄 관리
인벤토리 파일을 기반으로 여러 서버에 명령을 분산 실행하는 패턴입니다. 예외 유형별로 세분화된 오류 처리를 포함합니다.
from paramiko.ssh_exception import (
NoValidConnectionsError,
AuthenticationException,
SSHException
)
def batch_execute(command, inventory_file):
import paramiko
def single_host_exec(cmd, host, port, user, pwd):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(host, port=int(port), username=user, password=pwd)
stdin, stdout, stderr = ssh.exec_command(cmd)
return {
'host': host,
'output': stdout.read().decode(),
'error': stderr.read().decode(),
'status': 'success'
}
except NoValidConnectionsError:
return {'host': host, 'status': 'unreachable'}
except AuthenticationException:
return {'host': host, 'status': 'auth_failed'}
except SSHException as e:
return {'host': host, 'status': 'ssh_error', 'detail': str(e)}
finally:
ssh.close()
results = []
with open(inventory_file, 'r') as inv:
for entry in inv:
if ':' not in entry:
continue
host, port, user, pwd = entry.strip().split(':')
print(f"처리 중: {host}")
results.append(single_host_exec(command, host, port, user, pwd))
return results
# 실행 예시 - 서버 목록에서 OS 버전 확인
outcomes = batch_execute('cat /etc/os-release', 'production_hosts.txt')
for item in outcomes:
print(f"{item['host']}: {item.get('status', 'unknown')}")
키 기반 인증 연결
비밀번호 대신 RSA 개인 키를 사용한 무인 연결 방식입니다. 자동화 환경에서 권장되는 보안 접근법입니다.
import paramiko
from paramiko.ssh_exception import AuthenticationException
def key_based_connection(target, key_path, cmd, user='deploy'):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
private_key = paramiko.RSAKey.from_private_key_file(key_path)
client.connect(
hostname=target,
username=user,
pkey=private_key,
look_for_keys=False
)
stdin, stdout, stderr = client.exec_command(cmd)
return stdout.read().decode().strip()
except AuthenticationException:
return "키 인증 실패"
finally:
client.close()
# 서브넷 스캔 예시
for idx in range(1, 255):
ip = f"10.0.1.{idx}"
response = key_based_connection(ip, '~/.ssh/id_rsa_deploy', 'hostname')
if '실패' not in response:
print(f"{ip}: {response}")
키 인증 기반 파일 전송
개인 키를 활용한 양방향 파일 전송 구현입니다.
import paramiko
def secure_file_transfer(host, key_file, operations):
signer = paramiko.RSAKey.from_private_key_file(key_file)
tunnel = paramiko.Transport((host, 22))
try:
tunnel.connect(username='automation', pkey=signer)
sftp = paramiko.SFTPClient.from_transport(tunnel)
for op in operations:
if op['type'] == 'upload':
sftp.put(op['source'], op['destination'])
print(f"업로드: {op['source']}")
elif op['type'] == 'download':
sftp.get(op['source'], op['destination'])
print(f"다운로드: {op['source']}")
finally:
tunnel.close()
# 작업 정의
tasks = [
{'type': 'upload', 'source': './config/nginx.conf', 'destination': '/etc/nginx/nginx.conf'},
{'type': 'download', 'source': '/var/log/nginx/access.log', 'destination': './logs/access.log'}
]
secure_file_transfer('web-01.prod.local', '/keys/auto_id_rsa', tasks)
객체지향 SSH 관리 클래스
명령 실행, 파일 업로드, 다운로드를 통합 관리하는 재사용 가능한 클래스 설계입니다. 리플렉션을 활용한 동적 메서드 호출 패턴을 적용합니다.
import os
import paramiko
from paramiko.ssh_exception import (
AuthenticationException,
NoValidConnectionsError,
SSHException
)
class RemoteNodeManager:
def __init__(self, address, port, identity, secret, instruction):
self.address = address
self.port = int(port)
self.identity = identity
self.secret = secret
self.instruction = instruction
def dispatch(self):
verb = self.instruction.split()[0].lower()
handler_name = f"handle_{verb}"
if hasattr(self, handler_name):
getattr(self, handler_name)()
else:
print(f"지원하지 않는 작업 유형: {verb}")
def handle_exec(self):
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
conn.connect(
self.address,
self.port,
self.identity,
self.secret
)
actual_cmd = ' '.join(self.instruction.split()[1:])
_, out, err = conn.exec_command(actual_cmd)
output = out.read().decode()
error = err.read().decode()
if output:
print(f"[{self.address}] 출력:\n{output}")
if error:
print(f"[{self.address}] 오류:\n{error}")
except (AuthenticationException, NoValidConnectionsError) as e:
print(f"연결 실패 ({self.address}): {type(e).__name__}")
finally:
conn.close()
def handle_send(self):
self._transfer_file('upload')
def handle_receive(self):
self._transfer_file('download')
def _transfer_file(self, direction):
try:
pipe = paramiko.Transport((self.address, self.port))
pipe.connect(username=self.identity, password=self.secret)
agent = paramiko.SFTPClient.from_transport(pipe)
parts = self.instruction.split()[1:]
if len(parts) != 2:
print("파일 경로 2개 필요")
return
if direction == 'upload':
agent.put(parts[0], parts[1])
print(f"업로드 완료: {parts[0]} → {parts[1]}")
else:
agent.get(parts[0], parts[1])
print(f"다운로드 완료: {parts[0]} → {parts[1]}")
except SSHException as e:
print(f"전송 오류: {e}")
finally:
pipe.close()
def load_host_groups(config_dir='conf'):
return [f.replace('.conf', '')
for f in os.listdir(config_dir)
if f.endswith('.conf')]
def interactive_batch_control():
groups = load_host_groups()
print("=" * 50)
print("사용 가능한 호스트 그룹:")
for g in groups:
print(f" - {g}")
selected = input("\n그룹 선택: ").strip()
config_path = f"conf/{selected}.conf"
if not os.path.exists(config_path):
print("존재하지 않는 그룹")
return
hosts = []
print(f"\n[{selected}] 그룹의 호스트:")
with open(config_path) as cfg:
for line in cfg:
if ':' in line:
info = line.strip().split(':')
print(f" {info[0]}")
hosts.append(line.strip())
print("\n명령 형식:")
print(" exec ")
print(" send ")
print(" receive ")
print(" exit/quit")
while True:
user_input = input("\n>> ").strip()
if user_input in ('exit', 'quit'):
break
if not user_input:
continue
for host_entry in hosts:
addr, prt, usr, pwd = host_entry.split(':')
manager = RemoteNodeManager(addr, prt, usr, pwd, user_input)
manager.dispatch()
if __name__ == '__main__':
interactive_batch_control()