Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class Directory:
"objectguid",
"objectsid",
"entitytypename",
"name",
}

def get_dn_prefix(self) -> DistinguishedNamePrefix:
Expand Down
14 changes: 6 additions & 8 deletions app/ldap_protocol/ldap_requests/modify.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,21 @@ async def handle(
and await ctx.password_use_cases.is_password_change_restricted(
directory.id,
)
) or (
not can_modify and not (password_change_requested and self_modify)
):
yield ModifyResponse(
result_code=LDAPCodes.INSUFFICIENT_ACCESS_RIGHTS,
)
return

if directory.rdname in names:
yield ModifyResponse(result_code=LDAPCodes.NOT_ALLOWED_ON_RDN)
return

before_attrs = self.get_directory_attrs(directory)
entity_type = directory.entity_type
try:
if not can_modify and not (
password_change_requested and self_modify
):
yield ModifyResponse(
result_code=LDAPCodes.INSUFFICIENT_ACCESS_RIGHTS,
)
return

for change in self.changes:
if change.l_type in Directory.ro_fields:
continue
Expand Down
29 changes: 0 additions & 29 deletions tests/test_api/test_main/test_router/test_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,6 @@ async def test_api_correct_add(http_client: AsyncClient) -> None:
assert data.get("errorMessage") == ""


@pytest.mark.asyncio
@pytest.mark.usefixtures("session")
async def test_api_add_incorrect_computer_name(
http_client: AsyncClient,
) -> None:
"""Test api incorrect (name) add."""
response = await http_client.post(
"/entry/add",
json={
"entry": "cn=test,dc=md,dc=test",
"password": None,
"attributes": [
{"type": "name", "vals": [" test;incorrect"]},
{"type": "cn", "vals": ["test"]},
{"type": "objectClass", "vals": ["computer", "top"]},
{
"type": "memberOf",
"vals": ["cn=domain admins,cn=Groups,dc=md,dc=test"],
},
],
},
)

data = response.json()

assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.UNDEFINED_ATTRIBUTE_TYPE


@pytest.mark.asyncio
@pytest.mark.usefixtures("session")
async def test_api_add_incorrect_user_samaccount_with_dot(
Expand Down
65 changes: 0 additions & 65 deletions tests/test_api/test_main/test_router/test_modify.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,71 +288,6 @@ async def test_api_incorrect_modify_computer_samaccountname_add(
assert data.get("resultCode") == LDAPCodes.OPERATIONS_ERROR


@pytest.mark.asyncio
@pytest.mark.usefixtures("setup_session")
@pytest.mark.usefixtures("session")
async def test_api_duplicate_with_spaces_modify(
http_client: AsyncClient,
) -> None:
"""Test API for modify duplicated object name."""
entry_dn = "cn=new_test,dc=md,dc=test"
response = await http_client.post(
"/entry/add",
json={
"entry": entry_dn,
"password": None,
"attributes": [
{
"type": "objectClass",
"vals": ["organization", "top"],
},
],
},
)
data = response.json()
assert data.get("resultCode") == LDAPCodes.SUCCESS

response = await http_client.patch(
"/entry/update",
json={
"object": entry_dn,
"changes": [
{
"operation": Operation.REPLACE,
"modification": {
"type": "cn",
"vals": [" test"],
},
},
],
},
)

data = response.json()

assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.SUCCESS

response = await http_client.post(
"entry/search",
json={
"base_object": entry_dn,
"scope": 0,
"deref_aliases": 0,
"size_limit": 1000,
"time_limit": 10,
"types_only": True,
"filter": "(objectClass=*)",
"attributes": [],
"page_number": 1,
},
)

data = response.json()
assert isinstance(data, dict)
assert data["search_result"][0]["object_name"] == entry_dn


@pytest.mark.asyncio
@pytest.mark.usefixtures("adding_test_user")
@pytest.mark.usefixtures("setup_session")
Expand Down
138 changes: 0 additions & 138 deletions tests/test_api/test_main/test_router/test_rename.py

This file was deleted.

98 changes: 98 additions & 0 deletions tests/test_ldap/test_util/test_modify.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,104 @@ async def try_modify() -> int:
assert "posixEmail" not in attributes


@pytest.mark.asyncio
@pytest.mark.usefixtures("setup_session")
async def test_ldap_modify_rdn(
settings: Settings,
creds: TestCreds,
) -> None:
"""Test modify RDN."""
dn = "cn=user0,cn=Users,dc=md,dc=test"

async def try_modify() -> int:
with tempfile.NamedTemporaryFile("w") as file:
file.write(
(f"dn: {dn}\nchangetype: modify\nreplace: cn\ncn: modme\n-\n"),
)
file.seek(0)
proc = await asyncio.create_subprocess_exec(
"ldapmodify",
"-vvv",
"-H",
f"ldap://{settings.HOST}:{settings.PORT}",
"-D",
"user_admin",
"-x",
"-w",
creds.pw,
"-f",
file.name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

await proc.communicate()
return await proc.wait()

assert await try_modify() == LDAPCodes.NOT_ALLOWED_ON_RDN


@pytest.mark.asyncio
@pytest.mark.usefixtures("setup_session")
async def test_ldap_modify_name(
session: AsyncSession,
settings: Settings,
creds: TestCreds,
) -> None:
"""Test modify name."""
dn = "cn=user0,cn=Users,dc=md,dc=test"

query = (
select(Directory)
.options(
subqueryload(qa(Directory.attributes)),
joinedload(qa(Directory.user)),
)
.filter(get_filter_from_path(dn))
)

old_directory = await session.scalar(query)
assert old_directory

async def try_modify() -> int:
with tempfile.NamedTemporaryFile("w") as file:
file.write(
(
f"dn: {dn}\n"
"changetype: modify\n"
"replace: name\n"
"name: changename\n"
"-\n"
),
)
file.seek(0)
proc = await asyncio.create_subprocess_exec(
"ldapmodify",
"-vvv",
"-H",
f"ldap://{settings.HOST}:{settings.PORT}",
"-D",
"user_admin",
"-x",
"-w",
creds.pw,
"-f",
file.name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

await proc.communicate()
return await proc.wait()

assert await try_modify() == LDAPCodes.SUCCESS

new_directory = await session.scalar(query)
assert new_directory

assert old_directory.name == new_directory.name


async def run_single_modify(
settings: Settings,
operation: Literal["add", "delete", "replace"],
Expand Down